目錄
1:實作思想
2:事務消息發送流程
3:消息送出,復原
4:回查事務狀态
5:總結
1:實作思想
RocketMQ事務消息的實作原理是基于兩階段送出(可以去了解一下XA)和定時事務狀态回查來決定消息最終是送出還是復原。一般地,應用程式在事務内完成相應的DB後,需要同步來調用mq相關的接口來發送消息,發送狀态為prepare的消息(筆者稱之為預消息),消息發送成功後,Rocketmq伺服器會回調RocketMQ消息發送者的事件監聽程式,記錄消息的本地事務狀态(相當于打一個标),該标與本地業務操作同屬一個事務,確定消息發送與本地事務的原子性。
mq收到prepare的消息,會先備份消息的原主題和原消息消費隊列(消息的載體),然後将消息存儲為主題為 RMQ_SYS_TRANS_HALF_TOPIC(筆者稱之為半主題)的消息消費隊列中。
RocketMQ消息伺服器開啟一個定時任務,消費RMQ_SYS_TRANS_HALF_TOPIC消息隊列的中消息,向消息發送端發起事務狀态回查,應用程式根據儲存的事務狀态來回饋消息伺服器事務的狀态(commit,rollback,unkown),如果是送出或者是復原,則服務送出或者復原消息,如果是未知的消息,等待下一次回查,(為什麼會存在未知的消息,一次輪詢過去,而這個消息剛剛出來,還沒有被消費),RocketMQ允許一條消息的回查間隔(輪詢周期)與回查次數(超過多少次復原),如果達到了n(最大回查次數)次,将會復原消息
2:事務消息發送流程
1:入口很重要:發送事務消息的開始和普通的消息有點不同,事務消息的producer是TransactionMQProducer,如果看過筆者之前寫過的話,應該知道是DefaultMQProducer,其實TransactionMQProducer是DefaultMQProducer的增強,在TransactionMQProducer的内部有一個transactionlistener,主要實作了本地事務狀态執行,和本地事務事務回查兩個接口
![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsICM38FdsYkRGZkRG9lcvx2bjxiNx8VZ6l2cs0TPB9UNjRVT1UEVOBDOsJGcohVYsR2MMBjVtJWd0ckW65UbM5WOHJWa5kHT20ESjBjUIF2X0hXZ0xCMx81dvRWYoNHLrdEZwZ1Rh5WNXp1bwNjW1ZUba9VZwlHdssmch1mclRXY39CXldWYtlWPzNXZj9mcw1ycz9WL49zZuBnLxIjNxAjMxIjM0ATMxkTMwIzLc52YucWbp5GZzNmLn9Gbi1yZtl2Lc9CX6MHc0RHaiojIsJye.png)
2:事務消息發送依賴的是TransactionMQProducer
@Override
//通俗易懂,在事務中發送消息
public TransactionSendResult sendMessageInTransaction(final Message msg,
final Object arg) throws MQClientException {
//這裡必須要依賴事務監聽器,要是這個都沒有話,相當于不可能存在事務狀态回查和本地事務執行
if (null == this.transactionListener) {
throw new MQClientException("TransactionListener is null", null);
}
//設定系統的半topic
msg.setTopic(NamespaceUtil.wrapNamespace(this.getNamespace(), msg.getTopic()));
//最後還是調用的預設的mqProducer
return this.defaultMQProducerImpl.sendMessageInTransaction(msg, null, arg);
}
3:設定消息的主題,和生産者組
public TransactionSendResult sendMessageInTransaction(final Message msg,
final LocalTransactionExecuter localTransactionExecuter, final Object arg)
throws MQClientException {
TransactionListener transactionListener = getCheckListener();
if (null == localTransactionExecuter && null == transactionListener) {
throw new MQClientException("tranExecutor is null", null);
}
Validators.checkMessage(msg, this.defaultMQProducer);
SendResult sendResult = null;
//設定消息為事務消息,标記頭
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
//設定消息的生産者組
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());
try {
sendResult = this.send(msg);
} catch (Exception e) {
throw new MQClientException("send message Exception", e);
}
4:執行本地事務
LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
Throwable localException = null;
switch (sendResult.getSendStatus()) {
case SEND_OK: {
try {
if (sendResult.getTransactionId() != null) {
msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
}
//從消息中擷取事務id,為後續事務回查提供依據
String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
if (null != transactionId && !"".equals(transactionId)) {
msg.setTransactionId(transactionId);
}
if (null != localTransactionExecuter) {
localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);
} else if (transactionListener != null) {
log.debug("Used new transaction API");
//執行本地事務
localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
}
//擷取事務狀态
if (null == localTransactionState) {
localTransactionState = LocalTransactionState.UNKNOW;
}
//事務狀态不是 送出狀态,列印日志,等待下一次輪詢
if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {
log.info("executeLocalTransactionBranch return {}", localTransactionState);
log.info(msg.toString());
}
} catch (Throwable e) {
log.info("executeLocalTransactionBranch exception", e);
log.info(msg.toString());
localException = e;
}
}
5:check消息,發送消息
//是否設定了事務消息
final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (tranMsg != null && Boolean.parseBoolean(tranMsg)) {
//如果是事務消息的話,會給消息一個 PRE 的标志,後續會添加一個事務消息的專用處理器
sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;
}
Map<String, String> oriProps = MessageDecoder.string2messageProperties(requestHeader.getProperties());
//看看是不是事務消息
String traFlag = oriProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (traFlag != null && Boolean.parseBoolean(traFlag)) {
if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark(
"the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
+ "] sending transaction message is forbidden");
return response;
}
putMessageResult = this.brokerController.getTransactionalMessageService().prepareMessage(msgInner);
} else {
putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
}
return handlePutMessageResult(putMessageResult, response, request, msgInner, responseHeader, sendMessageContext, ctx, queueIdInt);
6:由于上述的消息的主題已經被改變成一個系統的主題,并且放入到對應的消息隊列中,那麼在消費的時候肯定是一個專門的線程去消費這個消息。
3:消息送出,復原
1:在事務消息結束(沒有真正的結束,隻是一階段的結束)事務消息的送出或者是復原均由sendResult這個字段辨別來确定,對應的處理器為org.apache.rocketmq.broker.processor.EndTransactionProcessor#processRequest,因為涉及到傳輸效率,在RocketMq中大量的使用Netty來做為io架構,是以涉及到資料的加解碼(就是各種handler啦)
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws
RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
final EndTransactionRequestHeader requestHeader =
//加解碼
(EndTransactionRequestHeader)request.decodeCommandCustomHeader(EndTransactionRequestHeader.class);
LOGGER.info("Transaction request:{}", requestHeader);
if (BrokerRole.SLAVE == brokerController.getMessageStoreConfig().getBrokerRole()) {
response.setCode(ResponseCode.SLAVE_NOT_AVAILABLE);
LOGGER.warn("Message store is slave mode, so end transaction is forbidden. ");
return response;
}
2:确定是否送出或者是復原事務
perationResult result = new OperationResult();
//送出事務
if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback()) {
result = this.brokerController.getTransactionalMessageService().commitMessage(requestHeader);
if (result.getResponseCode() == ResponseCode.SUCCESS) {
RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
//傳回成功
if (res.getCode() == ResponseCode.SUCCESS) {
MessageExtBrokerInner msgInner = endMessageTransaction(result.getPrepareMessage());
msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), requestHeader.getCommitOrRollback()));
msgInner.setQueueOffset(requestHeader.getTranStateTableOffset());
//獲得事務消息的偏移量,然後要将此消息放入到commitlog中進行檔案的存儲,最終是要存盤的
msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset());
//存儲時間戳,一般用于消息的時間查找
msgInner.setStoreTimestamp(result.getPrepareMessage().getStoreTimestamp());
RemotingCommand sendResult = sendFinalMessage(msgInner);
if (sendResult.getCode() == ResponseCode.SUCCESS) {
this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
}
return sendResult;
}
return res;
}
//復原事務
} else if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == requestHeader.getCommitOrRollback()) {
result = this.brokerController.getTransactionalMessageService().rollbackMessage(requestHeader);
if (result.getResponseCode() == ResponseCode.SUCCESS) {
RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
if (res.getCode() == ResponseCode.SUCCESS) {
this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
}
return res;
}
}
response.setCode(result.getResponseCode());
response.setRemark(result.getResponseRemark());
return response;
4:回查事務狀态
1:可以很絕對的認為,在RocketMQ中的事務消息隻會存在兩種狀态(分别是 commit,rollback,)至于unkown的話隻能做為一個中間的狀态,也就是說,随着時間的推移,這種狀态是一定會不存在的,要是真的不行,那就搬出來微積分--極限思想,世間萬物最終歸于平靜,其實大數定律也是如此(扯得有點哲學的味道了,偏題了,偏題了)。
2:定時任務去掃描HALF_TOPIC,回查消息的事務狀态
protected void onWaitEnd() {
//逾時時間
long timeout = brokerController.getBrokerConfig().getTransactionTimeOut();
//最大的配置檢測時間
int checkMax = brokerController.getBrokerConfig().getTransactionCheckMax();
//目前時間
long begin = System.currentTimeMillis();
log.info("Begin to check prepare message, begin time:{}", begin);
this.brokerController.getTransactionalMessageService().check(timeout, checkMax, this.brokerController.getTransactionalMessageCheckListener());
log.info("End to check prepare message, consumed time:{}", System.currentTimeMillis() - begin);
}
3:拉取HALF_TOPIC主題下的所有消息隊列,然後依次處理
public void check(long transactionTimeout, int transactionCheckMax,
AbstractTransactionalMessageCheckListener listener) {
try {
String topic = MixAll.RMQ_SYS_TRANS_HALF_TOPIC;
Set<MessageQueue> msgQueues = transactionalMessageBridge.fetchMessageQueues(topic);
if (msgQueues == null || msgQueues.size() == 0) {
log.warn("The queue of topic is empty :" + topic);
return;
}
log.debug("Check topic={}, queues={}", topic, msgQueues);
for (MessageQueue messageQueue : msgQueues) {
long startTime = System.currentTimeMillis();
MessageQueue opQueue = getOpQueue(messageQueue);
long halfOffset = transactionalMessageBridge.fetchConsumeOffset(messageQueue);
long opOffset = transactionalMessageBridge.fetchConsumeOffset(opQueue);
log.info("Before check, the queue={} msgOffset={} opOffset={}", messageQueue, halfOffset, opOffset);
if (halfOffset < 0 || opOffset < 0) {
log.error("MessageQueue: {} illegal offset read: {}, op offset: {},skip this queue", messageQueue,
halfOffset, opOffset);
continue;
}
4:事務回查實作
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
Integer status = localTrans.get(msg.getTransactionId());
if (null != status) {
switch (status) {
case 0:
return LocalTransactionState.UNKNOW;
case 1:
return LocalTransactionState.COMMIT_MESSAGE;
case 2:
return LocalTransactionState.ROLLBACK_MESSAGE;
default:
return LocalTransactionState.COMMIT_MESSAGE;
}
}
return LocalTransactionState.COMMIT_MESSAGE;
5:總結
在RocketMQ中事務消息底層包裝的是普通消息,在發送的時候更換其TOPIC,和對應的消息隊列,然後有一個線程來消費這個消息隊列上面的消息,分為兩階段送出,第一階段是本地執行,第二階段才是真正的送出,不過第二階段的送出也是由條件的,而是通過事務狀态回查的方式的來是實作的,最後會更新事務消息的ID,已達到将原來的消息更換成普通的消息進行發送,接着就會存入磁盤中持久化。