Spring boot (XIV) rabbitmq delay queue

1、 Foreword

Usage scenario of delay queue: 1 If the order is not paid on time, the order will be cancelled after 30 minutes; 2. Push messages to users with low activity after an interval of N days to improve activity; 3. Send registration email to users of newly registered members in 1 minute.

There are two ways to implement delay queues:

Note: the delay plug-in rabbitmq delayed message exchange is in rabbitmq 3.5 7 and above, depending on Erlang / opt 18.0 and above.

Due to the relatively tortuous use of dead letter exchange, this paper focuses on the second method, which uses rabbitmq delayed message exchange plug-in to complete the function of delay queue.

2、 Install delay plug-in

1.1 download plug-ins

Open the official website to download: http://www.rabbitmq.com/community-plugins.html

Select the corresponding version "3.7. X" and click download.

Note: the download is Zip installation package. After downloading, you need to manually unzip it.

1.2 installing plug-ins

Copy plug-ins to docker:

For rabbitmq installation in docker, please refer to the previous article in this series: http://www.apigo.cn/2018/09/11/springboot13/

1.3 startup plug-in

Enter the docker:

Open plug-in:

Query all plug-ins installed:

The installation is normal, and the effect is shown in the figure below:

Restart rabbitmq to make the plug-in take effect

3、 Code implementation

3.1 configuring queues

import com.example.rabbitmq.mq.DirectConfig;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;

@Configuration
public class DelayedConfig {
    final static String QUEUE_NAME = "delayed.goods.order";
    final static String EXCHANGE_NAME = "delayedec";
    @Bean
    public Queue queue() {
        return new Queue(DelayedConfig.QUEUE_NAME);
    }

    // 配置默认的交换机
    @Bean
    CustomExchange customExchange() {
        Map<String,Object> args = new HashMap<>();
        args.put("x-delayed-type","direct");
        //参数二为类型:必须是x-delayed-message
        return new CustomExchange(DelayedConfig.EXCHANGE_NAME,"x-delayed-message",true,false,args);
    }
    // 绑定队列到交换器
    @Bean
    Binding binding(Queue queue,CustomExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(DelayedConfig.QUEUE_NAME).noargs();
    }
}

3.2 sending messages

import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.text.SimpleDateFormat;
import java.util.Date;

@Component
public class DelayedSender {
    @Autowired
    private AmqpTemplate rabbitTemplate;

    public void send(String msg) {
        SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        System.out.println("发送时间:" + sf.format(new Date()));

        rabbitTemplate.convertAndSend(DelayedConfig.EXCHANGE_NAME,DelayedConfig.QUEUE_NAME,msg,new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                message.getMessageProperties().setHeader("x-delay",3000);
                return message;
            }
        });
    }
}

3.3 consumption news

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.text.SimpleDateFormat;
import java.util.Date;

@Component
@RabbitListener(queues = "delayed.goods.order")
public class DelayedReceiver {
    @RabbitHandler
    public void process(String msg) {
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        System.out.println("接收时间:" + sdf.format(new Date()));
        System.out.println("消息内容:" + msg);
    }
}

3.4 test queue

import com.example.rabbitmq.RabbitmqApplication;
import com.example.rabbitmq.mq.delayed.DelayedSender;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import java.text.SimpleDateFormat;
import java.util.Date;

@RunWith(SpringRunner.class)
@SpringBootTest
public class DelayedTest {

    @Autowired
    private DelayedSender sender;

    @Test
    public void test() throws InterruptedException {
        SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd");
        sender.send("Hi Admin.");
        Thread.sleep(5 * 1000); //等待接收程序执行之后,再退出测试
    }
}

The results are as follows:

发送时间:2018-09-11 20:47:51
接收时间:2018-09-11 20:47:54
消息内容:Hi Admin.

Full code access to my GitHub: https://github.com/vipstone/springboot-example/tree/master/springboot-rabbitmq

4、 Summary

So far, we have implemented the delay function with the "rabbitmq delayed message exchange" plug-in, but it should be noted that if the delay plug-in is disabled with the command "rabbitmq plugins disable rabbitmq_delayed_message_exchange", all unsent delay messages will be lost.

The content of this article comes from the network collection of netizens. It is used as a learning reference. The copyright belongs to the original author.
THE END
分享
二维码
< <上一篇
下一篇>>