天天看點

一文總結 MetaQ/RocketMQ 原理

作者:技術聯盟總壇

陽禮 阿裡開發者 2023-08-01 09:02 發表于浙江

一文總結 MetaQ/RocketMQ 原理

阿裡妹導讀

本文介紹的 MetaQ/RocketMQ 是側重于維持消息一緻性和高可靠性的消息隊列中間件,幫助大家對隊列設計的了解。

簡介—— 消息隊列中間件 MetaQ/RocketMQ

中間件 MetaQ 是一種基于隊列模型的消息中間件,MetaQ 據說最早是受 Kafka 的影響開發的,第一版的名字 "metamorphosis",是奧地利作家卡夫卡的名作——《變形記》。RocketMQ 是 MetaQ 的開源版本。

消息隊列中間件一般用于在分布式場景下解決叢集單機瓶頸的問題。在傳統的分布式計算環境中,常常會出現由于某個單機節點的性能瓶頸,即使其他節點仍有餘力,仍然會導緻整個系統的性能無法進一步提升的情況,這一現象通常是由于任務負載不均衡,網絡延遲等常見且難以解決的問題。消息隊列本質上是提供了一種非常合理的任務配置設定政策,通過将任務分給消費者實作異步和分布式處理,提高整個叢集的性能。

消息隊列(mq)的核心思想是将耗時的任務異步化,通過消息隊列緩存任務,進而實作消息發送方和接收方的解耦,使得任務的處理能夠異步、并行,進而提高系統或叢集的吞吐量和可擴充性。在這個過程中,整個系統強依賴于消息隊列,起到類似橋梁的作用。消息隊列有着經典的三大應用場景:解耦、異步和削峰填谷。

解耦場景:消息隊列一般使用釋出/訂閱的模型,如果服務 B C D 依賴服務 A 的消息,此時新增服務 E 也需要依賴 A ,而 B 服務不再需要消息,需要頻繁且複雜的業務改造,效率低,穩定性差,此時引入消息隊列進行解耦,服務 A 隻需要将産生的消息釋出到 mq 中,就不用管了,其它服務會自己根據需要訂閱 mq 中的消息,或者說去 mq 中消費,這就使得每個服務可以更多地關注自身業務,而不需要把精力用在維護服務之間的關系上,可擴充性提高。

一文總結 MetaQ/RocketMQ 原理

異步場景:如使用者的業務需要一系列的服務進行處理,按順序處理的話,使用者需要等待的時間過長。例如電商平台的使用者下單、支付、積分、郵件、短信通知等流程,長時間等待使用者無法接受,就可以通過 mq 進行服務的異步處理,例如積分、郵件和短信通知服務訂閱了支付服務的消息,将支付完成作為消息釋出到 mq ,這些服務就可以同時對這一訂單進行處理,降低了請求等待時間(rt) 。

一文總結 MetaQ/RocketMQ 原理

削峰填谷場景:削峰表示的含義是,流量如果太大,就控制伺服器處理的 QPS,不要讓大流量打挂資料庫等導緻伺服器當機,讓服務處理請求更加平緩,節省伺服器資源,其本質上是控制使用者的請求速率,或是延緩或是直接拒絕。填谷的含義是将階段性的大流量請求緩存起來,在流量平緩的時候慢慢處理,防止過多的請求被拒絕後的重試導緻更大的流量。mq 很适合這一場景,QPS 超出服務端接收請求的能力時,服務端仍然保持在安全範圍内地從消息隊列中擷取消息進行處理,多餘的消息會積壓在消息隊列中,或由于逾時直接拒絕,到 QPS 低于這一門檻值的時候,這些積壓的消息就會被逐漸消費掉。相當于在系統前修建了一個流量蓄水池。

除此之外還可以利用消息隊列進行消息通信,日志處理等業務,但消息隊列也會引入系統可用性,系統複雜度,資料一緻性等問題(強依賴消息隊列的正确執行,需要確定消息不會丢失,確定消息的順序性等)。這意味着如果系統中的消息隊列承擔着重要的角色,那麼消息隊列的可靠性和穩定性也至關重要,本文介紹的 MetaQ/RocketMQ 是側重于維持消息一緻性和高可靠性的消息隊列中間件。

實體架構

MetaQ 的高可用性是基于其實體部署架構實作的,在生産者為消息定義了一個 topic 之後,消費者可以訂閱這個 topic ,于是消息就有了從生産到消費的路由指向。

一文總結 MetaQ/RocketMQ 原理

