天天看點

RocketMQ進階篇

RocketMQ學習之旅三

本文對RocketMQ消息的存儲模型、高可用性機制、負載均衡、消息重試、死信隊列和消費幂等等知識點做一些總結與分享。

一、存儲媒體

分布式隊列因為有高可靠性的要求,是以資料要進行持久化存儲,目前有如下兩種存儲媒體:

  1. 關系型資料庫DB
  • Apache下開源的另一款MQ【ActiveMQ(預設采用的KahaDB做消息存儲)】可選用JDBC的方式來做消息持久化,通過建黨的xml配置資訊即可實作JDBC消息存儲。
  • 由于普通關系型資料庫(如MySQL)在單表資料量達到千萬級别的情況下,其IO讀寫性能往往會出現瓶頸。在可靠性方面,該方案非常依賴DB,如果一旦DB出現故障,則MQ的消息就無法落盤存儲會導緻線上故障。
  1. 檔案系統
  • 目前業界比較常用的幾款産品(RocketMQ/Kafka/RabbitMQ)均采用的是消息刷盤至所部署虛拟機/實體機的檔案系統來做持久化(刷盤一般可以分為異步刷盤和同步刷盤兩種模式)。消息刷盤為消息存儲提供了一種高效率、高性能的資料持久化方式。除非部署MQ機器或是本地磁盤挂了,否則一般是不會出現無法持久化的故障問題。

性能對比: 檔案系統 > 關系型資料庫

二、RocketMQ 消息的存儲和發送

消息存儲

  • 磁盤如果使用得當,磁盤的速度完全可以比對上網絡的資料傳輸速度。目前的高性能磁盤,順序寫速度可以達到600MB/s,超過了一般網卡的傳輸速度。但是磁盤随機寫的速度隻有大概100KB/s,和順序寫的性能相差6000倍。因為有如此巨大的速度差别,好的消息隊列系統會比普通的消息隊列系統速度快多個數量級。
  • RocketMQ的消息用順序寫,保證了消息存儲的速度。

消息發送

linux作業系統分為【使用者态】和【核心态】,檔案操作、網絡操作需要涉及這兩種形态的切換,免不了進行資料複制。

一台伺服器把把本機磁盤檔案的内容發送到用戶端,一般分為連個步驟:

1)read;讀取本地檔案内容;

2)write;将讀取的内容通過網絡發送出去。

這兩個看似簡單的操作,實際上進行了4次資料複制,分别是:

1)從磁盤複制資料到核心态記憶體;

2)從核心态記憶體複制到使用者态記憶體;

3)然後從使用者太記憶體複制到網絡驅動的核心态記憶體;

4)從網絡驅動的核心态記憶體複制到網卡中進行傳輸。

資料 -1⃣️-》核心态 -2⃣️-》 使用者态 -3⃣️-》 網絡啟動核心 -4⃣️-》 網卡 --》 資料

通過使用mmap的方式,可以省去向使用者态的記憶體複制(省略步驟2),提高速度。這種機制在java中是通過MappedByteBuffer實作的RocketMQ充分利用了上述特性,也就是所謂的“零拷貝”技術,提供消息存盤和網絡發送的速度。

注意:采用MappedByteBuffer這種記憶體映射的方式有幾個限制,其中之一是一次隻能映射1.5~2G的檔案至使用者态記憶體,這也就是為何RocketMQ預設設定單個CommitLog日志資料檔案為1G的原因。
           

三、RocketMQ 消息存儲結構

rocketMQ消息的存儲是由ConsumeQueue和CommitLog配合完成的,消息真正的實體存儲檔案是CommitLog,ConsumeQueue是消息的邏輯隊列,類似資料庫的索引檔案,存儲的是指向實體存儲的位址。每個Topic下的每個Message Queue 都有一個對應的ConsumeQueue檔案。如下圖所示:

RocketMQ進階篇

CommitLog:存消息的中繼資料

ConsumerQueue:存儲消息在CommitLog的索引(如果ConsumerQueue丢失,不要緊張,通過CommitLog可以将其還原)

IndexFile:為了消息查詢提供了一種通過key或者時間來查詢消息的方法,這種通過IndexFile來查找消息的方法不影響發送與消費消息的主流程。

