Multithreaded programming learning 6 (blocking queue in Java)
introduce
Blocking queue means that when the queue is full, the queue will block the thread inserting elements until the queue is full; when the queue is empty, the queue will block the thread obtaining elements until the queue becomes non empty. Blocking queue is the container used by producers to store elements and consumers to obtain elements.
When the thread insert / get action is blocked due to the queue full / empty, the queue also provides some mechanisms to handle, throw exceptions, return special values, or the thread has been waiting
Tips: if it is an unbounded blocking queue, the put method will never be blocked; The offer method always returns true.
Blocking queue in Java:
ArrayBlockingQueue
Arrayblockingqueue is a bounded blocking queue implemented with an array. This queue sorts elements according to the first in first out (FIFO) principle. By default, fair access of threads is not guaranteed.
The concurrency is controlled by reentrant lock and the blocking is realized by condition.
public class ArrayBlockingQueueTest {
/**
* 1. 由于是有界阻塞队列,需要设置初始大小
* 2. 默认不保证阻塞线程的公平访问,可设置公平性
*/
private static ArrayBlockingQueue<String> QUEUE = new ArrayBlockingQueue<>(2,true);
public static void main(String[] args) throws InterruptedException {
Thread put = new Thread(() -> {
// 3. 尝试插入元素
try {
QUEUE.put("java");
QUEUE.put("javaScript");
// 4. 元素已满,会阻塞线程
QUEUE.put("c++");
} catch (InterruptedException e) {
e.printStackTrace();
}
});
put.start();
Thread take = new Thread(() -> {
try {
// 5. 获取一个元素
System.out.println(QUEUE.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
});
take.start();
// 6 javaScript、c++
System.out.println(QUEUE.take());
System.out.println(QUEUE.take());
}
}
LinkedBlockingQueue
Linkedblockingqueue is a bounded blocking queue implemented with a one-way linked list. The default maximum length for this queue is integer MAX_ VALUE。 This queue sorts elements according to the first in first out principle, and the throughput is usually higher than that of arrayblockingqueue. Executors. Newfixedthreadpool () uses this queue.
Like the arrayblockingqueue, reentrantlock is used to control concurrency. The difference is that it uses two exclusive locks to control consumption and production. It uses takelock and putlock to control production and consumption without interference. As long as the queue is not full, the production thread can always produce; As long as the queue is not empty, consuming threads can consume all the time without blocking each other because of exclusive locks.
Tips: because double locks are used to avoid inaccurate concurrent calculation, an atomicinteger variable is used to count the total number of elements.
LinkedBlockingDeque
Linkedblockingdeque is a bounded blocking queue composed of two-way linked list structure, which can insert and remove elements from both ends of the queue. It implements the blockingdeque interface and adds addfirst, addlast, offerfirst, offerlast, peekfirst and peeklast methods. The method ending with the word first represents inserting, obtaining or removing the first element of the double ended queue. A method ending with the last word that inserts, gets, or removes the last element of a double ended queue.
The node of linkedblockingdeque implements a variable prev pointing to the previous node, so as to realize a two-way queue. Concurrency control is similar to arrayblockingqueue in that a single reentrantlock is used to control concurrency. Because both ends of the double ended queue can be consumed and produced, a shared lock is used.
Bidirectional blocking queues can be used in "work stealing" mode.
public class LinkedBlockingDequeTest {
private static LinkedBlockingDeque<String> DEQUE = new LinkedBlockingDeque<>(2);
public static void main(String[] args) {
DEQUE.addFirst("java");
DEQUE.addFirst("c++");
// java
System.out.println(DEQUE.peekLast());
// java
System.out.println(DEQUE.pollLast());
DEQUE.addLast("PHP");
// c++
System.out.println(DEQUE.pollFirst());
}
}
Tips: the take() method calls takefirst(), which should be noted when using.
PriorityBlockingQueue
Priorityblockingqueue is an unbounded blocking queue implemented by an array at the bottom, with sorting function. Because it is an unbounded queue, the insert will never be blocked. By default, elements are arranged in natural ascending order. You can also customize the class to implement the CompareTo () method to specify the element sorting rule, or specify the construction parameter comparator to sort the elements when initializing priorityblockingqueue.
The bottom layer also uses reentrantlock to control concurrency. Since only acquisition will block, only one condition (only notification consumption) is used to realize it.
public class PriorityBlockingQueueTest {
private static PriorityBlockingQueue<String> QUEUE = new PriorityBlockingQueue<>();
public static void main(String[] args) {
QUEUE.add("java");
QUEUE.add("javaScript");
QUEUE.add("c++");
QUEUE.add("python");
QUEUE.add("PHP");
Iterator<String> it = QUEUE.iterator();
while (it.hasNext()) {
// c++ javaScript java python PHP
// 同优先级不保证排序顺序
System.out.print(it.next() + " ");
}
}
}
DelayQueue
Delayqueue is an unbounded blocking queue that supports delayed acquisition of elements. Queues are implemented using PriorityQueue. The elements in the queue must implement the delayed interface (the design of the delayed interface can refer to the scheduledfuturetask class). The elements are sorted according to the delay priority. The elements with short delay time are in the front. The elements can be extracted from the queue only when the delay expires.
The PriorityQueue in delayqueue will sort the tasks in the queue. When sorting, the task with the smaller time will be ranked first (the task with the earlier time will be executed first). If the time of the two tasks is the same, compare the sequencenumber, and the task with the smaller sequencenumber will be ranked first (that is, if the execution time of the two tasks is the same, the task submitted first will be executed first).
Similar to priorityblockingqueue, the underlying layer is also an array, and a reentrantlock is used to control concurrency.
Application scenario:
public class DelayElement implements Delayed,Runnable {
private static final AtomicLong SEQUENCER = new AtomicLong();
/**
* 标识元素先后顺序
*/
private final long sequenceNumber;
/**
* 延迟时间,单位纳秒
*/
private long time;
public DelayElement(long time) {
this.time = System.nanoTime() + time;
this.sequenceNumber = SEQUENCER.getAndIncrement();
}
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(time - System.nanoTime(),NANOSECONDS);
}
@Override
public int compareTo(Delayed other) {
// compare zero if same object
if (other == this) {
return 0;
}
if (other instanceof DelayElement) {
DelayElement x = (DelayElement) other;
long diff = time - x.time;
if (diff < 0) {
return -1;
} else if (diff > 0) {
return 1;
} else if (sequenceNumber < x.sequenceNumber) {
return -1;
} else {
return 1;
}
}
long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
}
@Override
public void run() {
System.out.println("sequenceNumber" + sequenceNumber);
}
@Override
public String toString() {
return "DelayElement{" + "sequenceNumber=" + sequenceNumber + ",time=" + time + '}';
}
}
public class DelayQueueTest {
private static DelayQueue<DelayElement> QUEUE = new DelayQueue<>();
public static void main(String[] args) {
// 1. 添加 10 个参数
for (int i = 1; i < 10; i++) {
// 2. 5 秒内随机延迟
int nextInt = new Random().nextInt(5);
long convert = TimeUnit.NANOSECONDS.convert(nextInt,TimeUnit.SECONDS);
QUEUE.offer(new DelayElement(convert));
}
// 3. 查询元素排序 —— 延迟短的排在前面
Iterator<DelayElement> iterator = QUEUE.iterator();
while (iterator.hasNext()) {
System.out.println(iterator.next());
}
// 4. 可观察到元素延迟输出
while (!QUEUE.isEmpty()) {
Thread thread = new Thread(QUEUE.poll());
thread.start();
}
}
}
LinkedTransferQueue
Linkedtransferqueue is an unbounded blocked transfer queue composed of a linked list structure.
A large number of CAS operations are used in concurrency control, and locks are not used.
Compared with other blocking queues, linkedtransferqueue has more trytransfer and transfer methods.
SynchronousQueue
Synchronous queue is a blocking queue that does not store elements. Each put operation must wait for a take operation, otherwise it will be blocked to continue the put operation. Executors. Newcachedthreadpool uses this queue.
Synchronousqueue by default, the thread accesses the queue with an unfair policy without using a lock. It all implements concurrency through CAS operations. The throughput is very high, which is higher than linkedblockingqueue and arrayblockingqueue. It is very suitable for dealing with some efficient transitivity scenarios. Executors. Newcachedthreadpool() uses synchronousqueue for task delivery.
public class SynchronousQueueTest {
private static class SynchronousQueueProducer implements Runnable {
private BlockingQueue<String> blockingQueue;
private SynchronousQueueProducer(BlockingQueue<String> queue) {
this.blockingQueue = queue;
}
@Override
public void run() {
while (true) {
try {
String data = UUID.randomUUID().toString();
System.out.println(Thread.currentThread().getName() + " Put: " + data);
blockingQueue.put(data);
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
private static class SynchronousQueueConsumer implements Runnable {
private BlockingQueue<String> blockingQueue;
private SynchronousQueueConsumer(BlockingQueue<String> queue) {
this.blockingQueue = queue;
}
@Override
public void run() {
while (true) {
try {
System.out.println(Thread.currentThread().getName() + " take(): " + blockingQueue.take());
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public static void main(String[] args) {
final BlockingQueue<String> synchronousQueue = new SynchronousQueue<>();
SynchronousQueueProducer queueProducer = new SynchronousQueueProducer(synchronousQueue);
new Thread(queueProducer,"producer - 1").start();
SynchronousQueueConsumer queueConsumer1 = new SynchronousQueueConsumer(synchronousQueue);
new Thread(queueConsumer1,"consumer — 1").start();
SynchronousQueueConsumer queueConsumer2 = new SynchronousQueueConsumer(synchronousQueue);
new Thread(queueConsumer2,"consumer — 2").start();
}
}