NameServer 負責暴露消息的 topic ,是以可以以将 NameServer 了解成一個注冊中心,用來關聯 topic 和對應的 broker ,即消息的存儲位置。NameServer 的每個節點都維護着 topic 和 broker 的映射關系,每個節點彼此獨立,無同步。在每個NameServer節點内部都維護着所有 Broker 的位址清單,所有 Topic 和 Topic 對應 Queue 的資訊等。消息生産者在發送消息之前先與任意一台 NameServer 建立連接配接,擷取 Broker 伺服器的位址清單,然後根據負載均衡算法從清單中選擇一台消息伺服器發送消息。

Broker 主要負責消息的存儲和轉發,分為 master 和 slave,是一寫多讀的關系。broker 節點可以按照處理的資料相同劃分成副本組,同一組 master 和 slave 的關系可以通過指定相同 brokerName,不同的 brokerId 來定義,brokerId 為 0 辨別 master,非 0 是 slave。每個 broker 伺服器會與 NameServer 叢集建立長連接配接(注意是跟所有的 NameServer 伺服器,因為 NameServer 彼此之間獨立不同步),并且會注冊 topic 資訊到 NameServer 中。複制政策是 Broker 的 Master 與 Slave 間的資料同步方式,分為同步複制與異步複制。由于異步複制、異步刷盤可能會丢失少量消息,是以 Broker 預設采用的是同步雙寫的方式,消息寫入 master 成功後,master 會等待 slave 同步資料成功後才向 Producer 傳回成功 ACK ,即 Master 與 Slave 都要寫入成功後才會傳回成功 ACK 。這樣可以保證消息發送時消息不丢失。副本組中,各個節點處理的速度不同,也就有了日志水位的概念 (高水位對消費者不可見)。在 master 當機時,同步副本集中的其餘節點會自動選舉出新的 master 代替工作(Raft 協定)。

一文總結 MetaQ/RocketMQ 原理

Producer,消息生産者,與 NameServer 随機一個節點建立長連接配接,定時從 NameServer 擷取 topic 路由資訊,與 master broker 建立長連接配接,定時發送心跳,Producer 隻與 master 建立連接配接産生通信,不與 slave 建立連接配接。生産者和消費者都有組(Group)的概念,同一組節點的生産/消費邏輯相同。

Consumer,消息消費者,與 NameServer 随機一個節點建立長連接配接,定時從 NameServer 擷取 topic 的路由資訊,并擷取想要消費的 queue 。可以和提供服務的 master 或 slave 建立長連接配接,定時向 master 和 slave 發送心跳,既可以從 master 訂閱消息,也可以從 slave 訂閱消息。

一文總結 MetaQ/RocketMQ 原理

消息的存儲

MetaQ 将消息存儲(持久化)到位于生産者和消費者之間的一個消息代理(Message Broker)上。

一文總結 MetaQ/RocketMQ 原理

MetaQ 消息模型:

  • Message 機關消息;
  • Topic 消息的類型,生産者對應消費者的分區辨別;
  • Tag 消息在相同 Topic 時的二級分類辨別,可用于消息的篩選;
  • Queue 實體分區,一個 Topic 對應多個 Queue;
  • Group 生産者或消費者的邏輯分組,同一個 Group 的 生産者/消費者 通常 生産/消費 同一類消息,并且 生産/消費 的邏輯一緻;
  • Offset:偏移值, 表示消費到的位置或待消費的消息位置;
一文總結 MetaQ/RocketMQ 原理

消息的存儲方式對消息隊列的性能有很大影響,如 ActiveMQ 會使用隊清單來存儲消息,依靠輪訓、加鎖等方式檢查和處理消息,但對于 QPS 很高的系統來說,一下子積壓龐大的資料量在表中會導緻 B+ 樹索引層級加深,影響查詢效率。KV 資料庫采用如 LSM 樹作為索引結構,對讀性能有較大的犧牲,這對于消息隊列而言很難接受,因為消息隊列常常需要面對消費失敗需要重試的情況。

RocketMQ/Kafka/RabbitMQ 等消息隊列會采用順序寫的日志結構,将消息刷盤至檔案系統作持久化。順序寫日志檔案可以避免頻繁的随機通路而導緻的性能問題,而且利于延遲寫入等優化手段,能夠快速儲存日志。Kafka 會為每個 topic (事件的組織和存儲機關,一個 topic 可以對應多個生産者和多個消費者) 劃分出一個分區日志,便于根據 topic 順序消費,消息被讀取後不會立刻删除,可以持久存儲,但 topic 數量增加的時候,broker 的分區檔案數量增大,會使得本來速度很快的順序寫變成随機寫(不同檔案之間移動),性能大幅下降。

