A reusable transaction message scheme based on rabbitmq

premise

Distributed transaction is a thorny problem in microservice practice. In the microservice practice schemes implemented by the author, compromise or avoid strong consistency schemes are adopted. Referring to the local message table scheme proposed by eBay many years ago, a lightweight encapsulation is made based on rabbitmq and MySQL (JDBC), and a low intrusive transaction message module is realized. The content of this paper is to analyze the design idea and implementation of the whole scheme in detail. The environmental dependencies are as follows:

Scheme design idea

In principle, transaction messages are only suitable for weak consistency (or final consistency) scenarios. Common weak consistency scenarios include:

Transaction messages should not be used in strong consistency scenarios.

Generally, strong consistency requires strict synchronization, that is, all operations must succeed or fail at the same time, which will introduce additional consumption caused by synchronization. If the design of a transaction message module is reasonable and the functions of compensation, query and monitoring are completed, the overall throughput is higher than that of strict synchronization because the system interaction is asynchronous. In the business system for which the author is responsible, a basic principle is also customized based on the use of transaction messages: on the premise that the message content is correct, the consumer needs to take care of itself in case of abnormalities.

In order to reduce code intrusion, transaction messages need to use spring's programmatic or declarative transactions. Programming transactions generally rely on transactiontemplate, while declarative transactions rely on AOP module and annotation @ transactional.

Next, you need to customize a transaction message function module and add a transaction message record table (actually a local message table) to save each message record to be sent. The main functions of the transaction message function module are:

Logical unit of transaction execution

In the transaction execution logic unit, the transaction message record to be pushed needs to be saved, that is, the local (business) logic and the transaction message record saving operation are bound to the same transaction.

The step of sending a message to the rabbitmq server needs to be postponed until the transaction is committed, so as to ensure that the two operations of successful transaction submission and successful message sending to the rabbitmq server are consistent. In order to combine the two actions of saving the transaction message to be sent and sending the message to rabbitmq into one action from the perspective of user perception, the transaction synchronizer transaction synchronization unique to spring needs to be used. Here, the callback positions of the main methods of the transaction synchronizer are analyzed, Mainly refer to abstractplatformtransactionmanager #commit() or abstractplatformtransactionmanager #processcommit() methods:

The above figure only illustrates the scenario of correct transaction submission (excluding the scenario of exceptions). It can be clearly known here that the aftercommit() and aftercompletion (int status) methods of transaction synchronizer transactionsynchronization call back after the real transaction submission point abstractplatformtransactionmanager #docommit(), Therefore, one of the two methods can be used to push messages to the rabbitmq server. The overall pseudo code is as follows:

@Transactional
public Dto businessMethod(){
    business transaction code block ...
    // 保存事务消息
    [saveTransactionMessageRecord()]
    // 注册事务同步器 - 在afterCommit()方法中推送消息到RabbitMQ
    [register TransactionSynchronization,send message in method afterCommit()]
    business transaction code block ...
}

In the above pseudo code, the two steps of saving the transaction message and registering the transaction synchronizer can be inserted anywhere in the transaction method, that is, it has nothing to do with the execution order.

Compensation for transaction messages

Although the author suggested that the downstream service should take care of its own service consumption abnormally, sometimes the upstream needs to push the corresponding message again due to helplessness. This is a special scenario. There is another scenario to consider: after the transaction is committed, the aftercommit () method that triggers the transaction synchronizer transactionsynchronization fails. This is a low probability scenario, but it will certainly occur in production. A typical reason is that after the transaction is committed, the transaction synchronization #aftercommit () method can be triggered to push the service instance, and the service instance will be restarted. As shown in the figure below:

In order to deal with the problem of compensation push uniformly, the finite state is used to judge whether the message has been pushed successfully:

Another very special case is that the rabbitmq server itself fails, resulting in message push exceptions, In this case, retry (compensation push) is required. Experience has proved that repeated retry in a short time is meaningless, and the failed service will not recover instantaneously. Therefore, exponential backoff algorithm can be considered for retry, and the maximum number of retries should be limited.

The index value, interval value and maximum number of retries need to be set according to the actual situation, otherwise it is prone to problems such as too large message delay or too frequent retries.

Scheme implementation

Introducing core dependencies:

<properties>
    <spring.boot.version>2.2.4.RELEASE</spring.boot.version>
    <redisson.version>3.12.1</redisson.version>
    <MysqL.connector.version>5.1.48</MysqL.connector.version>
</properties>
<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-dependencies</artifactId>
            <version>${spring.boot.version}</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>
