Rabbitmq series (II) deeply understand the working principle and simple use of rabbitmq

Deeply understand the working principle and simple use of rabbitmq

Rabbitmq series

Introduction to rabbitmq

Before introducing rabbitmq, the implementation should introduce MQ. What is MQ?

The full name of MQ is message queue, which can be understood as message queue. In short, it means that messages are delivered in a pipeline.

Rabbitmq is a Message Queuing service that implements AMQP (Advanced message queuing protocol) advanced message queuing protocol. It is written in Erlang language.

Usage scenario

When we rush to buy goods, the system will remind us to wait in the queue, rather than blocking the page or reporting an error to the user as a few years ago.

This kind of queuing settlement uses the message queuing mechanism. It is put into the channel for settlement processing one by one, rather than the sudden influx of a large number of query additions at a certain time, which brings down the database. Therefore, rabbitmq essentially plays a role in cutting peaks and filling valleys to escort the business.

Why rabbitmq

There are many MQS available in the market, such as ActiveMQ, zeromq and appche qpid. The question is, why choose rabbitmq?

Working mechanism

Producers, consumers and agents

Before understanding message communication, we should first understand three concepts: producer, consumer and agent.

Producer: the creator of the message, responsible for creating and pushing data to the message server;

Consumer: the receiver of the message, which is used to process data and confirm messages;

Agent: rabbitmq itself is used to play the role of "express". It does not produce messages, but only plays the role of "express".

Message sending principle

First, you must connect to rabbit to publish and consume messages. How do you connect and send messages?

A TCP connection will be created between your application and the rabbit server. Once TCP is turned on and passes the authentication, authentication is the connection information, user name and password of the rabbit server you sent before trying to connect to the rabbit. It is a bit like a program connecting to the database. There are two ways of connection authentication using Java. The following code will be introduced in detail, Once the authentication passes, your application and rabbit create an AMQP channel.

Channel is a virtual connection created on "real" TCP. AMQP commands are sent through the channel. Each channel will have a unique ID. whether publishing messages, subscribing to queues or introducing messages are completed through the channel.

Why not send commands directly over TCP?

For the operating system, creating and destroying TCP sessions is a very expensive overhead. Assuming that there are thousands of connections per second in the peak period, each connection must create a TCP session, which causes a huge waste of TCP connections, and the TCP that the operating system can create per second is limited, so it will soon encounter a system bottleneck.

If we use a TCP connection for each request, we can not only meet the needs of performance, but also ensure the privacy of each connection, which is the reason for introducing the concept of channel.

You have to know, rabbit

To really understand rabbit, you must know some nouns.

Including: connectionfactory, channel, exchange, queue, routingkey and bindingkey.

Connectionfactory (connection manager): the manager that establishes the connection between the application and rabbit, which is used in the program code;

Channel: the channel used for message push;

Exchange (exchange): used to accept and distribute messages;

Queue: used to store producer's messages;

Routingkey: used to allocate the generated data to the switch;

Bindingkey: used to bind the message of the exchange to the queue;

According to the above explanation, the routing keys and binding keys are the most difficult to understand. See the following figure for how they work:

For more information about switches, we'll talk about it later.

Message persistence

There is an unspeakable secret between rabbit queues and switches, that is, restarting the server by default will lead to message loss, so how to ensure that rabbit is not lost when restarting? The answer is message persistence.

When you send a message to the rabbit server, you need to choose whether you want to persist, but this does not guarantee that the rabbit can recover from the crash. If you want the rabbit message to recover, you must meet three conditions:

How persistence works

Rabbit will write your persistent message to the persistent log file on the disk. After the message is consumed, rabbit will mark the message as waiting for garbage collection.

Disadvantages of persistence

The advantages of message persistence are obvious, but the disadvantages are also obvious. That is, performance, because the performance of writing to the hard disk is much lower than that of writing to the memory, which reduces the throughput of the server. Although the use of SSD hard disk can alleviate things, it still sucks up the performance of rabbit. When thousands of messages are written to the disk, the performance is very low.

Therefore, users should choose their own way according to their own situation.

Virtual host

Each rabbit can create many vhosts, which we call virtual hosts. In fact, each virtual host is a mini version of rabbit MQ, with its own queues, switches and bindings, and its own permission mechanism.

Vhost feature

Vhost operation

It can be created with the rabbitmqctl tool command:

Delete Vhost:

View all vhosts:

Environment construction

Previously, we have introduced the steps of building rabbitmq in Ubuntu: building rabbitmq environment on Ubuntu

If you are installing on Windows 10, it will be easier. First put the download address:

Erlang / rabbit server Baidu network disk link: https://pan.baidu.com/s/1TnKDV-ZuXLiIgyK8c8f9dg Password: wct9

