Implementing delayed tasks using redis (I)

premise

Recently, I happened to encounter the scene of delayed tasks in the production environment. I investigated the current mainstream schemes, analyzed the advantages and disadvantages, and finalized the final scheme. This article records the research process and the implementation of the preliminary scheme.

Comparison of candidate schemes

The following are several schemes to realize delayed tasks, and the corresponding advantages and disadvantages are summarized.

If the amount of application data is not high and the real-time requirements are relatively low, the scheme of short interval polling using scheduling framework and MySQL is the best scheme. However, the amount of scene data encountered by the author is relatively large and the real-time performance is not high. Adopting the library scanning scheme will certainly cause great pressure on the MySQL instance. I remember seeing a PPT called "evolution of box technology aggregate payment system" a long time ago, in which there was a picture that inspired the author:

The scheduling framework and redis's scheme of short interval polling to realize delayed tasks are just used. However, in order to share the pressure of the application, the scheme in the figure is also segmented. In view of the urgency of the author's current business, the first phase of the scheme does not consider fragmentation for the time being, and only a simplified version is implemented.

Scene design

The actual production scenario is that a system in the author's charge needs to connect with an external fund party. After each fund order is placed, it needs to delay 30 minutes to push the corresponding attachment. Here, it is simplified to a scenario of delayed processing of order information data, that is, an order message (temporarily called ordermessage) is recorded for each order. The order message needs to be delayed for 5 to 15 seconds for asynchronous processing.

Implementation idea of veto candidate scheme

The following describes the other four candidate schemes that are not selected, and analyzes the implementation process combined with some pseudo code and process.

JDK built-in delay queue

Delayqueue is an implementation of blocking queue. Its queue element must be a subclass of delayed. Here is a simple example:

public class DelayQueueMain {

    private static final Logger LOGGER = LoggerFactory.getLogger(DelayQueueMain.class);

    public static void main(String[] args) throws Exception {
        DelayQueue<OrderMessage> queue = new DelayQueue<>();
        // 默认延迟5秒
        OrderMessage message = new OrderMessage("ORDER_ID_10086");
        queue.add(message);
        // 延迟6秒
        message = new OrderMessage("ORDER_ID_10087",6);
        queue.add(message);
        // 延迟10秒
        message = new OrderMessage("ORDER_ID_10088",10);
        queue.add(message);
        ExecutorService executorService = Executors.newSingleThreadExecutor(r -> {
            Thread thread = new Thread(r);
            thread.setName("DelayWorker");
            thread.setDaemon(true);
            return thread;
        });
        LOGGER.info("开始执行调度线程...");
        executorService.execute(() -> {
            while (true) {
                try {
                    OrderMessage task = queue.take();
                    LOGGER.info("延迟处理订单消息,{}",task.getDescription());
                } catch (Exception e) {
                    LOGGER.error(e.getMessage(),e);
                }
            }
        });
        Thread.sleep(Integer.MAX_VALUE);
    }

    private static class OrderMessage implements Delayed {

        private static final DateTimeFormatter F = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");

        /**
         * 默认延迟5000毫秒
         */
        private static final long DELAY_MS = 1000L * 5;

        /**
         * 订单ID
         */
        private final String orderId;

        /**
         * 创建时间戳
         */
        private final long timestamp;

        /**
         * 过期时间
         */
        private final long expire;

        /**
         * 描述
         */
        private final String description;

        public OrderMessage(String orderId,long expireSeconds) {
            this.orderId = orderId;
            this.timestamp = System.currentTimeMillis();
            this.expire = this.timestamp + expireSeconds * 1000L;
            this.description = String.format("订单[%s]-创建时间为:%s,超时时间为:%s",orderId,LocalDateTime.ofInstant(Instant.ofEpochMilli(timestamp),ZoneId.systemDefault()).format(F),LocalDateTime.ofInstant(Instant.ofEpochMilli(expire),ZoneId.systemDefault()).format(F));
        }

        public OrderMessage(String orderId) {
            this.orderId = orderId;
            this.timestamp = System.currentTimeMillis();
            this.expire = this.timestamp + DELAY_MS;
            this.description = String.format("订单[%s]-创建时间为:%s,ZoneId.systemDefault()).format(F));
        }

        public String getOrderId() {
            return orderId;
        }

        public long getTimestamp() {
            return timestamp;
        }

        public long getExpire() {
            return expire;
        }

        public String getDescription() {
            return description;
        }

        @Override
        public long getDelay(TimeUnit unit) {
            return unit.convert(this.expire - System.currentTimeMillis(),TimeUnit.MILLISECONDS);
        }

        @Override
        public int compareTo(Delayed o) {
            return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
        }
    }
}

Note that ordermessage implements the delayed interface. The key is to implement delayed #getdelay() and delayed #compareto(). Run the main () method:

10:16:08.240 [main] INFO club.throwable.delay.DelayQueueMain - 开始执行调度线程...
10:16:13.224 [DelayWorker] INFO club.throwable.delay.DelayQueueMain - 延迟处理订单消息,订单[ORDER_ID_10086]-创建时间为:2019-08-20 10:16:08,超时时间为:2019-08-20 10:16:13
10:16:14.237 [DelayWorker] INFO club.throwable.delay.DelayQueueMain - 延迟处理订单消息,订单[ORDER_ID_10087]-创建时间为:2019-08-20 10:16:08,超时时间为:2019-08-20 10:16:14
10:16:18.237 [DelayWorker] INFO club.throwable.delay.DelayQueueMain - 延迟处理订单消息,订单[ORDER_ID_10088]-创建时间为:2019-08-20 10:16:08,超时时间为:2019-08-20 10:16:18

Scheduling framework + MySQL

Short interval polling of MySQL table using scheduling framework is a scheme with low implementation difficulty. Usually, this scheme should be preferred when the service is just launched, there is not much table data and the real-time performance is not high. However, the following points should be noted:

The Java driver package of quartz and MySQL and spring boot starter JDBC are introduced (this is just to facilitate the implementation with a relatively lightweight framework, and other more reasonable frameworks can be selected according to the needs of the scene in production):

<dependency>
    <groupId>MysqL</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>5.1.48</version>
    <scope>test</scope>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-jdbc</artifactId>
    <version>2.1.7.RELEASE</version>
    <scope>test</scope>
</dependency>
<dependency>
    <groupId>org.quartz-scheduler</groupId>
    <artifactId>quartz</artifactId>
    <version>2.3.1</version>
    <scope>test</scope>
</dependency>

The assumption table is designed as follows:

CREATE DATABASE `delayTask` CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_520_ci;

USE `delayTask`;

CREATE TABLE `t_order_message`
(
    id           BIGINT UNSIGNED PRIMARY KEY AUTO_INCREMENT,order_id     VARCHAR(50) NOT NULL COMMENT '订单ID',create_time  DATETIME    NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建日期时间',edit_time    DATETIME    NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '修改日期时间',retry_times  tinyint     NOT NULL DEFAULT 0 COMMENT '重试次数',order_status tinyint     NOT NULL DEFAULT 0 COMMENT '订单状态',INDEX idx_order_id (order_id),INDEX idx_create_time (create_time)
) COMMENT '订单信息表';

# 写入两条测试数据
INSERT INTO t_order_message(order_id) VALUES ('10086'),('10087');

Write code:

// 常量
public class OrderConstants {

