天天看點

分布式事務-事務消息模型/消息最終一緻性1:事務消息原理2:代碼開發

文章目錄

  • 1:事務消息原理
  • 2:代碼開發
    • 1:啟動rocketmq
    • 2:producer代碼開發
      • 1:pom
      • 2:建立消息發送者-TransactionProducer
      • 3:建立執行本地事務和事務狀态回查-OrderTransactionListener
      • 4:代碼分析
    • 3:consumer
      • 1:建立消息監聽者
      • 2:建立消息監聽業務
      • 3:死信隊列處理

1:事務消息原理

分布式事務之可靠消息最終一緻性通常使用rocketmq的事務消息機制來實作

Half Message,半消息
暫時不能被 Consumer消費的消息。Producer已經把消息發送到 Broker端,但是此消息的狀态被标記為不能投遞,處于這種狀态下的消息稱為半消息。事實上,該狀态下的消息會被放在一個叫做 RMQ_SYS_TRANS_HALF_TOPIC的主題下。
當 Producer端對它二次确認後,也就是 Commit之後,Consumer端才可以消費到;那麼如果是Rollback,該消息則會被删除,永遠不會被消費到。
事務狀态回查
我們想,可能會因為網絡原因、應用問題等,導緻Producer端一直沒有對這個半消息進行确認,那麼這時候 Broker伺服器會定時掃描這些半消息,主動找Producer端查詢該消息的狀态。
           
分布式事務-事務消息模型/消息最終一緻性1:事務消息原理2:代碼開發

其大緻流程如下

1、Producer 發送事務消息

Producer (MQ發送方)發送事務消息至MQ Server,MQ Server将消息狀态标記為Prepared(預備狀态),注意此時這條消息消費者(MQ訂閱方)是無法消費到的。

本例中,Producer 發送 ”更改訂單狀态“ 到MQ Server。

2、MQ Server回應消息發送成功

MQ Server接收到Producer 發送給的消息則回應發送成功表示MQ已接收到消息。

3、Producer 執行本地事務

Producer 端執行業務代碼邏輯,通過本地資料庫事務控制。

本例中,Producer 執行賬戶餘額扣除。

4、消息投遞

若Producer 本地事務執行成功則自動向MQServer發送commit消息,MQ Server接收到commit消息後将”更改訂單狀态“ 狀态标記為可消費,此時MQ訂閱方(積分服務)即正常消費消息;

若Producer 本地事務執行失敗則自動向MQServer發送rollback消息,MQ Server接收到rollback消息後 将删除”更改訂單狀态“消息 。

MQ訂閱方(積分服務)消費消息,消費成功則向MQ回應ack,否則将重複接收消息。這裡ack預設自動回應,即程式執行正常則自動回應ack。

5、事務回查

如果執行Producer端本地事務過程中,執行端挂掉,或者逾時,MQ Server将會不停的詢問同組的其他 Producer來擷取事務執行狀态,這個過程叫事務回查。MQ Server會根據事務回查結果來決定是否投遞消息。

6.在回查機制中,如果Producer經過檢查發現自己本地業務完成,那麼想MQ Server發送commit标記,MQ Server收到後将此消息更改為可消費。如果接到Producer的rollback辨別,則将此消息删除

2:代碼開發

1:啟動rocketmq

1:啟動rocketmq的nameserver

rocketmq-al1-4.5.0-bin-release\rocketmq-al1-4.5.0-bin-release\bin>start mqnamesrv.cmd

2:啟動broker

start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true

2:producer代碼開發

1:pom

!--rocket mq-->
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.0.2</version>
        </dependency>
           

2:建立消息發送者-TransactionProducer

通過 RocketMQ發送消息,需先建立一個消息發送者。值得注意的是,如果發送事務消息,在這裡我們的建立的執行個體必須是 TransactionMQProducer。

package com.mashibing.producer.config;

import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.client.producer.TransactionSendResult;
import org.apache.rocketmq.common.message.Message;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

@Component
public class TransactionProducer {
    
    private String producerGroup = "order_trans_group";

    // 事務消息
    private TransactionMQProducer producer;

    //用于執行本地事務和事務狀态回查的監聽器
    @Autowired
    OrderTransactionListener orderTransactionListener;
    //執行任務的線程池
    ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 60,
            TimeUnit.SECONDS, new ArrayBlockingQueue<>(50));
            
    @PostConstruct
    public void init(){
        producer = new TransactionMQProducer(producerGroup);
        producer.setNamesrvAddr("127.0.0.1:9876");
        producer.setSendMsgTimeout(Integer.MAX_VALUE);
        producer.setExecutorService(executor);
        producer.setTransactionListener(orderTransactionListener);
        this.start();
    }
    private void start(){
        try {
            this.producer.start();
        } catch (MQClientException e) {
            e.printStackTrace();
        }
    }
    //事務消息發送 
    public TransactionSendResult send(String data, String topic) throws MQClientException {
        Message message = new Message(topic,data.getBytes());
        return this.producer.sendMessageInTransaction(message, null);
    }
}
           

3:建立執行本地事務和事務狀态回查-OrderTransactionListener

package com.mashibing.producer.config;

import com.alibaba.fastjson.JSONObject;
import com.mashibing.producer.dao.TransactionLogDao;
import com.mashibing.producer.entity.OrderBase;
import com.mashibing.producer.service.OrderService;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.concurrent.TimeUnit;

