Use of delayqueue in Java

Use of delayqueue in Java

brief introduction

Today, I'd like to introduce delayqueue to you. Delayqueue is a kind of BlockingQueue, so it is thread safe. The characteristic of delayqueue is that the data inserted into the queue can be sorted according to the user-defined delay time. Only elements whose delay time is less than 0 can be fetched.

DelayQueue

Let's take a look at the definition of delayqueue:

public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
    implements BlockingQueue<E>

As can be seen from the definition, all objects stored in delayqueue must be subclasses of delayed.

Delayed inherits from comparable and needs to implement a getdelay method.

Why is it designed like this?

Because the underlying storage of delayqueue is a PriorityQueue, as we mentioned in the previous article, PriorityQueue is a sortable queue, and its elements must implement the comparable method. The getdelay method is used to determine whether the sorted elements can be taken out of the queue.

Application of delayqueue

Delayqueue is generally used in producer consumer mode. Let's take a specific example below.

First, to use delayqueue, you must customize a delayed object:

@Data
public class DelayedUser implements Delayed {
    private String name;
    private long avaibleTime;

    public DelayedUser(String name,long delayTime){
        this.name=name;
        //avaibleTime = 当前时间+ delayTime
        this.avaibleTime=delayTime + System.currentTimeMillis();

    }

    @Override
    public long getDelay(TimeUnit unit) {
        //判断avaibleTime是否大于当前系统时间,并将结果转换成MILLISECONDS
        long diffTime= avaibleTime- System.currentTimeMillis();
        return unit.convert(diffTime,TimeUnit.MILLISECONDS);
    }

    @Override
    public int compareTo(Delayed o) {
        //compareTo用在DelayedUser的排序
        return (int)(this.avaibleTime - ((DelayedUser) o).getAvaibleTime());
    }
}

In the above object, we need to implement getdelay and CompareTo methods.

Next, we create a producer:

@Slf4j
@Data
@AllArgsConstructor
class DelayedQueueProducer implements Runnable {
    private DelayQueue<DelayedUser> delayQueue;

    private Integer messageCount;

    private long delayedTime;

    @Override
    public void run() {
        for (int i = 0; i < messageCount; i++) {
            try {
                DelayedUser delayedUser = new DelayedUser(
                        new Random().nextInt(1000)+"",delayedTime);
                log.info("put delayedUser {}",delayedUser);
                delayQueue.put(delayedUser);
                Thread.sleep(500);
            } catch (InterruptedException e) {
                log.error(e.getMessage(),e);
            }
        }
    }
}

In the producer, we create a new delayeduser object every 0.5 seconds and merge it into the queue.

Create another consumer:

@Slf4j
@Data
@AllArgsConstructor
public class DelayedQueueConsumer implements Runnable {

    private DelayQueue<DelayedUser> delayQueue;

    private int messageCount;

    @Override
    public void run() {
        for (int i = 0; i < messageCount; i++) {
            try {
                DelayedUser element = delayQueue.take();
                log.info("take {}",element );
            } catch (InterruptedException e) {
                log.error(e.getMessage(),e);
            }
        }
    }
}

In the consumer, we loop to get objects from the queue.

Finally, let's take a call example:

    @Test
    public void useDelayedQueue() throws InterruptedException {
        ExecutorService executor = Executors.newFixedThreadPool(2);

        DelayQueue<DelayedUser> queue = new DelayQueue<>();
        int messageCount = 2;
        long delayTime = 500;
        DelayedQueueConsumer consumer = new DelayedQueueConsumer(
                queue,messageCount);
        DelayedQueueProducer producer = new DelayedQueueProducer(
                queue,messageCount,delayTime);

        // when
        executor.submit(producer);
        executor.submit(consumer);

        // then
        executor.awaitTermination(5,TimeUnit.SECONDS);
        executor.shutdown();

    }

In the above test example, we defined a thread pool of two threads. The producer generates two messages. The delaytime is set to 0.5 seconds, that is, after 0.5 seconds, the inserted object can be obtained.

The thread pool will be closed after 5 seconds.

Run and see the following results:

[pool-1-thread-1] INFO com.flydean.DelayedQueueProducer - put delayedUser DelayedUser(name=917,avaibleTime=1587623188389)
[pool-1-thread-2] INFO com.flydean.DelayedQueueConsumer - take DelayedUser(name=917,avaibleTime=1587623188389)
[pool-1-thread-1] INFO com.flydean.DelayedQueueProducer - put delayedUser DelayedUser(name=487,avaibleTime=1587623188899)
[pool-1-thread-2] INFO com.flydean.DelayedQueueConsumer - take DelayedUser(name=487,avaibleTime=1587623188899)

We see that the put and take of the message are alternating, which is in line with our expectations.

If we modify the delaytime to 50000, the elements inserted before the process pool is closed will not expire, that is, the consumer cannot obtain the results.

The content of this article comes from the network collection of netizens. It is used as a learning reference. The copyright belongs to the original author.
THE END
分享
二维码
< <上一篇
下一篇>>