Summary of the most complete implementation methods of delayed tasks in history! Code attached (strongly recommended)
The birth of this article is thanks to a reader who gave this excellent article the opportunity to meet you. The focus is on the excellent article, ha ha.
The story goes like this
Don't thank me. Give me roses. I have fragrance in my hand. I believe that the following content will not disappoint you, because it will be the best article on "delayed task" on the market, which has always been my goal of writing, so that every article of mine is a little better than that on the market.
Well, without much to say, let's go directly to today's topic. The main content of this article is shown in the figure below:
What is a deferred task?
Gu Mingsi suggested that we call tasks that need to be delayed as delayed tasks.
The usage scenarios of delayed tasks are as follows:
Such events require the use of deferred tasks.
Analysis of delayed task implementation ideas
The key to delayed task implementation is to execute a task at a certain time node. Based on this information, we can think of the following two means to realize delayed tasks:
The key words we can think of when implementing delayed tasks through JDK are: delayqueue and scheduled executorservice, and there are many delayed task execution methods provided by third parties, such as redis, netty, MQ and so on.
Delayed task implementation
Next, we will explain the specific implementation of each delayed task in combination with the code.
1. Infinite loop to achieve delayed tasks
In this way, we need to start an infinite loop to scan the task all the time, and then use a map collection to store the task and delay execution time. The implementation code is as follows:
import java.time.Instant;
import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
/**
* 延迟任务执行方法汇总
*/
public class DelayTaskExample {
// 存放定时任务
private static Map<String,Long> _TaskMap = new HashMap<>();
public static void main(String[] args) {
System.out.println("程序启动时间:" + LocalDateTime.Now());
// 添加定时任务
_TaskMap.put("task-1",Instant.Now().plusSeconds(3).toEpochMilli()); // 延迟 3s
// 调用无限循环实现延迟任务
loopTask();
}
/**
* 无限循环实现延迟任务
*/
public static void loopTask() {
Long itemLong = 0L;
while (true) {
Iterator it = _TaskMap.entrySet().iterator();
while (it.hasNext()) {
Map.Entry entry = (Map.Entry) it.next();
itemLong = (Long) entry.getValue();
// 有任务需要执行
if (Instant.Now().toEpochMilli() >= itemLong) {
// 延迟任务,业务逻辑执行
System.out.println("执行任务:" + entry.getKey() +
" ,执行时间:" + LocalDateTime.Now());
// 删除任务
_TaskMap.remove(entry.getKey());
}
}
}
}
}
The results of the above procedures are:
It can be seen that the task is delayed for 3S, which is in line with our expectations.
2. Java API implements deferred tasks
The Java API provides two ways to implement deferred tasks: delayqueue and scheduledexecutorservice.
① The scheduledexecutorservice implements deferred tasks
We can use scheduledexecutorservice to execute tasks at a fixed frequency. The implementation code is as follows:
public class DelayTaskExample {
public static void main(String[] args) {
System.out.println("程序启动时间:" + LocalDateTime.Now());
scheduledexecutorserviceTask();
}
/**
* scheduledexecutorservice 实现固定频率一直循环执行任务
*/
public static void scheduledexecutorserviceTask() {
scheduledexecutorservice executor = Executors.newScheduledThreadPool(1);
executor.scheduleWithFixedDelay(
new Runnable() {
@Override
public void run() {
// 执行任务的业务代码
System.out.println("执行任务" +
" ,执行时间:" + LocalDateTime.Now());
}
},2,// 初次执行间隔
2,// 2s 执行一次
TimeUnit.SECONDS);
}
}
The results of the above procedures are:
As you can see, using scheduleexecutorservice #schedulewithfixeddelay (...) Method, the deferred task is cycled at a certain frequency.
② Delayqueue implements delayed tasks
Delayqueue is an unbounded blocking queue that supports delayed acquisition of elements. The elements in the queue must implement the delayed interface and rewrite getdelay (timeunit) and CompareTo (delayed) methods. The complete code for delayqueue to implement delay queue is as follows:
public class DelayTest {
public static void main(String[] args) throws InterruptedException {
DelayQueue delayQueue = new DelayQueue();
// 添加延迟任务
delayQueue.put(new DelayElement(1000));
delayQueue.put(new DelayElement(3000));
delayQueue.put(new DelayElement(5000));
System.out.println("开始时间:" + DateFormat.getDateTimeInstance().format(new Date()));
while (!delayQueue.isEmpty()){
// 执行延迟任务
System.out.println(delayQueue.take());
}
System.out.println("结束时间:" + DateFormat.getDateTimeInstance().format(new Date()));
}
static class DelayElement implements Delayed {
// 延迟截止时间(单面:毫秒)
long delayTime = System.currentTimeMillis();
public DelayElement(long delayTime) {
this.delayTime = (this.delayTime + delayTime);
}
@Override
// 获取剩余时间
public long getDelay(TimeUnit unit) {
return unit.convert(delayTime - System.currentTimeMillis(),TimeUnit.MILLISECONDS);
}
@Override
// 队列里元素的排序依据
public int compareTo(Delayed o) {
if (this.getDelay(TimeUnit.MILLISECONDS) > o.getDelay(TimeUnit.MILLISECONDS)) {
return 1;
} else if (this.getDelay(TimeUnit.MILLISECONDS) < o.getDelay(TimeUnit.MILLISECONDS)) {
return -1;
} else {
return 0;
}
}
@Override
public String toString() {
return DateFormat.getDateTimeInstance().format(new Date(delayTime));
}
}
}
The results of the above procedures are:
3. Redis implements delayed tasks
The methods of using redis to implement delayed tasks can be roughly divided into two categories: judging through Zset data and notifying through key space.
① Judging by data
With the help of Zset data type, we store delayed tasks in this data set, and then start a wireless loop to query all tasks of the current time for consumption. The implementation code is as follows (with the help of jedis framework):
import redis.clients.jedis.Jedis;
import utils.JedisUtils;
import java.time.Instant;
import java.util.Set;
public class DelayQueueExample {
// zset key
private static final String _KEY = "myDelayQueue";
public static void main(String[] args) throws InterruptedException {
Jedis jedis = JedisUtils.getJedis();
// 延迟 30s 执行(30s 后的时间)
long delayTime = Instant.Now().plusSeconds(30).getEpochSecond();
jedis.zadd(_KEY,delayTime,"order_1");
// 继续添加测试数据
jedis.zadd(_KEY,Instant.Now().plusSeconds(2).getEpochSecond(),"order_2");
jedis.zadd(_KEY,"order_3");
jedis.zadd(_KEY,Instant.Now().plusSeconds(7).getEpochSecond(),"order_4");
jedis.zadd(_KEY,Instant.Now().plusSeconds(10).getEpochSecond(),"order_5");
// 开启延迟队列
doDelayQueue(jedis);
}
/**
* 延迟队列消费
* @param jedis Redis 客户端
*/
public static void doDelayQueue(Jedis jedis) throws InterruptedException {
while (true) {
// 当前时间
Instant NowInstant = Instant.Now();
long lastSecond = NowInstant.plusSeconds(-1).getEpochSecond(); // 上一秒时间
long NowSecond = NowInstant.getEpochSecond();
// 查询当前时间的所有任务
Set<String> data = jedis.zrangeByscore(_KEY,lastSecond,NowSecond);
for (String item : data) {
// 消费任务
System.out.println("消费:" + item);
}
// 删除已经执行的任务
jedis.zremrangeByscore(_KEY,NowSecond);
Thread.sleep(1000); // 每秒轮询一次
}
}
}
② Notification via key space
By default, the redis server does not turn on the keyspace notification. We need to turn it on manually through the command of config set notify keyspace events ex. after turning on the keyspace notification, we can get the event of each key value expiration. We use this mechanism to start a scheduled task for everyone. The implementation code is as follows:
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPubSub;
import utils.JedisUtils;
public class TaskExample {
public static final String _TOPIC = "__keyevent@0__:expired"; // 订阅频道名称
public static void main(String[] args) {
Jedis jedis = JedisUtils.getJedis();
// 执行定时任务
doTask(jedis);
}
/**
* 订阅过期消息,执行定时任务
* @param jedis Redis 客户端
*/
public static void doTask(Jedis jedis) {
// 订阅过期消息
jedis.psubscribe(new JedisPubSub() {
@Override
public void onPMessage(String pattern,String channel,String message) {
// 接收到消息,执行定时任务
System.out.println("收到消息:" + message);
}
},_TOPIC);
}
}
4. Netty implements delayed tasks
Netty is a Java open source framework provided by JBoss. It is a client-side and server-side programming framework based on NiO. Using netty can ensure that you can quickly and simply develop a network application, such as a client-side and server-side application that implements a certain protocol. Netty is equivalent to simplifying and streamlining the programming and development process of network applications, such as the development of socket services based on TCP and UDP.
You can use the tool class HashedWheelTimer provided by netty to implement the delayed task. The implementation code is as follows.
First, add a netty reference to the project. The configuration is as follows:
<!-- https://mvnrepository.com/artifact/io.netty/netty-common -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-common</artifactId>
<version>4.1.48.Final</version>
</dependency>
The complete code of netty implementation is as follows:
public class DelayTaskExample {
public static void main(String[] args) {
System.out.println("程序启动时间:" + LocalDateTime.Now());
NettyTask();
}
/**
* 基于 Netty 的延迟任务
*/
private static void NettyTask() {
// 创建延迟任务实例
HashedWheelTimer timer = new HashedWheelTimer(3,// 时间间隔
TimeUnit.SECONDS,100); // 时间轮中的槽数
// 创建一个任务
TimerTask task = new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
System.out.println("执行任务" +
" ,执行时间:" + LocalDateTime.Now());
}
};
// 将任务添加到延迟队列中
timer.newTimeout(task,TimeUnit.SECONDS);
}
}
The results of the above procedures are:
HashedWheelTimer is implemented by using a timing wheel. The timing wheel is actually a ring data structure. It can be imagined as a clock, which is divided into many grids. Each grid represents a certain time. On this grid, a linked list is used to save the timeout tasks to be executed, and a pointer goes grid by grid, When you get to that grid, execute the delay task corresponding to the grid, as shown in the following figure:
The above picture can be understood as that the size of the time wheel is 8. Turn one grid (for example, 1s) at a certain time, and each grid points to a linked list to save the tasks to be executed.
5. MQ implementation delay task
If an MQ middleware is specially opened to perform deferred tasks, it will be a bit of a luxury. However, if there is an MQ environment, it is still desirable to use it to implement deferred tasks.
Almost all MQ middleware can implement deferred tasks, which should be more accurately called deferred queue. This article takes rabbitmq as an example to see how it implements deferred tasks.
Rabbitmq implements delay queues in two ways:
Since it is troublesome to use the dead letter exchange, it is recommended to use the second implementation method rabbitmq delayed message exchange plug-in to realize the function of delay queue.
First, we need to download and install rabbitmq delayed message exchange plug-in. Download address: http://www.rabbitmq.com/community-plugins.html
Select the corresponding version to download, then copy it to the rabbitmq server directory, and use the command rabbitmq plugins enable rabbitmq_ delayed_ message_ Open the plug-ins in exchange. Query all the plug-ins installed by using the command rabbitmq plugins list. The installation is successful, as shown in the following figure:
Finally, restart the rabbitmq service to make the plug-in take effect.
First, we need to configure the message queue. The implementation code is as follows:
import com.example.rabbitmq.mq.DirectConfig;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class DelayedConfig {
final static String QUEUE_NAME = "delayed.goods.order";
final static String EXCHANGE_NAME = "delayedec";
@Bean
public Queue queue() {
return new Queue(DelayedConfig.QUEUE_NAME);
}
// 配置默认的交换机
@Bean
CustomExchange customExchange() {
Map<String,Object> args = new HashMap<>();
args.put("x-delayed-type","direct");
//参数二为类型:必须是x-delayed-message
return new CustomExchange(DelayedConfig.EXCHANGE_NAME,"x-delayed-message",true,false,args);
}
// 绑定队列到交换器
@Bean
Binding binding(Queue queue,CustomExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(DelayedConfig.QUEUE_NAME).noargs();
}
}
Then add the code to add the message. The specific implementation is as follows:
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.text.SimpleDateFormat;
import java.util.Date;
@Component
public class DelayedSender {
@Autowired
private AmqpTemplate rabbitTemplate;
public void send(String msg) {
SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
System.out.println("发送时间:" + sf.format(new Date()));
rabbitTemplate.convertAndSend(DelayedConfig.EXCHANGE_NAME,DelayedConfig.QUEUE_NAME,msg,new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setHeader("x-delay",3000);
return message;
}
});
}
}
Add the code of the consumption message:
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.text.SimpleDateFormat;
import java.util.Date;
@Component
@RabbitListener(queues = "delayed.goods.order")
public class DelayedReceiver {
@RabbitHandler
public void process(String msg) {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
System.out.println("接收时间:" + sdf.format(new Date()));
System.out.println("消息内容:" + msg);
}
}
Finally, let's use the code to test:
import com.example.rabbitmq.RabbitmqApplication;
import com.example.rabbitmq.mq.delayed.DelayedSender;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import java.text.SimpleDateFormat;
import java.util.Date;
@RunWith(SpringRunner.class)
@SpringBootTest
public class DelayedTest {
@Autowired
private DelayedSender sender;
@Test
public void test() throws InterruptedException {
SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd");
sender.send("Hi Admin.");
Thread.sleep(5 * 1000); //等待接收程序执行之后,再退出测试
}
}
The results of the above procedures are as follows:
It can be seen from the results that the execution of the above program meets the implementation expectation of the delayed task.
6. Use spring to schedule tasks
If you use a spring or springboot project, you can implement it with the help of scheduled. This article will use the springboot project to demonstrate the implementation of scheduled. We need to declare that scheduled is enabled. The implementation code is as follows:
@SpringBootApplication
@EnableScheduling
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class,args);
}
}
Then add the delayed task, and the implementation code is as follows:
@Component
public class ScheduleJobs {
@Scheduled(fixedDelay = 2 * 1000)
public void fixedDelayJob() throws InterruptedException {
System.out.println("任务执行,时间:" + LocalDateTime.Now());
}
}
At this time, after we start the project, we can see that the task has been executed circularly in the form of 2S delay. The results are as follows:
We can also use the corn expression to define the frequency of task execution, such as @ scheduled (cron = "0 / 4 * *?").
7. Quartz implements delayed tasks
Quartz is a powerful task scheduler, which can realize more complex scheduling functions. It also supports distributed task scheduling.
We use quartz to implement a delayed task. First, we define an execution task code as follows:
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.springframework.scheduling.quartz.QuartzJobBean;
import java.time.LocalDateTime;
public class SampleJob extends QuartzJobBean {
@Override
protected void executeInternal(JobExecutionContext jobExecutionContext)
throws JobExecutionException {
System.out.println("任务执行,时间:" + LocalDateTime.Now());
}
}
After defining a JobDetail and trigger, the implementation code is as follows:
import org.quartz.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class SampleScheduler {
@Bean
public JobDetail sampleJobDetail() {
return JobBuilder.newJob(SampleJob.class).withIdentity("sampleJob")
.storeDurably().build();
}
@Bean
public Trigger sampleJobTrigger() {
// 3s 后执行
SimpleScheduleBuilder scheduleBuilder =
SimpleScheduleBuilder.simpleSchedule().withIntervalInSeconds(3).withRepeatCount(1);
return TriggerBuilder.newTrigger().forJob(sampleJobDetail()).withIdentity("sampleTrigger")
.withSchedule(scheduleBuilder).build();
}
}
Finally, start the delay task after the springboot project is started. The implementation code is as follows:
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.scheduling.quartz.Schedulerfactorybean;
/**
* SpringBoot 项目启动后执行
*/
public class MyStartupRunner implements CommandLineRunner {
@Autowired
private Schedulerfactorybean schedulerfactorybean;
@Autowired
private SampleScheduler sampleScheduler;
@Override
public void run(String... args) throws Exception {
// 启动定时任务
schedulerfactorybean.getScheduler().scheduleJob(
sampleScheduler.sampleJobTrigger());
}
}
The results of the above procedures are as follows:
It can be seen from the results that the delayed task was executed 3S after the project was started.