Rabbitmq series (III) introduction and practice of rabbitmq exchange

Introduction and practice of rabbitmq exchange switch

Rabbitmq series

Reading guide

After we have the basic knowledge of rabbit (see: in-depth understanding of the working principle and simple use of rabbit MQ for the basic knowledge), we will focus on the knowledge of exchange in rabbit in this chapter.

Exchanger classification

Rabbitmq's exchange is divided into four categories:

Among them, the header switch allows you to match the header of AMQP messages instead of the routing key. In addition, the header switch is completely consistent with the direct switch, but the performance is very poor, so we won't explain it in this article.

Note: the fan out and topic switches do not have historical data, that is, for queues created in the middle of the process, previous messages cannot be obtained.

1. Direct switch

Direct is the default switch type, which is also very simple. If the routing key matches, the message will be delivered to the corresponding queue, as shown in the figure:

Usage code: channel Basicpublish ("", queuename, null, message) pushes the message of the direct switch to the queue. The null character is the default direct switch, and the queue name is used as the routing key.

Direct switch code example

Sender:

Connection conn = connectionFactoryUtil.GetRabbitConnection();
Channel channel = conn.createChannel();
// 声明队列【参数说明:参数一:队列名称,参数二:是否持久化;参数三:是否独占模式;参数四:消费者断开连接时是否删除队列;参数五:消息其他参数】
channel.queueDeclare(config.QueueName,false,null);
String message = String.format("当前时间:%s",new Date().getTime());
// 推送内容【参数说明:参数一:交换机名称;参数二:队列名称,参数三:消息的其他属性-路由的headers信息;参数四:消息主体】
channel.basicpublish("",config.QueueName,message.getBytes("UTF-8"));

The receiving end continuously receives messages:

Connection conn = connectionFactoryUtil.GetRabbitConnection();
Channel channel = conn.createChannel();
// 声明队列【参数说明:参数一:队列名称,参数二:是否持久化;参数三:是否独占模式;参数四:消费者断开连接时是否删除队列;参数五:消息其他参数】
channel.queueDeclare(config.QueueName,null);
Consumer defaultConsumer = new DefaultConsumer(channel) {
	@Override
	public void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body) throws IOException {
		String message = new String(body,"utf-8"); // 消息正文
		System.out.println("收到消息 => " + message);
		channel.basicAck(envelope.getDeliveryTag(),false); // 手动确认消息【参数说明:参数一:该消息的index;参数二:是否批量应答,true批量确认小于当前id的消息】
	}
};
channel.basicConsume(config.QueueName,"",defaultConsumer);

Receiving end, get a single message

Connection conn = connectionFactoryUtil.GetRabbitConnection();
Channel channel = conn.createChannel();
channel.queueDeclare(config.QueueName,null);
GetResponse resp = channel.basicGet(config.QueueName,false);
String message = new String(resp.getBody(),"UTF-8");
channel.basicAck(resp.getEnvelope().getDeliveryTag(),false); // 消息确认

Continuous message acquisition using: basic consume; Single message acquisition using: basic get。

Note: you cannot use for loop single message consumption to replace continuous message consumption, because the performance is very low;

Fair scheduling

When there are multiple subscribers at the receiving end, direct will poll and distribute them fairly to each subscriber (subscriber message confirmation is normal), as shown in the figure:

Forget after sending feature of message

The forget after sending mode means that the receiver does not know the source of the message. If you want to specify the sender of the message, you need to include it in the sending content. This is like indicating your name in the letter. Only in this way can you know who the sender is.

Message confirmation

After reading the above code, we can know that channel must be used after the message is received If the basicack () method confirms manually (not in the automatic confirmation deletion mode), then the problem comes.

What happens if the message is not confirmed?

If the application receives a message and forgets to confirm the receipt because of a bug, the status of the message in the queue will change from "ready" to "unacknowledged", as shown in the figure:

If the message is received but not confirmed, rabbit will not send more messages to the application because rabbit thinks you are not ready to receive the next message.

This message will remain unacketed until you confirm the message or disconnect from rabbit. Rabbit will automatically change the message to ready status and distribute it to other subscribers.

Of course, you can use this to make your program delay confirming the message until your program has processed the corresponding business logic, which can effectively prevent too many messages from rabbit and cause the program to crash.

Message confirmation Demo:

Connection conn = connectionFactoryUtil.GetRabbitConnection();
Channel channel = conn.createChannel();
channel.queueDeclare(config.QueueName,false);

channel. Basicack (long deliverytag, Boolean multiple) is the message confirmation. Parameter 1: the ID of the message; Parameter 2: whether to respond in batch. True: batch confirm messages with less than the secondary ID.

Conclusion: every message that consumers consume must be confirmed.

Message reject

There are two options before the message is acknowledged:

Option 1: disconnect from rabbit, so that rabbit will redistribute the message to another consumer;

Option 2: reject the message sent by rabbit and use channel Basicreject (long deliverytag, Boolean request), parameter 1: ID of the message; Parameter 2: message processing method. If it is true, rabbit will reassign the message to other subscribers. If it is set to false, rabbit will send the message to a special "dead letter" queue to store rejected messages that are not re queued.

Message reject Demo:

Connection conn = connectionFactoryUtil.GetRabbitConnection();
Channel channel = conn.createChannel();
channel.queueDeclare(config.QueueName,"UTF-8");
channel.basicReject(resp.getEnvelope().getDeliveryTag(),true); //消息拒绝