<dependencies>
    <dependency>
        <groupId>MysqL</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <version>${MysqL.connector.version}</version>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-jdbc</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-aop</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    <dependency>
        <groupId>org.redisson</groupId>
        <artifactId>redisson</artifactId>
        <version>${redisson.version}</version>
    </dependency>
</dependencies>

Spring boot starter JDBC, MySQL connector Java and spring boot starter AOP are related to MySQL transactions, while spring boot starter AMQP is the encapsulation of rabbitmq client. Redisson mainly uses its distributed lock to compensate for the lock execution of scheduled tasks (to prevent multiple service nodes from executing compensation push concurrently).

Table design

The transaction message module mainly involves two tables. Taking MySQL as an example, the table DDL is as follows:

CREATE TABLE `t_transactional_message`
(
    id                  BIGINT UNSIGNED AUTO_INCREMENT PRIMARY KEY,create_time         DATETIME        NOT NULL DEFAULT CURRENT_TIMESTAMP,edit_time           DATETIME        NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,creator             VARCHAR(20)     NOT NULL DEFAULT 'admin',editor              VARCHAR(20)     NOT NULL DEFAULT 'admin',deleted             tinyint         NOT NULL DEFAULT 0,current_retry_times tinyint         NOT NULL DEFAULT 0 COMMENT '当前重试次数',max_retry_times     tinyint         NOT NULL DEFAULT 5 COMMENT '最大重试次数',queue_name          VARCHAR(255)    NOT NULL COMMENT '队列名',exchange_name       VARCHAR(255)    NOT NULL COMMENT '交换器名',exchange_type       VARCHAR(8)      NOT NULL COMMENT '交换类型',routing_key         VARCHAR(255) COMMENT '路由键',business_module     VARCHAR(32)     NOT NULL COMMENT '业务模块',business_key        VARCHAR(255)    NOT NULL COMMENT '业务键',next_schedule_time  DATETIME        NOT NULL COMMENT '下一次调度时间',message_status      tinyint         NOT NULL DEFAULT 0 COMMENT '消息状态',init_backoff        BIGINT UNSIGNED NOT NULL DEFAULT 10 COMMENT '退避初始化值,单位为秒',backoff_factor      tinyint         NOT NULL DEFAULT 2 COMMENT '退避因子(也就是指数)',INDEX idx_queue_name (queue_name),INDEX idx_create_time (create_time),INDEX idx_next_schedule_time (next_schedule_time),INDEX idx_business_key (business_key)
) COMMENT '事务消息表';

CREATE TABLE `t_transactional_message_content`
(
    id         BIGINT UNSIGNED AUTO_INCREMENT PRIMARY KEY,message_id BIGINT UNSIGNED NOT NULL COMMENT '事务消息记录ID',content    TEXT COMMENT '消息内容'
) COMMENT '事务消息内容表';

Because this module may be extended to a background management module, the message management and status related fields and large message contents should be stored in two tables respectively, In order to avoid the problem of high IO utilization of MySQL service when querying message records in large quantities (this is a reasonable scheme obtained after discussion with the DBA team of the previous company). Two business fields business_module and business_key are reserved to identify business modules and business keys (generally unique identification numbers, such as order numbers).

Generally, if the service declares the binding relationship between the queue and the switch in advance through configuration, In fact, when sending rabbitmq messages, it only depends on the two fields of exchangename and routingkey (the header type switch is special and rarely used, which is not considered here for the time being). Considering that the service may miss the declaration operation, the first binding declaration will be made based on the queue and the relevant information will be cached when sending messages (the queue exchange binding declaration in rabbitmq will not throw an exception as long as the parameters of the binding relationship are consistent each time).

Scheme code design

The following scheme design description temporarily ignores the API design of the message transaction management background, which can be supplemented later.

Define the model entity classes transactionalmessage and transactionalmessagecontent:

@Data
public class TransactionalMessage {

    private Long id;
    private LocalDateTime createTime;
    private LocalDateTime editTime;
    private String creator;
    private String editor;
    private Integer deleted;
    private Integer currentRetryTimes;
    private Integer maxRetryTimes;
    private String queueName;
    private String exchangeName;
    private String exchangeType;
    private String routingKey;
    private String businessModule;
    private String businessKey;
    private LocalDateTime nextScheduleTime;
    private Integer messageStatus;
    private Long initBackoff;
    private Integer backoffFactor;
}

@Data
public class TransactionalMessageContent {

    private Long id;
    private Long messageId;
    private String content;
}

Then define the Dao interface (the detailed code of the implementation is not expanded here for the time being, and MySQL is used for storage. If you want to replace it with other types of databases, you only need to use different implementations):

