Understand the queue family in Java
Introduction to queue family in Java
brief introduction
Collection collection in Java has three families: list, set and queue. Of course, map is also a collection class, but map does not inherit the collection interface.
List and set are often used in our work and are usually used to store result data, while queue is usually used in producer consumer mode due to its particularity.
The popular message oriented middleware, such as rabbit MQ, is the expansion of the data structure queue.
Today's article will take you into the queue family.
Queue interface
Let's first look at the inheritance relationship of queue and the methods defined therein:
Queue inherits from collection, and collection inherits from Iterable.
Queue has three main methods. Let's use a table to see their differences:
Queue classification
Generally speaking, queues can be divided into BlockingQueue, deque and transferqueue.
BlockingQueue
BlockingQueue is an implementation of queue, which provides two additional functions:
BlockingQueue operations can be divided into the following four categories:
The first type is the operation that will throw an exception. When the insertion fails and the queue is empty, an exception is thrown.
The second type is operations that do not throw exceptions.
The third type is block operation. When the queue is empty or reaches the maximum capacity.
The fourth type is the operation of time out, which will block at a given time and return directly after timeout.
BlockingQueue is a thread safe queue that can be used in multithreading in producer consumer mode, as shown below:
class Producer implements Runnable {
private final BlockingQueue queue;
Producer(BlockingQueue q) { queue = q; }
public void run() {
try {
while (true) { queue.put(produce()); }
} catch (InterruptedException ex) { ... handle ...}
}
Object produce() { ... }
}
class Consumer implements Runnable {
private final BlockingQueue queue;
Consumer(BlockingQueue q) { queue = q; }
public void run() {
try {
while (true) { consume(queue.take()); }
} catch (InterruptedException ex) { ... handle ...}
}
void consume(Object x) { ... }
}
class Setup {
void main() {
BlockingQueue q = new SomeQueueImplementation();
Producer p = new Producer(q);
Consumer c1 = new Consumer(q);
Consumer c2 = new Consumer(q);
new Thread(p).start();
new Thread(c1).start();
new Thread(c2).start();
}
}
Finally, the operations before inserting elements into blockqueue in one thread happen before the operations deleted or obtained from blockqueue in another thread.
Deque
Deque is a subclass of queue, which represents double ended queue, that is, elements can be inserted and deleted from the head or tail of the queue.
Similarly, deque's method can also be represented in the following table. Deque's method can be divided into head operation and tail operation:
It is basically consistent with the method description of queue, so I won't talk about it here.
When deque processes elements in FIFO (first in first out), deque is equivalent to a queue.
When deque processes elements in LIFO (last in first out), deque is equivalent to a stack.
TransferQueue
Transferqueue inherits from BlockingQueue. Why is it called transfer? Because the transferqueue provides a transfer method, the producer can call the transfer method to wait for the consumer to call the take or poll method to get data from the queue.
Non blocking and timeout versions of the trytransfer method are also provided for use.
Let's take a producer consumer problem implemented by transferqueue.
Define a producer first:
@Slf4j
@Data
@AllArgsConstructor
class Producer implements Runnable {
private TransferQueue<String> transferQueue;
private String name;
private Integer messageCount;
public static final AtomicInteger messageProduced = new AtomicInteger();
@Override
public void run() {
for (int i = 0; i < messageCount; i++) {
try {
boolean added = transferQueue.tryTransfer( "第"+i+"个",2000,TimeUnit.MILLISECONDS);
log.info("transfered {} 是否成功: {}","第"+i+"个",added);
if(added){
messageProduced.incrementAndGet();
}
} catch (InterruptedException e) {
log.error(e.getMessage(),e);
}
}
log.info("total transfered {}",messageProduced.get());
}
}
In the producer's run method, we call the trytransfer method, wait for 2 seconds, and return directly if it fails.
Define another consumer:
@Slf4j
@Data
@AllArgsConstructor
public class Consumer implements Runnable {
private TransferQueue<String> transferQueue;
private String name;
private int messageCount;
public static final AtomicInteger messageConsumed = new AtomicInteger();
@Override
public void run() {
for (int i = 0; i < messageCount; i++) {
try {
String element = transferQueue.take();
log.info("take {}",element );
messageConsumed.incrementAndGet();
Thread.sleep(500);
} catch (InterruptedException e) {
log.error(e.getMessage(),e);
}
}
log.info("total consumed {}",messageConsumed.get());
}
}
In the run method, transferQueue. is called. Take method to get the message.
Let's take a look at the situation of one producer and zero consumers:
@Test
public void testOneProduceZeroConsumer() throws InterruptedException {
TransferQueue<String> transferQueue = new LinkedTransferQueue<>();
ExecutorService exService = Executors.newFixedThreadPool(10);
Producer producer = new Producer(transferQueue,"ProducerOne",5);
exService.execute(producer);
exService.awaitTermination(50000,TimeUnit.MILLISECONDS);
exService.shutdown();
}
Output results:
[pool-1-thread-1] INFO com.flydean.Producer - transfered 第0个 是否成功: false
[pool-1-thread-1] INFO com.flydean.Producer - transfered 第1个 是否成功: false
[pool-1-thread-1] INFO com.flydean.Producer - transfered 第2个 是否成功: false
[pool-1-thread-1] INFO com.flydean.Producer - transfered 第3个 是否成功: false
[pool-1-thread-1] INFO com.flydean.Producer - transfered 第4个 是否成功: false
[pool-1-thread-1] INFO com.flydean.Producer - total transfered 0
You can see that the message was not sent successfully because there were no consumers.
Let's look at the next situation with consumers:
@Test
public void testOneProduceOneConsumer() throws InterruptedException {
TransferQueue<String> transferQueue = new LinkedTransferQueue<>();
ExecutorService exService = Executors.newFixedThreadPool(10);
Producer producer = new Producer(transferQueue,2);
Consumer consumer = new Consumer(transferQueue,"ConsumerOne",2);
exService.execute(producer);
exService.execute(consumer);
exService.awaitTermination(50000,TimeUnit.MILLISECONDS);
exService.shutdown();
}
Output results:
[pool-1-thread-2] INFO com.flydean.Consumer - take 第0个
[pool-1-thread-1] INFO com.flydean.Producer - transfered 第0个 是否成功: true
[pool-1-thread-2] INFO com.flydean.Consumer - take 第1个
[pool-1-thread-1] INFO com.flydean.Producer - transfered 第1个 是否成功: true
[pool-1-thread-1] INFO com.flydean.Producer - total transfered 2
[pool-1-thread-2] INFO com.flydean.Consumer - total consumed 2
You can see that producers and consumers produce and consume one by one.