天天看點

透徹剖析RocketMQ的消息順序消費和并發消費機制體系的原理分析

作者:馬士兵老師

DefaultMQPushConsumerImpl拉取消息

首先,DefaultMQPushConsumerImpl 是一個實作了 RocketMQ 的消費者用戶端接口的類。該類的主要作用是從 RocketMQ 的 Broker 擷取消息并進行消費。

主要可以通過pullMessage方法進行擷取對應的操作,如下圖所示。

透徹剖析RocketMQ的消息順序消費和并發消費機制體系的原理分析

在消費消息時,DefaultMQPushConsumerImpl 會将擷取到的消息放入一個processQueue中,processQueue包含了一個TreeMap資料結構,它按照消息的 commitLogOffset 順序來排列。

透徹剖析RocketMQ的消息順序消費和并發消費機制體系的原理分析

DefaultMQPushConsumerImpl 通過定時的方式,從 Broker 上拉取消息。具體來說,它會調用DefaultMQPushConsumerImpl 自身定義的PullMessageService類,該類會定時的從消息伺服器中拉取消息。

源碼如下所示。

透徹剖析RocketMQ的消息順序消費和并發消費機制體系的原理分析

一旦消息拉取成功,PushConsumer 會将消息交給 processQueue 中的一個隊列進行處理,這個隊列對應同一個消息主題的同一個消息隊列。

processQueue 中的每個消息都會根據消息的commitLogOffset排列位置。這個位置決定了消息被消費的順序。也就是說,processQueue 存放的順序決定了消息消費的順序。

org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#pullMessage
java複制代碼boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
    pullResult.getMsgFoundList(),
    processQueue,
    pullRequest.getMessageQueue(),
    dispatchToConsume);
           

consumeMessageService的并發消費和順序消費

consumeMessageService 是一個用于消費消息的服務方法,它可以實作消息的并發消費和順序消費。當使用 consumeMessageService 時,需要考慮業務的實際需求以及消息處理的性質,權衡使用并發消費和順序消費。

并發消費

并發消費是指多個消費者同時消費同一批消息以提高處理速度,需要注意消息幂等性以避免重複消費。

DefaultMQPushConsumer的consumeMessageBatchMaxSize參數預設值為1,表示預設批量消費的消息數量是1個。在并發消費方式下,若一個隊列中拉取到32條消息,則會建立32個ConsumeRequest對象,每個ConsumeRequest對象對應1條消息,送出到線程池中運作。

順序消費

順序消費則是按照消息産生的順序逐個消費,适合處理需要順序進行的業務邏輯,如訂單處理,但實作可能帶來性能瓶頸,需謹慎設計。指同一時刻,一個 queue 隻有一個線程在消費。隻讓一個線程消費,由加鎖來實作,而順序則由 TreeMap 來實作。

一個隊列中拉取到32條消息,則隻會建立一個ConsumeRequest對象,該對象會被送出到線程池中,在ConsumeRequest.run方法中會按照消息的offset順序一條一條地消費,直到TreeMap為空。

concurrently 建立 ConsumeRequest

java複制代碼public void submitConsumeRequest(
    final List<MessageExt> msgs,
    final ProcessQueue processQueue,
    final MessageQueue messageQueue,
    final boolean dispatchToConsume) {
    final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
    if (msgs.size() <= consumeBatchSize) {
        ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);
        try {
            this.consumeExecutor.submit(consumeRequest);
        } catch (RejectedExecutionException e) {
            this.submitConsumeRequestLater(consumeRequest);
        }
    } else {
        for (int total = 0; total < msgs.size(); ) {
            List<MessageExt> msgThis = new ArrayList<MessageExt>(consumeBatchSize);
            for (int i = 0; i < consumeBatchSize; i++, total++) {
                if (total < msgs.size()) {
                    msgThis.add(msgs.get(total));
                } else {
                    break;
                }
            }
            ConsumeRequest consumeRequest = new ConsumeRequest(msgThis, processQueue, messageQueue);
            try {
                this.consumeExecutor.submit(consumeRequest);
            } catch (RejectedExecutionException e) {
                for (; total < msgs.size(); total++) {
                    msgThis.add(msgs.get(total));
                }

                this.submitConsumeRequestLater(consumeRequest);
            }
        }
    }
}
           