    public static final int MAX_RETRY_TIMES = 5;

    public static final int PENDING = 0;

    public static final int SUCCESS = 1;

    public static final int FAIL = -1;

    public static final int LIMIT = 10;
}

// 实体
@Builder
@Data
public class OrderMessage {

    private Long id;
    private String orderId;
    private LocalDateTime createTime;
    private LocalDateTime editTime;
    private Integer retryTimes;
    private Integer orderStatus;
}

// DAO
@requiredArgsConstructor
public class OrderMessageDao {

    private final JdbcTemplate jdbcTemplate;

    private static final ResultSetExtractor<List<OrderMessage>> M = r -> {
        List<OrderMessage> list = Lists.newArrayList();
        while (r.next()) {
            list.add(OrderMessage.builder()
                    .id(r.getLong("id"))
                    .orderId(r.getString("order_id"))
                    .createTime(r.getTimestamp("create_time").toLocalDateTime())
                    .editTime(r.getTimestamp("edit_time").toLocalDateTime())
                    .retryTimes(r.getInt("retry_times"))
                    .orderStatus(r.getInt("order_status"))
                    .build());
        }
        return list;
    };

    public List<OrderMessage> selectPendingRecords(LocalDateTime start,LocalDateTime end,List<Integer> statusList,int maxRetryTimes,int limit) {
        StringJoiner joiner = new StringJoiner(",");
        statusList.forEach(s -> joiner.add(String.valueOf(s)));
        return jdbcTemplate.query("SELECT * FROM t_order_message WHERE create_time >= ? AND create_time <= ? " +
                        "AND order_status IN (?) AND retry_times < ? LIMIT ?",p -> {
                    p.setTimestamp(1,Timestamp.valueOf(start));
                    p.setTimestamp(2,Timestamp.valueOf(end));
                    p.setString(3,joiner.toString());
                    p.setInt(4,maxRetryTimes);
                    p.setInt(5,limit);
                },M);
    }

    public int updateOrderStatus(Long id,int status) {
        return jdbcTemplate.update("UPDATE t_order_message SET order_status = ?,edit_time = ? WHERE id =?",p -> {
                    p.setInt(1,status);
                    p.setTimestamp(2,Timestamp.valueOf(LocalDateTime.Now()));
                    p.setLong(3,id);
                });
    }
}

// Service
@requiredArgsConstructor
public class OrderMessageService {

    private static final Logger LOGGER = LoggerFactory.getLogger(OrderMessageService.class);

    private final OrderMessageDao orderMessageDao;

    private static final List<Integer> STATUS = Lists.newArrayList();

    static {
        STATUS.add(OrderConstants.PENDING);
        STATUS.add(OrderConstants.FAIL);
    }

    public void executeDelayJob() {
        LOGGER.info("订单处理定时任务开始执行......");
        LocalDateTime end = LocalDateTime.Now();
        // 一天前
        LocalDateTime start = end.minusDays(1);
        List<OrderMessage> list = orderMessageDao.selectPendingRecords(start,end,STATUS,OrderConstants.MAX_RETRY_TIMES,OrderConstants.LIMIT);
        if (!list.isEmpty()) {
            for (OrderMessage m : list) {
                LOGGER.info("处理订单[{}],状态由{}更新为{}",m.getOrderId(),m.getOrderStatus(),OrderConstants.SUCCESS);
                // 这里其实可以优化为批量更新
                orderMessageDao.updateOrderStatus(m.getId(),OrderConstants.SUCCESS);
            }
        }
        LOGGER.info("订单处理定时任务开始完毕......");
    }
}

// Job
@DisallowConcurrentExecution
public class OrderMessageDelayJob implements Job {

    @Override
    public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
        OrderMessageService service = (OrderMessageService) jobExecutionContext.getMergedJobDataMap().get("orderMessageService");
        service.executeDelayJob();
    }

