天天看點

【RocketMQ】源碼系列研究-消息存儲Broker(概要設計)

1.RocketMQ存儲概要設計

    RocketMQ主要存儲的檔案包括Comitlog檔案、ConsumeQueue檔案、IndexFIle檔案。RocketMQ将所有主題的消息存儲在同一個檔案中,確定消息發送時順序寫檔案,盡最大的能力確定消息發送的高性能與吞吐量。但由于消息中間件一般是訂閱機制,這樣便給按照消息主題檢索帶來了極大的不便。為了提高效率,RocketMQ引入了ConsumeQueu消息隊列檔案,每個消息主題包含多個消息消費隊列,每一個消息隊列有一個消息檔案。IndexFile索引檔案,主要設計理念就是加速消息的檢索性能,根據消息的屬性快速從CommitLog檔案中檢索消息

【RocketMQ】源碼系列研究-消息存儲Broker(概要設計)

     1.1: commitLog:消息存儲檔案,所有消息主題的消息都存儲在CommitLog檔案中。

     1.2     ConsumeQueue:消息消費隊列,消息到達CommitLog檔案後,将異步轉發到消息消費隊列,供消息消費者消費。

     1.3   IndexFile:消息索引檔案,主要存儲消息key與Offset的對應關系

     1.4 事物狀态服務:存儲每條消息的事物狀态

     1.5 定時消息服務:每一個延遲級别對應一個消息消費隊列,存儲延遲隊列的消息拉取進度。

2.功能描述

Broker是處理消息存儲,轉發等處理的伺服器。

 1.Broker以group分開,每個group隻允許一個master,若幹個slave。

 2.隻有master才能進行寫入操作,slave不允許。

3.slave從master中同步資料,同步政策取決于master的配置,可以采用同步雙寫,異步複制兩種。

4.用戶端消費可以從master和slave消費。在預設情況下,消費者都從master消費,在master挂後,用戶端由于從Name Server

中感覺到Broker挂掉,就會從salve消費。

5.Broker向所有的Name Server節點建立長連結,注冊Topic 資訊。

3.消息發送存儲流程

代碼結構:

【RocketMQ】源碼系列研究-消息存儲Broker(概要設計)

消息存儲實作類:

在store子產品中:

org.apache.rocketmq.store.DefaultMessageStore
           

這個類在存儲子產品中最重要的一個類:

下面列舉一下核心屬性和方法

1. MessageStoreConfig  messageStoreConfig :消息存儲配置屬性

2.CommitLog commitLog:CommitLog檔案的存儲實作類

3.ConcurrentMap<String ,ConcurrentMap<Integer,Conssume-Queue>> consumeQueueTable:消息隊列存儲緩存表,按消息主體分組。

4.FlusConsumeQueueService flushConsumeQueueService :消息隊列檔案ConsumeQueue刷盤線程。

5. CleanCommitLogService cleanCommitLogService:清除CommitLog檔案服務

6.CleanConsumeQueueService cleanConsumeQueueService :清除ConsumeQueue檔案服務

7.IndexService indexService :索引檔案實作類

8.AllocateMappedFileSErvice allocateMApppedFileService:MappedFile配置設定服務。

9.ReputMessageServie reputMessageService: CommitLog消息分發,根據CommitLog檔案建構ConsumeQueue、IndexFile檔案

10.HAService haService:存儲HA機制

11.TransientStorePool transientStorePool: 消息堆記憶體緩存

12.MessageArrivingListener messagArrivingListener: 消息拉取長輪訓模式消息達到監聽器。

13.BrokerConfig brokerConfig :Broker配置屬性。

14.StoreCheckpoint storeCheckPoint:檔案刷盤監測點

15.LinkedList<CommitLogDispacher> dispatcherList:CommitLog檔案轉發請求。

消息存儲追蹤入口:

org.apache.rocketmq.store.DefaultMessageStore#putMessage

注意事項:

master節點允許寫入,消息主體長度不能超過256個字元,消息屬性成都不能超過65536個字元。

第二步:

如果消息的延遲級别大于0,将消息的原主題名稱與原消息隊列ID存入消息屬性中,用延遲消息主體SCHEDULE_TOPIC,消息隊列ID更新原消息的主題與隊列,這個是并發消息消費重試關鍵的一步。

代碼中有關于延遲時間的判斷

第三步:

擷取目前可以寫入Commitlog檔案,RocketMQ實體檔案如下圖:

【RocketMQ】源碼系列研究-消息存儲Broker(概要設計)

 Commitlog檔案存儲目錄為${ROCKETMQ_HOME}/store/commitlog目錄,mac系統下面可以在使用者根目錄下面找store檔案夾

每個檔案預設1G,一個檔案寫滿後,會再建立另外一個,以該檔案中第一個偏移量為檔案名稱,偏移量小于20位用0補齊。如上圖第一個檔案的偏移量為0。MappedFileQueue可以看做是${ROCKETMQ_HOME}/store/commitlog檔案夾,而MappedFile則對應該檔案夾下一個個的檔案。

第四步:

在寫入Commitlog之前,先申請鎖,消息存儲到CommitLog檔案是串行的。

第五步:

設定消息的存儲時間,如果mappedFile為空,表明${ROCKETMQ_HOME}/store/commitlog目錄下不存在任何檔案,說明本次消息是第一次消息發送,用偏移量0建立第一個commit檔案,。檔案名:00000000000000000000

第六步:

将消息追加到MappedFile中。

第七步:

建立全局唯一消息ID,消息ID有16位元組

 4位元組IP:4位元組端口号:8位元組消息偏移量

第八步:

擷取改消息在消息隊列的偏移量。CommitLog中儲存了目前所有消息隊列的目前待寫入偏移量。

第九步:

根據消息體的長度、主體的長度、屬性的長度結合消息存儲格式計算消息的總長度。

第十步:

如果消息長度+END_FILE_MIN_BLAK_LENGTH大于CommitLog檔案的空閑空間,則傳回AppendMessageStatus.END_OF_FILE,Broker會重新建立一個新的CommitLog檔案來存儲該消息。從這裡可以看出,每個CommitLog檔案最少會空閑8個位元組,高4位元組存儲目前檔案剩餘空間,低4位元組存儲魔數:CommitLog.BLANK_MAGIC_CODE

十一步:

将消息内容存儲到ByteBuffer中,然後建立AppendMessageResult。這一步隻是将消息存儲在MappedFile對應的記憶體映射Buffer中,并沒有刷寫到磁盤。

十二步:

更新消息隊列邏輯偏移量

十三步:

處理完消息追加邏輯後将釋放putMessageLock鎖。

後續會分析:存儲檔案組織與記憶體映射機制

參考資料:《RocketMQ技術内幕》