一文總結 MetaQ/RocketMQ 原理

MetaQ 2.0 對這部分進行重新設計,其存儲結構主要包括 CommitLog 和 Consume queue 兩部分。

CommitLog 是實體存儲,存儲不定長的完整消息記錄,邏輯上是完全連續的一個檔案,實體上單個檔案大小是 1 GB,檔案名是目前檔案首位址在 CommitLog 中的偏移量。隻要 CommitLog 落盤,就可以認為已經接收到消息,即使 Cosume queue 丢失,也可以從 CommitLog 恢複。而所有 topic 的消息都會存儲在同一個 CommitLog 中來保證順序寫。這樣的結構會導緻 CommitLog 讀取完全變成随機讀,是以需要 Consume queue 作為索引隊列 (offset, size, tag),每個 topic-queue 的消息在寫完 CommitLog 之後,都會寫到獨立的 Consume queue ,隊列裡的每個元素都是定長的中繼資料,内容包含該消息在對應 CommitLog 的 offset 和 size ,還包括 tagcode 可支援消息按照指定 tag 進行過濾。順序寫是 MetaQ 實作高性能的基礎。

一文總結 MetaQ/RocketMQ 原理

基于這樣的存儲結構,MetaQ 對用戶端暴露的主要是 Consume queue 邏輯視圖,提供隊列通路接口。消費者通過指定 Consume queue 的位點來讀取消息,通過送出 Consume queue 的位點來維護消費進度。Concume queue 每個條目長度固定(8個位元組CommitLog實體偏移量、4位元組消息長度、8位元組tag哈希碼),單個 ConsumeQueue 檔案預設最多包括 30 萬個條目。這樣做的好處是隊列非常輕量級,Consume Queue 非常小,且在消費過程中都是順序讀取,其速度幾乎能與記憶體讀寫相比,而在 page cache 和良好的空間局部性作用下,CommitLog 的通路也非常快速。

一文總結 MetaQ/RocketMQ 原理

MetaQ 會啟動一個定時服務 ReputMessageService 定時調用(間隔 1ms)來生成 Consume queue 和 其它索引檔案。

Consume queue 解決了順序消費的問題,但如果需要根據屬性進行篩選,就必須用到 index 索引。

一文總結 MetaQ/RocketMQ 原理

index 索引支援根據 key 值進行篩選,查找時,可以根據消息的 key 計算 hash 槽的位置,hash 槽中存儲着 Index 條目的位置,可以根據這個 index 條目獲得一個連結清單(尾),每個 index 條目包含在 CommitLog 上的消息主體的實體偏移量。

消息鍊路

MetaQ 的消息可以根據 topic-queue 劃分出确定的從生産者到消費者路由指向。

一文總結 MetaQ/RocketMQ 原理

1.producer 指定 broker 和 queue 發送消息 msg ;

2.broker 接收消息,并完成緩存、刷盤和生成摘要(同時根據 tag 和 user properties 對 msg 進行打标)等操作;

3.consumer 每隔一段時間( pullInterval )從 broker 端的(根據服務端消息過濾模式 tag 或 sql 過濾後)擷取一定量的消息到本地消息隊列中(單線程)

4.consumer 按照配置并發配置設定上述隊列消息并執行消費方法;

5.consumer 傳回 broker 消費結果并重置消費位點;

生産者

Topic 是消息的主題,每個 topic 對應多個隊列,多個隊列會均勻的分布在多個 broker 上,Producer 發送的消息在 broker 上會均衡的分布在多個隊列中,Producer 發送消息時在多個隊列間輪詢確定消息的均衡。

一文總結 MetaQ/RocketMQ 原理

發送消息的具體操作如下:

1、查詢本地緩存是否存儲了 TopicPublishInfo ,否則從 NameServer 擷取

2、根據負載均衡選擇政策擷取待發送隊列并輪訓通路

3、擷取消息隊列對應的 broker 實際 IP

4、設定消息 Unique ID ,zip 壓縮消息

5、消息校驗(長度等),發送消息