public interface TransactionalMessageDao {

    void insertSelective(TransactionalMessage record);

    void updateStatusSelective(TransactionalMessage record);

    List<TransactionalMessage> queryPendingCompensationRecords(LocalDateTime minScheduleTime,LocalDateTime maxScheduleTime,int limit);
}

public interface TransactionalMessageContentDao {

    void insert(TransactionalMessageContent record);

    List<TransactionalMessageContent> queryByMessageIds(String messageIds);
}

Next, define the transaction message service interface transactionalmessageservice:

// 对外提供的服务类接口
public interface TransactionalMessageService {

    void sendTransactionalMessage(Destination destination,TxMessage message);
}


@Getter
@requiredArgsConstructor
public enum ExchangeType {

    FANOUT("fanout"),DIRECT("direct"),TOPIC("topic"),DEFAULT(""),;

    private final String type;
}

// 发送消息的目的地
public interface Destination {

    ExchangeType exchangeType();

    String queueName();

    String exchangeName();

    String routingKey();
}

@Builder
public class DefaultDestination implements Destination {

    private ExchangeType exchangeType;
    private String queueName;
    private String exchangeName;
    private String routingKey;

    @Override
    public ExchangeType exchangeType() {
        return exchangeType;
    }

    @Override
    public String queueName() {
        return queueName;
    }

    @Override
    public String exchangeName() {
        return exchangeName;
    }

    @Override
    public String routingKey() {
        return routingKey;
    }
}

// 事务消息
public interface TxMessage {

    String businessModule();

    String businessKey();

    String content();
}

@Builder
public class DefaultTxMessage implements TxMessage {

    private String businessModule;
    private String businessKey;
    private String content;

    @Override
    public String businessModule() {
        return businessModule;
    }

    @Override
    public String businessKey() {
        return businessKey;
    }

    @Override
    public String content() {
        return content;
    }
}

// 消息状态
@requiredArgsConstructor
public enum TxMessageStatus {

    /**
     * 成功
     */
    SUCCESS(1),/**
     * 待处理
     */
    PENDING(0),/**
     * 处理失败
     */
    FAIL(-1),;

    private final Integer status;
}

The implementation class of transactionalmessageservice is the core function implementation of transaction message. The code is as follows:

@Slf4j
@Service
@requiredArgsConstructor
public class RabbitTransactionalMessageService implements TransactionalMessageService {

    private final AmqpAdmin amqpAdmin;
    private final TransactionalMessageManagementService managementService;

    private static final ConcurrentMap<String,Boolean> QUEUE_ALREADY_DECLARE = new ConcurrentHashMap<>();

    @Override
    public void sendTransactionalMessage(Destination destination,TxMessage message) {
        String queueName = destination.queueName();
        String exchangeName = destination.exchangeName();
        String routingKey = destination.routingKey();
        ExchangeType exchangeType = destination.exchangeType();
        // 原子性的预声明
        QUEUE_ALREADY_DECLARE.computeIfAbsent(queueName,k -> {
            Queue queue = new Queue(queueName);
            amqpAdmin.declareQueue(queue);
            Exchange exchange = new CustomExchange(exchangeName,exchangeType.getType());
            amqpAdmin.declareExchange(exchange);
            Binding binding = BindingBuilder.bind(queue).to(exchange).with(routingKey).noargs();
            amqpAdmin.declareBinding(binding);
            return true;
        });
        TransactionalMessage record = new TransactionalMessage();
        record.setQueueName(queueName);
        record.setExchangeName(exchangeName);
        record.setExchangeType(exchangeType.getType());
        record.setRoutingKey(routingKey);
        record.setBusinessModule(message.businessModule());
        record.setBusinessKey(message.businessKey());
        String content = message.content();
        // 保存事务消息记录
        managementService.saveTransactionalMessageRecord(record,content);
        // 注册事务同步器
        TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
            @Override
            public void afterCommit() {
                managementService.sendMessageSync(record,content);
            }
        });
    }
}

The management of message record status and content persistence is unified in transactionalmessagemanagementservice:

@Slf4j
@requiredArgsConstructor
@Service
public class TransactionalMessageManagementService {

    private final TransactionalMessageDao messageDao;
    private final TransactionalMessageContentDao contentDao;
    private final RabbitTemplate rabbitTemplate;

    private static final LocalDateTime END = LocalDateTime.of(2999,1,0);
    private static final long DEFAULT_INIT_BACKOFF = 10L;
    private static final int DEFAULT_BACKOFF_FACTOR = 2;
    private static final int DEFAULT_MAX_RETRY_TIMES = 5;
    private static final int LIMIT = 100;

