DefaultMQPushConsumerImpl拉取消息
首先,DefaultMQPushConsumerImpl 是一個實作了 RocketMQ 的消費者用戶端接口的類。該類的主要作用是從 RocketMQ 的 Broker 擷取消息并進行消費。
主要可以通過pullMessage方法進行擷取對應的操作,如下圖所示。
在消費消息時,DefaultMQPushConsumerImpl 會将擷取到的消息放入一個processQueue中,processQueue包含了一個TreeMap資料結構,它按照消息的 commitLogOffset 順序來排列。
DefaultMQPushConsumerImpl 通過定時的方式,從 Broker 上拉取消息。具體來說,它會調用DefaultMQPushConsumerImpl 自身定義的PullMessageService類,該類會定時的從消息伺服器中拉取消息。
源碼如下所示。
一旦消息拉取成功,PushConsumer 會将消息交給 processQueue 中的一個隊列進行處理,這個隊列對應同一個消息主題的同一個消息隊列。
processQueue 中的每個消息都會根據消息的commitLogOffset排列位置。這個位置決定了消息被消費的順序。也就是說,processQueue 存放的順序決定了消息消費的順序。
org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#pullMessage
java複制代碼boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
pullResult.getMsgFoundList(),
processQueue,
pullRequest.getMessageQueue(),
dispatchToConsume);
consumeMessageService的并發消費和順序消費
consumeMessageService 是一個用于消費消息的服務方法,它可以實作消息的并發消費和順序消費。當使用 consumeMessageService 時,需要考慮業務的實際需求以及消息處理的性質,權衡使用并發消費和順序消費。
并發消費
并發消費是指多個消費者同時消費同一批消息以提高處理速度,需要注意消息幂等性以避免重複消費。
DefaultMQPushConsumer的consumeMessageBatchMaxSize參數預設值為1,表示預設批量消費的消息數量是1個。在并發消費方式下,若一個隊列中拉取到32條消息,則會建立32個ConsumeRequest對象,每個ConsumeRequest對象對應1條消息,送出到線程池中運作。
順序消費
順序消費則是按照消息産生的順序逐個消費,适合處理需要順序進行的業務邏輯,如訂單處理,但實作可能帶來性能瓶頸,需謹慎設計。指同一時刻,一個 queue 隻有一個線程在消費。隻讓一個線程消費,由加鎖來實作,而順序則由 TreeMap 來實作。
一個隊列中拉取到32條消息,則隻會建立一個ConsumeRequest對象,該對象會被送出到線程池中,在ConsumeRequest.run方法中會按照消息的offset順序一條一條地消費,直到TreeMap為空。
concurrently 建立 ConsumeRequest
java複制代碼public void submitConsumeRequest(
final List<MessageExt> msgs,
final ProcessQueue processQueue,
final MessageQueue messageQueue,
final boolean dispatchToConsume) {
final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
if (msgs.size() <= consumeBatchSize) {
ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);
try {
this.consumeExecutor.submit(consumeRequest);
} catch (RejectedExecutionException e) {
this.submitConsumeRequestLater(consumeRequest);
}
} else {
for (int total = 0; total < msgs.size(); ) {
List<MessageExt> msgThis = new ArrayList<MessageExt>(consumeBatchSize);
for (int i = 0; i < consumeBatchSize; i++, total++) {
if (total < msgs.size()) {
msgThis.add(msgs.get(total));
} else {
break;
}
}
ConsumeRequest consumeRequest = new ConsumeRequest(msgThis, processQueue, messageQueue);
try {
this.consumeExecutor.submit(consumeRequest);
} catch (RejectedExecutionException e) {
for (; total < msgs.size(); total++) {
msgThis.add(msgs.get(total));
}
this.submitConsumeRequestLater(consumeRequest);
}
}
}
}
消費者在消費消息時,根據批量消費的大小來決定是将任務送出到線程池中一次性消費,還是将任務分成多次送出到線程池中進行消費。
首先判斷msgs中消息的數量是否小于等于一個批量消費數量consumeBatchSize,如果小于等于,那麼将所有消息封裝成一個ConsumeRequest對象并送出到consumeExecutor線程池中,其中dispatchToConsume表示是否立即分發給消費者消費。
如果消息數量大于批量消費數量,那麼将消息分段送出到線程池中進行消費。首先通過兩層循環,将msgs中的消息按照consumeBatchSize分成若幹個小的MessageExt清單,每個小的MessageExt清單封裝成一個ConsumeRequest對象并送出到consumeExecutor線程池中。
如果線程池送出任務出現拒絕執行異常,說明該線程池已經滿了,這時候需要将目前小的MessageExt清單繼續循環并依次每次取出一個消息封裝成ConsumeRequest對象進行送出,直到所有的小的MessageExt清單被完整地送出到線程池中。若還有未送出的清單,則将該ConsumeRequest對象送出到一個新的線程池中進行定時的重複送出。
concurrently ConsumeRequest#run 消費主體邏輯
消息消費者消費消息的地方,listener.consumeMessage方法會被消費者調用,将消息清單和消息處理上下文傳入。
java複制代碼status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
- msgs是需要消費的消息清單,這裡使用了Collections.unmodifiableList方法來建立一個不可修改的消息清單,這是為了保證消息的安全性,防止消息在消費過程中被意外或惡意修改。
- context是消息處理的上下文,可能包含消費者的訂閱資訊、消費進度等資訊,可根據業務需要進行擴充和使用。
- consumeMessage方法傳回消費結果,通常是一個枚舉類型,表示消費結果的狀态,如消費成功、消費失敗等。消費結果會影響消息處理的下一步流程。
消費結束之後清除資料
主要用于移除已經消費完成的消息。直接從 msgTreeMap 中删除消息,并傳回 msgTreeMap 中第一條消息的 queue offset 值。
org.apache.rocketmq.client.impl.consumer.ProcessQueue#removeMessage
java複制代碼public long removeMessage(final List<MessageExt> msgs) {
long result = -1;
final long now = System.currentTimeMillis();
try {
this.lockTreeMap.writeLock().lockInterruptibly();
this.lastConsumeTimestamp = now;
try {
if (!msgTreeMap.isEmpty()) {
result = this.queueOffsetMax + 1;
int removedCnt = 0;
for (MessageExt msg : msgs) {
MessageExt prev = msgTreeMap.remove(msg.getQueueOffset());
if (prev != null) {
removedCnt--;
msgSize.addAndGet(0 - msg.getBody().length);
}
}
msgCount.addAndGet(removedCnt);
if (!msgTreeMap.isEmpty()) {
result = msgTreeMap.firstKey();
}
}
} finally {
this.lockTreeMap.writeLock().unlock();
}
} catch (Throwable t) {
log.error("removeMessage exception", t);
}
return result;
}
具體來說,它接收一個 MessageExt 類型的消息清單msgs,通過周遊msgs,查找msgTreeMap中相應的消息,将找到的消息删除并計數,更新msgCount和msgSize這兩個計數器。代碼中也使用了重入鎖lockTreeMap來保證線程安全。函數将傳回result,表示下一步應該消費的消息的offset,如果沒有可消費的消息,則傳回-1。
orderly 建立 ConsumeRequest
在消息消費過程中,判斷是否需要立即将消息分發給消費者進行消費。
java複制代碼public void submitConsumeRequest(
final List<MessageExt> msgs,
final ProcessQueue processQueue,
final MessageQueue messageQueue,
final boolean dispathToConsume) {
if (dispathToConsume) {
ConsumeRequest consumeRequest = new ConsumeRequest(processQueue, messageQueue);
this.consumeExecutor.submit(consumeRequest);
}
}
首先判斷參數dispathToConsume為true,如果為true,表示需要立即分發給消費者消費;否則就不需要進行分發,因為可能等待其他條件觸發再進行消費。
如果需要立即分發,那麼将該消息的消息隊列和消息處理隊列封裝成ConsumeRequest對象,并将該對象送出到consumeExecutor線程池中進行執行。每個消費者線程從consumeExecutor線程池中取出ConsumeRequest對象并進行消費。
orderly ConsumeRequest#run 消費主體邏輯
先簡單介紹一下 RocketMQ 消息消費的流程:消費者将消息從 Broker 中拉取到本地的 ProcessQueue 中,然後在 ProcessQueue 中進行消息消費。
java複制代碼// 擷取鎖
final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
synchronized (objLock) {
for (boolean continueConsume = true; continueConsume; ) {
// 從 TreeMap 中獲得消息
List<MessageExt> msgs = this.processQueue.takeMessags(consumeBatchSize);
if (!msgs.isEmpty()) {
status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);
} else {
continueConsume = false;
}
}
...
}
public class MessageQueueLock {
private ConcurrentMap<MessageQueue, Object> mqLockTable = new ConcurrentHashMap<MessageQueue, Object>();
public Object fetchLockObject(final MessageQueue mq) {
Object objLock = this.mqLockTable.get(mq);
if (null == objLock) {
objLock = new Object();
Object prevLock = this.mqLockTable.putIfAbsent(mq, objLock);
if (prevLock != null) {
objLock = prevLock;
}
}
return objLock;
}
}
首先執行個體化了 MessageQueueLock,用于保證多線程環境下的線程同步和互斥。在代碼的第一行中,擷取到了目前 MessageQueue 的鎖對象 objLock。這個鎖對象是在 mqLockTable 中擷取的,mqLockTable 存儲了每個 MessageQueue 的鎖對象,用于對不同的 MessageQueue 進行互斥控制。
在代碼的後面,使用 synchronized 對 objLock 進行加鎖,并進入到了循環中。在循環中,調用 processQueue.takeMessags() 方法從 ProcessQueue 中擷取消息,傳回的是一個消息清單。如果消息清單不為空,則調用 messageListener.consumeMessage() 方法來進行消息消費。
如果消息清單為空,說明目前的 ProcessQueue 中沒有更多的消息,結束目前的循環,并退出 synchronized 塊,釋放了 objLock 的鎖,等待下一次的消費請求。
整個邏輯是通過鎖機制來實作對 ProcessQueue 進行互斥控制的,保證了多個消費者之間的消費的安全性。同時,使用了循環來進行多次消費。
順序處理機制
take消息時,将消息從 msgTreeMap 取出,并放入 consumingMsgOrderlyTreeMap。消費完成後,清空 consumingMsgOrderlyTreeMap。将 offset 設為 this.consumingMsgOrderlyTreeMap.lastKey() + 1,表示已經消費的消息的下一條消息的 offset。
// org.apache.rocketmq.client.impl.consumer.ProcessQueue#commit
java複制代碼public long commit() {
try {
this.lockTreeMap.writeLock().lockInterruptibly();
try {
Long offset = this.consumingMsgOrderlyTreeMap.lastKey();
msgCount.addAndGet(0 - this.consumingMsgOrderlyTreeMap.size());
for (MessageExt msg : this.consumingMsgOrderlyTreeMap.values()) {
msgSize.addAndGet(0 - msg.getBody().length);
}
this.consumingMsgOrderlyTreeMap.clear();
if (offset != null) {
return offset + 1;
}
} finally {
this.lockTreeMap.writeLock().unlock();
}
} catch (InterruptedException e) {
log.error("commit exception", e);
}
return -1;
}
關于 offset 送出
offset 是消費者從 broker 拉取的下一條消息的偏移量
消息消費的失敗
- 順序消費:如果處理某條消息失敗且重試次數小于門檻值,從 consumingMsgOrderlyTreeMap 中取出這條消息并重新放入 msgTreeMap;如果重試次數超過門檻值,則将消息發送回 broker 并根據重試次數決定發送消息到 SCHDULE_TOPIC_XXXX 或死信隊列
- 并發消費:如果處理消息時失敗,則将消息發送回 broker。如果發送失敗,将會繼續消費消息,直到成功消費并送出給 broker。
發送 ConsumeRequest 的時機有兩個,一是在拉取到消息後,二是在出現異常後延遲送出。
作者:洛神灬殇
連結:https://juejin.cn/post/7248608019852165175