天天看點

RocketMQ(七)消息消費

消費者從Broker中擷取消息的方式有兩種:pull拉取方式和push推動方式。消費者組對于消息消費的模式又分為兩種:叢集消費Clustering和廣播消費Broadcasting。

1 擷取消費類型

1.1 拉取式消費

Consumer主動從Broker中拉取消息,主動權由Consumer控制。一旦擷取了批量消息,就會啟動消費過程。不過,該方式的實時性較弱,即Broker中有了新的消息時消費者并不能及時發現并消費。

由于拉取時間間隔是由使用者指定的,是以在設定該間隔時需要注意平穩:間隔太短,空請求比例會增加;間隔太長,消息的實時性太差

1.2 推送式消費

該模式下Broker收到資料後會主動推送給Consumer。該擷取方式一般實時性較高。

該擷取方式是典型的釋出-訂閱模式,即Consumer向其關聯的Queue注冊了監聽器,一旦發現有新的消息到來就會觸發回調的執行,回調方法是Consumer去Queue中拉取消息。而這些都是基于Consumer與Broker間的長連接配接的。長連接配接的維護是需要消耗系統資源的。

1.3 對比

  • pull:需要應用去實作對關聯Queue的周遊,實時性差;但便于應用控制消息的拉取
  • push:封裝了對關聯Queue的周遊,實時性強,但會占用較多的系統資

2 消費模式

2.1 廣播消費

RocketMQ(七)消息消費

廣播消費模式下,相同Consumer Group的每個Consumer執行個體都接收同一個Topic的全量消息。即每條消息都會被發送到Consumer Group中的每個Consumer。

2.2 叢集消費

RocketMQ(七)消息消費

叢集消費模式下,相同Consumer Group的每個Consumer執行個體平均分攤同一個Topic的消息。即每條消息隻會被發送到Consumer Group中的某個Consumer。

2.3 消息進度儲存

  • 廣播模式:消費進度儲存在consumer端。因為廣播模式下consumer group中每個consumer都會消費所有消息,但它們的消費進度是不同。是以consumer各自儲存各自的消費進度。
  • 叢集模式:消費進度儲存在broker中。consumer group中的所有consumer共同消費同一個Topic中的消息,同一條消息隻會被消費一次。消費進度會參與到了消費的負載均衡中,故消費進度是需要共享的。下圖是broker中存放的各個Topic的各個Queue的消費進度。
RocketMQ(七)消息消費

3 Rebalance機制

Rebalance機制讨論的前提是:叢集消費。

3.1 什麼是Rebalance

Rebalance即再均衡,指的是,将⼀個Topic下的多個Queue在同⼀個Consumer Group中的多個Consumer間進行重新配置設定的過程。

RocketMQ(七)消息消費

Rebalance機制的本意是為了提升消息的并行消費能力。例如,⼀個Topic下5個隊列,在隻有1個消費者的情況下,這個消費者将負責消費這5個隊列的消息。如果此時我們增加⼀個消費者,那麼就可以給其中⼀個消費者配置設定2個隊列,給另⼀個配置設定3個隊列,進而提升消息的并行消費能力。

3.2 Rebalance限制

由于⼀個隊列最多配置設定給⼀個消費者,是以當某個消費者組下的消費者執行個體數量大于隊列的數量時,多餘的消費者執行個體将配置設定不到任何隊列。

3.3 Rebalance危害

Rebalance的在提升消費能力的同時,也帶來一些問題:

消費暫停:在隻有一個Consumer時,其負責消費所有隊列;在新增了一個Consumer後會觸發Rebalance的發生。此時原Consumer就需要暫停部分隊列的消費,等到這些隊列配置設定給新的Consumer後,這些暫停消費的隊列才能繼續被消費。

消費重複:Consumer 在消費新配置設定給自己的隊列時,必須接着之前Consumer 送出的消費進度的offset繼續消費。然而預設情況下,offset是異步送出的,這個異步性導緻送出到Broker的offset與Consumer實際消費的消息并不一緻。這個不一緻的內插補點就是可能會重複消費的消息。

同步送出:consumer送出了其消費完畢的一批消息的offset給broker後,需要等待broker的成功ACK。當收到ACK後,consumer才會繼續擷取并消費下一批消息。在等待ACK期間,consumer是阻塞的。

異步送出:consumer送出了其消費完畢的一批消息的offset給broker後, 不需要等待broker的成功ACK。consumer可以直接擷取并消費下一批消息。

對于一次性讀取消息的數量,需要根據具體業務場景選擇一個相對均衡的是很有必要的。因為數量過大,系統性能提升了,但産生重複消費的消息數量可能會增加;數量過小,系統性能會下降,但被重複消費的消息數量可能會減少。

消費突刺:由于Rebalance可能導緻重複消費,如果需要重複消費的消息過多,或者因為Rebalance暫停時間過長進而導緻積壓了部分消息。那麼有可能會導緻在Rebalance結束之後瞬間需要消費很多消息。