Producer 發送的每條消息都包含一個 Topic,表示一類消息的集合。同時還有一個 Tag,用于區分同一Topic 下不同類型的消息。一個 Topic 包括多個 Queue,每個 Queue 中存放該 Topic 對應消息的位置。一個 Topic 的 Queue 相當于該 Topic 中消息的分區,Queue 可以存儲在不同的 Broker 上。發送消息時,Producer 通過負載均衡子產品選擇相應的 Broker 叢集隊列進行消息投遞。

消息發送時如果出現失敗,預設會重試 2 次,在重試時會盡量避開剛剛接收失敗的 Broker,而是選擇其它 Broker 上的隊列進行發送,進而提高消息發送的成功率。

消費者

消費方式

  • 廣播消費:Producer 向一些隊列輪流發送消息,隊列集合稱為 Topic,每一個 Consumer 執行個體消費這個 Topic 對應的所有隊列。
  • 叢集消費:多個 Consumer 執行個體平均消費這個 Topic 對應的隊列集合。

MetaQ 消費者端有多套負載均衡算法的實作,比較常見的是平均配置設定和平均循環配置設定,預設使用平均配置設定算法,給每個 Consumer 配置設定均等的隊列。一個 Consumer 可以對應多個隊列,而一個隊列隻能給一個 Consumer 進行消費,Consumer 和隊列之間是一對多的關系。

叢集模式下有一點需要注意:消費隊列負載機制遵循一個通用的思想,一個消息隊列同時隻允許被一個消費者消費,一個消費者可以消費多個消費隊列。是以當 Consumer 的數量大于隊列的數量,會有部分 Consumer 配置設定不到隊列,這些配置設定不到隊列的 Consumer 機器不會有消息到達。

平均配置設定算法舉例:

  • 如果有 5 個隊列,2 個 consumer,consumer1 會配置設定 3 個隊列,consumer2 配置設定 2 個隊列;
  • 如果有 6 個隊列,2 個 consumer,consumer1 會配置設定 3 個隊列,consumer2 也會配置設定 3 個隊列;
  • 如果 10 個隊列,11 個 consumer,consumer1~consumer10 各配置設定一個隊列,consumer11 無隊列配置設定;

如果消費叢集規模較大:例如 topic 隊列資源是 128 個,而消費機器數有 160 台,按照一個隊列隻會被一個消費叢集中一台機器處理的原則,會有 32 台機器不會收到消息,此種情況需要聯系 MetaQ 人員進行擴容評估。

消費重試:當出現消費失敗的消息時,Broker 會為每個消費者組設定一個重試隊列。當一條消息初次消費失敗,消息隊列會自動進行消費重試。達到最大重試次數後,若消費仍然失敗,此時會将該消息發送到死信隊列。對于死信消息,通常需要開發人員進行手動處理。

一文總結 MetaQ/RocketMQ 原理

在消費時間過程中可能會遇到消息消費隊列增加和減少、消息消費者增加或減少,此時需要對消息消費隊列進行重新平衡,既重新配置設定 (rebalance),這就是所謂的重平衡機制。在 RocketMQ 中,每隔 20s 會根據目前隊列數量、消費者數量重新進行隊列負載計算,如果計算出來的結果與目前不一樣,則觸發消息消費隊列的重配置設定。

Consumer 啟動時會啟動定時器,還執行一些定時同步任務,包括:同步 nameServer 位址,從 nameServer 同步 topic 的路由資訊,清理 offline 的 broker,并向所有 broker 發送心跳,配置設定給目前 consumer 的每個隊列将最新消費的 offset 同步給 broker。

消息消費過程淺析

三個關鍵服務: RebalanceService、PullMessageService、MessageConsumeService

RebalanceService 負載均衡服務

定時執行一次負載均衡(20 s)配置設定消息隊列給消費者。負載均衡針對每個 topic 獨立進行,具體如下:

private void rebalanceByTopic(final String topic, final boolean isOrder) {
        switch (messageModel) {
            case BROADCASTING: {
                Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
                if (mqSet != null) {
                    boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);//廣播模式下每個消費者要消費所有 queue 的消息
                    if (changed) {
                        this.messageQueueChanged(topic, mqSet, mqSet);
                        log.info("messageQueueChanged {} {} {} {}",
                            consumerGroup,
                            topic,
                            mqSet,
                            mqSet);
                    }
                } else {
                    log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
                }
                break;
            }
            case CLUSTERING: {
                Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);//找到該topic下的消息隊列集合
                List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);//找到給消費者組下的所有消費者id
                if (null == mqSet) {
                    if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                        log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
                    }
                }
                if (null == cidAll) {
                    log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic);
                }
                
                if (mqSet != null && cidAll != null) {
                    List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
                    mqAll.addAll(mqSet);


                    Collections.sort(mqAll);
                    Collections.sort(cidAll);
                    
                    AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;
                    
                    List<MessageQueue> allocateResult = null;
                    try {
                        allocateResult = strategy.allocate(
                            this.consumerGroup,
                            this.mQClientFactory.getClientId(),
                            mqAll,
                            cidAll);// 根據配置設定政策進行配置設定
                    } catch (Throwable e) {
                        log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),
                            e);
                        return;
                    }
                    
                    Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
                    if (allocateResult != null) {
                        allocateResultSet.addAll(allocateResult);
                    }
                    boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);// 更新處理隊清單
                    
                    if (changed) {
                        log.info(
                            "rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}",
                            strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(),
                            allocateResultSet.size(), allocateResultSet);
                        this.messageQueueChanged(topic, mqSet, allocateResultSet);
                    }
                }
                break;
            }
            default:
                break;
        }
    }           

這裡主要做了幾件事:

  • 判斷消費模式
  • 廣播模式

i.找到 topic 下的消息隊列(queue)集合

ii.更新處理隊清單

  • 叢集模式

i.找到 topic 下的消息隊列集合

ii.找到消費者組下所有消費者 id

iii.根據配置設定政策進行配置設定

iv.更新處理隊清單,開始真正拉取消息請求

消費者會将消費位點更新到 NameServer 上,Rebalance 發生時,讀取消費者的消費位點資訊,需要注意在消費者數量大于隊列數量的情況下,如果消費者不及時更新消費位點資訊,可能會導緻消息被重複消費。是以,消費者需要及時更新消費位點資訊,確定消費進度正确。

Consumer 建立的時候 Rebalance 會被執行。整個 rebalanceService 的作用就是不斷的通過負載均衡,重新配置設定隊列的過程。根據配置設定好的隊列建構拉取消息的請求,然後放到 pullRequestQueue 中。

PullMessageService 拉取消息服務

首先拉取消息時最重要的是确定偏移量 offset,這存儲在消費者端的 OffsetStore 對象中。

if (this.defaultMQPushConsumer.getOffsetStore() != null) {
          this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
        } else {
          switch (this.defaultMQPushConsumer.getMessageModel()) {
            case BROADCASTING:
              this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
              break;
            case CLUSTERING:
              this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
              break;
            default:
              break;
          }
}
this.offsetStore.load();           

可以看到廣播模式和叢集模式的對象類型不同,這是因為對 offset 的維護的方式不一樣,在 load 的時候 LocalFileOffsetStore 會從本地檔案加載這個 offset,而 RemoteBrokerOffsetStore 的 load 函數是空的。

兩種對象類型分别有 readOffset 函數支援從記憶體中擷取 offset 值,以及分别從本地檔案存儲和 broker 擷取 offset。需要注意叢集模式下消費者隻需要關心 broker 上維護的消費進度,因為不論 queue 和 消費者的映射關系如何切換, 隻有 offset 之後的未消費消息是消費者需要關心的。

一文總結 MetaQ/RocketMQ 原理

消息的拉取過程是一個不斷循環的生産者消費者模型,一個 PullRequest 就對應一個拉取任務,并和一對MessageQueue(儲存 Consume queue 的資訊)和 ProcessQueue 關聯,消息拉取的過程中,PullMessageService 拉取線程不停的讀取 PullRequestQueue 根據 PullRequest 拉取消息。拉取到消息時,消息送出到 ProcessQueue 中并建立 ConsumeRequest 送出到 ConsumeService 處理, 然後生成下一批的 PullRequest 丢到 PullRequestQueue。如果沒有拉取到消息或出現異常,則會重新将請求放回拉取隊列。ProcessQueue 中以 TreeMap 形式儲存待處理的消息, key 為消息對應的 offset ,并自動進行排序。

一文總結 MetaQ/RocketMQ 原理

消息拉取過程:

1.PullMessageService 不斷循環周遊,從 PullRequestQueue 中提取 PullRequest,根據 nextOffset 去 broker 拉取消息,若該隊列 已經 dropped 則更新 offset 到 broker 并丢棄此拉消息請求。

2.PullMessageService 異步拉取消息,同時将 PullRequest 封裝在 PullCallback 中,PullCallback 封裝在 ResponseFuture中,并以自增的請求 id 為鍵,ResponseFuture 為值放入 ResponseTable 中。

