天天看点

分布式事务-事务消息模型/消息最终一致性1:事务消息原理2:代码开发

文章目录

  • 1:事务消息原理
  • 2:代码开发
    • 1:启动rocketmq
    • 2:producer代码开发
      • 1:pom
      • 2:创建消息发送者-TransactionProducer
      • 3:创建执行本地事务和事务状态回查-OrderTransactionListener
      • 4:代码分析
    • 3:consumer
      • 1:创建消息监听者
      • 2:创建消息监听业务
      • 3:死信队列处理

1:事务消息原理

分布式事务之可靠消息最终一致性通常使用rocketmq的事务消息机制来实现

Half Message,半消息
暂时不能被 Consumer消费的消息。Producer已经把消息发送到 Broker端,但是此消息的状态被标记为不能投递,处于这种状态下的消息称为半消息。事实上,该状态下的消息会被放在一个叫做 RMQ_SYS_TRANS_HALF_TOPIC的主题下。
当 Producer端对它二次确认后,也就是 Commit之后,Consumer端才可以消费到;那么如果是Rollback,该消息则会被删除,永远不会被消费到。
事务状态回查
我们想,可能会因为网络原因、应用问题等,导致Producer端一直没有对这个半消息进行确认,那么这时候 Broker服务器会定时扫描这些半消息,主动找Producer端查询该消息的状态。
           
分布式事务-事务消息模型/消息最终一致性1:事务消息原理2:代码开发

其大致流程如下

1、Producer 发送事务消息

Producer (MQ发送方)发送事务消息至MQ Server,MQ Server将消息状态标记为Prepared(预备状态),注意此时这条消息消费者(MQ订阅方)是无法消费到的。

本例中,Producer 发送 ”更改订单状态“ 到MQ Server。

2、MQ Server回应消息发送成功

MQ Server接收到Producer 发送给的消息则回应发送成功表示MQ已接收到消息。

3、Producer 执行本地事务

Producer 端执行业务代码逻辑,通过本地数据库事务控制。

本例中,Producer 执行账户余额扣除。

4、消息投递

若Producer 本地事务执行成功则自动向MQServer发送commit消息,MQ Server接收到commit消息后将”更改订单状态“ 状态标记为可消费,此时MQ订阅方(积分服务)即正常消费消息;

若Producer 本地事务执行失败则自动向MQServer发送rollback消息,MQ Server接收到rollback消息后 将删除”更改订单状态“消息 。

MQ订阅方(积分服务)消费消息,消费成功则向MQ回应ack,否则将重复接收消息。这里ack默认自动回应,即程序执行正常则自动回应ack。

5、事务回查

如果执行Producer端本地事务过程中,执行端挂掉,或者超时,MQ Server将会不停的询问同组的其他 Producer来获取事务执行状态,这个过程叫事务回查。MQ Server会根据事务回查结果来决定是否投递消息。

6.在回查机制中,如果Producer经过检查发现自己本地业务完成,那么想MQ Server发送commit标记,MQ Server收到后将此消息更改为可消费。如果接到Producer的rollback标识,则将此消息删除

2:代码开发

1:启动rocketmq

1:启动rocketmq的nameserver

rocketmq-al1-4.5.0-bin-release\rocketmq-al1-4.5.0-bin-release\bin>start mqnamesrv.cmd

2:启动broker

start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true

2:producer代码开发

1:pom

!--rocket mq-->
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.0.2</version>
        </dependency>
           

2:创建消息发送者-TransactionProducer

通过 RocketMQ发送消息,需先创建一个消息发送者。值得注意的是,如果发送事务消息,在这里我们的创建的实例必须是 TransactionMQProducer。

package com.mashibing.producer.config;

import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.client.producer.TransactionSendResult;
import org.apache.rocketmq.common.message.Message;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

@Component
public class TransactionProducer {
    
    private String producerGroup = "order_trans_group";

    // 事务消息
    private TransactionMQProducer producer;

    //用于执行本地事务和事务状态回查的监听器
    @Autowired
    OrderTransactionListener orderTransactionListener;
    //执行任务的线程池
    ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 60,
            TimeUnit.SECONDS, new ArrayBlockingQueue<>(50));
            
    @PostConstruct
    public void init(){
        producer = new TransactionMQProducer(producerGroup);
        producer.setNamesrvAddr("127.0.0.1:9876");
        producer.setSendMsgTimeout(Integer.MAX_VALUE);
        producer.setExecutorService(executor);
        producer.setTransactionListener(orderTransactionListener);
        this.start();
    }
    private void start(){
        try {
            this.producer.start();
        } catch (MQClientException e) {
            e.printStackTrace();
        }
    }
    //事务消息发送 
    public TransactionSendResult send(String data, String topic) throws MQClientException {
        Message message = new Message(topic,data.getBytes());
        return this.producer.sendMessageInTransaction(message, null);
    }
}
           

3:创建执行本地事务和事务状态回查-OrderTransactionListener

package com.mashibing.producer.config;

import com.alibaba.fastjson.JSONObject;
import com.mashibing.producer.dao.TransactionLogDao;
import com.mashibing.producer.entity.OrderBase;
import com.mashibing.producer.service.OrderService;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.concurrent.TimeUnit;