    public static void main(String[] args) throws Exception {
        HikariConfig config = new HikariConfig();
        config.setJdbcUrl("jdbc:MysqL://localhost:3306/delayTask?useSSL=false&characterEncoding=utf8");
        config.setDriverClassName(Driver.class.getName());
        config.setUsername("root");
        config.setPassword("root");
        HikariDataSource dataSource = new HikariDataSource(config);
        OrderMessageDao orderMessageDao = new OrderMessageDao(new JdbcTemplate(dataSource));
        OrderMessageService service = new OrderMessageService(orderMessageDao);
        // 内存模式的调度器
        StdSchedulerFactory factory = new StdSchedulerFactory();
        Scheduler scheduler = factory.getScheduler();
        // 这里没有用到IOC容器,直接用Quartz数据集合传递服务引用
        JobDataMap jobDataMap = new JobDataMap();
        jobDataMap.put("orderMessageService",service);
        // 新建Job
        JobDetail job = JobBuilder.newJob(OrderMessageDelayJob.class)
                .withIdentity("orderMessageDelayJob","delayJob")
                .usingJobData(jobDataMap)
                .build();
        // 新建触发器,10秒执行一次
        Trigger trigger = TriggerBuilder.newTrigger()
                .withIdentity("orderMessageDelayTrigger","delayJob")
                .withSchedule(SimpleScheduleBuilder.simpleSchedule().withIntervalInSeconds(10).repeatForever())
                .build();
        scheduler.scheduleJob(job,trigger);
        // 启动调度器
        scheduler.start();
        Thread.sleep(Integer.MAX_VALUE);
    }
}

Create is used in this example_ Time polling, you can actually add a scheduling time schedule_ The time column is polled, so that it is easier to customize the scheduling strategy in idle and busy times. The operation effect of the above example is as follows:

11:58:27.202 [main] INFO org.quartz.core.QuartzScheduler - Scheduler Meta-data: Quartz Scheduler (v2.3.1) 'DefaultQuartzScheduler' with instanceId 'NON_CLUSTERED'
  Scheduler class: 'org.quartz.core.QuartzScheduler' - running locally.
  NOT STARTED.
  Currently in standby mode.
  Number of jobs executed: 0
  Using thread pool 'org.quartz.simpl.SimpleThreadPool' - with 10 threads.
  Using job-store 'org.quartz.simpl.RAMJobStore' - which does not support persistence. and is not clustered.

11:58:27.202 [main] INFO org.quartz.impl.StdSchedulerFactory - Quartz scheduler 'DefaultQuartzScheduler' initialized from default resource file in Quartz package: 'quartz.properties'
11:58:27.202 [main] INFO org.quartz.impl.StdSchedulerFactory - Quartz scheduler version: 2.3.1
11:58:27.209 [main] INFO org.quartz.core.QuartzScheduler - Scheduler DefaultQuartzScheduler_$_NON_CLUSTERED started.
11:58:27.212 [DefaultQuartzScheduler_QuartzSchedulerThread] DEBUG org.quartz.core.QuartzSchedulerThread - batch acquisition of 1 triggers
11:58:27.217 [DefaultQuartzScheduler_QuartzSchedulerThread] DEBUG org.quartz.simpl.PropertySettingJobFactory - Producing instance of Job 'delayJob.orderMessageDelayJob',class=club.throwable.jdbc.OrderMessageDelayJob
11:58:27.219 [HikariPool-1 connection adder] DEBUG com.zaxxer.hikari.pool.HikariPool - HikariPool-1 - Added connection com.MysqL.jdbc.JDBC4Connection@10eb8c53
11:58:27.220 [DefaultQuartzScheduler_QuartzSchedulerThread] DEBUG org.quartz.core.QuartzSchedulerThread - batch acquisition of 0 triggers
11:58:27.221 [DefaultQuartzScheduler_Worker-1] DEBUG org.quartz.core.JobRunShell - Calling execute on job delayJob.orderMessageDelayJob
11:58:34.440 [DefaultQuartzScheduler_Worker-1] INFO club.throwable.jdbc.OrderMessageService - 订单处理定时任务开始执行......
11:58:34.451 [HikariPool-1 connection adder] DEBUG com.zaxxer.hikari.pool.HikariPool - HikariPool-1 - Added connection com.MysqL.jdbc.JDBC4Connection@3d27ece4
11:58:34.459 [HikariPool-1 connection adder] DEBUG com.zaxxer.hikari.pool.HikariPool - HikariPool-1 - Added connection com.MysqL.jdbc.JDBC4Connection@64e808af
11:58:34.470 [HikariPool-1 connection adder] DEBUG com.zaxxer.hikari.pool.HikariPool - HikariPool-1 - Added connection com.MysqL.jdbc.JDBC4Connection@79c8c2b7
11:58:34.477 [HikariPool-1 connection adder] DEBUG com.zaxxer.hikari.pool.HikariPool - HikariPool-1 - Added connection com.MysqL.jdbc.JDBC4Connection@19a62369
11:58:34.485 [HikariPool-1 connection adder] DEBUG com.zaxxer.hikari.pool.HikariPool - HikariPool-1 - Added connection com.MysqL.jdbc.JDBC4Connection@1673d017
11:58:34.485 [HikariPool-1 connection adder] DEBUG com.zaxxer.hikari.pool.HikariPool - HikariPool-1 - After adding stats (total=10,active=0,idle=10,waiting=0)
11:58:34.559 [DefaultQuartzScheduler_Worker-1] DEBUG org.springframework.jdbc.core.JdbcTemplate - Executing prepared sql query
11:58:34.565 [DefaultQuartzScheduler_Worker-1] DEBUG org.springframework.jdbc.core.JdbcTemplate - Executing prepared sql statement [SELECT * FROM t_order_message WHERE create_time >= ? AND create_time <= ? AND order_status IN (?) AND retry_times < ? LIMIT ?]
11:58:34.645 [DefaultQuartzScheduler_Worker-1] DEBUG org.springframework.jdbc.datasource.DataSourceUtils - Fetching JDBC Connection from DataSource
11:58:35.210 [DefaultQuartzScheduler_Worker-1] DEBUG org.springframework.jdbc.core.JdbcTemplate - sqlWarning ignored: sql state '22007',error code '1292',message [Truncated incorrect DOUBLE value: '0,-1']
11:58:35.335 [DefaultQuartzScheduler_Worker-1] INFO club.throwable.jdbc.OrderMessageService - 处理订单[10086],状态由0更新为1
11:58:35.342 [DefaultQuartzScheduler_Worker-1] DEBUG org.springframework.jdbc.core.JdbcTemplate - Executing prepared sql update
11:58:35.346 [DefaultQuartzScheduler_Worker-1] DEBUG org.springframework.jdbc.core.JdbcTemplate - Executing prepared sql statement [UPDATE t_order_message SET order_status = ?,edit_time = ? WHERE id =?]
11:58:35.347 [DefaultQuartzScheduler_Worker-1] DEBUG org.springframework.jdbc.datasource.DataSourceUtils - Fetching JDBC Connection from DataSource
11:58:35.354 [DefaultQuartzScheduler_Worker-1] INFO club.throwable.jdbc.OrderMessageService - 处理订单[10087],状态由0更新为1
11:58:35.355 [DefaultQuartzScheduler_Worker-1] DEBUG org.springframework.jdbc.core.JdbcTemplate - Executing prepared sql update
11:58:35.355 [DefaultQuartzScheduler_Worker-1] DEBUG org.springframework.jdbc.core.JdbcTemplate - Executing prepared sql statement [UPDATE t_order_message SET order_status = ?,edit_time = ? WHERE id =?]
11:58:35.355 [DefaultQuartzScheduler_Worker-1] DEBUG org.springframework.jdbc.datasource.DataSourceUtils - Fetching JDBC Connection from DataSource
11:58:35.361 [DefaultQuartzScheduler_Worker-1] INFO club.throwable.jdbc.OrderMessageService - 订单处理定时任务开始完毕......
11:58:35.363 [DefaultQuartzScheduler_QuartzSchedulerThread] DEBUG org.quartz.core.QuartzSchedulerThread - batch acquisition of 1 triggers
11:58:37.206 [DefaultQuartzScheduler_QuartzSchedulerThread] DEBUG org.quartz.simpl.PropertySettingJobFactory - Producing instance of Job 'delayJob.orderMessageDelayJob',class=club.throwable.jdbc.OrderMessageDelayJob
11:58:37.206 [DefaultQuartzScheduler_QuartzSchedulerThread] DEBUG org.quartz.core.QuartzSchedulerThread - batch acquisition of 0 triggers

