Spring boot (XIV) rabbitmq delay queue
1、 Foreword
Usage scenario of delay queue: 1 If the order is not paid on time, the order will be cancelled after 30 minutes; 2. Push messages to users with low activity after an interval of N days to improve activity; 3. Send registration email to users of newly registered members in 1 minute.
There are two ways to implement delay queues:
Note: the delay plug-in rabbitmq delayed message exchange is in rabbitmq 3.5 7 and above, depending on Erlang / opt 18.0 and above.
Due to the relatively tortuous use of dead letter exchange, this paper focuses on the second method, which uses rabbitmq delayed message exchange plug-in to complete the function of delay queue.
2、 Install delay plug-in
1.1 download plug-ins
Open the official website to download: http://www.rabbitmq.com/community-plugins.html
Select the corresponding version "3.7. X" and click download.
Note: the download is Zip installation package. After downloading, you need to manually unzip it.
1.2 installing plug-ins
Copy plug-ins to docker:
For rabbitmq installation in docker, please refer to the previous article in this series: http://www.apigo.cn/2018/09/11/springboot13/
1.3 startup plug-in
Enter the docker:
Open plug-in:
Query all plug-ins installed:
The installation is normal, and the effect is shown in the figure below:
Restart rabbitmq to make the plug-in take effect
3、 Code implementation
3.1 configuring queues
import com.example.rabbitmq.mq.DirectConfig;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class DelayedConfig {
final static String QUEUE_NAME = "delayed.goods.order";
final static String EXCHANGE_NAME = "delayedec";
@Bean
public Queue queue() {
return new Queue(DelayedConfig.QUEUE_NAME);
}
// 配置默认的交换机
@Bean
CustomExchange customExchange() {
Map<String,Object> args = new HashMap<>();
args.put("x-delayed-type","direct");
//参数二为类型:必须是x-delayed-message
return new CustomExchange(DelayedConfig.EXCHANGE_NAME,"x-delayed-message",true,false,args);
}
// 绑定队列到交换器
@Bean
Binding binding(Queue queue,CustomExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(DelayedConfig.QUEUE_NAME).noargs();
}
}
3.2 sending messages
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.text.SimpleDateFormat;
import java.util.Date;
@Component
public class DelayedSender {
@Autowired
private AmqpTemplate rabbitTemplate;
public void send(String msg) {
SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
System.out.println("发送时间:" + sf.format(new Date()));
rabbitTemplate.convertAndSend(DelayedConfig.EXCHANGE_NAME,DelayedConfig.QUEUE_NAME,msg,new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setHeader("x-delay",3000);
return message;
}
});
}
}
3.3 consumption news
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.text.SimpleDateFormat;
import java.util.Date;
@Component
@RabbitListener(queues = "delayed.goods.order")
public class DelayedReceiver {
@RabbitHandler
public void process(String msg) {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
System.out.println("接收时间:" + sdf.format(new Date()));
System.out.println("消息内容:" + msg);
}
}
3.4 test queue
import com.example.rabbitmq.RabbitmqApplication;
import com.example.rabbitmq.mq.delayed.DelayedSender;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import java.text.SimpleDateFormat;
import java.util.Date;
@RunWith(SpringRunner.class)
@SpringBootTest
public class DelayedTest {
@Autowired
private DelayedSender sender;
@Test
public void test() throws InterruptedException {
SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd");
sender.send("Hi Admin.");
Thread.sleep(5 * 1000); //等待接收程序执行之后,再退出测试
}
}
The results are as follows:
发送时间:2018-09-11 20:47:51
接收时间:2018-09-11 20:47:54
消息内容:Hi Admin.
Full code access to my GitHub: https://github.com/vipstone/springboot-example/tree/master/springboot-rabbitmq
4、 Summary
So far, we have implemented the delay function with the "rabbitmq delayed message exchange" plug-in, but it should be noted that if the delay plug-in is disabled with the command "rabbitmq plugins disable rabbitmq_delayed_message_exchange", all unsent delay messages will be lost.