消費者在消費消息時,根據批量消費的大小來決定是将任務送出到線程池中一次性消費,還是将任務分成多次送出到線程池中進行消費。

首先判斷msgs中消息的數量是否小于等于一個批量消費數量consumeBatchSize,如果小于等于,那麼将所有消息封裝成一個ConsumeRequest對象并送出到consumeExecutor線程池中,其中dispatchToConsume表示是否立即分發給消費者消費。

如果消息數量大于批量消費數量,那麼将消息分段送出到線程池中進行消費。首先通過兩層循環,将msgs中的消息按照consumeBatchSize分成若幹個小的MessageExt清單,每個小的MessageExt清單封裝成一個ConsumeRequest對象并送出到consumeExecutor線程池中。

如果線程池送出任務出現拒絕執行異常,說明該線程池已經滿了,這時候需要将目前小的MessageExt清單繼續循環并依次每次取出一個消息封裝成ConsumeRequest對象進行送出,直到所有的小的MessageExt清單被完整地送出到線程池中。若還有未送出的清單,則将該ConsumeRequest對象送出到一個新的線程池中進行定時的重複送出。

concurrently ConsumeRequest#run 消費主體邏輯

消息消費者消費消息的地方,listener.consumeMessage方法會被消費者調用,将消息清單和消息處理上下文傳入。

java複制代碼status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
           
  • msgs是需要消費的消息清單,這裡使用了Collections.unmodifiableList方法來建立一個不可修改的消息清單,這是為了保證消息的安全性,防止消息在消費過程中被意外或惡意修改。
  • context是消息處理的上下文,可能包含消費者的訂閱資訊、消費進度等資訊,可根據業務需要進行擴充和使用。
  • consumeMessage方法傳回消費結果,通常是一個枚舉類型,表示消費結果的狀态,如消費成功、消費失敗等。消費結果會影響消息處理的下一步流程。

消費結束之後清除資料

主要用于移除已經消費完成的消息。直接從 msgTreeMap 中删除消息,并傳回 msgTreeMap 中第一條消息的 queue offset 值。

org.apache.rocketmq.client.impl.consumer.ProcessQueue#removeMessage
java複制代碼public long removeMessage(final List<MessageExt> msgs) {
    long result = -1;
    final long now = System.currentTimeMillis();
    try {
        this.lockTreeMap.writeLock().lockInterruptibly();
        this.lastConsumeTimestamp = now;
        try {
            if (!msgTreeMap.isEmpty()) {
                result = this.queueOffsetMax + 1;
                int removedCnt = 0;
                for (MessageExt msg : msgs) {
                    MessageExt prev = msgTreeMap.remove(msg.getQueueOffset());
                    if (prev != null) {
                        removedCnt--;
                        msgSize.addAndGet(0 - msg.getBody().length);
                    }
                }
                msgCount.addAndGet(removedCnt);
                if (!msgTreeMap.isEmpty()) {
                    result = msgTreeMap.firstKey();
                }
            }
        } finally {
            this.lockTreeMap.writeLock().unlock();
        }
    } catch (Throwable t) {
        log.error("removeMessage exception", t);
    }
    return result;
}
           

具體來說,它接收一個 MessageExt 類型的消息清單msgs,通過周遊msgs,查找msgTreeMap中相應的消息,将找到的消息删除并計數,更新msgCount和msgSize這兩個計數器。代碼中也使用了重入鎖lockTreeMap來保證線程安全。函數将傳回result,表示下一步應該消費的消息的offset,如果沒有可消費的消息,則傳回-1。

orderly 建立 ConsumeRequest

在消息消費過程中,判斷是否需要立即将消息分發給消費者進行消費。

java複制代碼public void submitConsumeRequest(
    final List<MessageExt> msgs,
    final ProcessQueue processQueue,
    final MessageQueue messageQueue,
    final boolean dispathToConsume) {
    if (dispathToConsume) {
        ConsumeRequest consumeRequest = new ConsumeRequest(processQueue, messageQueue);
        this.consumeExecutor.submit(consumeRequest);
    }
}
           

首先判斷參數dispathToConsume為true,如果為true,表示需要立即分發給消費者消費;否則就不需要進行分發,因為可能等待其他條件觸發再進行消費。

如果需要立即分發,那麼将該消息的消息隊列和消息處理隊列封裝成ConsumeRequest對象,并将該對象送出到consumeExecutor線程池中進行執行。每個消費者線程從consumeExecutor線程池中取出ConsumeRequest對象并進行消費。

