文章目錄
- 1:事務消息原理
- 2:代碼開發
-
- 1:啟動rocketmq
- 2:producer代碼開發
-
- 1:pom
- 2:建立消息發送者-TransactionProducer
- 3:建立執行本地事務和事務狀态回查-OrderTransactionListener
- 4:代碼分析
- 3:consumer
-
- 1:建立消息監聽者
- 2:建立消息監聽業務
- 3:死信隊列處理
1:事務消息原理
分布式事務之可靠消息最終一緻性通常使用rocketmq的事務消息機制來實作
Half Message,半消息
暫時不能被 Consumer消費的消息。Producer已經把消息發送到 Broker端,但是此消息的狀态被标記為不能投遞,處于這種狀态下的消息稱為半消息。事實上,該狀态下的消息會被放在一個叫做 RMQ_SYS_TRANS_HALF_TOPIC的主題下。
當 Producer端對它二次确認後,也就是 Commit之後,Consumer端才可以消費到;那麼如果是Rollback,該消息則會被删除,永遠不會被消費到。
事務狀态回查
我們想,可能會因為網絡原因、應用問題等,導緻Producer端一直沒有對這個半消息進行确認,那麼這時候 Broker伺服器會定時掃描這些半消息,主動找Producer端查詢該消息的狀态。

