在閱讀本文前,若您對RocketMQ技術感興趣,請加入 RocketMQ技術交流群
上節已經梳理了RocketMQ發送事務消息的流程(基于二階段送出),本節将繼續深入學習事務狀态消息回查,我們知道,第一次送出到消息伺服器時消息的主題被替換為RMQ_SYS_TRANS_HALF_TOPIC,本地事務執行完後如果傳回本地事務狀态為UN_KNOW時,第二次送出到伺服器時将不會做任何操作,也就是說此時消息還存在與RMQ_SYS_TRANS_HALF_TOPIC主題中,并不能被消息消費者消費,那這些消息最終如何被送出或復原呢?
原來RocketMQ使用TransactionalMessageCheckService線程定時去檢測
RMQ_SYS_TRANS_HALF_TOPIC主題中的消息,回查消息的事務狀态。TransactionalMessageCheckService的檢測頻率預設1分鐘,可通過在broker.conf檔案中設定transactionCheckInterval的值來改變預設值,機關為毫秒。
接下來将深入分析該線程的實作原理,進而解開事務消息回查機制。
TransactionalMessageCheckService#onWaitEnd
protected void onWaitEnd() {
long timeout = brokerController.getBrokerConfig().getTransactionTimeOut(); // @1
int checkMax = brokerController.getBrokerConfig().getTransactionCheckMax(); // @2
long begin = System.currentTimeMillis();
log.info("Begin to check prepare message, begin time:{}", begin);
this.brokerController.getTransactionalMessageService().check(timeout, checkMax, this.brokerController.getTransactionalMessageCheckListener()); // @3
log.info("End to check prepare message, consumed time:{}", System.currentTimeMillis() - begin);
}
代碼@1:從broker配置檔案中擷取transactionTimeOut參數值。
代碼@2:從broker配置檔案中擷取transactionCheckMax參數值,表示事務的最大檢測次數,如果超過檢測次數,消息會預設為丢棄,即復原消息。
接下來重點分析TransactionalMessageService#check的實作邏輯:
org.apache.rocketmq.broker.transaction.queue.TransactionalMessageServiceImpl
TransactionalMessageServiceImpl#check
String topic = MixAll.RMQ_SYS_TRANS_HALF_TOPIC;
Set<MessageQueue> msgQueues = transactionalMessageBridge.fetchMessageQueues(topic);
if (msgQueues == null || msgQueues.size() == 0) {
log.warn("The queue of topic is empty :" + topic);
return;
}
step1:根據主題名稱,擷取該主題下所有的消息隊列。
TransactionalMessageServiceImpl#check
for (MessageQueue messageQueue : msgQueues) {
// ...
}
Step2:循環周遊消息隊列,從單個消息消費隊列去擷取消息。
TransactionalMessageServiceImpl#check
long startTime = System.currentTimeMillis();
MessageQueue opQueue = getOpQueue(messageQueue);
long halfOffset = transactionalMessageBridge.fetchConsumeOffset(messageQueue);
long opOffset = transactionalMessageBridge.fetchConsumeOffset(opQueue);
log.info("Before check, the queue={} msgOffset={} opOffset={}", messageQueue, halfOffset, opOffset);
if (halfOffset < 0 || opOffset < 0) {
log.error("MessageQueue: {} illegal offset read: {}, op offset: {},skip this queue", messageQueue, halfOffset, opOffset);
continue;
}
Step3:擷取對應的操作隊列,其主題為:RMQ_SYS_TRANS_OP_HALF_TOPIC,然後擷取操作隊列的消費進度、待操作的消費隊列的消費進度,如果任意一小于0,忽略該消息隊列,繼續處理下一個隊列。
TransactionalMessageServiceImpl#check
List<Long> doneOpOffset = new ArrayList<>();
HashMap<Long, Long> removeMap = new HashMap<>();
PullResult pullResult = fillOpRemoveMap(removeMap, opQueue, opOffset, halfOffset, doneOpOffset);
if (null == pullResult) {
log.error("The queue={} check msgOffset={} with opOffset={} failed, pullResult is null",
messageQueue, halfOffset, opOffset);
continue;
}
Step4:調用fillOpRemoveMap主題填充removeMap、doneOpOffset資料結構,這裡主要的目的是避免重複調用事務回查接口,這裡說一下RMQ_SYS_TRANS_HALF_TOPIC、RMQ_SYS_TRANS_OP_HALF_TOPIC這兩個主題的作用。
RMQ_SYS_TRANS_HALF_TOPIC:prepare消息的主題,事務消息首先先進入到該主題。
RMQ_SYS_TRANS_OP_HALF_TOPIC:當消息伺服器收到事務消息的送出或復原請求後,會将消息存儲在該主題下。
TransactionalMessageServiceImpl#check
// single thread
int getMessageNullCount = 1;
long newOffset = halfOffset;
long i = halfOffset; // @1
while (true) {
if (System.currentTimeMillis() - startTime > MAX_PROCESS_TIME_LIMIT) { // @2
log.info("Queue={} process time reach max={}", messageQueue, MAX_PROCESS_TIME_LIMIT);
break;
}
if (removeMap.containsKey(i)) { // @3
log.info("Half offset {} has been committed/rolled back", i);
removeMap.remove(i);
} else {
GetResult getResult = getHalfMsg(messageQueue, i); // @4
MessageExt msgExt = getResult.getMsg();
if (msgExt == null) { // @5
if (getMessageNullCount++ > MAX_RETRY_COUNT_WHEN_HALF_NULL) {
break;
}
if (getResult.getPullResult().getPullStatus() == PullStatus.NO_NEW_MSG) {
log.info("No new msg, the miss offset={} in={}, continue check={}, pull result={}", i,
messageQueue, getMessageNullCount, getResult.getPullResult());
break;
} else {
log.info("Illegal offset, the miss offset={} in={}, continue check={}, pull result={}",
i, messageQueue, getMessageNullCount, getResult.getPullResult());
i = getResult.getPullResult().getNextBeginOffset();
newOffset = i;
continue;
}
}
if (needDiscard(msgExt, transactionCheckMax) || needSkip(msgExt)) { // @6
listener.resolveDiscardMsg(msgExt);
newOffset = i + 1;
i++;
continue;
}
if (msgExt.getStoreTimestamp() >= startTime) {
log.info("Fresh stored. the miss offset={}, check it later, store={}", i,
new Date(msgExt.getStoreTimestamp()));
break;
}
long valueOfCurrentMinusBorn = System.currentTimeMillis() - msgExt.getBornTimestamp(); // @7
long checkImmunityTime = transactionTimeout;
String checkImmunityTimeStr = msgExt.getUserProperty(MessageConst.PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS);
if (null != checkImmunityTimeStr) { // @8
checkImmunityTime = getImmunityTime(checkImmunityTimeStr, transactionTimeout);
if (valueOfCurrentMinusBorn < checkImmunityTime) {
if (checkPrepareQueueOffset(removeMap, doneOpOffset, msgExt, checkImmunityTime)) {
newOffset = i + 1;
i++;
continue;
}
}
} else { // @9
if ((0 <= valueOfCurrentMinusBorn) && (valueOfCurrentMinusBorn < checkImmunityTime)) {
log.info("New arrived, the miss offset={}, check it later checkImmunity={}, born={}", i,
checkImmunityTime, new Date(msgExt.getBornTimestamp()));
break;
}
}
List<MessageExt> opMsg = pullResult.getMsgFoundList();
boolean isNeedCheck = (opMsg == null && valueOfCurrentMinusBorn > checkImmunityTime)
|| (opMsg != null && (opMsg.get(opMsg.size() - 1).getBornTimestamp() - startTime > transactionTimeout))
|| (valueOfCurrentMinusBorn <= -1); // @10
if (isNeedCheck) {
if (!putBackHalfMsgQueue(msgExt, i)) { // @11
continue;
}
listener.resolveHalfMsg(msgExt);
} else {
pullResult = fillOpRemoveMap(removeMap, opQueue, pullResult.getNextBeginOffset(), halfOffset, doneOpOffset); // @12
log.info("The miss offset:{} in messageQueue:{} need to get more opMsg, result is:{}", i,
messageQueue, pullResult);
continue;
}
}
newOffset = i + 1;
i++;
}
if (newOffset != halfOffset) { // @13
transactionalMessageBridge.updateConsumeOffset(messageQueue, newOffset);
}
long newOpOffset = calculateOpOffset(doneOpOffset, opOffset);
if (newOpOffset != opOffset) { // @14
transactionalMessageBridge.updateConsumeOffset(opQueue, newOpOffset);
}
本段代碼比較長,卻是事務狀态回查的重點實作。
代碼@1:先解釋幾個局部變量的含義。
- getMessageNullCount :擷取空消息的次數
- newOffset :目前處理RMQ_SYS_TRANS_HALF_TOPIC#queueId的最新進度
- i:目前處理消息的隊列偏移量,其主題依然為RMQ_SYS_TRANS_HALF_TOPIC。
代碼@2:這段代碼應該不陌生,這是RocketMQ處理任務的一個通用處理邏輯,就是一個任務處理,可以限制每次最多處理的時間,RocketMQ為待檢測主題RMQ_SYS_TRANS_HALF_TOPIC的每個隊列,做事務狀态回查,一次最多不超過60S,目前該值不可配置。
代碼@3:如果removeMap中包含目前處理的消息,則繼續下一條,removeMap中的值是通過Step3中填充的,具體實作邏輯是從RMQ_SYS_TRANS_OP_HALF_TOPIC主題中拉取32條,如果拉取的消息隊列偏移量大于等于RMQ_SYS_TRANS_HALF_TOPIC#queueId目前的處理進度時,會添加到removeMap中,表示已處理過。
代碼@4:根據消息隊列偏移量i從消費隊列中擷取消息。
代碼@5:如果消息為空,則根據允許重複次數進行操作,預設重試一次,目前不可配置。其具體實作為:
- 如果超過重試次數,直接跳出,結束該消息隊列的事務狀态回查。
-
如果是由于沒有新的消息而傳回為空(拉取狀态為:PullStatus.NO_NEW_MSG),則結束該消息隊列的事務狀态回查。
1.其他原因,則将偏移量i設定為: getResult.getPullResult().getNextBeginOffset(),重新拉取。
代碼@6:判斷該消息是否需要discard(吞沒,丢棄,不處理)、或skip(跳過),其依據如下:
- needDiscard 依據:如果該消息回查的次數超過允許的最大回查次數,則該消息将被丢棄,即事務消息送出失敗,不能被消費者消費,其做法,主要是每回查一次,在消息屬性TRANSACTION_CHECK_TIMES中增1,預設最大回查次數為5次。
- needSkip依據:如果事務消息超過檔案的過期時間,預設72小時(具體請檢視RocketMQ過期檔案相關内容),則跳過該消息。
代碼@7:處理事務逾時相關概念,先解釋幾個局部變量:、
- valueOfCurrentMinusBorn :該消息已存儲的時間,等于系統目前時間減去消息存儲的時間戳。
- checkImmunityTime :立即檢測事務消息的時間。
-
transactionTimeout:事務消息的逾時時間,其設計的意義是,應用程式在發送事務消息後,事務不會馬上送出,該時間就是假設事務消息發送成功後,應用程式事務送出的時間,在這段時間内,RocketMQ任務事務未送出,故不應該在這個時間段向應用程式發送回查請求。
代碼@8:如果消息指定了事務消息過期時間屬性(PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS),如果目前時間已超過該值。
代碼@9:如果目前時間還未過(應用程式事務結束時間),則跳出本次回查處理的,等下一次再試。
代碼@10:判斷是否需要發送事務回查消息,具體邏輯:
- 如果從操作隊列(RMQ_SYS_TRANS_OP_HALF_TOPIC)中沒有已處理消息并且已經超過(應用程式事務結束時間),參數transactionTimeOut值。
- 如果操作隊列不為空,并且最後一天條消息的存儲時間已經超過transactionTimeOut值。
代碼@11:如果需要發送事務狀态回查消息,則先将消息再次發送到RMQ_SYS_TRANS_HALF_TOPIC主題中,發送成功則傳回true,否則傳回false,這裡還有一個實作關鍵點:
if (putMessageResult != null
&& putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) {
msgExt.setQueueOffset(
putMessageResult.getAppendMessageResult().getLogicsOffset());
msgExt.setCommitLogOffset(
putMessageResult.getAppendMessageResult().getWroteOffset());
msgExt.setMsgId(putMessageResult.getAppendMessageResult().getMsgId());
}
如果發送成功,會将該消息的queueOffset、commitLogOffset設定為重新存入的偏移量,為什麼需要這樣呢,答案在listener.resolveHalfMsg(msgExt)中。
AbstractTransactionalMessageCheckListener#resolveHalfMsg
public void resolveHalfMsg(final MessageExt msgExt) {
executorService.execute(new Runnable() {
@Override
public void run() {
try {
sendCheckMessage(msgExt);
} catch (Exception e) {
LOGGER.error("Send check message error!", e);
}
}
});
}
發送具體的事務回查機制,這裡用一個線程池來異步發送回查消息,為了回查進度儲存的簡化,這裡隻要發送了回查消息,目前回查進度會向前推動,如果回查失敗,上一步驟新增的消息将可以再次發送回查消息,那如果回查消息發送成功,那會不會下一次又重複發送回查消息呢?這個可以根據OP隊列中的消息來判斷是否重複,如果回查消息發送成功并且消息伺服器完成送出或復原操作,這條消息會發送到OP隊列中,然後fillOpRemoveMap根據處理進度擷取一批已處理的消息,來與消息判斷是否重複,由于fillopRemoveMap一次隻拉32條消息,那又如何保證一定能拉取到與目前消息的處理記錄呢?其實就是通過代碼@10來實作的,如果此批消息最後一條未超過事務延遲消息,則繼續拉取更多消息進行判斷(@12)和(@14),op隊列也會随着回查進度的推進而推進。
代碼@12:如果無法判斷是否發送回查消息,則加載更多的已處理消息進行刷選。
代碼@13:儲存(Prepare)消息隊列的回查進度。
代碼@14:儲存處理隊列(op)的進度。
上述講解了TransactionalMessageCheckService回查定時線程的發送回查消息的整體流程與實作細節,接下來重點分析一下上述步驟@11,通過異步方式發送消息回查的實作過程。
AbstractTransactionalMessageCheckListener#sendCheckMessage
public void sendCheckMessage(MessageExt msgExt) throws Exception {
CheckTransactionStateRequestHeader checkTransactionStateRequestHeader = new CheckTransactionStateRequestHeader();
checkTransactionStateRequestHeader.setCommitLogOffset(msgExt.getCommitLogOffset());
checkTransactionStateRequestHeader.setOffsetMsgId(msgExt.getMsgId());
checkTransactionStateRequestHeader.setMsgId(msgExt.getUserProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX));
checkTransactionStateRequestHeader.setTransactionId(checkTransactionStateRequestHeader.getMsgId());
checkTransactionStateRequestHeader.setTranStateTableOffset(msgExt.getQueueOffset()); // @1
msgExt.setTopic(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_TOPIC));
msgExt.setQueueId(Integer.parseInt(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_QUEUE_ID)));
msgExt.setStoreSize(0); // @2
String groupId = msgExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP); // @3
Channel channel = brokerController.getProducerManager().getAvaliableChannel(groupId);
if (channel != null) {
brokerController.getBroker2Client().checkProducerTransactionState(groupId, channel, checkTransactionStateRequestHeader, msgExt); // @4
} else {
LOGGER.warn("Check transaction failed, channel is null. groupId={}", groupId);
}
}
代碼@1:首先建構回查事務狀态請求消息,請求核心參數包括:消息offsetId、消息ID(索引)、消息事務ID、事務消息隊列中的偏移量(RMQ_SYS_TRANS_HALF_TOPIC)。
代碼@2:恢複原消息的主題、隊列,并設定storeSize為0。
代碼@3:擷取生産者組名稱。
代碼@4:根據生産者組擷取任意一個生産者,通過與其連接配接發送事務回查消息,回查消息的請求者為【Broker伺服器】,接收者為(client,具體為消息生産者)。
其處理類為:org.apache.rocketmq.client.impl.ClientRemotingProcessor#processRequest,其詳細邏輯實作方法為:
ClientRemotingProcessor#checkTransactionState
public RemotingCommand checkTransactionState(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final CheckTransactionStateRequestHeader requestHeader =
(CheckTransactionStateRequestHeader) request.decodeCommandCustomHeader(CheckTransactionStateRequestHeader.class);
final ByteBuffer byteBuffer = ByteBuffer.wrap(request.getBody());
final MessageExt messageExt = MessageDecoder.decode(byteBuffer);
if (messageExt != null) {
String transactionId = messageExt.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
if (null != transactionId && !"".equals(transactionId)) {
messageExt.setTransactionId(transactionId);
}
final String group = messageExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP);
if (group != null) {
MQProducerInner producer = this.mqClientFactory.selectProducer(group);
if (producer != null) {
final String addr = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
producer.checkTransactionState(addr, messageExt, requestHeader); // @1
} else {
log.debug("checkTransactionState, pick producer by group[{}] failed", group);
}
} else {
log.warn("checkTransactionState, pick producer group failed");
}
} else {
log.warn("checkTransactionState, decode message failed");
}
return null;
}
代碼@1:最終調用生産者的checkTransactionState方法。
DefaultMQProducerImpl#checkTransactionState
public void checkTransactionState(final String addr, final MessageExt msg,
final CheckTransactionStateRequestHeader header) {
Runnable request = new Runnable() { // @1
private final String brokerAddr = addr;
private final MessageExt message = msg;
private final CheckTransactionStateRequestHeader checkRequestHeader = header;
private final String group = DefaultMQProducerImpl.this.defaultMQProducer.getProducerGroup();
@Override
public void run() {
TransactionListener transactionCheckListener = DefaultMQProducerImpl.this.checkListener(); // @1
if (transactionCheckListener != null) {
LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
Throwable exception = null;
try {
localTransactionState = transactionCheckListener.checkLocalTransaction(message); // @2
} catch (Throwable e) {
log.error("Broker call checkTransactionState, but checkLocalTransactionState exception", e);
exception = e;
}
this.processTransactionState( // @3
localTransactionState,
group,
exception);
} else {
log.warn("checkTransactionState, pick transactionCheckListener by group[{}] failed", group);
}
}
private void processTransactionState(
final LocalTransactionState localTransactionState,
final String producerGroup,
final Throwable exception) {
final EndTransactionRequestHeader thisHeader = new EndTransactionRequestHeader();
thisHeader.setCommitLogOffset(checkRequestHeader.getCommitLogOffset());
thisHeader.setProducerGroup(producerGroup);
thisHeader.setTranStateTableOffset(checkRequestHeader.getTranStateTableOffset());
thisHeader.setFromTransactionCheck(true);
String uniqueKey = message.getProperties().get(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
if (uniqueKey == null) {
uniqueKey = message.getMsgId();
}
thisHeader.setMsgId(uniqueKey);
thisHeader.setTransactionId(checkRequestHeader.getTransactionId());
switch (localTransactionState) {
case COMMIT_MESSAGE:
thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
break;
case ROLLBACK_MESSAGE:
thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
log.warn("when broker check, client rollback this transaction, {}", thisHeader);
break;
case UNKNOW:
thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
log.warn("when broker check, client does not know this transaction state, {}", thisHeader);
break;
default:
break;
}
String remark = null;
if (exception != null) {
remark = "checkLocalTransactionState Exception: " + RemotingHelper.exceptionSimpleDesc(exception);
}
try {
DefaultMQProducerImpl.this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, thisHeader, remark,
3000);
} catch (Exception e) {
log.error("endTransactionOneway exception", e);
}
}
};
this.checkExecutor.submit(request);
}
上述代碼雖多,其實實作思路非常清晰,先使用一個匿名類( Runnable )建構一個運作任務,然後送出到checkExecutor線程池中執行,這與我第一篇文章的猜測是吻合的,那重點分析一下該任務的允許邏輯,對應在run方法中。
代碼@1:擷取消息發送者的TransactionListener。
代碼@2:執行TransactionListener#checkLocalTransaction,檢測本地事務狀态,也就是應用程式需要實作TransactionListener#checkLocalTransaction,告知RocketMQ該事務的事務狀态,然後傳回COMMIT_MESSAGE、ROLLBACK_MESSAGE、UNKNOW中的一個,然後向Broker發送END_TRANSACTION指令即可,
代碼@3:發送END_TRANSACTION到Broker,其具體實作,已經在
https://blog.csdn.net/prestigeding/article/details/81263833中詳細講解過,在此不重複分析。
到這裡,事務消息狀态回查流程就講解完畢,接下來以一張流程圖結束本篇的講解。

下一篇,将重點分析Broker在收到事務狀态為COMMIT_MESSAGE、ROLLBACK_MESSAGE時如何送出、復原事務。
若您對RocketMQ技術感興趣,請加入作者所在的