Rabbitmq dead letter queue

Using rabbitmq dead letter queue depends on two features of rabbitmq: TTL and Dlx.

Draw a diagram to describe these two features:

Next, for simplicity, TTL uses queue specific dimensions. Java driver for rabbitmq:

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.7.3</version>
    <scope>test</scope>
</dependency>

The code is as follows:

public class DlxMain {

    private static final DateTimeFormatter F = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
    private static final Logger LOGGER = LoggerFactory.getLogger(DlxMain.class);

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        Connection connection = factory.newConnection();
        Channel producerChannel = connection.createChannel();
        Channel consumerChannel = connection.createChannel();
        // dlx交换器名称为dlx.exchange,类型是direct,绑定键为dlx.key,队列名为dlx.queue
        producerChannel.exchangeDeclare("dlx.exchange","direct");
        producerChannel.queueDeclare("dlx.queue",false,null);
        producerChannel.queueBind("dlx.queue","dlx.exchange","dlx.key");
        Map<String,Object> queueArgs = new HashMap<>();
        // 设置队列消息过期时间,5秒
        queueArgs.put("x-message-ttl",5000);
        // 指定DLX相关参数
        queueArgs.put("x-dead-letter-exchange","dlx.exchange");
        queueArgs.put("x-dead-letter-routing-key","dlx.key");
        // 声明业务队列
        producerChannel.queueDeclare("business.queue",queueArgs);
        ExecutorService executorService = Executors.newSingleThreadExecutor(r -> {
            Thread thread = new Thread(r);
            thread.setDaemon(true);
            thread.setName("DlxConsumer");
            return thread;
        });
        // 启动消费者
        executorService.execute(() -> {
            try {
                consumerChannel.basicConsume("dlx.queue",true,new DlxConsumer(consumerChannel));
            } catch (IOException e) {
                LOGGER.error(e.getMessage(),e);
            }
        });
        OrderMessage message = new OrderMessage("10086");
        producerChannel.basicpublish("","business.queue",MessageProperties.TEXT_PLAIN,message.getDescription().getBytes(StandardCharsets.UTF_8));
        LOGGER.info("发送消息成功,订单ID:{}",message.getOrderId());

        message = new OrderMessage("10087");
        producerChannel.basicpublish("",message.getOrderId());

        message = new OrderMessage("10088");
        producerChannel.basicpublish("",message.getOrderId());

        Thread.sleep(Integer.MAX_VALUE);
    }

    private static class DlxConsumer extends DefaultConsumer {

        DlxConsumer(Channel channel) {
            super(channel);
        }

        @Override
        public void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body) throws IOException {
            LOGGER.info("处理消息成功:{}",new String(body,StandardCharsets.UTF_8));
        }
    }

    private static class OrderMessage {

        private final String orderId;
        private final long timestamp;
        private final String description;

        OrderMessage(String orderId) {
            this.orderId = orderId;
            this.timestamp = System.currentTimeMillis();
            this.description = String.format("订单[%s],订单创建时间为:%s",ZoneId.systemDefault()).format(F));
        }

        public String getOrderId() {
            return orderId;
        }

        public long getTimestamp() {
            return timestamp;
        }

        public String getDescription() {
            return description;
        }
    }
}

Run the main() method and the result is as follows:

16:35:58.638 [main] INFO club.throwable.dlx.DlxMain - 发送消息成功,订单ID:10086
16:35:58.641 [main] INFO club.throwable.dlx.DlxMain - 发送消息成功,订单ID:10087
16:35:58.641 [main] INFO club.throwable.dlx.DlxMain - 发送消息成功,订单ID:10088
16:36:03.646 [pool-1-thread-4] INFO club.throwable.dlx.DlxMain - 处理消息成功:订单[10086],订单创建时间为:2019-08-20 16:35:58
16:36:03.670 [pool-1-thread-5] INFO club.throwable.dlx.DlxMain - 处理消息成功:订单[10087],订单创建时间为:2019-08-20 16:35:58
16:36:03.670 [pool-1-thread-6] INFO club.throwable.dlx.DlxMain - 处理消息成功:订单[10088],订单创建时间为:2019-08-20 16:35:58

Time wheel

Timingwheel is an efficient and low delay scheduling data structure. The bottom layer adopts array to realize the ring queue for storing task list. The schematic diagram is as follows:

The time wheel and its implementation will not be analyzed here for the time being. Only a simple example is given to illustrate how to use the time wheel to realize the delayed task. Here, HashedWheelTimer provided by netty is used to introduce dependency:

<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-common</artifactId>
    <version>4.1.39.Final</version>
</dependency>

The code is as follows:

public class HashedWheelTimerMain {

    private static final DateTimeFormatter F = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");

    public static void main(String[] args) throws Exception {
        AtomicInteger counter = new AtomicInteger();
        ThreadFactory factory = r -> {
            Thread thread = new Thread(r);
            thread.setDaemon(true);
            thread.setName("HashedWheelTimerWorker-" + counter.getAndIncrement());
            return thread;
        };
        // tickDuration - 每tick一次的时间间隔,每tick一次就会到达下一个槽位
        // unit - tickDuration的时间单位
        // ticksPerWhee - 时间轮中的槽位数
        Timer timer = new HashedWheelTimer(factory,1,TimeUnit.SECONDS,60);
        TimerTask timerTask = new DefaultTimerTask("10086");
        timer.newTimeout(timerTask,5,TimeUnit.SECONDS);
        timerTask = new DefaultTimerTask("10087");
        timer.newTimeout(timerTask,10,TimeUnit.SECONDS);
        timerTask = new DefaultTimerTask("10088");
        timer.newTimeout(timerTask,15,TimeUnit.SECONDS);
        Thread.sleep(Integer.MAX_VALUE);
    }

    private static class DefaultTimerTask implements TimerTask {

        private final String orderId;
        private final long timestamp;

        public DefaultTimerTask(String orderId) {
            this.orderId = orderId;
            this.timestamp = System.currentTimeMillis();
        }

        @Override
        public void run(Timeout timeout) throws Exception {
            System.out.println(String.format("任务执行时间:%s,订单创建时间:%s,订单ID:%s",LocalDateTime.Now().format(F),orderId));
        }
    }
}

Operation results:

任务执行时间:2019-08-20 17:19:49.310,订单创建时间:2019-08-20 17:19:43.294,订单ID:10086
任务执行时间:2019-08-20 17:19:54.297,订单创建时间:2019-08-20 17:19:43.301,订单ID:10087
任务执行时间:2019-08-20 17:19:59.297,订单ID:10088

Generally speaking, another business thread pool should be used during task execution to avoid blocking the movement of the time wheel itself.

Implementation process of selected scheme

Finally, the ordered set sorted set and quartz short polling based on redis are selected for implementation. The specific scheme is:

For point 4, there are two options:

Finally, scheme 1 is selected temporarily, that is, the order ID is popped from the sorted set, and the corresponding data in the two sets is deleted immediately after the push data is obtained from the hash. The flow chart of the scheme is roughly as follows:

Here, the redis command used will be described in detail.

Sorted set related commands

ZADD KEY score1 VALUE1.. scoreN VALUEN

ZREVRANGEBYscore key max min [WITHscoreS] [LIMIT offset count]

ZREM key member [member ...]

Hash related commands

HMSET KEY_ NAME FIELD1 VALUE1 ... FIELDN VALUEN

HDEL KEY_ NAME FIELD1.. FIELDN

Lua related

Import dependency:

<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-dependencies</artifactId>
            <version>2.1.7.RELEASE</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>

<dependencies>
    <dependency>
        <groupId>org.quartz-scheduler</groupId>
        <artifactId>quartz</artifactId>
        <version>2.3.1</version>
    </dependency>
    <dependency>
        <groupId>redis.clients</groupId>
        <artifactId>jedis</artifactId>
        <version>3.1.0</version>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-jdbc</artifactId>
    </dependency>    
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-context-support</artifactId>
        <version>5.1.9.RELEASE</version>
    </dependency> 
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <version>1.18.8</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.59</version>
    </dependency>       
</dependencies>

Write Lua script / Lua / enqueue Lua and / Lua / dequeue lua:

-- /lua/enqueue.lua
local zset_key = KEYS[1]
local hash_key = KEYS[2]
local zset_value = ARGV[1]
local zset_score = ARGV[2]
local hash_field = ARGV[3]
local hash_value = ARGV[4]
redis.call('ZADD',zset_key,zset_score,zset_value)
redis.call('HSET',hash_key,hash_field,hash_value)
return nil

-- /lua/dequeue.lua
-- 参考jesque的部分Lua脚本实现
local zset_key = KEYS[1]
local hash_key = KEYS[2]
local min_score = ARGV[1]
local max_score = ARGV[2]
local offset = ARGV[3]
local limit = ARGV[4]
-- TYPE命令的返回结果是{'ok':'zset'}这样子,这里利用next做一轮迭代
local status,type = next(redis.call('TYPE',zset_key))
if status ~= nil and status == 'ok' then
    if type == 'zset' then
        local list = redis.call('ZREVRANGEBYscore',max_score,min_score,'LIMIT',offset,limit)
        if list ~= nil and #list > 0 then
            -- unpack函数能把table转化为可变参数
            redis.call('ZREM',unpack(list))
            local result = redis.call('HMGET',unpack(list))
            redis.call('HDEL',unpack(list))
            return result
        end
    end