其大緻流程如下
1、Producer 發送事務消息
Producer (MQ發送方)發送事務消息至MQ Server,MQ Server将消息狀态标記為Prepared(預備狀态),注意此時這條消息消費者(MQ訂閱方)是無法消費到的。
本例中,Producer 發送 ”更改訂單狀态“ 到MQ Server。
2、MQ Server回應消息發送成功
MQ Server接收到Producer 發送給的消息則回應發送成功表示MQ已接收到消息。
3、Producer 執行本地事務
Producer 端執行業務代碼邏輯,通過本地資料庫事務控制。
本例中,Producer 執行賬戶餘額扣除。
4、消息投遞
若Producer 本地事務執行成功則自動向MQServer發送commit消息,MQ Server接收到commit消息後将”更改訂單狀态“ 狀态标記為可消費,此時MQ訂閱方(積分服務)即正常消費消息;
若Producer 本地事務執行失敗則自動向MQServer發送rollback消息,MQ Server接收到rollback消息後 将删除”更改訂單狀态“消息 。
MQ訂閱方(積分服務)消費消息,消費成功則向MQ回應ack,否則将重複接收消息。這裡ack預設自動回應,即程式執行正常則自動回應ack。
5、事務回查
如果執行Producer端本地事務過程中,執行端挂掉,或者逾時,MQ Server将會不停的詢問同組的其他 Producer來擷取事務執行狀态,這個過程叫事務回查。MQ Server會根據事務回查結果來決定是否投遞消息。
6.在回查機制中,如果Producer經過檢查發現自己本地業務完成,那麼想MQ Server發送commit标記,MQ Server收到後将此消息更改為可消費。如果接到Producer的rollback辨別,則将此消息删除
2:代碼開發
1:啟動rocketmq
1:啟動rocketmq的nameserver
rocketmq-al1-4.5.0-bin-release\rocketmq-al1-4.5.0-bin-release\bin>start mqnamesrv.cmd
2:啟動broker
start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true
2:producer代碼開發
1:pom
!--rocket mq-->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.2</version>
</dependency>
2:建立消息發送者-TransactionProducer
通過 RocketMQ發送消息,需先建立一個消息發送者。值得注意的是,如果發送事務消息,在這裡我們的建立的執行個體必須是 TransactionMQProducer。
package com.mashibing.producer.config;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.client.producer.TransactionSendResult;
import org.apache.rocketmq.common.message.Message;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@Component
public class TransactionProducer {
private String producerGroup = "order_trans_group";
// 事務消息
private TransactionMQProducer producer;
//用于執行本地事務和事務狀态回查的監聽器
@Autowired
OrderTransactionListener orderTransactionListener;
//執行任務的線程池
ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 60,
TimeUnit.SECONDS, new ArrayBlockingQueue<>(50));
@PostConstruct
public void init(){
producer = new TransactionMQProducer(producerGroup);
producer.setNamesrvAddr("127.0.0.1:9876");
producer.setSendMsgTimeout(Integer.MAX_VALUE);
producer.setExecutorService(executor);
producer.setTransactionListener(orderTransactionListener);
this.start();
}
private void start(){
try {
this.producer.start();
} catch (MQClientException e) {
e.printStackTrace();
}
}
//事務消息發送
public TransactionSendResult send(String data, String topic) throws MQClientException {
Message message = new Message(topic,data.getBytes());
return this.producer.sendMessageInTransaction(message, null);
}
}
3:建立執行本地事務和事務狀态回查-OrderTransactionListener
package com.mashibing.producer.config;
import com.alibaba.fastjson.JSONObject;
import com.mashibing.producer.dao.TransactionLogDao;
import com.mashibing.producer.entity.OrderBase;
import com.mashibing.producer.service.OrderService;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.concurrent.TimeUnit;
@Component
@Slf4j
public class OrderTransactionListener implements TransactionListener {
@Autowired
OrderService orderService;
@Autowired
TransactionLogDao transactionLogDao;
/**
* 發送half msg 傳回send ok後調用的方法
* @param message
* @param o
* @return
*/
@Override
public LocalTransactionState executeLocalTransaction(Message message, Object o) {
log.info("開始執行本地事務....");
LocalTransactionState state;
try{
String body = new String(message.getBody());
OrderBase order = JSONObject.parseObject(body, OrderBase.class);
orderService.createOrder(order,message.getTransactionId());
// 傳回commit後,消息能被消費者消費
state = LocalTransactionState.COMMIT_MESSAGE;
// state = LocalTransactionState.ROLLBACK_MESSAGE;
// state = LocalTransactionState.UNKNOW;
// TimeUnit.MINUTES.sleep(1);
log.info("本地事務已送出。{}",message.getTransactionId());
}catch (Exception e){
log.info("執行本地事務失敗。{}",e);
state = LocalTransactionState.ROLLBACK_MESSAGE;
}
return state;
}
/**
* 回查 走的方法
* @param messageExt
* @return
*/
@Override
public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
// 回查多次失敗 人工補償。提醒人。發郵件的。
log.info("開始回查本地事務狀态。{}",messageExt.getTransactionId());
LocalTransactionState state;
String transactionId = messageExt.getTransactionId();
if (transactionLogDao.selectCount(transactionId)>0){
state = LocalTransactionState.COMMIT_MESSAGE;
}else {
state = LocalTransactionState.UNKNOW;
}
log.info("結束本地事務狀态查詢:{}",state);
return state;
}
}
4:代碼分析
在通過 producer.sendMessageInTransaction發送事務消息後,如果消息發送成功,就會調用到這裡的executeLocalTransaction方法,來執行本地事務。在這裡,它會完成訂單資料和事務日志的插入。
該方法傳回值 LocalTransactionState 代表本地事務狀态,它是一個枚舉類。
public enum LocalTransactionState {
//送出事務消息,消費者可以看到此消息
COMMIT_MESSAGE,
//復原事務消息,消費者不會看到此消息
ROLLBACK_MESSAGE,
//事務未知狀态,需要調用事務狀态回查,确定此消息是送出還是復原
UNKNOW;
}
那麼, checkLocalTransaction 方法就是用于事務狀态查詢。在這裡,我們通過事務ID查詢transaction_log這張表,如果可以查詢到結果,就送出事務消息;如果沒有查詢到,就傳回未知狀态。
注意,這裡還涉及到另外一個問題。如果是傳回未知狀态,RocketMQ Broker伺服器會以1分鐘的間隔時間不斷回查,直至達到事務回查最大檢測數,如果超過這個數字還未查詢到事務狀态,則復原此消息。
當然,事務回查的頻率和最大次數,我們都可以配置。在 Broker 端,可以通過這樣來配置它:
brokerConfig.setTransactionCheckInterval(10000); //回查頻率10秒一次
brokerConfig.setTransactionCheckMax(3); //最大檢測次數為3
3:consumer
1:建立消息監聽者
package com.mashibing.producer.compent;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.exception.MQClientException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
@Component
public class Consumer {
String consumerGroup = "consumer-group";
DefaultMQPushConsumer consumer;
@Autowired
OrderListener orderListener;
@PostConstruct
public void init() throws MQClientException {
consumer = new DefaultMQPushConsumer(consumerGroup);
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.subscribe("order","*");
consumer.registerMessageListener(orderListener);
// 2次失敗 就進死信隊列
consumer.setMaxReconsumeTimes(2);
consumer.start();
}
}
2:建立消息監聽業務
package com.mashibing.producer.compent;
import com.alibaba.fastjson.JSONObject;
import com.mashibing.producer.entity.OrderBase;
import com.mashibing.producer.service.PointsService;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.List;
@Component
@Slf4j
public class OrderListener implements MessageListenerConcurrently {
@Autowired
PointsService pointsService;
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext context) {
log.info("消費者線程監聽到消息。");
try{
System.out.println(1/0);
for (MessageExt message:list) {
log.info("開始處理訂單資料,準備增加積分....");
OrderBase order = JSONObject.parseObject(message.getBody(), OrderBase.class);
pointsService.increasePoints(order);
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}catch (Exception e){
log.error("處理消費者資料發生異常。{}",e);
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
}
3:死信隊列處理
因為rocketmq消費者預設消費次數為16次,如果到達16次後還沒有被消費,就會進入死信隊列;這時候我們可以再寫一個監聽者,對死信隊列進行監聽,如果死信隊列中有了數值,那我們可以對此情況進行發郵件等消息通知,然後人工幹預;
死信隊列名:%DLQ%consumer-group