    public void saveTransactionalMessageRecord(TransactionalMessage record,String content) {
        record.setMessageStatus(TxMessageStatus.PENDING.getStatus());
        record.setNextScheduleTime(calculateNextScheduleTime(LocalDateTime.Now(),DEFAULT_INIT_BACKOFF,DEFAULT_BACKOFF_FACTOR,0));
        record.setCurrentRetryTimes(0);
        record.setInitBackoff(DEFAULT_INIT_BACKOFF);
        record.setBackoffFactor(DEFAULT_BACKOFF_FACTOR);
        record.setMaxRetryTimes(DEFAULT_MAX_RETRY_TIMES);
        messageDao.insertSelective(record);
        TransactionalMessageContent messageContent = new TransactionalMessageContent();
        messageContent.setContent(content);
        messageContent.setMessageId(record.getId());
        contentDao.insert(messageContent);
    }

    public void sendMessageSync(TransactionalMessage record,String content) {
        try {
            rabbitTemplate.convertAndSend(record.getExchangeName(),record.getRoutingKey(),content);
            if (log.isDebugEnabled()) {
                log.debug("发送消息成功,目标队列:{},消息内容:{}",record.getQueueName(),content);
            }
            // 标记成功
            markSuccess(record);
        } catch (Exception e) {
            // 标记失败
            markFail(record,e);
        }
    }

    private void markSuccess(TransactionalMessage record) {
        // 标记下一次执行时间为最大值
        record.setNextScheduleTime(END);
        record.setCurrentRetryTimes(record.getCurrentRetryTimes().compareTo(record.getMaxRetryTimes()) >= 0 ?
                record.getMaxRetryTimes() : record.getCurrentRetryTimes() + 1);
        record.setMessageStatus(TxMessageStatus.SUCCESS.getStatus());
        record.setEditTime(LocalDateTime.Now());
        messageDao.updateStatusSelective(record);
    }

    private void markFail(TransactionalMessage record,Exception e) {
        log.error("发送消息失败,目标队列:{}",e);
        record.setCurrentRetryTimes(record.getCurrentRetryTimes().compareTo(record.getMaxRetryTimes()) >= 0 ?
                record.getMaxRetryTimes() : record.getCurrentRetryTimes() + 1);
        // 计算下一次的执行时间
        LocalDateTime nextScheduleTime = calculateNextScheduleTime(
                record.getNextScheduleTime(),record.getInitBackoff(),record.getBackoffFactor(),record.getCurrentRetryTimes()
        );
        record.setNextScheduleTime(nextScheduleTime);
        record.setMessageStatus(TxMessageStatus.FAIL.getStatus());
        record.setEditTime(LocalDateTime.Now());
        messageDao.updateStatusSelective(record);
    }

    /**
     * 计算下一次执行时间
     *
     * @param base          基础时间
     * @param initBackoff   退避基准值
     * @param backoffFactor 退避指数
     * @param round         轮数
     * @return LocalDateTime
     */
    private LocalDateTime calculateNextScheduleTime(LocalDateTime base,long initBackoff,long backoffFactor,long round) {
        double delta = initBackoff * Math.pow(backoffFactor,round);
        return base.plusSeconds((long) delta);
    }

    /**
     * 推送补偿 - 里面的参数应该根据实际场景定制
     */
    public void processPendingCompensationRecords() {
        // 时间的右值为当前时间减去退避初始值,这里预防把刚保存的消息也推送了
        LocalDateTime max = LocalDateTime.Now().plusSeconds(-DEFAULT_INIT_BACKOFF);
        // 时间的左值为右值减去1小时
        LocalDateTime min = max.plusHours(-1);
        Map<Long,TransactionalMessage> collect = messageDao.queryPendingCompensationRecords(min,max,LIMIT)
                .stream()
                .collect(Collectors.toMap(TransactionalMessage::getId,x -> x));
        if (!collect.isEmpty()) {
            StringJoiner joiner = new StringJoiner(",","(",")");
            collect.keySet().forEach(x -> joiner.add(x.toString()));
            contentDao.queryByMessageIds(joiner.toString())
                    .forEach(item -> {
                        TransactionalMessage message = collect.get(item.getMessageId());
                        sendMessageSync(message,item.getContent());
                    });
        }
    }
}

There is one thing to be optimized here: the method of updating transaction message record status can be optimized as batch update. When the limit is large, the efficiency of batch update will be higher.

Finally, the configuration class of scheduled tasks:

