Simple encapsulation of Alibaba cloud message queue using java

1、 Foreword

Recently, the company needs to use Alibaba cloud message queue. In order to make it easier to use, I spent a few days encapsulating the message queue into API calls to facilitate the call of internal systems. Now it has been completed. I hereby record the process and relevant technologies used, and encourage you.

Alibaba cloud now provides two kinds of message services: MNS service and ons service. I think MNS is a simplified version of ons, and the message consumption of MNS needs to customize the polling policy. In contrast, The publish and subscribe mode of ons is more powerful (for example, compared with MNS, ons provides message tracking, logging, monitoring and other functions). Its API is more convenient to use. I heard that Ali will not develop MNs in the future, but only maintain it. Ons service will gradually replace MNS service and become the main product of Ali's message service. Therefore, if there is a need to use message queue, it is recommended not to use MNS again to make it easier Using ons is the best choice.

Technologies involved: spring, reflection, dynamic proxy, Jackson serialization and deserialization

Before reading the following articles, you need to read the above documents to understand the related concepts (topic, consumer, producer, tag, etc.) and the simple sending and receiving code implementation provided in the documents.

This blog is only for friends who have a basic knowledge of message queuing. I'm naturally glad to help you. Don't scold those who don't understand. It shows that you're in the wrong way.

2、 Design scheme

1. Message sending

In a simple CS architecture, assuming that the server will listen to messages sent by a topic producer, it should first provide an API for the client. The client can produce messages through the producer by simply calling the API

2. Message receiving

Since the API is developed by the server, the server certainly knows how to consume these messages

In this process, the server actually acts as a consumer and the client actually acts as a producer, but the rules for producers to produce messages are formulated by consumers to meet consumers' consumption needs.

3. Ultimate goal

We will create a separate jar package named queue core to provide producers and consumers with specific implementations of dependencies and publish subscriptions.

3、 Message sending

1. Provide interface for consumers

Because the relationship between topic and producer is n:1, producer ID is directly used as an attribute of topic; Tag is a key filter condition. Consumers use it to classify messages and do different business processing. Therefore, tag is used as the routing condition here.

2. The producer uses the API provided by the consumer to send messages

Since the consumer only provides the interface for the producer, there is no way to use the interface directly, because there is no way to instantiate it. Here, the dynamic agent is used to generate objects. In the API provided by the consumer, the following config is added to facilitate the producer to directly import config. Here, spring config based on Java is used. Please know.

3. Queue core encapsulation of messages sent by producers

All annotations (topic, tag, body, key) in 1 above and the queueresourcefactory class used in 2 should be defined in queue core. The definition of annotation only defines rules, and the real implementation is actually in queueresourcefactory

Here, the custom package and the package name used by the third party are specially pasted to make it easy to distinguish.

What exactly has been done here?

The process of sending messages is that a dynamic agent creates a proxy object. The object will be intercepted when calling the method. First, analyze all annotations, such as topicName, producerId, tag and other key information from the annotations, then call Ali SDK to send messages. The process is very simple, but note that when sending messages, it is time to divide the environment. Generally speaking, there are three environments in Enterprises: QA, staging and product. QA and staging are test environments, and there are also three environments for message queues. However, QA and staging environments often use the same Ali account to reduce costs, so the created topic and productid will be placed in the same area, In this way, topicname with the same name is not allowed to exist, so it is distinguished by adding an environment prefix, such as QA_ TopicName,PID_ Staging_ Producerid, etc; In addition, queue core provides an mqconnection interface to obtain configuration information. Producer services only need to implement this interface.

4. The producer sends a message

Only a few lines of code are needed to send the message to the specified topic, which is much simpler than the original sending code.

4、 Message consumption

Compared with message sending, message consumption is more complex.

1. Message consumption design

Since the relationship between topic and consumer is n: n, the consumerid is placed on the specific implementation method of the consumer

Here are two new annotations @ queueresource and @ consumerannotation. How to use these two annotations will be discussed later. Someone will ask me why I use the name consumer annotation instead of the name consumer, because the name consumer conflicts with the name in the SDK provided by aliyun....

Here, the consumer provides an API interface to the producer to facilitate the producer to send messages, and the consumer implements the interface to consume the messages sent by the producer. How to implement the API interface to realize listening is the key logic.

2. Queue core implements the core logic of message queue listening

Step 1: use the listening method of the spin container to obtain all beans annotated with queueresource

Step 2: distribute and process beans

How to deal with these beans? Each bean is actually an object. With an object, such as the userqueueresourceimpl object in the above example, we can get the interface bytecode object implemented by the object, and then we can get the annotations on the userqueuererousce of the interface and the annotations on and in the method, Of course, the annotations on the userqueueresourceimpl implementation method can also be obtained. Here, I take the consumerid as the key, encapsulate the other relevant information as value and cache it in a map object. The core code is as follows:

Step 3: realize consumption through reflection

First, determine the timing of the reflection action, that is, listen to the new message

Second, how to perform reflection actions? Needless to say, children's shoes with reflection related foundation know how to do it. The core code is as follows:

5、 See the GIT link below for the complete code

https://github.com/kdyzm/queue-core.git

The content of this article comes from the network collection of netizens. It is used as a learning reference. The copyright belongs to the original author.
THE END
分享
二维码
< <上一篇
下一篇>>