3.4 Rebalance産生的原因

導緻Rebalance産生的原因,無非就兩個:消費者所訂閱Topic的Queue數量發生變化,或消費者組中消費者的數量發生變化

1)Queue數量發生變化的場景:
  • Broker擴容或縮容
  • Broker更新運維
  • Broker與NameServer間的網絡異常
  • Queue擴容或縮容
2)消費者數量發生變化的場景:
  • Consumer Group擴容或縮容
  • Consumer更新運維
  • Consumer與NameServer間網絡異常

3.5 Rebalance過程

在Broker中維護着多個Map集合,這些集合中動态存放着目前Topic中Queue的資訊、Consumer Group中Consumer執行個體的資訊。一旦發現消費者所訂閱的Queue數量發生變化,或消費者組中消費者的數量發生變化,立即向Consumer Group中的每個執行個體發出Rebalance通知。

TopicConfigManager:key是topic名稱,value是TopicConfig。TopicConfig中維護着該Topic中所有Queue的資料。

ConsumerManager:key是Consumser Group Id,value是ConsumerGroupInfo。

ConsumerGroupInfo中維護着該Group中所有Consumer執行個體資料。

ConsumerOffsetManager:key為 Topic與訂閱該Topic的Group的組合,即topic@group,value是一個内層Map。内層Map的key為QueueId,内層Map的value為該Queue的消費進度offset。

Consumer執行個體在接收到通知後會采用Queue配置設定算法自己擷取到相應的Queue,即由Consumer執行個體自主進行Rebalance。

3.5 與Kafka對比

在Kafka中,一旦發現出現了Rebalance條件,Broker會調用Group Coordinator來完成Rebalance。

Coordinator是Broker中的一個程序。Coordinator會在Consumer Group中選出一個Group Leader。由這個Leader根據自己本身組情況完成Partition分區的再配置設定。這個再配置設定結果會上報給Coordinator,并由Coordinator同步給Group中的所有Consumer執行個體。

Kafka中的Rebalance是由Consumer Leader完成的。而RocketMQ中的Rebalance是由每個Consumer自身完成的,Group中不存在Leader。

4 Queue配置設定算法

一個Topic中的Queue隻能由Consumer Group中的一個Consumer進行消費,而一個Consumer可以同時消費多個Queue中的消息。那麼Queue與Consumer間的配對關系是如何确定的,即Queue要配置設定給哪個Consumer進行消費,也是有算法政策的。常見的有四種政策。這些政策是通過在建立Consumer時的構造器傳進去的。

4.1 平均配置設定政策

RocketMQ(七)消息消費

該算法是要根據avg = QueueCount / ConsumerCount 的計算結果進行配置設定的。如果能夠整除,則按順序将avg個Queue逐個配置設定Consumer;如果不能整除,則将多餘出的Queue按照Consumer順序逐個配置設定。

該算法即,先計算好每個Consumer應該分得幾個Queue,然後再依次将這些數量的Queue逐個配置設定個Consumer。

4.2 環形平均政策

RocketMQ(七)消息消費

環形平均算法是指,根據消費者的順序,依次在由queue隊列組成的環形圖中逐個配置設定。

該算法不用事先計算每個Consumer需要配置設定幾個Queue,直接一個一個分即可。

4.3 一緻性hash政策

RocketMQ(七)消息消費

該算法會将consumer的hash值作為Node節點存放到hash環上,然後将queue的hash值也放到hash環上,通過順時針方向,距離queue最近的那個consumer就是該queue要配置設定的consumer。

該算法存在的問題:配置設定不均。

4.4 同機房政策

RocketMQ(七)消息消費

該算法會根據queue的部署機房位置和consumer的位置,過濾出目前consumer相同機房的queue。然後按照平均配置設定政策或環形平均政策對同機房queue進行配置設定。如果沒有同機房queue,則按照平均配置設定政策或環形平均政策對所有queue進行配置設定。

4.5 對比

一緻性hash算法存在的問題:

兩種平均配置設定政策的配置設定效率較高,一緻性hash政策的較低。因為一緻性hash算法較複雜。另外,一緻性hash政策配置設定的結果也很大可能上存在不平均的情況。

一緻性hash算法存在的意義:

其可以有效減少由于消費者組擴容或縮容所帶來的大量的Rebalance。

RocketMQ(七)消息消費
RocketMQ(七)消息消費

一緻性hash算法的應用場景:Consumer數量變化較頻繁的場景。

5 至少一次原則

RocketMQ有一個原則:每條消息必須要被成功消費一次。

那麼什麼是成功消費呢?Consumer在消費完消息後會向其消費進度記錄器送出其消費消息的offset,offset被成功記錄到記錄器中,那麼這條消費就被成功消費了。

什麼是消費進度記錄器?