1.RocketMQ存儲概要設計
RocketMQ主要存儲的檔案包括Comitlog檔案、ConsumeQueue檔案、IndexFIle檔案。RocketMQ将所有主題的消息存儲在同一個檔案中,確定消息發送時順序寫檔案,盡最大的能力確定消息發送的高性能與吞吐量。但由于消息中間件一般是訂閱機制,這樣便給按照消息主題檢索帶來了極大的不便。為了提高效率,RocketMQ引入了ConsumeQueu消息隊列檔案,每個消息主題包含多個消息消費隊列,每一個消息隊列有一個消息檔案。IndexFile索引檔案,主要設計理念就是加速消息的檢索性能,根據消息的屬性快速從CommitLog檔案中檢索消息

1.1: commitLog:消息存儲檔案,所有消息主題的消息都存儲在CommitLog檔案中。
1.2 ConsumeQueue:消息消費隊列,消息到達CommitLog檔案後,将異步轉發到消息消費隊列,供消息消費者消費。
1.3 IndexFile:消息索引檔案,主要存儲消息key與Offset的對應關系
1.4 事物狀态服務:存儲每條消息的事物狀态
1.5 定時消息服務:每一個延遲級别對應一個消息消費隊列,存儲延遲隊列的消息拉取進度。
2.功能描述
Broker是處理消息存儲,轉發等處理的伺服器。
1.Broker以group分開,每個group隻允許一個master,若幹個slave。
2.隻有master才能進行寫入操作,slave不允許。
3.slave從master中同步資料,同步政策取決于master的配置,可以采用同步雙寫,異步複制兩種。
4.用戶端消費可以從master和slave消費。在預設情況下,消費者都從master消費,在master挂後,用戶端由于從Name Server
中感覺到Broker挂掉,就會從salve消費。
5.Broker向所有的Name Server節點建立長連結,注冊Topic 資訊。
3.消息發送存儲流程
代碼結構:
消息存儲實作類:
在store子產品中:
org.apache.rocketmq.store.DefaultMessageStore
這個類在存儲子產品中最重要的一個類:
下面列舉一下核心屬性和方法
1. MessageStoreConfig messageStoreConfig :消息存儲配置屬性
2.CommitLog commitLog:CommitLog檔案的存儲實作類
3.ConcurrentMap<String ,ConcurrentMap<Integer,Conssume-Queue>> consumeQueueTable:消息隊列存儲緩存表,按消息主體分組。
4.FlusConsumeQueueService flushConsumeQueueService :消息隊列檔案ConsumeQueue刷盤線程。
5. CleanCommitLogService cleanCommitLogService:清除CommitLog檔案服務
6.CleanConsumeQueueService cleanConsumeQueueService :清除ConsumeQueue檔案服務
7.IndexService indexService :索引檔案實作類
8.AllocateMappedFileSErvice allocateMApppedFileService:MappedFile配置設定服務。
9.ReputMessageServie reputMessageService: CommitLog消息分發,根據CommitLog檔案建構ConsumeQueue、IndexFile檔案
10.HAService haService:存儲HA機制
11.TransientStorePool transientStorePool: 消息堆記憶體緩存
12.MessageArrivingListener messagArrivingListener: 消息拉取長輪訓模式消息達到監聽器。
13.BrokerConfig brokerConfig :Broker配置屬性。
14.StoreCheckpoint storeCheckPoint:檔案刷盤監測點
15.LinkedList<CommitLogDispacher> dispatcherList:CommitLog檔案轉發請求。
消息存儲追蹤入口:
org.apache.rocketmq.store.DefaultMessageStore#putMessage
注意事項:
master節點允許寫入,消息主體長度不能超過256個字元,消息屬性成都不能超過65536個字元。
第二步:
如果消息的延遲級别大于0,将消息的原主題名稱與原消息隊列ID存入消息屬性中,用延遲消息主體SCHEDULE_TOPIC,消息隊列ID更新原消息的主題與隊列,這個是并發消息消費重試關鍵的一步。
代碼中有關于延遲時間的判斷
第三步:
擷取目前可以寫入Commitlog檔案,RocketMQ實體檔案如下圖:
Commitlog檔案存儲目錄為${ROCKETMQ_HOME}/store/commitlog目錄,mac系統下面可以在使用者根目錄下面找store檔案夾
每個檔案預設1G,一個檔案寫滿後,會再建立另外一個,以該檔案中第一個偏移量為檔案名稱,偏移量小于20位用0補齊。如上圖第一個檔案的偏移量為0。MappedFileQueue可以看做是${ROCKETMQ_HOME}/store/commitlog檔案夾,而MappedFile則對應該檔案夾下一個個的檔案。
第四步:
在寫入Commitlog之前,先申請鎖,消息存儲到CommitLog檔案是串行的。
第五步:
設定消息的存儲時間,如果mappedFile為空,表明${ROCKETMQ_HOME}/store/commitlog目錄下不存在任何檔案,說明本次消息是第一次消息發送,用偏移量0建立第一個commit檔案,。檔案名:00000000000000000000
第六步:
将消息追加到MappedFile中。
第七步:
建立全局唯一消息ID,消息ID有16位元組
4位元組IP:4位元組端口号:8位元組消息偏移量
第八步:
擷取改消息在消息隊列的偏移量。CommitLog中儲存了目前所有消息隊列的目前待寫入偏移量。
第九步:
根據消息體的長度、主體的長度、屬性的長度結合消息存儲格式計算消息的總長度。
第十步:
如果消息長度+END_FILE_MIN_BLAK_LENGTH大于CommitLog檔案的空閑空間,則傳回AppendMessageStatus.END_OF_FILE,Broker會重新建立一個新的CommitLog檔案來存儲該消息。從這裡可以看出,每個CommitLog檔案最少會空閑8個位元組,高4位元組存儲目前檔案剩餘空間,低4位元組存儲魔數:CommitLog.BLANK_MAGIC_CODE
十一步:
将消息内容存儲到ByteBuffer中,然後建立AppendMessageResult。這一步隻是将消息存儲在MappedFile對應的記憶體映射Buffer中,并沒有刷寫到磁盤。
十二步:
更新消息隊列邏輯偏移量
十三步:
處理完消息追加邏輯後将釋放putMessageLock鎖。
後續會分析:存儲檔案組織與記憶體映射機制
參考資料:《RocketMQ技術内幕》