四、刷盤機制

RocketMQ的消息時存儲到磁盤上的,這樣即保證斷電後恢複,又可以讓存儲的消息量超出記憶體的限制。RocketMQ為了提高性能,會盡可能地保證磁盤的順序寫。消息在通過Producer寫入RocketMQ的時候,有兩種寫磁盤方式,分布式同步刷盤和異步刷盤。

RocketMQ進階篇
  1. 同步刷盤(保證量消息的可靠性)

    - 在傳回寫成功狀态時,消息已經被寫入磁盤

    - 具體流程是,消息寫入記憶體的PAGECACHE後,立刻通知刷盤線程刷盤,然後等待刷盤完成,刷盤線程執行完成後喚醒等待的線程,傳回消息寫成功的狀态。

  2. 異步刷盤(可以提高消息發送的吞吐量)

    - 在傳回寫成功狀态時,消息可能隻能被寫入來記憶體的PAGECACHE,寫操作的傳回快,吞吐量大;當記憶體裡的消息量積累搭配一定程度時,統一觸發寫磁盤動作,快速寫入。

配置:通過配置檔案中的 flushDiskType 參數進行配置 同步【SYNC_FLUSH】/ 異步【ASYNC_FLUSH】

五、RocketMQ 的高可用性機制

RocketMQ進階篇

1.消息消費高可用

在Consumer的配置檔案中,并不需要設定是從Master讀還是從Slave中讀,當MAster不可用或者繁忙的時候,Consumer會被自動切換到Slave讀。有了自動切換Consumer這種機制,當一個Master角色的機器出現故障後,Consumer仍然可以從Slave讀取消息,不影響Consumer程式。

2. 消息發送高可用

在建立Topic的時候,把Topic的多個Message Queue 建立在多個Broker組上,這樣當一個Broker組的Master不可用後,其他組的Master仍然可用,Producer仍然可以發送消息。

RocketMQ目前還不支援把Slave自動轉換成Master,如果機器資源不足,需要把Slave轉成Master,則要手動停止Slave角色的Broker,更改配置檔案,用新的配置檔案啟動Broker。

3.消息主從複制

1)同步複制

2)異步複制

通過 brokerRole 參數配置 SLAVE【從結點】/ SYNC_MASTER【同步複制Master】/ ASYNC_MASTRER【異步複制Master】

4. 推薦的模式

異步刷盤+同步複制

六、負載均衡

1.Producer負載均衡

Producer端,每個執行個體在發消息端時候,預設會輪詢所有端messgae queue 發送,以達到讓消息平均落在不同端queue上。

2.Consumer負載均衡(叢集模式)

在叢集消費模式下,每條消息隻需要投遞到訂閱這個topic到Consumer Group下的一個執行個體即可。RocketMQ采用主動拉取的方式拉取并消費消息,在拉取的時候需要明确指定拉取哪一條message queue,而每當執行個體的數量有變更,都會觸發一次所有執行個體的負載均衡,這個時候會按照queue的數量和執行個體的數量平均配置設定queue給每個執行個體。

預設的配置設定算法:

RocketMQ進階篇

AllocateQueueAveragely 每個consumer執行個體平均配置設定每個consume queue

另一種算法:

RocketMQ進階篇

AllocateMessageQueueAveragelyByCircle,平均分攤每一條queue,隻是以環狀輪流份queue的形式。

3.注意:

  • 叢集模式下,queue都是隻允許配置設定一個執行個體
  • 通過增加consumer執行個體去分攤queue的消費,可以起到水準擴張消費能力的作用。
  • 有執行個體下線的時候,會重新觸發負載均衡,這時候原來配置設定到的queue将配置設定到其他的執行個體上繼續消費。
  • 如果consumer執行個體的數量比message queue的總數量還多的話,多出來的consumer執行個體将無法分到queue,也就無法消費到消息,起到分攤負載的作用。
  • 是以需要空中queue的總數量大于等于consumer的數量

七、消息重試

1.順序消息的重試

對于順序消息,當消費這消費消息失敗後,消息隊列RocketMQ會自動不斷進行消息重試(每次時間間隔為1秒),這時,應用會出現消息消費被阻塞的情況。是以,在使用順序消息時,務必保證應用能夠及時監控并并處理消息消費失敗的情況,避免阻塞現象的發生。