end
return nil

Write core api code:

// Jedis提供者
@Component
public class JedisProvider implements InitializingBean {

    private JedisPool jedisPool;

    @Override
    public void afterPropertiesSet() throws Exception {
        jedisPool = new JedisPool();
    }

    public Jedis provide(){
        return jedisPool.getResource();
    }
}

// OrderMessage
@Data
public class OrderMessage {

    private String orderId;
    private BigDecimal amount;
    private Long userId;
}

// 延迟队列接口
public interface OrderDelayQueue {

    void enqueue(OrderMessage message);

    List<OrderMessage> dequeue(String min,String max,String offset,String limit);

    List<OrderMessage> dequeue();

    String enqueueSha();

    String dequeueSha();
}

// 延迟队列实现类
@requiredArgsConstructor
@Component
public class RedisOrderDelayQueue implements OrderDelayQueue,InitializingBean {

    private static final String MIN_score = "0";
    private static final String OFFSET = "0";
    private static final String LIMIT = "10";
    private static final String ORDER_QUEUE = "ORDER_QUEUE";
    private static final String ORDER_DETAIL_QUEUE = "ORDER_DETAIL_QUEUE";
    private static final String ENQUEUE_LUA_SCRIPT_LOCATION = "/lua/enqueue.lua";
    private static final String DEQUEUE_LUA_SCRIPT_LOCATION = "/lua/dequeue.lua";
    private static final AtomicReference<String> ENQUEUE_LUA_SHA = new AtomicReference<>();
    private static final AtomicReference<String> DEQUEUE_LUA_SHA = new AtomicReference<>();
    private static final List<String> KEYS = Lists.newArrayList();

    private final JedisProvider jedisProvider;

    static {
        KEYS.add(ORDER_QUEUE);
        KEYS.add(ORDER_DETAIL_QUEUE);
    }

    @Override
    public void enqueue(OrderMessage message) {
        List<String> args = Lists.newArrayList();
        args.add(message.getOrderId());
        args.add(String.valueOf(System.currentTimeMillis()));
        args.add(message.getOrderId());
        args.add(JSON.toJSONString(message));
        try (Jedis jedis = jedisProvider.provide()) {
            jedis.evalsha(ENQUEUE_LUA_SHA.get(),KEYS,args);
        }
    }

    @Override
    public List<OrderMessage> dequeue() {
        // 30分钟之前
        String maxscore = String.valueOf(System.currentTimeMillis() - 30 * 60 * 1000);
        return dequeue(MIN_score,maxscore,OFFSET,LIMIT);
    }

    @SuppressWarnings("unchecked")
    @Override
    public List<OrderMessage> dequeue(String min,String limit) {
        List<String> args = new ArrayList<>();
        args.add(max);
        args.add(min);
        args.add(offset);
        args.add(limit);
        List<OrderMessage> result = Lists.newArrayList();
        try (Jedis jedis = jedisProvider.provide()) {
            List<String> eval = (List<String>) jedis.evalsha(DEQUEUE_LUA_SHA.get(),args);
            if (null != eval) {
                for (String e : eval) {
                    result.add(JSON.parSEObject(e,OrderMessage.class));
                }
            }
        }
        return result;
    }

    @Override
    public String enqueueSha() {
        return ENQUEUE_LUA_SHA.get();
    }

    @Override
    public String dequeueSha() {
        return DEQUEUE_LUA_SHA.get();
    }

    @Override
    public void afterPropertiesSet() throws Exception {
        // 加载Lua脚本
        loadLuaScript();
    }

    private void loadLuaScript() throws Exception {
        try (Jedis jedis = jedisProvider.provide()) {
            ClassPathResource resource = new ClassPathResource(ENQUEUE_LUA_SCRIPT_LOCATION);
            String luaContent = StreamUtils.copyToString(resource.getInputStream(),StandardCharsets.UTF_8);
            String sha = jedis.scriptLoad(luaContent);
            ENQUEUE_LUA_SHA.compareAndSet(null,sha);
            resource = new ClassPathResource(DEQUEUE_LUA_SCRIPT_LOCATION);
            luaContent = StreamUtils.copyToString(resource.getInputStream(),StandardCharsets.UTF_8);
            sha = jedis.scriptLoad(luaContent);
            DEQUEUE_LUA_SHA.compareAndSet(null,sha);
        }
    }

    public static void main(String[] as) throws Exception {
        DateTimeFormatter f = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
        JedisProvider jedisProvider = new JedisProvider();
        jedisProvider.afterPropertiesSet();
        RedisOrderDelayQueue queue = new RedisOrderDelayQueue(jedisProvider);
        queue.afterPropertiesSet();
        // 写入测试数据
        OrderMessage message = new OrderMessage();
        message.setAmount(BigDecimal.valueOf(10086));
        message.setOrderId("ORDER_ID_10086");
        message.setUserId(10086L);
        message.setTimestamp(LocalDateTime.Now().format(f));
        List<String> args = Lists.newArrayList();
        args.add(message.getOrderId());
        // 测试需要,score设置为30分钟之前
        args.add(String.valueOf(System.currentTimeMillis() - 30 * 60 * 1000));
        args.add(message.getOrderId());
        args.add(JSON.toJSONString(message));
        try (Jedis jedis = jedisProvider.provide()) {
            jedis.evalsha(ENQUEUE_LUA_SHA.get(),args);
        }
        List<OrderMessage> dequeue = queue.dequeue();
        System.out.println(dequeue);
    }
}

Here, first execute the main () method to verify whether the delay queue is effective:

[OrderMessage(orderId=ORDER_ID_10086,amount=10086,userId=10086,timestamp=2019-08-21 08:32:22.885)]

Make sure that there is no problem with the code of the delay queue, and then write a quartz job type consumer ordermessageconsumer:

