Rabbitmq series (IV) rabbitmq transaction and confirm sender message confirmation – in depth interpretation

Rabbitmq transaction and confirm sender message acknowledgement - in depth interpretation

Rabbitmq series

introduction

According to the previous knowledge (deeply understand the working principle and simple use of rabbit MQ, and the introduction and practice of several working modes of rabbit) we know that if you want to ensure the reliability of messages, you need to persist messages. However, message persistence requires code settings, and another important step is very important, that is, to ensure that your messages enter the broker smoothly (proxy server), as shown in the figure:

Under normal circumstances, message persistence can be completed if the message enters the queue through the exchange. However, if the message is unexpected before it reaches the broker, it will cause message loss. Is there any way to solve this problem?

Rabbitmq has two ways to solve this problem:

1、 Transaction usage

The implementation of transactions mainly involves the setting of channels. There are three main methods:

It can be seen from the above that transactions begin with TX, which should be the abbreviation of transaction extend (transaction extension module). If there is an accurate explanation, please leave a message on the blog.

Let's look at the specific code implementation:

// 创建连接
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername(config.UserName);
factory.setPassword(config.Password);
factory.setVirtualHost(config.VHost);
factory.setHost(config.Host);
factory.setPort(config.Port);	
Connection conn = factory.newConnection();
// 创建信道
Channel channel = conn.createChannel();
// 声明队列
channel.queueDeclare(_queueName,true,false,null);
String message = String.format("时间 => %s",new Date().getTime());
try {
	channel.txSelect(); // 声明事务
	// 发送消息
	channel.basicpublish("",_queueName,MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes("UTF-8"));
	channel.txCommit(); // 提交事务
} catch (Exception e) {
	channel.txRollback();
} finally {
	channel.close();
	conn.close();
}

Note: the user needs to set config XX configures its own rabbit information.

From the above code, we can see that the code before sending the message is the same as that described before. Only before sending the message, you need to declare the channel as the transaction mode and commit or rollback the transaction.

After understanding the implementation of transactions, let's use Wireshark to grab a packet to see how transactions are executed, as shown in the figure:

Enter IP Addr = = Rabbit & & AMQP view the communication between the client and rabbit, and you can see the interaction process:

The above completes the transaction interaction process. If there is a problem in any link, IOException will be thrown for removal, so that users can intercept exceptions, roll back transactions, or decide whether to repeat messages.

Then, since there are already transactions, there is nothing to use the sender confirmation mode because the performance of transactions is very poor. Transaction performance test:

Transaction mode, the results are as follows:

In non transaction mode, the results are as follows:

It can be seen from the above that the performance of non transaction mode is 149 times higher than that of transaction mode. My computer test shows that different computer configurations are slightly different, but the conclusion is the same. The performance of transaction mode is much worse. Is there a solution that can ensure the reliability of messages and take into account the performance? That is the confirmation sender confirmation mode to be talked about next.

Expand knowledge

We know that consumers can send messages automatically or manually to confirm consumption messages. What happens if we use transactions in the consumer mode (of course, if we use manual confirmation messages, we can't use transactions at all)?

Consumer mode usage transactions

Assuming that transactions are used in the consumer pattern and transaction rollback is performed after message confirmation, what changes will happen to rabbitmq?

The results are divided into two cases:

2、 Confirm sender confirmation mode

The confirm sender confirmation mode is similar to the transaction mode. The sender confirmation is also performed by setting the channel.

Three implementation methods of confirm:

Method 1: channel Waitforconfirms() normal sender confirmation mode;

Mode 2: channel Waitforconfirmsordie() batch confirmation mode;

Method 3: channel Addconfirmlistener() asynchronously listens to the sender's confirmation mode;

Mode 1: normal confirm mode

// 创建连接
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername(config.UserName);
factory.setPassword(config.Password);
factory.setVirtualHost(config.VHost);
factory.setHost(config.Host);
factory.setPort(config.Port);
Connection conn = factory.newConnection();
// 创建信道
Channel channel = conn.createChannel();
// 声明队列
channel.queueDeclare(config.QueueName,null);
// 开启发送方确认模式
channel.confirmSelect();
String message = String.format("时间 => %s",new Date().getTime());
channel.basicpublish("",config.QueueName,null,message.getBytes("UTF-8"));
if (channel.waitForConfirms()) {
	System.out.println("消息发送成功" );
}

You can see from the code that we only need to push the message before the channel Confirmselect() declares to enable the sender confirmation mode, and then use channel Waitforconfirms() waits for the message to be confirmed by the server.

Mode 2: batch confirm mode

// 创建连接
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername(config.UserName);
factory.setPassword(config.Password);
factory.setVirtualHost(config.VHost);
factory.setHost(config.Host);
factory.setPort(config.Port);
Connection conn = factory.newConnection();
// 创建信道
Channel channel = conn.createChannel();
// 声明队列
channel.queueDeclare(config.QueueName,null);
// 开启发送方确认模式
channel.confirmSelect();
for (int i = 0; i < 10; i++) {
	String message = String.format("时间 => %s",new Date().getTime());
	channel.basicpublish("",message.getBytes("UTF-8"));
}
channel.waitForConfirmsOrDie(); //直到所有信息都发布,只要有一个未确认就会IOException
System.out.println("全部执行完成");

From the above code, you can see channel Waitforconfirmsordie(), using the synchronization method, will execute the following code after all messages are sent. As long as a message is not confirmed, an IOException will be thrown.

Mode 3: asynchronous confirm mode

// 创建连接
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername(config.UserName);
factory.setPassword(config.Password);
factory.setVirtualHost(config.VHost);
factory.setHost(config.Host);
factory.setPort(config.Port);
Connection conn = factory.newConnection();
// 创建信道
Channel channel = conn.createChannel();
// 声明队列
channel.queueDeclare(config.QueueName,message.getBytes("UTF-8"));
}
//异步监听确认和未确认的消息
channel.addConfirmListener(new ConfirmListener() {
	@Override
	public void handleNack(long deliveryTag,boolean multiple) throws IOException {
		System.out.println("未确认消息,标识:" + deliveryTag);
	}
	@Override
	public void handleAck(long deliveryTag,boolean multiple) throws IOException {
		System.out.println(String.format("已确认消息,标识:%d,多个消息:%b",deliveryTag,multiple));
	}
});

The advantage of asynchronous mode is high execution efficiency. You don't need to wait for the message to be executed. You just need to listen to the message. The information returned asynchronously is as follows:

It can be seen that the code is executed asynchronously, and the message confirmation may be batch confirmation. Whether batch confirmation depends on the returned multiple parameter. This parameter is the bool value. If true, it means that all previous messages with the value of deliverytag have been executed in batch. If false, it means a single confirmation.

Confirm performance test

Test premise: like transactions, we send 1W messages.

Method 1: confirm normal mode

Mode 2: confirm batch mode

Mode 3: confirm asynchronous listening mode

The content of this article comes from the network collection of netizens. It is used as a learning reference. The copyright belongs to the original author.
THE END
分享
二维码
< <上一篇
下一篇>>