首先說明本次源碼分析僅分析時間輪之前的延時消息設計
現在的RocketMQ已經支援基于時間輪的任意級别延時消息
延時消息基礎知識
預設RocketMQ延時消息共有18個等級
1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
延時消息的簡單使用
發送延時消息
DefaultMQProducer producer = new DefaultMQProducer(producerGroup);
producer.setNamesrvAddr(namesrvAddr);
try {
producer.start();
} catch (MQClientException e) {
throw new RuntimeException(e);
}
Message msg = new Message(TOPIC /* Topic */,
TAG /* Tag */,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
msg.setDelayTimeLevel(2);
producer.send(msg);
和普通消息不同的我們新增了延時消息等級的設定
msg.setDelayTimeLevel(2);
源碼分析
消息發送本身和普通消息發送沒有太大差别,但是新增了一個DELAY屬性
public static final String PROPERTY_DELAY_TIME_LEVEL = "DELAY";
消息發送的請求code是
public static final int SEND_BATCH_MESSAGE = 320;
源碼入口
這裡的我們先從消息的發送開始
消息發送最終的請求狀态碼是
public static final int SEND_MESSAGE_V2 = 310;
可以看到實際消息發送和普通消息沒有什麼差別,是以我們還是要去broker那邊看看有沒有什麼特殊處理
broker處理請求
消息處理方法 org.apache.rocketmq.broker.processor.SendMessageProcessor#processRequest
實際的消息發送邏輯在方法
this.sendMessage(ctx, request, sendMessageContext, requestHeader, mappingContext,
(ctx12, response12) -> executeSendMessageHookAfter(response12, ctx12));
這裡我們進去這個方法看看
由于是分析延時消息,是以我們不關注消息發送的一些其他細節。主要關注延時消息和普通消息的處理差別 我們看方法
org.apache.rocketmq.store.DefaultMessageStore#asyncPutMessage
這裡可以看到現在的topic還是我們正常指定的xiao-zou-topic
但是這裡會執行多個消息的後置處理器putMessageHookList
我們看看putMessageHookList有哪些
public void registerMessageStoreHook() {
List<PutMessageHook> putMessageHookList = messageStore.getPutMessageHookList();
putMessageHookList.add(new PutMessageHook() {
@Override
public String hookName() {
return "checkBeforePutMessage";
}
@Override
public PutMessageResult executeBeforePutMessage(MessageExt msg) {
return HookUtils.checkBeforePutMessage(BrokerController.this, msg);
}
});
putMessageHookList.add(new PutMessageHook() {
@Override
public String hookName() {
return "innerBatchChecker";
}
@Override
public PutMessageResult executeBeforePutMessage(MessageExt msg) {
if (msg instanceof MessageExtBrokerInner) {
return HookUtils.checkInnerBatch(BrokerController.this, msg);
}
return null;
}
});
putMessageHookList.add(new PutMessageHook() {
@Override
public String hookName() {
return "handleScheduleMessage";
}
@Override
public PutMessageResult executeBeforePutMessage(MessageExt msg) {
if (msg instanceof MessageExtBrokerInner) {
return HookUtils.handleScheduleMessage(BrokerController.this, (MessageExtBrokerInner) msg);
}
return null;
}
});
SendMessageBackHook sendMessageBackHook = new SendMessageBackHook() {
@Override
public boolean executeSendMessageBack(List<MessageExt> msgList, String brokerName, String brokerAddr) {
return HookUtils.sendMessageBack(BrokerController.this, msgList, brokerName, brokerAddr);
}
};
if (messageStore != null) {
messageStore.setSendMessageBackHook(sendMessageBackHook);
}
}
這裡可以看到主要是三個
- checkBeforePutMessage
- innerBatchChecker
- handleScheduleMessage
通過方法名我們可以很确定的看到主要是handleScheduleMessage這個方法出處理延時消息,是以我們重點看看handleScheduleMessage
handleScheduleMessage
- org.apache.rocketmq.broker.util.HookUtils#handleScheduleMessage
public static PutMessageResult handleScheduleMessage(BrokerController brokerController,
final MessageExtBrokerInner msg) {
final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
// 事務消息 暫時不管
if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
|| tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
if (!isRolledTimerMessage(msg)) {
if (checkIfTimerMessage(msg)) {
if (!brokerController.getMessageStoreConfig().isTimerWheelEnable()) {
//wheel timer is not enabled, reject the message
return new PutMessageResult(PutMessageStatus.WHEEL_TIMER_NOT_ENABLE, null);
}
PutMessageResult transformRes = transformTimerMessage(brokerController, msg);
if (null != transformRes) {
return transformRes;
}
}
}
// Delay Delivery 延時消息
if (msg.getDelayTimeLevel() > 0) {
transformDelayLevelMessage(brokerController, msg);
}
}
return null;
}
這裡的核心邏輯還是
if (msg.getDelayTimeLevel() > 0) {
transformDelayLevelMessage(brokerController, msg);
}
我們看看transformDelayLevelMessage方法
public static void transformDelayLevelMessage(BrokerController brokerController, MessageExtBrokerInner msg) {
// 判斷是否超過最大延時級别18 如果超過則設定為最大延時等級
if (msg.getDelayTimeLevel() > brokerController.getScheduleMessageService().getMaxDelayLevel()) {
msg.setDelayTimeLevel(brokerController.getScheduleMessageService().getMaxDelayLevel());
}
// Backup real topic, queueId 備份真實的topic queueId
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
// 設定延時消息為SCHEDULE_TOPIC_XXXX 這個固定的topic
msg.setTopic(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC);
// 設定延時消息的queueID delayLevel - 1
msg.setQueueId(ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel()));
}
備份前的message資訊
備份後
總結就是
- 将原始topic替換為延遲消息固定的topic:SCHEDULE_TOPIC_XXXX
- 将原始queueid替換為delayLevel - 1
- 備份原始topic/queueid, 儲存到原始消息的properties屬性中
延時消息消費
我們通過檢視SCHEDULE_TOPIC_XXXX的調用發現有一個方法
- org.apache.rocketmq.broker.schedule.ScheduleMessageService.DeliverDelayedMessageTimerTask#executeOnTimeup
我們檢視調用鍊
發現很明顯應該是使用了一個定時器去執行的,我們可以看看
這裡可以看到把18個延時等級的任務都加進去了
deliverExecutorService的核心線程數也是18個
this.deliverExecutorService = new ScheduledThreadPoolExecutor(this.maxDelayLevel, new ThreadFactoryImpl("ScheduleMessageTimerThread_"));
我們這裡看看丢進去的DeliverDelayedMessageTimerTask的任務裡面裡面的邏輯
- org.apache.rocketmq.broker.schedule.ScheduleMessageService.DeliverDelayedMessageTimerTask#executeOnTimeup
核心邏輯都被封裝在executeOnTimeup
我們進去看看executeOnTimeup
代碼邏輯有點長我們,慢慢看
public void executeOnTimeup() {
// 根據延遲topic和延遲queueid 去擷取Consumequeue
ConsumeQueueInterface cq =
ScheduleMessageService.this.brokerController.getMessageStore().getConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC,
delayLevel2QueueId(delayLevel));
// 如果 Consumequeue 為空則新增一個 DeliverDelayedMessageTimerTask 丢到 deliverExecutorService再定時執行,延時時間預設為100毫秒
// Consumequeue存在但是沒有消費檔案 一樣延時100毫秒後繼續重複執行
if (cq == null) {
this.scheduleNextTimerTask(this.offset, DELAY_FOR_A_WHILE);
return;
}
ReferredIterator<CqUnit> bufferCQ = cq.iterateFrom(this.offset);
if (bufferCQ == null) {
long resetOffset;
if ((resetOffset = cq.getMinOffsetInQueue()) > this.offset) {
log.error("schedule CQ offset invalid. offset={}, cqMinOffset={}, queueId={}",
this.offset, resetOffset, cq.getQueueId());
} else if ((resetOffset = cq.getMaxOffsetInQueue()) < this.offset) {
log.error("schedule CQ offset invalid. offset={}, cqMaxOffset={}, queueId={}",
this.offset, resetOffset, cq.getQueueId());
} else {
resetOffset = this.offset;
}
this.scheduleNextTimerTask(resetOffset, DELAY_FOR_A_WHILE);
return;
}
long nextOffset = this.offset;
try {
while (bufferCQ.hasNext() && isStarted()) {
CqUnit cqUnit = bufferCQ.next();
long offsetPy = cqUnit.getPos();
int sizePy = cqUnit.getSize();
// 這裡的 tagCode實際是投遞時間
long tagsCode = cqUnit.getTagsCode();
if (!cqUnit.isTagsCodeValid()) {
//can't find ext content.So re compute tags code.
log.error("[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}",
tagsCode, offsetPy, sizePy);
long msgStoreTime = ScheduleMessageService.this.brokerController.getMessageStore().getCommitLog().pickupStoreTimestamp(offsetPy, sizePy);
tagsCode = computeDeliverTimestamp(delayLevel, msgStoreTime);
}
long now = System.currentTimeMillis();
// 延時消息時間校驗 如果超過deliverTimestamp now+delayLevel 時間則矯正為 now+delayLevel
long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);
long currOffset = cqUnit.getQueueOffset();
assert cqUnit.getBatchNum() == 1;
nextOffset = currOffset + cqUnit.getBatchNum();
long countdown = deliverTimestamp - now;
// 如果延時時間超過目前時間,證明還未到消息處理時間
if (countdown > 0) {
this.scheduleNextTimerTask(currOffset, DELAY_FOR_A_WHILE);
ScheduleMessageService.this.updateOffset(this.delayLevel, currOffset);
return;
}
// 加載出延時消息
MessageExt msgExt = ScheduleMessageService.this.brokerController.getMessageStore().lookMessageByOffset(offsetPy, sizePy);
if (msgExt == null) {
continue;
}
// 建構新的消息體,将原來的消息資訊設定到這裡,并将topic和queueid設定為原始的topic和queueid(前面備份過)
MessageExtBrokerInner msgInner = ScheduleMessageService.this.messageTimeup(msgExt);
if (TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC.equals(msgInner.getTopic())) {
log.error("[BUG] the real topic of schedule msg is {}, discard the msg. msg={}",
msgInner.getTopic(), msgInner);
continue;
}
boolean deliverSuc;
if (ScheduleMessageService.this.enableAsyncDeliver) {
deliverSuc = this.asyncDeliver(msgInner, msgExt.getMsgId(), currOffset, offsetPy, sizePy);
} else {
// 将消息寫入到 commitlog中
deliverSuc = this.syncDeliver(msgInner, msgExt.getMsgId(), currOffset, offsetPy, sizePy);
}
if (!deliverSuc) {
this.scheduleNextTimerTask(nextOffset, DELAY_FOR_A_WHILE);
return;
}
}
} catch (Exception e) {
log.error("ScheduleMessageService, messageTimeup execute error, offset = {}", nextOffset, e);
} finally {
bufferCQ.release();
}
this.scheduleNextTimerTask(nextOffset, DELAY_FOR_A_WHILE);
}
上面的代碼我們将核心邏輯做了注釋,我們總結一下大緻流程
- 校驗對應的延時等級消息是否存在ConsumeQueue,如果不存在則延時100ms繼續輪訓
- 校驗ConsumeQueue中的索引是否存在,如果不存在則延時100ms繼續輪訓
- 校驗索引裡面的tagsCode 實際是投遞時間是否到了需要投遞的時間,如果是則取出延時消息
- 将延時消息還原為原始消息,投遞到commitLog中繼續消費
這裡我們通過debug看看消息轉換前後的參數
轉換前的延時消息
轉換後的原始消息
消息的重新投遞是通過syncDeliver方法
deliverSuc = this.syncDeliver(msgInner, msgExt.getMsgId(), currOffset, offsetPy, sizePy);
private boolean syncDeliver(MessageExtBrokerInner msgInner, String msgId, long offset, long offsetPy,
int sizePy) {
PutResultProcess resultProcess = deliverMessage(msgInner, msgId, offset, offsetPy, sizePy, false);
PutMessageResult result = resultProcess.get();
boolean sendStatus = result != null && result.getPutMessageStatus() == PutMessageStatus.PUT_OK;
if (sendStatus) {
ScheduleMessageService.this.updateOffset(this.delayLevel, resultProcess.getNextOffset());
}
return sendStatus;
}
如果成功則更新延時消息的消費進度,注意延時消息的消費進度是存儲在
private final ConcurrentMap<Integer /* level */, Long/* offset */> offsetTable =
new ConcurrentHashMap<>(32);
實際的配置檔案是delayOffset.json
自此RocketMQ的延時消息我們就分析完了
總結
總的來說延時消息就是預設分為18個隊列,然後啟動18個線程一直去掃描這18個隊列。如果沒有消息就過100毫秒繼續重複掃描,周而複始 如果掃描到消息則将延時消息Topic:SCHEDULE_TOPIC_XXXX中的消息取出來,然後重新轉換為原始消息,包括原始消息的Topic和queueId,然後重新寫入到commitLog中
作者:小奏技術
連結:https://juejin.cn/post/7262372539522400317
來源:稀土掘金