@Component
@Slf4j
public class OrderTransactionListener implements TransactionListener {

    @Autowired
    OrderService orderService;

    @Autowired
    TransactionLogDao transactionLogDao;

    /**
     * 发送half msg 返回send ok后调用的方法
     * @param message
     * @param o
     * @return
     */
    @Override
    public LocalTransactionState executeLocalTransaction(Message message, Object o) {
        log.info("开始执行本地事务....");
        LocalTransactionState state;
        try{
            String body = new String(message.getBody());
            OrderBase order = JSONObject.parseObject(body, OrderBase.class);
            orderService.createOrder(order,message.getTransactionId());
            // 返回commit后,消息能被消费者消费
            state = LocalTransactionState.COMMIT_MESSAGE;
//            state = LocalTransactionState.ROLLBACK_MESSAGE;
//            state = LocalTransactionState.UNKNOW;
//            TimeUnit.MINUTES.sleep(1);
            log.info("本地事务已提交。{}",message.getTransactionId());


        }catch (Exception e){
            log.info("执行本地事务失败。{}",e);
            state = LocalTransactionState.ROLLBACK_MESSAGE;
        }
        return state;
    }

    /**
     * 回查 走的方法
     * @param messageExt
     * @return
     */
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {

        // 回查多次失败 人工补偿。提醒人。发邮件的。
        log.info("开始回查本地事务状态。{}",messageExt.getTransactionId());
        LocalTransactionState state;
        String transactionId = messageExt.getTransactionId();
        if (transactionLogDao.selectCount(transactionId)>0){
            state = LocalTransactionState.COMMIT_MESSAGE;
        }else {
            state = LocalTransactionState.UNKNOW;
        }
        log.info("结束本地事务状态查询:{}",state);
        return state;
    }
}
           

4:代码分析

在通过 producer.sendMessageInTransaction发送事务消息后,如果消息发送成功,就会调用到这里的executeLocalTransaction方法,来执行本地事务。在这里,它会完成订单数据和事务日志的插入。

该方法返回值 LocalTransactionState 代表本地事务状态,它是一个枚举类。

public enum LocalTransactionState {

//提交事务消息,消费者可以看到此消息

COMMIT_MESSAGE,

//回滚事务消息,消费者不会看到此消息

ROLLBACK_MESSAGE,

//事务未知状态,需要调用事务状态回查,确定此消息是提交还是回滚

UNKNOW;

}

那么, checkLocalTransaction 方法就是用于事务状态查询。在这里,我们通过事务ID查询transaction_log这张表,如果可以查询到结果,就提交事务消息;如果没有查询到,就返回未知状态。

注意,这里还涉及到另外一个问题。如果是返回未知状态,RocketMQ Broker服务器会以1分钟的间隔时间不断回查,直至达到事务回查最大检测数,如果超过这个数字还未查询到事务状态,则回滚此消息。

当然,事务回查的频率和最大次数,我们都可以配置。在 Broker 端,可以通过这样来配置它:

brokerConfig.setTransactionCheckInterval(10000); //回查频率10秒一次

brokerConfig.setTransactionCheckMax(3); //最大检测次数为3

3:consumer

1:创建消息监听者

package com.mashibing.producer.compent;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.exception.MQClientException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;

@Component
public class Consumer {

    String consumerGroup = "consumer-group";
    DefaultMQPushConsumer consumer;

    @Autowired
    OrderListener orderListener;
    
    @PostConstruct
    public void init() throws MQClientException {
        consumer = new DefaultMQPushConsumer(consumerGroup);
        consumer.setNamesrvAddr("127.0.0.1:9876");
        consumer.subscribe("order","*");
        consumer.registerMessageListener(orderListener);

        // 2次失败 就进死信队列
        consumer.setMaxReconsumeTimes(2);
        consumer.start();
    }
}
           

2:创建消息监听业务

package com.mashibing.producer.compent;

import com.alibaba.fastjson.JSONObject;
import com.mashibing.producer.entity.OrderBase;
import com.mashibing.producer.service.PointsService;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.List;

@Component
@Slf4j
public class OrderListener implements MessageListenerConcurrently {

    @Autowired
    PointsService pointsService;

    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext context) {
        log.info("消费者线程监听到消息。");
        try{

            System.out.println(1/0);
            for (MessageExt message:list) {
                log.info("开始处理订单数据,准备增加积分....");
                OrderBase order  = JSONObject.parseObject(message.getBody(), OrderBase.class);
                pointsService.increasePoints(order);
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }catch (Exception e){
            log.error("处理消费者数据发生异常。{}",e);
            return ConsumeConcurrentlyStatus.RECONSUME_LATER;
        }
    }
}
           

3:死信队列处理

因为rocketmq消费者默认消费次数为16次,如果到达16次后还没有被消费,就会进入死信队列;这时候我们可以再写一个监听者,对死信队列进行监听,如果死信队列中有了数值,那我们可以对此情况进行发邮件等消息通知,然后人工干预;

死信队列名:%DLQ%consumer-group

分布式事务-事务消息模型/消息最终一致性1:事务消息原理2:代码开发