前言:這是在檢視源碼時生産者消費者初始化的部分源碼的思路的記錄
1. 生産者初始化
1、mq開啟的時候會檢查相關配置:生産組是否配置,必須配置且不能為null也不能為DEFAULT_PRODUCER
2、生産者執行個體名的設定,若未主動設定,則采用預設的配置,生成的執行個體名(格式):[email protected]的@符号前的數字使用字元串格式化
3、獲得生産者的真正意義上的執行個體,并且以組名為key值,将其緩存在緩存中(此處,生産者與組應是一一對應),生産組在緩存已存在,不能重複放入,否則失敗。
4、topicPublishInfoTable會初始化一個預設的topic資訊,TBW102
5、MQ用戶端執行個體開啟
6、向所有的broker發送心跳
2. 普通消息發送
1、從緩存擷取目前要發送消息的topic的TopicPublishInfo,沒有則從nameserver更新
2、緩存的每個生産者都要更新下最新的TopicPublishInfo
3、更新topic的訂閱資訊
4、在小于重試次數且消息未發送成功的條件下,選取一條消息隊列,第一次随機選取,隊列=(随機數%(總隊列數=各個broker隊列數之和)),下次輪詢為目前隊列數加1下
3. 事物消息發送
1、事物消息發送需要傳入一個LocalTransactionExecuter本地回調的函數對象(一階段開始)
2、校驗該函數對象不能為null
3、普通消息發送開始發送消息
4、函數回調,本地事務函數對象執行(一階段結束,二階段開始)
5、判斷事務是否處理成功,傳回類型為[COMMIT_MESSAGE,ROLLBACK_MESSAGE,UNKNOW]
6、設定好請求頭,根據傳回類型進行相應的響應
事務處理結束(二階段結束)
commitLog中會存有消息的(topic就叫ThirdTopic),普通的消息描述
ThirdTopic vPGROUPTXPRODUCER_THIRDTOPIC_GROUPTAGSluoyKEYS13951818259UNIQ_KEYC0A85C0B220408FD9B4D62BDB0F00000TRAN_MSGtrue ڣ w_
發送失敗還是成功,隻剩帶事務标記的消息
ThirdTopic vPGROUPTXPRODUCER_THIRDTOPIC_GROUPTAGSluoyKEYS13951818259TRAN_MSGtrueUNIQ_KEYC0A85C0B220408FD9B4D62BDB0F00000
4. consumer高可用
consumer啟動時,如果name server或broker挂掉或連接配接失敗,依然可以成功啟動,不會報異常,等到連接配接恢複,consumer會定時發送心跳感覺到,之後會繼續正常進行消費。
5. topic訂閱
1、初始化訂閱的topic的相關資訊
2、将該topic及其訂閱資訊放入負載實作類裡
3、若MQClientInstance執行個體未初始化(啟動時會初始化),則不發送心跳給所有broker
6. 消費者啟動
1、檢查相關配置(消費隊列配置設定政策、訂閱資訊、回調的消息監聽器類型:順序or并發等等相關配置,不管什麼樣的消費者類型涉及的配置都會檢查)
2、從訂閱資訊将topic及tag所有周遊出來,建構SubscriptionData放入負載均衡實作類内
3、判斷消息模型,廣播不處理,叢集:增加重試topic(%RETRY%消費組名字)相關訂閱資訊
4、初始化MQClientInstance
5、RebalanceImpl(負載均衡)類執行個體相應設定(消費組、消費模型、配置設定消息隊列政策、MQClientInstance)
6、建構消費進度存儲對象:廣播:本地檔案消費進度;叢集:遠端broker消費進度
7、加載消費進度,廣播:加載處理,叢集:不處理
8、順序or并發初始化相應消息消費服務并啟動
9、消費者注冊到MQClientInstance并啟動MQClientInstance
10、遠端用戶端通信啟動
11、MQClientInstance啟動時會啟動相關定時任務
- 定時擷取name server位址(兩分鐘)
- 定時從name server更新topic路由資訊(選舉namer server的時間間隔)
- 清除下線的broker及發送心跳到所有的broker(與broker的心跳時間)
- 持久化所有的消費進度(持久化消費進度的間隔時間)
- 定時調整線程池(1分鐘)
12、拉消息服務啟動(一個自旋線程,不停的pull消息)
13、負載均衡服務啟動
14、生産者啟動(消費者也需要這個,内部使用(TBW102,PRODUCER_INNER_GROUP)
15、當訂閱資訊改變更新訂閱資訊
16、發送心跳給所有的broker
17、負載均衡服務線程喚醒
7. 負載均衡隊列平均配置設定算法
以下計算針對一個具體的topic
1、消費端索引計算:目前消費端在所有消息端的索引
2、求模:隊列數%消費端數
3、平均大小:隊列數是否不大于于消費端數,是:1,不是:(模>0并且索引<模,是:隊列數除以消費端數并加1,不是:隊列數除以消費端數)
4、開始索引:模>0并且索引<模,是:索引*平均大小,不是:索引乘以平均大小再加模
5、隊列範圍:平均大小與(隊列總數送去開始索引)的最小值
8. 負載均衡的結果
負載均衡根據配置設定算法(有多個配置設定算法,平均配置設定隻是其中這一),會将指定範圍的消息隊列配置設定給消費端,比如隻有一個消費端,那它就會獲得所有隊列,4個消費隊列,2個消費端,每個消費端會消費其中的兩個隊列(不交叉),另外,非目前topic的隊列不會被這些消費端給消費。