orderly ConsumeRequest#run 消費主體邏輯

先簡單介紹一下 RocketMQ 消息消費的流程:消費者将消息從 Broker 中拉取到本地的 ProcessQueue 中,然後在 ProcessQueue 中進行消息消費。

java複制代碼// 擷取鎖
final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
synchronized (objLock) {
    for (boolean continueConsume = true; continueConsume; ) {
        // 從 TreeMap 中獲得消息
        List<MessageExt> msgs = this.processQueue.takeMessags(consumeBatchSize);
        if (!msgs.isEmpty()) {
            status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);
        } else {
            continueConsume = false;
        }
    }
    ...
}

public class MessageQueueLock {
    private ConcurrentMap<MessageQueue, Object> mqLockTable = new ConcurrentHashMap<MessageQueue, Object>();

    public Object fetchLockObject(final MessageQueue mq) {
        Object objLock = this.mqLockTable.get(mq);
        if (null == objLock) {
            objLock = new Object();
            Object prevLock = this.mqLockTable.putIfAbsent(mq, objLock);
            if (prevLock != null) {
                objLock = prevLock;
            }
        }

        return objLock;
    }
}
           

首先執行個體化了 MessageQueueLock,用于保證多線程環境下的線程同步和互斥。在代碼的第一行中,擷取到了目前 MessageQueue 的鎖對象 objLock。這個鎖對象是在 mqLockTable 中擷取的,mqLockTable 存儲了每個 MessageQueue 的鎖對象,用于對不同的 MessageQueue 進行互斥控制。

在代碼的後面,使用 synchronized 對 objLock 進行加鎖,并進入到了循環中。在循環中,調用 processQueue.takeMessags() 方法從 ProcessQueue 中擷取消息,傳回的是一個消息清單。如果消息清單不為空,則調用 messageListener.consumeMessage() 方法來進行消息消費。

如果消息清單為空,說明目前的 ProcessQueue 中沒有更多的消息,結束目前的循環,并退出 synchronized 塊,釋放了 objLock 的鎖,等待下一次的消費請求。

整個邏輯是通過鎖機制來實作對 ProcessQueue 進行互斥控制的,保證了多個消費者之間的消費的安全性。同時,使用了循環來進行多次消費。

順序處理機制

take消息時,将消息從 msgTreeMap 取出,并放入 consumingMsgOrderlyTreeMap。消費完成後,清空 consumingMsgOrderlyTreeMap。将 offset 設為 this.consumingMsgOrderlyTreeMap.lastKey() + 1,表示已經消費的消息的下一條消息的 offset。

// org.apache.rocketmq.client.impl.consumer.ProcessQueue#commit
java複制代碼public long commit() {
    try {
        this.lockTreeMap.writeLock().lockInterruptibly();
        try {
            Long offset = this.consumingMsgOrderlyTreeMap.lastKey();
            msgCount.addAndGet(0 - this.consumingMsgOrderlyTreeMap.size());
            for (MessageExt msg : this.consumingMsgOrderlyTreeMap.values()) {
                msgSize.addAndGet(0 - msg.getBody().length);
            }
            this.consumingMsgOrderlyTreeMap.clear();
            if (offset != null) {
                return offset + 1;
            }
        } finally {
            this.lockTreeMap.writeLock().unlock();
        }
    } catch (InterruptedException e) {
        log.error("commit exception", e);
    }

    return -1;
}
           

關于 offset 送出

offset 是消費者從 broker 拉取的下一條消息的偏移量

消息消費的失敗

  • 順序消費:如果處理某條消息失敗且重試次數小于門檻值,從 consumingMsgOrderlyTreeMap 中取出這條消息并重新放入 msgTreeMap;如果重試次數超過門檻值,則将消息發送回 broker 并根據重試次數決定發送消息到 SCHDULE_TOPIC_XXXX 或死信隊列
  • 并發消費:如果處理消息時失敗,則将消息發送回 broker。如果發送失敗,将會繼續消費消息,直到成功消費并送出給 broker。

發送 ConsumeRequest 的時機有兩個,一是在拉取到消息後,二是在出現異常後延遲送出。

作者:洛神灬殇

連結:https://juejin.cn/post/7248608019852165175

繼續閱讀