3.Broker 收到請求,如果 offset 之後有新的消息會立即發送異步響應;否則等待直到 producer 有新的消息發送後傳回或者逾時。如果通信異常或者 Broker 逾時未傳回響應,nettyClient 會定時清理逾時的請求,釋放 PullRequest 回到 PullRequestQueue。

4.用最新的 offset 更新 ResponseFuture 裡的 PullRequest 并推送給 PullRequestQueue 裡以進行下一次拉取。批量拉取到的消息分批送出給 consumeExecutor 線程處理。

消費控速

MetaQ 為消費者端拉取消息提供了消費控速的能力:

  • 主動控速,在整個消費過程中我們可以發現,如果想要做到流控,一個是控制生成 PullRequest 的時間間隔,一個是控制生成新一批的請求數量,是以 MetaQ 提供了兩個參數給我們 pullInterval、pullBatchSize ,主動控速的邏輯是通過控制消息的拉取速度來達到降低速率的效果。
  • 被動控速,這種流量控制的方式要複雜得多,需要使用者在消費消息時控制流量 (sentinel),由于消費線程池的待消費隊列的消息達到一定門檻值之後,MetaQ 會被動降低 PullRequest 的産生的速率,是以當采用流量控制手段通過埋點降低消費速度時,待消費隊列會逐漸占滿,觸發降速機制;為什麼不直接用 sentinel ?因為 sentinel 快速失敗等政策觸發限流後會産生大量重試,重試消息會進入重試隊列,當重試的量逐漸增大,broker 上重試隊列中消息量也越來越多,并且重試消息再次投遞時還可能再次發生重試,又重新進入重試隊列,同一條消息反複進出隊列,這種無意義的重複動作會增加 broker 的壓力。

消息種類

普通消息

可選擇同步、異步或單向發送。同步:Producer 發出一條消息後,會在收到 MQ 傳回的 ACK 之後再發送下一條消息。異步:Producer 發出消息後無需等待 MQ 傳回 ACK ,直接發送下一條消息。單向: Producer 僅負責發送消息,不等待,MQ 也不傳回 ACK。

順序消息

消息的順序性分為兩種:

  • 全局順序:對于指定的一個 Topic ,所有消息按照嚴格的先入先出的順序進行釋出和消費 (同一個 queue)。
  • 分區順序:對于一個指定的 Topic ,所有消息根據 sharding key 進行分區,同一個分區内的消息按照嚴格的 FIFO 順序進行釋出和消費,分區之間彼此獨立。

MetaQ 隻支援同一個 queue 的順序消息,且同一個 queue 隻能被一台機器的一個線程消費,如果想要支援全局消息,那需要将該 topic 的 queue 的數量設定為 1,犧牲了可用性。

消息事務

一文總結 MetaQ/RocketMQ 原理

1.發送方向 MQ 服務端發送消息。

2.MQ Server 将消息持久化成功之後,向發送方 ACK 确認消息已經發送成功,此時消息為半消息。

3.發送方開始執行本地事務邏輯。

4.發送方根據本地事務執行結果向 MQ Server 送出二次确認(Commit 或是 Rollback),MQ Server 收到 Commit 狀态則将半消息标記為可投遞,訂閱方最終将收到該消息;MQ Server 收到 Rollback 狀态則删除半消息,訂閱方将不會接受該消息。

5.在斷網或者是應用重新開機的特殊情況下,上述步驟4送出的二次确認最終未到達 MQ Server,經過固定時間後 MQ Server 将對該消息發起消息回查。

6.發送方收到消息回查後,需要檢查對應消息的本地事務執行的最終結果。

7.發送方根據檢查得到的本地事務的最終狀态再次送出二次确認,MQ Server 仍按照步驟 4 對半消息進行操作。

MetaQ 3.0 以後,新的版本提供更加豐富的功能,支援消息屬性、無序消息、延遲消息、廣播消息、長輪詢消費、高可用特性,這些功能基本上覆寫了大部分應用對消息中間件的需求。除了功能豐富之外,MetaQ 基于順序寫,大機率順序讀的隊列存儲結構和 pull 模式的消費方式,使得 MetaQ 具備了最快的消息寫入速度和百億級的堆積能力,特别适合用來削峰填谷。在 MetaQ 3.0 版本的基礎上,衍生了開源版本 RocketMQ。

高可用

如何做到不重複消費也不丢失消息?

