Use of delayqueue in Java
Use of delayqueue in Java
brief introduction
Today, I'd like to introduce delayqueue to you. Delayqueue is a kind of BlockingQueue, so it is thread safe. The characteristic of delayqueue is that the data inserted into the queue can be sorted according to the user-defined delay time. Only elements whose delay time is less than 0 can be fetched.
DelayQueue
Let's take a look at the definition of delayqueue:
public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
implements BlockingQueue<E>
As can be seen from the definition, all objects stored in delayqueue must be subclasses of delayed.
Delayed inherits from comparable and needs to implement a getdelay method.
Why is it designed like this?
Because the underlying storage of delayqueue is a PriorityQueue, as we mentioned in the previous article, PriorityQueue is a sortable queue, and its elements must implement the comparable method. The getdelay method is used to determine whether the sorted elements can be taken out of the queue.
Application of delayqueue
Delayqueue is generally used in producer consumer mode. Let's take a specific example below.
First, to use delayqueue, you must customize a delayed object:
@Data
public class DelayedUser implements Delayed {
private String name;
private long avaibleTime;
public DelayedUser(String name,long delayTime){
this.name=name;
//avaibleTime = 当前时间+ delayTime
this.avaibleTime=delayTime + System.currentTimeMillis();
}
@Override
public long getDelay(TimeUnit unit) {
//判断avaibleTime是否大于当前系统时间,并将结果转换成MILLISECONDS
long diffTime= avaibleTime- System.currentTimeMillis();
return unit.convert(diffTime,TimeUnit.MILLISECONDS);
}
@Override
public int compareTo(Delayed o) {
//compareTo用在DelayedUser的排序
return (int)(this.avaibleTime - ((DelayedUser) o).getAvaibleTime());
}
}
In the above object, we need to implement getdelay and CompareTo methods.
Next, we create a producer:
@Slf4j
@Data
@AllArgsConstructor
class DelayedQueueProducer implements Runnable {
private DelayQueue<DelayedUser> delayQueue;
private Integer messageCount;
private long delayedTime;
@Override
public void run() {
for (int i = 0; i < messageCount; i++) {
try {
DelayedUser delayedUser = new DelayedUser(
new Random().nextInt(1000)+"",delayedTime);
log.info("put delayedUser {}",delayedUser);
delayQueue.put(delayedUser);
Thread.sleep(500);
} catch (InterruptedException e) {
log.error(e.getMessage(),e);
}
}
}
}
In the producer, we create a new delayeduser object every 0.5 seconds and merge it into the queue.
Create another consumer:
@Slf4j
@Data
@AllArgsConstructor
public class DelayedQueueConsumer implements Runnable {
private DelayQueue<DelayedUser> delayQueue;
private int messageCount;
@Override
public void run() {
for (int i = 0; i < messageCount; i++) {
try {
DelayedUser element = delayQueue.take();
log.info("take {}",element );
} catch (InterruptedException e) {
log.error(e.getMessage(),e);
}
}
}
}
In the consumer, we loop to get objects from the queue.
Finally, let's take a call example:
@Test
public void useDelayedQueue() throws InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(2);
DelayQueue<DelayedUser> queue = new DelayQueue<>();
int messageCount = 2;
long delayTime = 500;
DelayedQueueConsumer consumer = new DelayedQueueConsumer(
queue,messageCount);
DelayedQueueProducer producer = new DelayedQueueProducer(
queue,messageCount,delayTime);
// when
executor.submit(producer);
executor.submit(consumer);
// then
executor.awaitTermination(5,TimeUnit.SECONDS);
executor.shutdown();
}
In the above test example, we defined a thread pool of two threads. The producer generates two messages. The delaytime is set to 0.5 seconds, that is, after 0.5 seconds, the inserted object can be obtained.
The thread pool will be closed after 5 seconds.
Run and see the following results:
[pool-1-thread-1] INFO com.flydean.DelayedQueueProducer - put delayedUser DelayedUser(name=917,avaibleTime=1587623188389)
[pool-1-thread-2] INFO com.flydean.DelayedQueueConsumer - take DelayedUser(name=917,avaibleTime=1587623188389)
[pool-1-thread-1] INFO com.flydean.DelayedQueueProducer - put delayedUser DelayedUser(name=487,avaibleTime=1587623188899)
[pool-1-thread-2] INFO com.flydean.DelayedQueueConsumer - take DelayedUser(name=487,avaibleTime=1587623188899)
We see that the put and take of the message are alternating, which is in line with our expectations.
If we modify the delaytime to 50000, the elements inserted before the process pool is closed will not expire, that is, the consumer cannot obtain the results.