2. Fanout switch - Publish / subscribe mode

Fanout is different from the direct switch. Fanout is a publish / subscribe switch. When you send a message, the switch will broadcast the message to all queues attached to the switch.

For example, when a user uploads his own avatar, the image needs to clear the cache, and the user should be rewarded with points. You can bind these two queues to the image upload switch. In this way, when there is a third and fourth need to process the uploaded image, the original code can remain unchanged, and you only need to add a subscription message, In this way, the sender and consumer code are completely decoupled, and new functions can be easily added.

Unlike the direct switch, we add a new channel when sending messages Exchangedeclare (exchanename, "fanout"), this line of code declares the fanout switch.

Sender:

final String ExchangeName = "fanoutec"; // 交换器名称
Connection conn = connectionFactoryUtil.GetRabbitConnection();
Channel channel = conn.createChannel();
channel.exchangeDeclare(ExchangeName,"fanout"); // 声明fanout交换器
String message = "时间:" + new Date().getTime();
channel.basicpublish(ExchangeName,message.getBytes("UTF-8"));

Accepting messages is different from direct. We need to declare the fanout router and bind it to the fanout switch using the default queue.

Receiving end:

Connection conn = connectionFactoryUtil.GetRabbitConnection();
Channel channel = conn.createChannel();
channel.exchangeDeclare(ExchangeName,"fanout"); // 声明fanout交换器
String queueName = channel.queueDeclare().getQueue(); // 声明队列
channel.queueBind(queueName,ExchangeName,"");
Consumer consumer = new DefaultConsumer(channel) {
	@Override
	public void handleDelivery(String consumerTag,"UTF-8");
	}
};
channel.basicConsume(queueName,true,consumer);

The most difference between fanout and direct is at the receiving end. Fanout needs to bind the queue to the corresponding switch to subscribe to messages.

Including channel queueDeclare(). Getqueue () is a random queue. Rabbit will randomly generate the queue name. Once the consumer disconnects, the queue will be deleted automatically.

Note: the routingkey is invalid for the fanout switch. This parameter is ignored.

3. Topic switch -- matching subscription patterns

Finally, the topic switch is introduced. The operation of the topic switch is similar to that of fanout, but it can match the information you want to subscribe more flexibly. At this time, the routing key is in use. The routing key is used for message (rule) matching.

Suppose we have a log system that sends logs of all log levels to the switch, such as warning, log, error and fatal, but we only want to process logs above error. What should we do? This requires the use of topic router.

The key of topic router is to define the routing key. The routingkey name cannot exceed 255 bytes, and use "." As a separator, for example: com mq. rabbit. error。

When consuming a message, the routingkey can match the message with the following characters:

For example, a message "com. MQ. Rabbit. Error" is published:

Can match routing keys on:

Cannot match routing key on:

So if you want to subscribe to all messages, you can use "#" matching.

Note: the fan out and topic switches do not have historical data, that is, for queues created in the middle of the process, previous messages cannot be obtained.

Publisher:

String routingKey = "com.mq.rabbit.error";
Connection conn = connectionFactoryUtil.GetRabbitConnection();
Channel channel = conn.createChannel();
channel.exchangeDeclare(ExchangeName,"topic"); // 声明topic交换器
String message = "时间:" + new Date().getTime();
channel.basicpublish(ExchangeName,routingKey,message.getBytes("UTF-8"));

Receiving end:

Connection conn = connectionFactoryUtil.GetRabbitConnection();
Channel channel = conn.createChannel();
channel.exchangeDeclare(ExchangeName,"topic"); // 声明topic交换器
String queueName = channel.queueDeclare().getQueue(); // 声明队列
String routingKey = "#.error";
channel.queueBind(queueName,routingKey);
Consumer consumer = new DefaultConsumer(channel) {
	@Override
	public void handleDelivery(String consumerTag,"UTF-8");
		System.out.println(routingKey + "|接收消息 => " + message);
	}
};
channel.basicConsume(queueName,consumer);

Extension - Custom thread pool

If you need a larger control connection, you can set the thread pool yourself. The code is as follows:

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

ExecutorService es = Executors.newFixedThreadPool(20);
Connection conn = factory.newConnection(es);

In fact, students who have seen the source code may know that factory NewConnection itself is also the mechanism of wire process pool by default, connectionfactory The source code of class is as follows:

private ExecutorService sharedExecutor;
public Connection newConnection() throws IOException,TimeoutException {
		return newConnection(this.sharedExecutor,Collections.singletonList(new Address(getHost(),getPort())));
}
public void setSharedExecutor(ExecutorService executor) {
		this.sharedExecutor = executor;
}

Where this Sharedexecutor is the default thread pool. You can set the thread pool of connectionfactory through the setsharedexecutor() method. If it is not set, it will be null.

If the user sets the thread pool, as written in the first paragraph of this section, the user-defined thread pool will not be closed automatically when the connection is closed. Therefore, the user must close it manually by calling the shutdown () method, otherwise the termination of the JVM may be blocked.

The official recommendation is that this feature should be considered only when there are serious performance bottlenecks in the program.

Project address

GitHub: https://github.com/vipstone/rabbitmq-java.git

The content of this article comes from the network collection of netizens. It is used as a learning reference. The copyright belongs to the original author.
THE END
分享
二维码
< <上一篇
下一篇>>