Detailed explanation of smooth closing of Message Queue task in Java
preface
Message Queuing Middleware is an important component in distributed system. It mainly solves the problems of application decoupling, asynchronous messages, traffic cutting and so on, and realizes the architecture of high performance, high availability, scalability and final consistency. At present, ActiveMQ, rabbitmq, zeromq, Kafka, metamq and rocketmq are widely used message queues
Message queuing application scenario
Message queuing is commonly used in practical applications: asynchronous processing, application decoupling, traffic clipping and message communication.
This article mainly introduces the content related to the smooth closing of message queue tasks in Java, which is shared for your reference and learning. I won't say much below. Let's take a look at the detailed introduction together.
1. Problem background
For Message Queue task listening, we usually use java to write an independent program and run it on a Linux server. When the subscriber program starts, it will receive messages through the message queue client and put them into the thread pool for concurrent processing.
So the problem is, when we need to restart after modifying the program, how to ensure that all messages can be processed?
Some open source Message Queuing Middleware will provide ack mechanism (message confirmation mechanism). After the subscriber processes the message, it will notify the server to delete the corresponding message. If the subscriber has an exception and the server does not receive the confirmation consumption, it will retry sending.
If the Message Queuing Middleware does not provide an ACK mechanism, or the ACK function is turned off for the sake of high swallowing measurement, how to ensure that messages can be processed to the greatest extent?
Normally, after the subscriber program is closed, messages will accumulate in the queue and wait for the subscriber to subscribe and consume next time, so the unreceived messages will not be lost. The possible problem is that at the moment of shutdown, messages that have been taken out of the message queue but have not been processed.
Therefore, we need a smooth shutdown mechanism to ensure that the received messages can be processed normally when restarting.
2. Problem analysis
The idea of smooth closing is as follows:
Close message subscription: the client of the message queue will provide a method to close the connection. You can view the API yourself.
Close the thread pool: Java's ThreadPoolExecutor thread pool provides two methods: shutdown() and shutdown now(). The difference is that the former will wait for all messages in the thread pool to be processed, while the latter will directly stop all threads and return the list of unprocessed threads. Because we need to use the shutdown () method to shut down, and use the isterminated () method to judge whether the thread pool has been closed.
So the problem comes again. How do we notify the program that it needs to close?
In Linux, the process is closed by signal. We can use kill - 9 PID to close the process. In addition to - 9, we can use kill - L to view other semaphores of the kill command.
There are two closing methods:
Supplementary note: in addshutdownhook method and handle method, if you call system Exit will cause deadlock and make the process unable to exit normally.
The pseudo codes are as follows
Simulation demo
Next, a demo is used to simulate the relevant logic operations
First, simulate a producer to produce 5 messages per second
Then simulate a subscriber. After receiving the message, put it into the thread pool for processing. The thread pool is fixed with 4 threads, and the processing time of each thread is 1 second. In this way, the thread pool will backlog 1 message per second.
When we run on the service, we can see the relevant output information through the console. The number of backlog messages of the thread pool is output in the demo
Open another terminal, view the process number through the PS command, or start the java process through nohup to get the process ID
When we execute kill - 12 PID, we can see that the business logic is closed
3. Summary
In fact, not only message queue tasks, but also similar functions can be seen in common RPC services, such as SCF of 58. In the source code, usr2 semaphores and addshutdownhook hook methods are registered respectively.
In the restart script, the kill - 12 command will be sent first. After receiving the signal, the RPC service will modify the server status to closed. The kill - 15 command is then sent to trigger the hook method and close all connections.
Well, the above is the whole content of this article. I hope the content of this article has a certain reference value for your study or work. If you have any questions, you can leave a message. Thank you for your support for programming tips.