天天看點

RocketMQ生産者消費者部分源碼分析總結

前言:這是在檢視源碼時生産者消費者初始化的部分源碼的思路的記錄

1. 生産者初始化

1、mq開啟的時候會檢查相關配置:生産組是否配置,必須配置且不能為null也不能為DEFAULT_PRODUCER

2、生産者執行個體名的設定,若未主動設定,則采用預設的配置,生成的執行個體名(格式):[email protected]的@符号前的數字使用字元串格式化

3、獲得生産者的真正意義上的執行個體,并且以組名為key值,将其緩存在緩存中(此處,生産者與組應是一一對應),生産組在緩存已存在,不能重複放入,否則失敗。

4、topicPublishInfoTable會初始化一個預設的topic資訊,TBW102

5、MQ用戶端執行個體開啟

6、向所有的broker發送心跳

2. 普通消息發送

1、從緩存擷取目前要發送消息的topic的TopicPublishInfo,沒有則從nameserver更新

2、緩存的每個生産者都要更新下最新的TopicPublishInfo

3、更新topic的訂閱資訊

4、在小于重試次數且消息未發送成功的條件下,選取一條消息隊列,第一次随機選取,隊列=(随機數%(總隊列數=各個broker隊列數之和)),下次輪詢為目前隊列數加1下

3. 事物消息發送

1、事物消息發送需要傳入一個LocalTransactionExecuter本地回調的函數對象(一階段開始)

2、校驗該函數對象不能為null

3、普通消息發送開始發送消息

4、函數回調,本地事務函數對象執行(一階段結束,二階段開始)

5、判斷事務是否處理成功,傳回類型為[COMMIT_MESSAGE,ROLLBACK_MESSAGE,UNKNOW]

6、設定好請求頭,根據傳回類型進行相應的響應

事務處理結束(二階段結束)

commitLog中會存有消息的(topic就叫ThirdTopic),普通的消息描述

ThirdTopic vPGROUPTXPRODUCER_THIRDTOPIC_GROUPTAGSluoyKEYS13951818259UNIQ_KEYC0A85C0B220408FD9B4D62BDB0F00000TRAN_MSGtrue ڣ w_

發送失敗還是成功,隻剩帶事務标記的消息

ThirdTopic vPGROUPTXPRODUCER_THIRDTOPIC_GROUPTAGSluoyKEYS13951818259TRAN_MSGtrueUNIQ_KEYC0A85C0B220408FD9B4D62BDB0F00000

4. consumer高可用

consumer啟動時,如果name server或broker挂掉或連接配接失敗,依然可以成功啟動,不會報異常,等到連接配接恢複,consumer會定時發送心跳感覺到,之後會繼續正常進行消費。

5. topic訂閱

1、初始化訂閱的topic的相關資訊

2、将該topic及其訂閱資訊放入負載實作類裡

3、若MQClientInstance執行個體未初始化(啟動時會初始化),則不發送心跳給所有broker

6. 消費者啟動

1、檢查相關配置(消費隊列配置設定政策、訂閱資訊、回調的消息監聽器類型:順序or并發等等相關配置,不管什麼樣的消費者類型涉及的配置都會檢查)

2、從訂閱資訊将topic及tag所有周遊出來,建構SubscriptionData放入負載均衡實作類内

3、判斷消息模型,廣播不處理,叢集:增加重試topic(%RETRY%消費組名字)相關訂閱資訊

4、初始化MQClientInstance

5、RebalanceImpl(負載均衡)類執行個體相應設定(消費組、消費模型、配置設定消息隊列政策、MQClientInstance)

6、建構消費進度存儲對象:廣播:本地檔案消費進度;叢集:遠端broker消費進度

7、加載消費進度,廣播:加載處理,叢集:不處理

8、順序or并發初始化相應消息消費服務并啟動

9、消費者注冊到MQClientInstance并啟動MQClientInstance

10、遠端用戶端通信啟動

11、MQClientInstance啟動時會啟動相關定時任務

  • 定時擷取name server位址(兩分鐘)
  • 定時從name server更新topic路由資訊(選舉namer server的時間間隔)
  • 清除下線的broker及發送心跳到所有的broker(與broker的心跳時間)
  • 持久化所有的消費進度(持久化消費進度的間隔時間)
  • 定時調整線程池(1分鐘)

12、拉消息服務啟動(一個自旋線程,不停的pull消息)

13、負載均衡服務啟動

14、生産者啟動(消費者也需要這個,内部使用(TBW102,PRODUCER_INNER_GROUP)

15、當訂閱資訊改變更新訂閱資訊

16、發送心跳給所有的broker

17、負載均衡服務線程喚醒

7. 負載均衡隊列平均配置設定算法

以下計算針對一個具體的topic

1、消費端索引計算:目前消費端在所有消息端的索引

2、求模:隊列數%消費端數

3、平均大小:隊列數是否不大于于消費端數,是:1,不是:(模>0并且索引<模,是:隊列數除以消費端數并加1,不是:隊列數除以消費端數)

4、開始索引:模>0并且索引<模,是:索引*平均大小,不是:索引乘以平均大小再加模

5、隊列範圍:平均大小與(隊列總數送去開始索引)的最小值

8. 負載均衡的結果

負載均衡根據配置設定算法(有多個配置設定算法,平均配置設定隻是其中這一),會将指定範圍的消息隊列配置設定給消費端,比如隻有一個消費端,那它就會獲得所有隊列,4個消費隊列,2個消費端,每個消費端會消費其中的兩個隊列(不交叉),另外,非目前topic的隊列不會被這些消費端給消費。

繼續閱讀