Of course, you can also go to the official websites of Erlang and rabbit, but the speed is relatively slow. My baidu cloud rabbit latest version: 3.7 6. Erlang version: 20.2. Note: do not download the latest Erlang. There is a problem opening the extension plug-in on windows10 and it cannot be opened.

use: http://localhost:15672 The default login account is guest and the password is guest

Repeated installation of rabbit server

If you are not installing rabbit server on windows for the first time, you must uninstall rabbit and Erlang and find the registry: HKEY_ LOCAL_ Machine \ software \ Ericsson \ Erlang \ erlsrv delete all items under it.

Otherwise, the rabbit cannot be started after installation. Theoretically, the order of uninstallation is to install the rabbit in Erlang first.

code implementation

The Java version is implemented by using Maven project. You can view: MyEclipse 2017 cracking settings and Maven project construction

After the project is created successfully, add the rabbit client jar package, just in pom.com The following information is configured in XML:

 <dependency>
  <groupId>com.rabbitmq</groupId>
  <artifactId>amqp-client</artifactId>
  <version>5.2.0</version>
</dependency>

The Java implementation code is divided into two classes. The first is to create a rabbit connection, and the second is the application class to publish and consume messages in the simplest way.

Rabbit can be connected in two ways:

Mode 1:

public static Connection GetRabbitConnection() {
	ConnectionFactory factory = new ConnectionFactory();
	factory.setUsername(Config.UserName);
	factory.setPassword(Config.Password);
	factory.setVirtualHost(Config.VHost);
	factory.setHost(Config.Host);
	factory.setPort(Config.Port);
	Connection conn = null;
	try {
		conn = factory.newConnection();
	} catch (Exception e) {
		e.printStackTrace();
	}
	return conn;
}

Mode 2:

public static Connection GetRabbitConnection2() {
	ConnectionFactory factory = new ConnectionFactory();
	// 连接格式:amqp://userName:password@hostName:portNumber/virtualHost
	String uri = String.format("amqp://%s:%s@%s:%d%s",Config.UserName,Config.Password,Config.Host,Config.Port,Config.VHost);
	Connection conn = null;
	try {
		factory.setUri(uri);
		factory.setVirtualHost(Config.VHost);
		conn = factory.newConnection();
	} catch (Exception e) {
		e.printStackTrace();
	}
	return conn;
}

Part II: application class, which uses the simplest way to publish and consume messages

public static void main(String[] args) {
	Publisher(); // 推送消息

	Consumer(); // 消费消息
}

/**
 * 推送消息
 */
public static void Publisher() {
	// 创建一个连接
	Connection conn = ConnectionFactoryUtil.GetRabbitConnection();
	if (conn != null) {
		try {
			// 创建通道
			Channel channel = conn.createChannel();
			// 声明队列【参数说明:参数一:队列名称,参数二:是否持久化;参数三:是否独占模式;参数四:消费者断开连接时是否删除队列;参数五:消息其他参数】
			channel.queueDeclare(Config.QueueName,null);
			String content = String.format("当前时间:%s",new Date().getTime());
			// 发送内容【参数说明:参数一:交换机名称;参数二:队列名称,参数三:消息的其他属性-routing headers,此属性为MessageProperties.PERSISTENT_TEXT_PLAIN用于设置纯文本消息存储到硬盘;参数四:消息主体】
			channel.basicpublish("",Config.QueueName,null,content.getBytes("UTF-8"));
			System.out.println("已发送消息:" + content);
			// 关闭连接
			channel.close();
			conn.close();
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
}

/**
 * 消费消息
 */
public static void Consumer() {
	// 创建一个连接
	Connection conn = ConnectionFactoryUtil.GetRabbitConnection();
	if (conn != null) {
		try {
			// 创建通道
			Channel channel = conn.createChannel();
			// 声明队列【参数说明:参数一:队列名称,参数二:是否持久化;参数三:是否独占模式;参数四:消费者断开连接时是否删除队列;参数五:消息其他参数】
			channel.queueDeclare(Config.QueueName,null);

			// 创建订阅器,并接受消息
			channel.basicConsume(Config.QueueName,"",new DefaultConsumer(channel) {
				@Override
				public void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body) throws IOException {
					String routingKey = envelope.getRoutingKey(); // 队列名称
					String contentType = properties.getContentType(); // 内容类型
					String content = new String(body,"utf-8"); // 消息正文
					System.out.println("消息正文:" + content);
					channel.basicAck(envelope.getDeliveryTag(),false); // 手动确认消息【参数说明:参数一:该消息的index;参数二:是否批量应答,true批量确认小于index的消息】
				}
			});

		} catch (Exception e) {
			e.printStackTrace();
		}
	}
}

Detailed comments have been written in the code, but there is not much introduction here.

Execution effect, as shown in the figure:

The content of this article comes from the network collection of netizens. It is used as a learning reference. The copyright belongs to the original author.
THE END
分享
二维码
< <上一篇
下一篇>>