本文由阿裡雲釘群直播整理而來。
講師介紹:
丁威:中通科技技術平台部資深架構師。《RocketMQ技術内幕》作者,社群直播講師。開源愛好者,關注分布式、雲計算、大資料領域。目前主要負責消息中間件與全鍊路壓測的實施與落地。
本次分享将主要圍繞以下四個方面展開
1、如何學習RocketMQ之我所見。
2、路由注冊、發現、剔除設計模式。
3、消息發送高可用設計。
4、RocketMQ存儲設計。
5、RocketMQ消息消費。
6、RocketMQ HA(主從同步)。
一、如何學習RocketMQ之我所見

為大家介紹自身學習RocketMQ經驗,給大家學習提供了借鑒的思路。
首先是通讀RocketMQ官方文檔,特别是RocketMQ 3.x版本設計手冊,從全局了解RocketMQ的設計理念,需要解決的問題等。從官方文檔設計理念,大家會發現官方文檔中不僅囊括了RocketMQ,還包括了MQ中間件涉及的各個方面,比如MQ通用的角色如Prodcuer消息生産者,Consumer消息消費者,Push Consumner推模式,Pull Consumner拉模式,Producer Group生産者組,Consumner Group消費者組通過這些名詞介紹你就會對RocketMQ有一個整體的了解,了解RocketMQ要解決那些問題,如何訂閱和釋出的實作機制是什麼,還有RocketMQ存儲特點零拷貝原理,相信大家肯對會這些疑問産生好奇心,提出問題。通過反複閱讀官方文檔對RocketMQ的整體有大概的認識,同時也會給大家帶來一些思考,如果讓你來實作這些功能你會怎麼做,如果自己不會,是不是可以帶着這個問題看看RocketMQ是怎樣實作的。這時大家會發現學習新東西不是非常困難。
其次下載下傳RocketMQ源碼不要立馬檢視源碼,大家可以重點關注example包這是官方提供的使用示例。在閱讀RocketMQ源碼中,首先關注的是example官方示例,通過對官方提供示例的學習可以知曉RocketMQ的使用方式,注意事項,進而達到使用目的。但在大家學會使用之後要想駕馭RocketMQ并且能夠處理工作中遇到得各種問題,分析源碼是最好的方式。首先通過分析源碼的過程中大家通過他的實作細節,了解工作原理友善為以後生産實際工作中在出現問題時提供解決問題的思路和方法。另一方面是RocketMQ的代碼品質非常高,RocketMQ擁有高性能。為了實作高性能會涉及到很多方面,比如說RocketMQ在多線程方面的實踐,在高并發程式設計中基于檔案的設計模式,基于Nitty的網絡通信等待這些在分析源碼的過程中能夠提升大家的工作中處理問題的能力,對我們大家自身程式設計能力的提升是非常有幫助的。在這裡特别提示分析源碼首先要有一定給分布式的基礎,在大家會發現分析源碼有難度時,介紹了自身如何通過六個月的時間打下堅實的基礎,真正看懂源碼的過程。還介紹到個人部落格,大家可以通過逆序檢視進而了解講師在檢視RocketMQ源碼之前做了那些準備工作,比如集合,鎖,Netty基礎,對于學習分布式系統同學,這些基礎都是需要掌握的。
如果大家對源碼實在看不懂時又想全面的學習RocketMQ的知識體系,可以通過閱讀《RocketMQ技術内幕》一書來進行學習,書中對RocketMQ的原理,設計理念,實踐細節講的非常透徹,相信對大家的學習會起到一定的幫助。
官方位址:
http://rocketmq.apache.org/講師CSDN位址:
https://me.csdn.net/prestigeding二、路由注冊、發現、提出設計模式
首先介紹了業界我們接觸經常到的一些中間件的服務注冊-發現的實作原理。在此這列舉了Dubbo中注冊-發現機制中的實時推送模式。介紹Dubbo是如何提供服務的,首先服務提供者啟動時會向注冊中心發送消息注冊自己也就是告知注冊中心自己可以提供服務了,那如何注冊呢?注冊中心在收到服務提供者發送的消息後會建立節點名稱為的服務提供者的全路徑名例如com.springboot.dubbo.demo.Demoservice檔案,同時在對應節點目錄産生四個節點如下圖所示。
在服務提供者啟動時通過事件機制,發送事件消息的方式推送給注冊中心,注冊中心收到消息後在對應providers目錄增加一條記錄儲存該服務。對于服務消費者通過訂閱注冊中心消息知曉那些服務提供者可以提供服務進而服務被消費者遠端調用。如果注冊中心providers下面的節點增加或者是減少時,注冊中心都會通過事件機制發送消息及時的通知到服務消費者。同時為了保證服務高可用,服務提供者每30s向注冊中心發送消息告知自己的狀态正常。這種模式的優點是實時性非常高,隻要提供者有變動,消費者都能及時的知道,缺點就是服務注冊實作較複雜,至少每個服務都需要具備消息的釋出-訂閱能力,而且,像zookeeper内部實作複雜,并不适用于RocketMQ,有點大财小用。
RocketMQ核心概念
1)Topic:消息主題,一級消息類型,生産者向其發送消息。
2)生産者:也稱為消息釋出者,負責生産并發送消息至主題Topic。
3)消費者:也稱為消息訂閱者,負責從主題Topic 接收并消費消息。
4)消息:生産者向 主題Topic 發送并最終傳送給消費者的資料和(可選)屬性的組合。
5)消息屬性:生産者可以為消息定義的屬性,包含 Message Key 和 Tag。
6)Group:一類生産者或消費者,這類生産者或消費者通常生産或消費同一類消息,且消息釋出或訂閱的邏輯一緻。
7)生産者叢集:用來表示發送消息應用,一個生産者叢集下包含多個生産者執行個體,可以是多台機器,也可以是一台機器的多個程序,或者一個程序的多個生産者對象。
一個生産者叢集可以發送多個主題Topic 消息。發送分布式事務消息時,如果生産者中途意外當機,消息存儲者Broker 會主動回調生産者叢集的任意一台機器來确認事務狀态。
8)消費者叢集:用來表示消費消息應用,一個消費者叢集下包含多個消費者執行個體,可以是多台機器,也可以是多個程序,或者是一個程序的多個消費者對象。一個消費者叢集下的多個消費者以均攤方式消費消息。
如果設定的是廣播方式,那麼這個消費者叢集下的每個執行個體都消費全量資料。
RocketMQ如何事件服務發現-注冊-拉取模式
RocketMQ使用拉取模式實作主題Topic路由有什麼缺點呢?
1.主題Topic路由中心(NameServer)Topic是基于最終一緻性,極端情況下會出現資料不一緻。
2.用戶端無法實時感覺路由資訊的變化,例如某台消息存儲Brocker自身程序為關閉,但停止向NameServer發送心跳包,但生産者無法立即感覺該Brocker伺服器的異常,會對消息發送造成一定的可用性?
RocketMQ并不打算解決上述問題,因為基于上述的設計,RocketMQ NameServer的實作非常簡單高效,至于消息發送的高可用,則有消息發送用戶端自己保證。
RocketMQ的設計遵循的一個設計理念:崇尚“缺陷美”,簡單,高性能。
如果在知道定時拉取模式的不足時,有哪些方法方式去解決這些問題,帶着這些問題去研究RocketMQ源碼可以獲得更大的收獲,事半功倍。
三、消息發送高可用設計
問題:RocketMQ消息發送如何實作高可用?
答案:RocketMQ消息發送分三步首先是Topic路由尋址,其次是選擇消息隊列,最後是消息發送重試,Broker規避。比如存在主題Topic A有兩條路由存儲消息的Broker A 和存儲消息的Broker B 共8個隊列(Broker A q1, Broker A q2, Broker A q3,Broker A q4, Broker B q1, Broker B q2, Broker B q3,Broker B q4)。在RocketMQ用戶端向RocketMQ叢集發送消息的時候,首先要選擇隊列對于多個隊列的選擇系統預設使用輪詢機制。比如發送第一條消息時如果選擇Broker A收到了消息,那麼發送第二條消息則會選擇Broker B,發送第三條消息重新開始一輪選擇Broker A發送,依次不斷輪詢發送,這也是RocketMQ預設的負載均衡機制。
如果RocketMQ用戶端選擇Broker A q1發送一條消息後,Broker A因為一些其他的原因導緻Broker A不可用,RocketMQ用戶端嘗試進行重新發送,RocketMQ用戶端第一次選擇Broker A q2發送,第二次RocketMQ用戶端選擇Broker A q2發送,發現第一次和第二次都失敗,RocketMQ用戶端會重試兩次,共發送三次。當Broker A故障導緻不可用時,無論對Broker A重試多少次都會失敗,RocketMQ用戶端重試三次失敗,則該消息被告知發送失敗。RocketMQ采用規避機制解決次問題,首先RocketMQ用戶端第一次向Broker A發送消息失敗時在第二次選擇Broker時會規避掉Broker A 的所有隊列(Broker A q1, Broker A q2, Broker A q3,Broker A q4),也就是說Broker A的所有隊列不參加選擇,也就是第二次選擇會選擇Broker B 上的隊列,這樣可以保證第一次消息發送失敗後,第二次可以成功發送消息,進而實作高可用。RocketMQ為了保證高可用提供了另外一種機制設定一個時間在RocketMQ用戶端第一次向Broker A發送消息失敗後在設定時間内RocketMQ用戶端不再向不可用的Broker A發送消息,進一步保證高可用。
建議大家看發送消息的源碼可用重點關注上面提到的高可用的機制,進一步探尋RocketMQ高可用的設計思想重試加規避。
四、RocketMQ存儲設計設計
RocketMQ的存儲設計是RocketMQ的最重要的部分,采取了一種資料與索引分離的存儲方法。有效降低檔案資源、IO資源,記憶體資源的損耗。即便是阿裡這種海量資料,高并發場景也能夠有效降低端到端延遲,并具備較強的橫向擴充能力。作為一款高性能的MQ要具有一下特點。
1.吞度量Tps很高,能夠支援高并發。
2.響應延時要很短。
3.支援海量消息的堆積能力。
以上特點離不開RocketMQ存儲機制。
首先介紹RocketMQ存儲設計整體組織方式
其次是RocketMQ存儲設計之CommitLog檔案,CommitLog是消息存儲檔案,所有主題Topic消息到達Broker後按順序存儲在CommitLog檔案中的。每個CommitLog檔案預設大小為1GB,固定檔案大小友善記憶體映射 。通過對RocketMQ源碼分析,學習RocketMQ如何完成記憶體映射的實作方式給大家一些借鑒的思想。RocketMQ對CommitLog這樣的定長檔案了解為一個邏輯的實體檔案,巧妙的構造了檔案名,比如第一檔案名是00000000000000000000是以實體磁盤上檔案的偏移量為檔案名,對于第二個檔案名0000000000010733741824就是以第一檔案的偏移量作為檔案名的。RocketMQ對CommitLog這樣設計的優點能夠快速定位到一條消息到達Brocker後落在那個檔案中,擁有很高檢索的效率。
對于一條按順序寫入CommitLog檔案的消息,雖然極大的提高了檔案的寫入性能,但對于消息讀取消息就會很慢,為了解決讀取速度慢的問題RocketMQ引入ConsumeQueue檔案類似于kaffka的隊列檔案稱為消息消費隊列檔案。ConsumeQueue檔案是對于CommitLog檔案的基于Topic的索引檔案,主要用于消費者根據Topic消息消費時,其組織方式為/topic/queue,同一隊列存在多個檔案,ConsumeQueue設計及其巧妙,每個條目使用固定長度(8位元組CommitLog實體偏移量,4位元組消息長度,8位元組tag的 hashCode),這裡不是存儲tag的原始字元串,而是存儲hashCode,目的就是確定每個條目的長度固定,可以使用通路類似數組下标的方式快速定位條目,極大的提高CommitLog的讀取性能,試想一下,消息消費者根據Topic,消息消費進度(ConsumeQueue邏輯偏移量),即第幾個ConsumeQueue條目,這樣根據消費進度去通路消息的方法為使用邏輯偏移量logicOffset*20即可找到條目的起始偏移量(ConsumeQueue檔案中的偏移量),然後讀取該偏移量後20個位元組即得到了一個條目,無需周遊整個ConsumeQueue檔案。
相信大家在探究源碼的過程中深刻了解上面的設計理念
然後是RocketMQ基于檔案的Hash索引。類比mysql的Hash索引方式,基于檔案的HashMap方式。提供了通過消息屬性檢索消息的機制,使用定長的方式,可以像使用數組一樣去友善的檢索。比如想要把一個訂單編号的資料,首先要把訂單編号的HashCode放在,通過HashCode在500W個Hash槽中取出一個,再去判斷取出的hash槽中有沒有消息,如果hash槽位為-1則有資料,
37:24IndexFile檔案基于實體磁盤檔案按實作Hash索引。其檔案有40位元組的檔案,500W個Hash槽組成。每個hash槽為4個位元組,最後由2000W的Index條目組成,每個條目由20個位元組構成,分别為4位元組索引key的HashCode、8位元組消息實體偏移量、4位元組時間戳、4位元組的前一個Index條目(Hash沖突的連結清單結構)。
1、 記憶體映射。
2 、基于檔案定長設計,應用數組的結構,友善檢索。
3 基于HashCode的設計。
五、RocketMQ消息消費設計
首先介紹RocketMQ消息消費概要。消息消費通常需要考慮消息隊列負載、消費模式、拉去機制、消息過濾、消息消費(處理消息)、消費進度回報、消息消費限流等方面。
1.消息隊列負載模式:RocketMQ叢集内(同一消費組内)的消費者共同承擔主題Topic下所有消息的消費,即一條消息隻能被叢集中一個消費者消費。
RocketMQ的隊列負載原則是一個消費者可以共同承擔同一主題下的多個消息消費隊列,但同一個消息消費隊列同一時間隻允許被配置設定給一個消費者。
2.消息消費模式:RocketMQ執行叢集消費和廣播消費兩種模式。
3.消息拉取模式:RocketMQ消息拉取支援推、拉兩種模式,其本質為拉模式。
4.消息消費:RocketMQ支援順尋消息、并發消息兩種模式,每個消費組使用獨立的線程池來處理拉取到的消息。
5.RocketMQ的消息消費端的限流主要包含兩個次元:
1)消息堆積數量
如果消息消費處理隊列中的消息條數超過1000條會出發消費端的流控,其具體做法是放棄本次拉取動作,并且延遲50ms後将放入該拉取任務放入到pullRequestQueue中,每1000條流控會列印一次消費端流控日志。
2)消息堆積大小
如果處理隊列中堆積的消息總記憶體大小超過100M,同樣觸發一次流控。
并發消息拉取與消費流程
首先一個消費用戶端有兩個線程(PullMessageService線程和RebalanceService線程)工作,PullMessageService線程負責拉取消息,從阻塞隊列pullRequestQueue中通過take的方式擷取一條拉取消息的任務,如果隊列pullRequestQueue為空時,則PullMessageService線程阻塞。怎麼喚醒隊列,則需要RebalanceService線程每20s進行一次隊列重新負載,擷取主題Topic的所有消息隊列與目前訂閱該主題的所有消費者按照隊列負載算法配置設定隊列
并發消息拉取與消費的幾個核心要點:
1.PullMessageService線程與RebalanceService線程的互動。
2.每個消費組一個一個線程池,用來異步處理消息。
3.消息進度回報。
RocketMQ消息消費-消費進度回報機制
拉取流程:
1.PullMessageService從Brocker伺服器拉取一批消息,預設32條。
2.先存儲到本地處理隊列ProcessQueue。
3.送出到消費組線程池,異步執行。
4.送出到線程池後,繼續在從Brocker伺服器拉取下一批消息。
思考:由于是并發消息,例如thread-1線程在消費消息msg1,thread-2線程在消費消息msg2,thread-3線程在消費消息msg3,此時如果thread-3線程先消費完消息msg3,但thread-1線程,thread-2線程還沒處理完消息msg1,消息msg2,那thread-1線程是如何向消息存儲者Brocker回報消息msg3的偏移量?
在這裡提示重複消費的問題是由業務方處理。
RocketMQ主從同步
RocketMQ的主從同步主要是為了讀寫分離并不提供主從服務切換功能,當主節點服務當機後,RocketMQ隻提供讀取服務不提供寫入服務。
實作步驟
1.首先啟動Master服務并在指定端口進行監聽。
2.用戶端啟動,主動連接配接Master服務,連接配接TCP連接配接。
3.用戶端每5s(糾正不是5s隻要有消息就會拉取,讀不到消息時休眠5s再次拉取消息)的間隔時間向服務端拉取消息,如果第一次拉取的話,先擷取本地commitLog檔案中最大的偏移量,以該偏移量向服務端拉取資料。
4.服務端解析請求,并傳回一批資料給用戶端。
5.用戶端收到一批資料後,将消息寫入本地。
commitLog檔案中,然後向Master服務彙報拉取進度,并更新下一次待拉取偏移量。
6.然後重複第三步。
問題1:主,從伺服器都在運作中,消息消費者是從主節點拉取消息還是從從節點拉取?
答:預設情況下,RocketMQ消息消費者從主伺服器拉取,當主伺服器積壓的消息超過實體記憶體的40%,則建議從從伺服器拉取,但如果slaveReadEnable為false,表示從伺服器不可讀,從伺服器也不會接管消息拉取。
問題2:當消息消費者向從伺服器拉取消息後,會一直從從伺服器拉取?
答:不是的,分如下情況:
1.如果從伺服器的slaveReadEnable設定為false,則下次拉取,從主伺服器拉取。
2.如果從伺服器允許讀取并且從伺服器積壓的消息為超過其實體記憶體的30%,下次拉取使用的Brocker為訂閱組的Brocker指定的Brocker伺服器,改制預設為0,代表為主伺服器。
3.如果從伺服器允許讀取并且從伺服器積壓的消息超過了其實體記憶體的30%,下次拉取使用的Brocker為訂閱組的whichBrockerWhenConsumeSlowly指定的Brocker伺服器,該值預設為1,代表從伺服器。
問題3:主從服務消息消費進度是如何同步的?
答:消息消費進度的同步是單向的,從伺服器開啟一個定時任務,定時從主伺服器同步消息消費進度;無論消息消費者是從主伺服器拉取的消息還是從從伺服器拉取的消息,在向Broker回報消息消費進度時,優先向主伺服器彙報;消息消費者向主伺服器拉取消息時,如果消息消費者記憶體中存在消息消費進度時,主伺服器會嘗試更新消息消費進度。
如果主伺服器當機後恢複後,消息消費者是否會重複消費?
當主伺服器當機恢複後,從伺服器在同步消費進度時同步到的消息消費進度還是主伺服器當機前的進度,進而造成重複消費。隻要消息消費者不重新開機的情況下,消息消費進度還是實時的。還是之前的,如果在此期間消息消費者重新開機了,那麼重複消費就無法避免。在RocketMQ使用過程中很多地方會引發重複消費
大家可以通過以上講述内容為切入點更深入的了解RocketMQ。
直播答疑
問題1:如果發送的是順序消息,Brocker挂了,怎麼做規避政策?還是說就是無法發送?
丁慧:順序消息指的是消息消費方式兩種(順序消費和并發消費)RocketMQ能夠保證進入消費者的消息按順序消費,并不是消息發送者。例如訂單場景下,我們會使用訂單編号作為key,RocketMQ能夠保證同一個單号的所有消息發送到同一個隊列。如果Brocker當機後隊列數量會減少,消息會重新發送就無法保證發送的順序性。如果要保證發送的順序性,可以使用一個Topic一個隊列,這樣會犧牲你的高可用性。RocketMQ順序消息指的是消費的順序性,而不是發送的順序性。
問題2:那些場景會用到MQ?
丁威: MQ的使用場景1對流量進行削鋒填谷操作,使用他的消息堆積能力。例如雙十一期間,訂單量Tps是平時的幾倍或者幾十倍,如果通過服務來處理,無法抵擋訂單洪峰,可以使用MQ進行降為打擊,比如你的訂單到達系統後先将訂單放入MQ中,然後消費者的數量是有限的,可以平穩的通過異步的方式處理訂單,保證系統高可用,不會造成你的服務在關鍵時刻當機。使用MQ作為大量消息的擋箭牌,抵擋訂單洪峰。2解耦系統子產品,降低系統複雜性。比如當使用者登陸後,要送優惠券、送積分時,就可以在使用者登入後發送消息到MQ,你的優惠券系統,積分系統等都可以訂閱MQ接到消息後再去派發優惠券或是積分等其他業務。
問題3:業務端如何解決重複消費的問題?
開發者:可以借助key+redis去重。