重複消費問題

  • 發送時消息重複【消息 Message ID 不同】:MQ Producer 發送消息時,消息已成功發送到服務端并完成持久化,此時網絡閃斷或者用戶端當機導緻服務端應答給用戶端失敗。如果此時 MQ Producer 意識到消息發送失敗并嘗試再次發送消息,MQ 消費者後續會收到兩條内容相同但是 Message ID 不同的消息。
  • 投遞時消息重複【消息 Message ID 相同】:MQ Consumer 消費消息場景下,消息已投遞到消費者并完成業務處理,當用戶端給服務端回報應答的時候網絡閃斷。為了保證消息至少被消費一次,MQ 服務端将在網絡恢複後再次嘗試投遞之前已被處理過的消息,MQ 消費者後續會收到兩條内容相同并且 Message ID 也相同的消息。

MetaQ 不能保證消息不重複,是以對于重複消費情況,需要業務自定義唯一辨別作為幂等處理的依據。

消息丢失問題

MetaQ 避免消息丢失的機制主要包括:重試、備援消息存儲。在生産者的消息投遞失敗時,預設會重試兩次。消費者消費失敗時,在廣播模式下,消費失敗僅會傳回 ConsumeConcurrentlyStatus.RECONSUME_LATER ,而不會重試。在未指定順序消息的叢集模式下,消費失敗的消息會進入重試隊列自動重試,預設最大重試次數為 16 。在順序消費的叢集模式下,消費失敗會使得目前隊列暫停消費,并重試到成功為止。

主從同步

RocketMQ/MetaQ 為每個存儲資料的 Broker 節點配置 ClusterName,BrokerName 辨別來更好的進行資源管理。多個 BrokerName 相同的節點構成一個副本組。每個副本還擁有一個從 0 開始編号,不重複也不一定連續的 BrokerId 用來表示身份,編号為 0 的節點是這個副本組的 Leader / Primary / Master,故障時通過選舉來重新對 Broker 編号辨別新的身份。例如 BrokerId = {0, 1, 3},則 0 為主,其他兩個為備。

從模型的角度來看,RocketMQ /MetaQ 單節點上 Topic 數量較多,如果像 kafka 以 topic 粒度維護狀态機,節點當機會導緻上萬個狀态機切換,這種驚群效應會帶來很多潛在風險,是以新版本的 RocketMQ/MetaQ 選擇以單個 Broker 作為切換的最小粒度來管理,相比于其他更細粒度的實作,副本身份切換時隻需要重配置設定 Broker 編号,對中繼資料節點壓力最小。由于通信的資料量少,可以加快主備切換的速度,單個副本下線的影響被限制在副本組内,減少管理和運維成本。這種實作也存在一些缺點,例如存儲節點的負載無法以最佳狀态在叢集上進行負載均衡。

一文總結 MetaQ/RocketMQ 原理

RocketMQ/MetaQ 采用實體複制的方法,存儲層的 CommitLog 通過連結清單和核心的 MappedFile 機制抽象出一條 append only 的資料流。主副本将未送出的消息按序傳輸給其他副本(相當于 redo log),并根據一定規則計算确認位點(confirm offset)判斷日志流是否被送出。最終一緻性通過資料水位對齊的方式來實作(越近期的消息價值越高):

一文總結 MetaQ/RocketMQ 原理
  • 1-1 情況下滿足備 Max <= 主 Min,一般是備新上線或下線較久,備跳過存量日志,從主的 Min 開始複制。
  • 1-2,2-2 兩種情況下滿足 主 Min < 備 Max <= 主 Max,一般是由于備網絡閃斷導緻日志水位落後,通過 HA 連接配接追随主即可。
  • 1-3,2-3 兩種情況下備 Max > 主 Max,可能由于主異步寫磁盤當機後又成為主,或者網絡分區時雙主寫入造成 CommitLog 分叉。由于新主落後于備,在确認位點對齊後少量未确認的消息丢失,這種非正常模式的選舉是應該盡量避免的。
  • 3-3 理論上不會出現,備的資料長于主,原因可能是主節點資料丢失又疊加了非正常選舉,是以這種情況需要人工介入處理。

副本組的消息複制也支援同步和異步的模式。

複制方式 優點 缺點
同步複制 成功寫入的消息不會丢失,可靠性高 寫入延遲更高
異步複制 slave 當機不影響 master 性能更高 可能丢失消息

slave broker 會定時(60 s)從 master 同步資訊

public void syncAll() {
        this.syncTopicConfig();
        this.syncConsumerOffset();
        this.syncDelayOffset();
        this.syncSubscriptionGroupConfig();
        this.syncMessageRequestMode();
        if (brokerController.getMessageStoreConfig().isTimerWheelEnable()) {
            this.syncTimerMetrics();
        }
    }           