@Component
@Slf4j
public class OrderTransactionListener implements TransactionListener {

    @Autowired
    OrderService orderService;

    @Autowired
    TransactionLogDao transactionLogDao;

    /**
     * 發送half msg 傳回send ok後調用的方法
     * @param message
     * @param o
     * @return
     */
    @Override
    public LocalTransactionState executeLocalTransaction(Message message, Object o) {
        log.info("開始執行本地事務....");
        LocalTransactionState state;
        try{
            String body = new String(message.getBody());
            OrderBase order = JSONObject.parseObject(body, OrderBase.class);
            orderService.createOrder(order,message.getTransactionId());
            // 傳回commit後,消息能被消費者消費
            state = LocalTransactionState.COMMIT_MESSAGE;
//            state = LocalTransactionState.ROLLBACK_MESSAGE;
//            state = LocalTransactionState.UNKNOW;
//            TimeUnit.MINUTES.sleep(1);
            log.info("本地事務已送出。{}",message.getTransactionId());


        }catch (Exception e){
            log.info("執行本地事務失敗。{}",e);
            state = LocalTransactionState.ROLLBACK_MESSAGE;
        }
        return state;
    }

    /**
     * 回查 走的方法
     * @param messageExt
     * @return
     */
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {

        // 回查多次失敗 人工補償。提醒人。發郵件的。
        log.info("開始回查本地事務狀态。{}",messageExt.getTransactionId());
        LocalTransactionState state;
        String transactionId = messageExt.getTransactionId();
        if (transactionLogDao.selectCount(transactionId)>0){
            state = LocalTransactionState.COMMIT_MESSAGE;
        }else {
            state = LocalTransactionState.UNKNOW;
        }
        log.info("結束本地事務狀态查詢:{}",state);
        return state;
    }
}
           

4:代碼分析

在通過 producer.sendMessageInTransaction發送事務消息後,如果消息發送成功,就會調用到這裡的executeLocalTransaction方法,來執行本地事務。在這裡,它會完成訂單資料和事務日志的插入。

該方法傳回值 LocalTransactionState 代表本地事務狀态,它是一個枚舉類。

public enum LocalTransactionState {

//送出事務消息,消費者可以看到此消息

COMMIT_MESSAGE,

//復原事務消息,消費者不會看到此消息

ROLLBACK_MESSAGE,

//事務未知狀态,需要調用事務狀态回查,确定此消息是送出還是復原

UNKNOW;

}

那麼, checkLocalTransaction 方法就是用于事務狀态查詢。在這裡,我們通過事務ID查詢transaction_log這張表,如果可以查詢到結果,就送出事務消息;如果沒有查詢到,就傳回未知狀态。

注意,這裡還涉及到另外一個問題。如果是傳回未知狀态,RocketMQ Broker伺服器會以1分鐘的間隔時間不斷回查,直至達到事務回查最大檢測數,如果超過這個數字還未查詢到事務狀态,則復原此消息。

當然,事務回查的頻率和最大次數,我們都可以配置。在 Broker 端,可以通過這樣來配置它:

brokerConfig.setTransactionCheckInterval(10000); //回查頻率10秒一次

brokerConfig.setTransactionCheckMax(3); //最大檢測次數為3

3:consumer

1:建立消息監聽者

package com.mashibing.producer.compent;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.exception.MQClientException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;

@Component
public class Consumer {

    String consumerGroup = "consumer-group";
    DefaultMQPushConsumer consumer;

    @Autowired
    OrderListener orderListener;
    
    @PostConstruct
    public void init() throws MQClientException {
        consumer = new DefaultMQPushConsumer(consumerGroup);
        consumer.setNamesrvAddr("127.0.0.1:9876");
        consumer.subscribe("order","*");
        consumer.registerMessageListener(orderListener);

        // 2次失敗 就進死信隊列
        consumer.setMaxReconsumeTimes(2);
        consumer.start();
    }
}
           

2:建立消息監聽業務

package com.mashibing.producer.compent;

import com.alibaba.fastjson.JSONObject;
import com.mashibing.producer.entity.OrderBase;
import com.mashibing.producer.service.PointsService;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.List;

@Component
@Slf4j
public class OrderListener implements MessageListenerConcurrently {

    @Autowired
    PointsService pointsService;

    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext context) {
        log.info("消費者線程監聽到消息。");
        try{

            System.out.println(1/0);
            for (MessageExt message:list) {
                log.info("開始處理訂單資料,準備增加積分....");
                OrderBase order  = JSONObject.parseObject(message.getBody(), OrderBase.class);
                pointsService.increasePoints(order);
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }catch (Exception e){
            log.error("處理消費者資料發生異常。{}",e);
            return ConsumeConcurrentlyStatus.RECONSUME_LATER;
        }
    }
}
           

3:死信隊列處理

因為rocketmq消費者預設消費次數為16次,如果到達16次後還沒有被消費,就會進入死信隊列;這時候我們可以再寫一個監聽者,對死信隊列進行監聽,如果死信隊列中有了數值,那我們可以對此情況進行發郵件等消息通知,然後人工幹預;

死信隊列名:%DLQ%consumer-group

分布式事務-事務消息模型/消息最終一緻性1:事務消息原理2:代碼開發