@Slf4j
@requiredArgsConstructor
@Configuration
@EnableScheduling
public class ScheduleJobAutoConfiguration {

    private final TransactionalMessageManagementService managementService;

    /**
     * 这里用的是本地的Redis,实际上要做成配置
     */
    private final RedissonClient redisson = Redisson.create();

    @Scheduled(fixedDelay = 10000)
    public void transactionalMessageCompensationTask() throws Exception {
        RLock lock = redisson.getLock("transactionalMessageCompensationTask");
        // 等待时间5秒,预期300秒执行完毕,这两个值需要按照实际场景定制
        boolean tryLock = lock.tryLock(5,300,TimeUnit.SECONDS);
        if (tryLock) {
            try {
                long start = System.currentTimeMillis();
                log.info("开始执行事务消息推送补偿定时任务...");
                managementService.processPendingCompensationRecords();
                long end = System.currentTimeMillis();
                long delta = end - start;
                // 以防锁过早释放
                if (delta < 5000) {
                    Thread.sleep(5000 - delta);
                }
                log.info("执行事务消息推送补偿定时任务完毕,耗时:{} ms...",end - start);
            } finally {
                lock.unlock();
            }
        }
    }
}

After the basic code is written, the structure of the whole project is as follows:

Finally, add two test classes:

@requiredArgsConstructor
@Component
public class MockBusinessRunner implements CommandLineRunner {

    private final MockBusinessService mockBusinessService;

    @Override
    public void run(String... args) throws Exception {
        mockBusinessService.saveOrder();
    }
}

@Slf4j
@requiredArgsConstructor
@Service
public class MockBusinessService {

    private final JdbcTemplate jdbcTemplate;
    private final TransactionalMessageService transactionalMessageService;
    private final ObjectMapper objectMapper;

    @Transactional(rollbackFor = Exception.class)
    public void saveOrder() throws Exception {
        String orderId = UUID.randomUUID().toString();
        BigDecimal amount = BigDecimal.valueOf(100L);
        Map<String,Object> message = new HashMap<>();
        message.put("orderId",orderId);
        message.put("amount",amount);
        jdbcTemplate.update("INSERT INTO t_order(order_id,amount) VALUES (?,?)",p -> {
            p.setString(1,orderId);
            p.setBigDecimal(2,amount);
        });
        String content = objectMapper.writeValueAsString(message);
        transactionalMessageService.sendTransactionalMessage(
                DefaultDestination.builder()
                        .exchangeName("tm.test.exchange")
                        .queueName("tm.test.queue")
                        .routingKey("tm.test.key")
                        .exchangeType(ExchangeType.DIRECT)
                        .build(),DefaultTxMessage.builder()
                        .businessKey(orderId)
                        .businessModule("SAVE_ORDER")
                        .content(content)
                        .build()
        );
        log.info("保存订单:{}成功...",orderId);
    }
}

The results of a test are as follows:

2020-02-05 21:10:13.287  INFO 49556 --- [           main] club.throwable.cm.MockBusinessService    : 保存订单:07a75323-460b-42cb-aa63-1a0a45ce19bf成功...

The simulated order data is saved successfully, and the rabbitmq message is normally sent to the rabbitmq server after the transaction is successfully committed, as shown in the rabbitmq console data.

Summary

The design of the transaction message module is only to make the function of asynchronous message push more complete. In fact, a reasonable asynchronous message interaction system will provide a synchronous query interface, which is caused by the fact that asynchronous messages have no callback or response. In general, The throughput of a system is positively correlated with the proportion of asynchronous processing (refer to Amdahl's law for this point). Therefore, in the actual system architecture design, asynchronous interaction should be used as much as possible to improve the system throughput and reduce the unnecessary waiting caused by synchronous blocking. The transaction message module can be extended to a background management, and can even cooperate with micrometer, Prometheus and grafana systems for real-time data monitoring.

This article demo project Repository: Rabbit transactional message

The demo can be started normally only after mysql, redis and rabbitmq are installed locally. A new database named local must be created locally.

Personal blog

Throwable's Blog

(at the end of this article, c-5-d e-a-20200202, the epidemic is serious. We should start working at home soon. We should go out less and read more books.)

The official account of Technology (Throwable Digest), which is not regularly pushed to the original technical article (never copied or copied):

Entertainment official account ("sand sculpture"), select interesting sand sculptures, videos and videos, push them to relieve life and work stress.

The content of this article comes from the network collection of netizens. It is used as a learning reference. The copyright belongs to the original author.
THE END
分享
二维码
< <上一篇
下一篇>>