主從切換

RocketMQ 衍生出了很多不同的主從切換架構。

無切換架構

最早的時候,RocketMQ 基于 Master-Slave 模式提供了主備部署的架構,這種模式提供了一定的高可用能力,在 Master 節點負載較高情況下,讀流量可以被重定向到備機。由于沒有選主機制,在 Master 節點不可用時,這個副本組的消息發送将會完全中斷,還會出現延遲消息、事務消息等無法消費或者延遲。此外,備機在正常工作場景下資源使用率較低,造成一定的資源浪費。為了解決這些問題,社群提出了在一個 Broker 程序内運作多個 BrokerContainer,這個設計類似于 Flink 的 slot,讓一個 Broker 程序上可以以 Container 的形式運作多個節點,複用傳輸層的連接配接,業務線程池等資源,通過單節點主備交叉部署來同時承擔多份流量,無外部依賴,自愈能力強。這種方式下隔離性弱于使用原生容器方式進行隔離,同時由于架構的複雜度增加導緻了自愈流程較為複雜。

切換架構

另一條演進路線則是基于可切換的,RocketMQ 也嘗試過依托于 Zookeeper 的分布式鎖和通知機制進行 HA 狀态的管理。引入外部依賴的同時給架構帶來了複雜性,不容易做小型化部署,部署運維和診斷的成本較高。另一種方式就是基于 Raft 在叢集内自動選主,Raft 中的副本身份被透出和複用到 Broker Role 層面去除外部依賴,然而強一緻的 Raft 版本并未支援靈活的降級政策,無法在 C(Consistency)和 A (Availability)之間靈活調整。兩種切換方案都是 CP 設計,犧牲高可用優先保證一緻性。主副本下線時選主和路由定時更新政策導緻整個故障轉移時間依然較長,Raft 本身對三副本的要求也會面臨較大的成本壓力。

RocketMQ DLedger 融合模式

RocketMQ DLedger (基于 Raft 的分布式日志存儲)融合模式是 RocketMQ 5.0 演進中結合上述兩條路線後的一個系統的解決方案。

模式 優點 缺點
無切換 Master-Slave 模式 實作簡單,适用于中小型使用者,人工管控力強 故障需要人工處理,故障時寫入消息失敗,導緻消息消費暫停
Broker Container 模式 無需選主,無外部依賴,故障轉移非常快 (< 3 秒) 增加單節點運維的複雜度,機器故障的風險增加,自愈流程複雜
切換架構 Raft 自動選主模式 自動主備切換 故障轉移時間較長,強一緻無法靈活降級,三副本成本壓力較大
融合架構 基于 Dledger Controller 的可切換模式 可支援無切換和切換架構之間的轉換,複制協定更簡單,靈活降級 提高了部署和系統的複雜度

總結

相比較于 RocketMQ/MetaQ,Kafka 具有更高的吞吐量。Kafka 預設采用異步發送的機制,并且還擁有消息收集和批量發送的機制,這樣的設定可以顯著提高其吞吐量。由于 Kafka 的高吞吐量,是以通常被用于日志采集、大資料等領域。

RocketMQ/MetaQ 不采用異步的方式發送消息。因為當采用異步的方式發送消息時,Producer 發送的消息到達 Broker 就會傳回成功。此時如果 Producer 當機,而消息在 Broker 刷盤失敗時,就會導緻消息丢失,進而降低系統的可靠性。

RocketMQ/MetaQ 單機可以支援更多的 topic 數量。因為 Kafka 在 Broker 端是将一個分區存儲在一個檔案中的,當 topic 增加時,分區的數量也會增加,就會産生過多的檔案。當消息刷盤時,就會出現性能下降的情況。而 RocketMQ/MetaQ 是将所有消息順序寫入檔案的,是以不會出現這種情況。

當 Kafka 單機的 topic 數量從幾十到幾百個時,就會出現吞吐量大幅度下降、load 增高、響應時間變長等現象。而 RocketMQ/MetaQ 的 topic 數量達到幾千,甚至上萬時,也隻是會出現小幅度的性能下降。

綜上所述,Kafka 具有更高的吞吐量,适合應用于日志采集、大資料等領域。而 RocketMQ/MetaQ 單機支援更多的 topic,且具有更高的可靠性(一緻性支援),是以适用于淘寶這樣複雜的業務處理。

繼續閱讀