2.無序消息重試

對于無序消息(普通、定時、延時、事務消息),當消費者消費消息失敗時,可以通過設定傳回狀态達到消息重試的結果。無序消息的重試隻針對叢集消費方式生效;廣播方式不提供失敗重試特性,即消費失敗後,失敗消息不再重試,繼續消費新的消息。

3.重試次數

消息隊列RocketMQ預設允許每條消息最多重試16次。如下圖所示:

RocketMQ進階篇

如果消息重試16次後仍然失敗,消息将不再投遞。

如果嚴格按照重試時間來計算時間間隔,某條消息在一直消費失敗的前提下,将會在接下來的4小時46分鐘内進行16次重試,超過這個時間範圍,消息将不再投遞。

一條消息無論重試多少次,這些重試消息的Message ID 不會改變。

4.重試的實作

叢集消費方式下,消息消費失敗後期望消息重試,需要在消息監聽器接口的實作中明确進行配置,有如下三種方式:

  1. 傳回 Action.ReconsumeLater
  2. 傳回 null
  3. 直接抛出異常,消息将重試

5.自定義重試的最大次數

Properties properties = new Properties();
//配置對應 Group ID 的最大消費重試次數
properties.put(PropertyKeyConst.MaxReconsumeTimes,"20");
Consumer consumer = ONSFactory.createConsumer(properties);
           

1.最大重試次數小于等于16次,則重試時間間隔同上圖所示

2.最大重試次數大于16次,超過16次的重試時間間隔均為每次2小時

6.注意事項

1.消息最大重試次數的設定相對相同Group ID 下的所有執行個體Consumer有效

2.采用覆寫對方式生效,最後啟動對Consumer執行個體會覆寫之前啟動對配置

八、死信隊列

當一條消息初次消費失敗,消息隊列RocketMQ會自動進行消息重試;達到最大重試次數後,若依舊消費失敗,則表明消費者在正常情況下無法正确的消費該消息,此時,消息隊列RocketMQ不會立即将消息丢棄,而是将其發送到該消費者對應對特殊隊列中。可以通過控制台檢視,并重新發送該消息。

死信消息的特征

1.不會再被消費者正常消費

2.有效期與正常消息相同,均為3天,3天以後會被自動删除。是以,請在死信消息産生後對3天内即使處理。

死信隊列的特征

1.一個死信隊列對應一個Group ID ,而不是對應單個消費者執行個體

2.如果一個Group ID 未産生死信消息,消息隊列RocketMQ 不會為其建立相應的死信隊列

3.一個死信隊列包含了對應Group ID産生的所有死信消息,不論該消息屬于哪個Topic

九、幂等消息

消息隊列RocketMQ消費者在接收到消息以後,有必要根據業務上的唯一Key對消息做幂等處理。

消息隊列RocketMQ可能會fas重複發送消息的情況:

1)發送時消息重複

當一條消息已經被成功發送到服務端并完成持久化,此時出現了網絡閃斷或者用戶端當機,導緻服務端對用戶端應答失敗。如果此時生産者意識到消息發送失敗并嘗試再次發送消息,消費者後續會收到兩條内容相同并且Message ID也相同對消息。

2)投遞時消息重複

消息消費場景下,消息已經投遞到消費者并且完成了業務處理,當用戶端給服務端回報應答時網絡閃斷。為了保證消息至少被消費一次,RocketMQ對服務端将在網絡恢複後再次嘗試投遞之前已被處理過對消息,消費zhe後續會收到兩條内容相同且Message ID也相同對消息。

3)負載均衡時消息重複(包括但不限于網絡抖動、broker重新開機以及訂閱方式應用重新開機)

當消息隊列RocketMQ對Broker或者用戶端重新開機、擴容或者縮容時,會觸發Rebalance,此時消費者也可能會收到重複對消息。

處理方式

Producter在發送消息時定義唯一對消息key

訂閱方收到消息時可以依據消息對Key進行幂等處理。

學習總結參考 黑馬程式員《RocketMQ系統精講,經受曆年雙十一狂歡節考驗的分布式消息中間件》,圖檔來源于網絡侵删~