@DisallowConcurrentExecution
@Component
public class OrderMessageConsumer implements Job {

    private static final AtomicInteger COUNTER = new AtomicInteger();
    private static final ExecutorService BUSINESS_WORKER_POOL = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors(),r -> {
        Thread thread = new Thread(r);
        thread.setDaemon(true);
        thread.setName("OrderMessageConsumerWorker-" + COUNTER.getAndIncrement());
        return thread;
    });
    private static final Logger LOGGER = LoggerFactory.getLogger(OrderMessageConsumer.class);

    @Autowired
    private OrderDelayQueue orderDelayQueue;

    @Override
    public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
        StopWatch stopWatch = new StopWatch();
        stopWatch.start();
        LOGGER.info("订单消息处理定时任务开始执行......");
        List<OrderMessage> messages = orderDelayQueue.dequeue();
        if (!messages.isEmpty()) {
            // 简单的列表等分放到线程池中执行
            List<List<OrderMessage>> partition = Lists.partition(messages,2);
            int size = partition.size();
            final CountDownLatch latch = new CountDownLatch(size);
            for (List<OrderMessage> p : partition) {
                BUSINESS_WORKER_POOL.execute(new ConsuMetask(p,latch));
            }
            try {
                latch.await();
            } catch (InterruptedException ignore) {
                //ignore
            }
        }
        stopWatch.stop();
        LOGGER.info("订单消息处理定时任务执行完毕,耗时:{} ms......",stopWatch.getTotalTimeMillis());
    }

    @requiredArgsConstructor
    private static class ConsuMetask implements Runnable {

        private final List<OrderMessage> messages;
        private final CountDownLatch latch;

        @Override
        public void run() {
            try {
                // 实际上这里应该单条捕获异常
                for (OrderMessage message : messages) {
                    LOGGER.info("处理订单信息,内容:{}",message);
                }
            } finally {
                latch.countDown();
            }
        }
    }
}      

The above consumer design needs the following considerations:

Other quartz related codes:

// Quartz配置类
@Configuration
public class QuartzAutoConfiguration {

    @Bean
    public Schedulerfactorybean schedulerfactorybean(QuartzAutowiredJobFactory quartzAutowiredJobFactory) {
        Schedulerfactorybean factory = new Schedulerfactorybean();
        factory.setAutoStartup(true);
        factory.setJobFactory(quartzAutowiredJobFactory);
        return factory;
    }

    @Bean
    public QuartzAutowiredJobFactory quartzAutowiredJobFactory() {
        return new QuartzAutowiredJobFactory();
    }

    public static class QuartzAutowiredJobFactory extends AdaptableJobFactory implements beanfactoryAware {

        private AutowireCapablebeanfactory autowireCapablebeanfactory;

        @Override
        public void setbeanfactory(beanfactory beanfactory) throws BeansException {
            this.autowireCapablebeanfactory = (AutowireCapablebeanfactory) beanfactory;
        }

        @Override
        protected Object createJobInstance(TriggerFiredBundle bundle) throws Exception {
            Object jobInstance = super.createJobInstance(bundle);
            // 这里利用AutowireCapablebeanfactory从新建的Job实例做一次自动装配,得到一个原型(prototype)的JobBean实例
            autowireCapablebeanfactory.autowireBean(jobInstance);
            return jobInstance;
        }
    }
}

RAMJobStore in memory state is temporarily used to store the information related to tasks and triggers. If it is better to replace the production environment with clustering based on MySQL, that is, jobstoretx, and finally start the function and the implementation of commandlinerunner:

@SpringBootApplication(exclude = {DataSourceAutoConfiguration.class,TransactionAutoConfiguration.class})
public class Application implements CommandLineRunner {

    @Autowired
    private Scheduler scheduler;

    @Autowired
    private JedisProvider jedisProvider;

    public static void main(String[] args) {
        SpringApplication.run(Application.class,args);
    }

    @Override
    public void run(String... args) throws Exception {
        // 准备一些测试数据
        prepareOrderMessageData();
        JobDetail job = JobBuilder.newJob(OrderMessageConsumer.class)
                .withIdentity("OrderMessageConsumer","DelayTask")
                .build();
        // 触发器5秒触发一次
        Trigger trigger = TriggerBuilder.newTrigger()
                .withIdentity("OrderMessageConsumerTrigger","DelayTask")
                .withSchedule(SimpleScheduleBuilder.simpleSchedule().withIntervalInSeconds(5).repeatForever())
                .build();
        scheduler.scheduleJob(job,trigger);
    }

    private void prepareOrderMessageData() throws Exception {
        DateTimeFormatter f = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
        try (Jedis jedis = jedisProvider.provide()) {
            List<OrderMessage> messages = Lists.newArrayList();
            for (int i = 0; i < 100; i++) {
                OrderMessage message = new OrderMessage();
                message.setAmount(BigDecimal.valueOf(i));
                message.setOrderId("ORDER_ID_" + i);
                message.setUserId((long) i);
                message.setTimestamp(LocalDateTime.Now().format(f));
                messages.add(message);
            }
            // 这里暂时不使用Lua
            Map<String,Double> map = Maps.newHashMap();
            Map<String,String> hash = Maps.newHashMap();
            for (OrderMessage message : messages) {
                // 故意把score设计成30分钟前
                map.put(message.getOrderId(),Double.valueOf(String.valueOf(System.currentTimeMillis() - 30 * 60 * 1000)));
                hash.put(message.getOrderId(),JSON.toJSONString(message));
            }
            jedis.zadd("ORDER_QUEUE",map);
            jedis.hmset("ORDER_DETAIL_QUEUE",hash);
        }
    }
}

The output results are as follows:

2019-08-21 22:45:59.518  INFO 33000 --- [ryBean_Worker-1] club.throwable.OrderMessageConsumer      : 订单消息处理定时任务开始执行......
2019-08-21 22:45:59.525  INFO 33000 --- [onsumerWorker-4] club.throwable.OrderMessageConsumer      : 处理订单信息,内容:OrderMessage(orderId=ORDER_ID_91,amount=91,userId=91,timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:45:59.525  INFO 33000 --- [onsumerWorker-2] club.throwable.OrderMessageConsumer      : 处理订单信息,内容:OrderMessage(orderId=ORDER_ID_95,amount=95,userId=95,timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:45:59.525  INFO 33000 --- [onsumerWorker-1] club.throwable.OrderMessageConsumer      : 处理订单信息,内容:OrderMessage(orderId=ORDER_ID_97,amount=97,userId=97,timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:45:59.525  INFO 33000 --- [onsumerWorker-0] club.throwable.OrderMessageConsumer      : 处理订单信息,内容:OrderMessage(orderId=ORDER_ID_99,amount=99,userId=99,timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:45:59.525  INFO 33000 --- [onsumerWorker-3] club.throwable.OrderMessageConsumer      : 处理订单信息,内容:OrderMessage(orderId=ORDER_ID_93,amount=93,userId=93,timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:45:59.539  INFO 33000 --- [onsumerWorker-2] club.throwable.OrderMessageConsumer      : 处理订单信息,内容:OrderMessage(orderId=ORDER_ID_94,amount=94,userId=94,timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:45:59.539  INFO 33000 --- [onsumerWorker-1] club.throwable.OrderMessageConsumer      : 处理订单信息,内容:OrderMessage(orderId=ORDER_ID_96,amount=96,userId=96,timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:45:59.539  INFO 33000 --- [onsumerWorker-3] club.throwable.OrderMessageConsumer      : 处理订单信息,内容:OrderMessage(orderId=ORDER_ID_92,amount=92,userId=92,timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:45:59.539  INFO 33000 --- [onsumerWorker-0] club.throwable.OrderMessageConsumer      : 处理订单信息,内容:OrderMessage(orderId=ORDER_ID_98,amount=98,userId=98,timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:45:59.539  INFO 33000 --- [onsumerWorker-4] club.throwable.OrderMessageConsumer      : 处理订单信息,内容:OrderMessage(orderId=ORDER_ID_90,amount=90,userId=90,timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:45:59.540  INFO 33000 --- [ryBean_Worker-1] club.throwable.OrderMessageConsumer      : 订单消息处理定时任务执行完毕,耗时:22 ms......
2019-08-21 22:46:04.515  INFO 33000 --- [ryBean_Worker-2] club.throwable.OrderMessageConsumer      : 订单消息处理定时任务开始执行......
2019-08-21 22:46:04.516  INFO 33000 --- [onsumerWorker-5] club.throwable.OrderMessageConsumer      : 处理订单信息,内容:OrderMessage(orderId=ORDER_ID_89,amount=89,userId=89,timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:46:04.516  INFO 33000 --- [onsumerWorker-6] club.throwable.OrderMessageConsumer      : 处理订单信息,内容:OrderMessage(orderId=ORDER_ID_87,amount=87,userId=87,timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:46:04.516  INFO 33000 --- [onsumerWorker-7] club.throwable.OrderMessageConsumer      : 处理订单信息,内容:OrderMessage(orderId=ORDER_ID_85,amount=85,userId=85,timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:46:04.516  INFO 33000 --- [onsumerWorker-5] club.throwable.OrderMessageConsumer      : 处理订单信息,内容:OrderMessage(orderId=ORDER_ID_88,amount=88,userId=88,timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:46:04.516  INFO 33000 --- [onsumerWorker-2] club.throwable.OrderMessageConsumer      : 处理订单信息,内容:OrderMessage(orderId=ORDER_ID_83,amount=83,userId=83,timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:46:04.516  INFO 33000 --- [onsumerWorker-1] club.throwable.OrderMessageConsumer      : 处理订单信息,内容:OrderMessage(orderId=ORDER_ID_81,amount=81,userId=81,内容:OrderMessage(orderId=ORDER_ID_86,amount=86,userId=86,内容:OrderMessage(orderId=ORDER_ID_82,amount=82,userId=82,内容:OrderMessage(orderId=ORDER_ID_84,amount=84,userId=84,内容:OrderMessage(orderId=ORDER_ID_80,amount=80,userId=80,timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:46:04.516  INFO 33000 --- [ryBean_Worker-2] club.throwable.OrderMessageConsumer      : 订单消息处理定时任务执行完毕,耗时:1 ms......
......

The first execution involves the initialization of some components, which will be relatively slow. Later, we see that since we just print the order information, the execution of scheduled tasks is relatively fast. If you do not adjust the current architecture, you need to pay attention to:

In fact, there is a performance hazard here. The time complexity of the command zrevrangebyscore can be regarded as O (n). N is the number of elements in the set. Since all order information is put into the same sorted set (order_queue), the time complexity of the dequeue script is always high when there are new data, After the subsequent orders increase, it will certainly become a performance bottleneck here. Solutions will be given later.

Summary

This article mainly starts with a simulation example of an actual production case, analyzes some implementation schemes of current delayed tasks, and gives a complete example based on redis and quartz. The current example is only in a runnable state, and some problems have not been solved. The next article will focus on solving two problems:

Another point is that the architecture is evolved based on the business form. Many things need to be designed and improved in combination with the scenario. The idea is for reference only. Do not copy the code.

enclosure

(at the end of this article, c-5-d e-a-20190821 has opened the RSS plug-in. See the icon on the home page. Welcome to subscribe)

The official account of Technology (Throwable Digest), which is not regularly pushed to the original technical article (never copied or copied):

The content of this article comes from the network collection of netizens. It is used as a learning reference. The copyright belongs to the original author.
THE END
分享
二维码
< <上一篇
下一篇>>