天天看點

升空,RocketMQ

文章目錄

  • 一、前言
  • 二、 RocketMQ簡介
    • 2.1 RocketMQ是阿裡的開源消息中間件,現為Apache頂級開源項目
    • 2.2 RocketMQ處理高并發做了兩件事情
    • 2.3 RocketMQ:優點 + 缺點 + 業務用途(可以作為一個面試問題)
    • 2.4 RocketMQ項目結構
  • 三、RocketMQ面試知識
    • 3.1 Rocket的分布式架構(高并發、高效率、高可用的保證)
    • 3.2 詳細四個部分
      • 3.2.1 第一,NameServer
      • 3.2.2 第二,Producer
      • 3.2.3 第三,Broker
      • 3.2.4 第四,Consumer
  • 四、RocketMQ消息領域模型
  • 五、其他問題
  • 六、面試金手指
  • 七、小結

一、前言

RocketMQ是一個純Java、分布式、隊列模型的開源消息中間件,前身是MetaQ,是阿裡參考Kafka特點研發的一個隊列模型的消息中間件(RocketMQ是阿裡開源其自研的第三代分布式消息中間件),後開源給apache基金會成為了apache的頂級開源項目,具有高性能、高可靠、高實時、分布式特點。

RocketMQ英文直譯:Rocket火箭、MQ message queue 消息隊列

Apache基金會中的342個項目中,暫時還隻有Kylin、CarbonData、Eagle 、Dubbo和 RocketMQ 共計五個中國技術人主導的項目,我們比較熟悉的是Dubbo和RocketMQ,都是阿裡的,難怪國内這麼多公司舔阿裡、

RocketMQ可以實用于電商領域,金融領域,大資料領域,這些領域正好是阿裡的專長。

阿裡巴巴内部圍繞着RocketMQ核心打造了三款産品,分别是MetaQ、Notify和Aliware MQ,這三者分别采用了不同的模型,

MetaQ主要使用了拉模型,解決了順序消息和海量堆積問題;

Notify主要使用了推模型,解決了事務消息;

而雲産品Aliware MQ則是提供了商業化的版本。

說到高并發,就是雙11洗禮,為了在高并發下保護資料庫,RocketMQ團隊重點做了兩件事情,優化慢請求與統一存儲引擎。

(1)優化慢請求:這裡主要是解決在海量高并發場景下降低慢請求對整個叢集帶來的抖動,毛刺問題。這是一個極具挑戰的技術活,團隊同學經過長達1個多月的跟進調優,從雙十一的複盤情況來看,99.996%的延遲落在了10ms以内,而99.6%的延遲在1ms以内。優化主要集中在RocketMQ存儲層算法優化、JVM與作業系統調優。更多的細節大家可以參考《萬億級資料洪峰下的分布式消息引擎》。

(2)統一存儲引擎:主要解決的消息引擎的高可用,成本問題。在多代消息引擎共存的前提下,我們對Notify的存儲子產品進行了全面移植與替換。

RocketMQ天生為金融網際網路領域而生,追求高可用、高并發、低延遲,是一個阿裡巴巴由内而外成功孕育的典範,RocketMQ在阿裡集團也被廣泛應用在訂單,交易,充值,流計算,消息推送,日志流式處理,binglog分發等場景。

RocketMQ優點:

單機吞吐量:十萬級

可用性:非常高,分布式架構

消息可靠性:經過參數優化配置,消息可以做到0丢失

功能支援:MQ功能較為完善,還是分布式的,擴充性好

支援10億級别的消息堆積,不會因為堆積導緻性能下降

源碼是java,我們可以自己閱讀源碼,定制自己公司的MQ,可以掌控

天生為金融網際網路領域而生,對于可靠性要求很高的場景,尤其是電商裡面的訂單扣款,以及業務削峰,在大量交易湧入時,後端可能無法及時處理的情況

RoketMQ在穩定性上可能更值得信賴,這些業務場景在阿裡雙11已經經曆了多次考驗,如果你的業務有上述并發場景,建議可以選擇RocketMQ

RocketMQ缺點:

支援的用戶端語言不多,目前是java及c++,其中c++不成熟

社群活躍度不是特别活躍那種

沒有在 mq 核心中去實作JMS等接口,有些系統要遷移需要修改大量代碼

RocketMQ的業務用途(重點):

釋出/訂閱消息傳遞模型

财務級交易消息

各種跨語言用戶端,例如Java,C / C ++,Python,Go

可插拔的傳輸協定,例如TCP,SSL,AIO

内置的消息跟蹤功能,還支援開放式跟蹤

多功能的大資料和流生态系統內建

按時間或偏移量追溯消息

可靠的FIFO和嚴格的有序消息傳遞在同一隊列中

高效的推拉消費模型

單個隊列中的百萬級消息累積容量

多種消息傳遞協定,例如JMS和OpenMessaging

靈活的分布式橫向擴充部署架構

快如閃電的批量消息交換系統

各種消息過濾器機制,例如SQL和Tag

用于隔離測試和雲隔離群集的Docker映像

功能豐富的管理儀表闆,用于配置,名額和監視

認證與授權

RocketMQ項目結構

GitHub位址:https://github.com/apache/rocketmq

RocketMQ核心子產品(下載下傳源碼,idea打開):

rocketmq-broker:接受生産者發來的消息并存儲(通過調用rocketmq-store),消費者從這裡取得消息。

rocketmq-client:提供發送、接受消息的用戶端API。

rocketmq-namesrv:NameServer,類似于Zookeeper,這裡儲存着消息的TopicName,隊列等運作時的元資訊。

rocketmq-common:通用的一些類,方法,資料結構等。

rocketmq-remoting:基于Netty4的client/server + fastjson序列化 + 自定義二進制協定。

rocketmq-store:消息、索引存儲等。

rocketmq-filtersrv:消息過濾器Server,需要注意的是,要實作這種過濾,需要上傳代碼到MQ!(一般而言,我們利用Tag足以滿足大部分的過濾需求,如果更靈活更複雜的過濾需求,可以考慮filtersrv元件)。

rocketmq-tools:指令行工具。

他主要有四大核心組成部分:NameServer、Broker、Producer以及Consumer四部分。

升空,RocketMQ

RocketMQ的四個部分(NameServer、Broker、Producer以及Consumer)都是采用分布式叢集,這是他高并發(吞吐量大),高可用的原因之一,叢集的模式有多種,包括多master 模式、多master多slave異步複制模式、多 master多slave同步雙寫模式。

這個模式很像Kafka,因為就是阿裡參考Kafka特點研發的一個隊列模型的消息中間件。

NameServer定義:主要負責對于源資料的管理,包括了對于Topic和路由資訊的管理。

NameServer的地位:NameServer在RocketMQ中是注冊中心,Zookeeper在Dubbo也是注冊中心

NameServer是一個功能齊全的伺服器,從功能上看,類似Dubbo中的Zookeeper,但NameServer與Zookeeper相比更輕量。主要是因為每個NameServer節點互相之間是獨立的,沒有任何資訊互動。

NameServer心跳

NameServer壓力不會太大,平時主要開銷是在維持心跳和提供Topic-Broker的關系資料。但是,每個Broker向NameServer發心跳時, 會帶上目前自己負責的所有Topic資訊,如果一個Broker中的Topic個數太多(萬級别),會導緻一次心跳中,就Topic的資料就幾十M,網絡情況差的話, 網絡傳輸失敗,心跳失敗,導緻NameServer誤認為Broker心跳失敗。

NameServer 是無狀态的,可以橫向擴充,成為一個注冊中心叢集

NameServer 被設計成幾乎無狀态的,可以橫向擴充,節點之間互相之間無通信,通過部署多台機器來标記自己是一個僞叢集。

每個 Broker 在啟動的時候會到 NameServer 注冊:

每個 Broker 在啟動的時候會到 NameServer 注冊,然後,各個Broker将資訊注冊到NameServer,然後Producer和Consumer就可以來NameServer取Broker的資訊了,即每個 Producer 在發送消息前會根據 Topic 到 NameServer 擷取到 Broker 的路由資訊,每個Consumer 也會定時擷取 Topic 的路由資訊。

NameServer作為RocketMQ的注冊中心,在互動邏輯上,和Dubbo中注冊中心的角色,幾乎一模一樣。

Producer定義:消息生産者,負責産生消息,一般由業務系統負責産生消息。

Producer分布式部署

Producer由使用者進行分布式部署,消息由Producer通過多種負載均衡模式發送到Broker叢集,發送低延時,支援快速失敗。

Producer擁有了三種方式發送消息:同步發送、異步發送和單向發送

同步雙向發送定義:同步發送指消息發送方Producer發出資料後,一定要收到接收方發回響應之後才發下一個資料包。

同步雙向發送特點:可靠性最好,耗時最長,注重可靠性高,适合發重要資訊。

同步雙向發送用途:用于重要通知消息,例如重要通知郵件、營銷短信。

異步雙向發送定義:異步發送指消息發送方Producer發出資料後,不等接收方發回響應,接着發送下個資料包。

異步雙向發送特點:可靠性稍差,耗時較長。

異步雙向發送用途:用于可能鍊路耗時較長而對響應時間敏感的業務場景,例如使用者視訊上傳後,消息隊列發送消息,通知啟動轉碼服務。

單向發送定義:單向發送是指隻負責發送消息而不等待伺服器回應且沒有回調函數觸發,單向,發送方自己發自己的,不管接收方。

單向發送特點:可靠性最差,但是耗時最短,注重耗時短。

單向發送用途:适用于某些耗時非常短但對可靠性要求并不高的場景,例如日志收集。

Broker定義:消息中轉角色,負責存儲消息,轉發消息。

Broker是具體提供業務的伺服器,單個Broker節點與所有的NameServer節點保持長連接配接及心跳,并會定時将Topic資訊注冊到NameServer,順帶一提底層的通信和連接配接都是基于Netty實作的((1)netty基于NIO,網絡通信高效率,(2)阿裡的兩種開源中間件,Dubbo和RocketMQ的結構相同,都需要網絡通信,網絡通信都是netty)。

Broker負責消息存儲,以Topic為緯度支援輕量級的隊列,單機可以支撐上萬隊列規模,支援消息推拉模型。官網上有資料顯示:具有上億級消息堆積能力,同時可嚴格保證消息的有序性。

Consumer定義:消息消費者,負責消費消息,一般是背景系統負責異步消費。

Consumer也由使用者部署,支援PUSH和PULL兩種類型的消費模式,支援叢集消費和廣播消息,提供實時的消息訂閱機制。

Pull:拉取型消費者(Pull Consumer)主動從消息伺服器拉取資訊,隻要批量拉取到消息,使用者應用就會啟動消費過程,是以 Pull 稱為主動消費型。

Push:推送型消費者(Push Consumer)封裝了消息的拉取、消費進度和其他的内部維護工作,将消息到達時執行的回調接口留給使用者應用程式來實作。是以 Push 稱為被動消費類型,但從實作上看還是從消息伺服器中拉取消息,不同于 Pull 的是 Push 首先要注冊消費監聽器,當監聽器處觸發後才開始消費消息。

升空,RocketMQ

第一,Message 消息

Message本質:就是消息,就是要傳輸的資訊。

Message與Topic關系:一條消息必須有一個主題(Topic),主題可以看做是你的信件要郵寄的位址。

Message與Tag的關系:一條消息也可以擁有一個可選的标簽(Tag)和額處的鍵值對,它們可以用于設定一個業務 Key 并在 Broker 上查找此消息以便在開發期間查找問題。

第二,Topic 主題

Topic定義:Topic 可以看做消息的分類,它是消息的第一級類型。比如一個電商系統可以分為:交易消息、物流消息等,

Topic與Message的關系:一條消息必須有一個 Topic 。

Topic與Producer的關系:一個 Topic 可以有0個、1個、多個生産者向其發送消息,一個生産者也可以同時向不同的 Topic 發送消息。

Topic與Consumer的關系:一個 Topic 也可以被 0個、1個、多個消費者訂閱。

第三,Group 分組

Group定義:分為ProducerGroup,ConsumerGroup,代表某一類的生産者和消費者,一般來說同一個服務可以作為Group,同一個Group一般來說發送和消費的消息都是一樣的

Group與Topic的關系:一個組可以訂閱多個Topic。

第四,Queue 隊列 和 Message Queue 消息隊列

4.1 Queue 隊列

Queue定義:在Kafka中叫Partition,每個Queue内部是有序的,在RocketMQ中分為讀和寫兩種隊列,一般來說讀寫隊列數量一緻,如果不一緻就會出現很多問題。

4.2 Message Queue 消息隊列

Message Queue(消息隊列),主題被劃分為一個或多個子主題,即子主題就是消息隊列。

消息隊列和Topic的關系:一個 Topic 下可以設定多個消息隊列,發送消息時執行該消息的 Topic ,RocketMQ 會輪詢該 Topic 下的所有隊列将消息發出去。

消息隊列是消息的實體管理機關:一個Topic下可以有多個Queue,Queue的引入使得消息的存儲可以分布式叢集化,具有了水準擴充能力。

第五,Offset 偏移量

在RocketMQ 中,所有消息隊列都是持久化,長度無限的資料結構,所謂長度無限是指隊列中的每個存儲單元都是定長,通路其中的存儲單元使用Offset 來通路,Offset 為 java long 類型,64 位,理論上在 100年内不會溢出,是以認為是長度無限。

也可以認為 Message Queue 是一個長度無限的數組,Offset 就是下标。

第六,Tag 标簽

Tag定義:Tag 可以看作子分類/子主題,它是消息的第二級類型,用于為使用者提供額外的靈活性。使用标簽,同一業務子產品不同目的的消息就可以用相同 Topic 而不同的 Tag 來辨別。比如交易消息又可以分為:交易建立消息、交易完成消息等,

Tag意義:标簽有助于保持您的代碼幹淨和連貫,并且還可以為 RocketMQ 提供的查詢系統提供幫助。

Tag與Message的關系:一條消息可以沒有 Tag 。

第七,消息消費模式

消息消費模式有兩種:Clustering(叢集消費)和Broadcasting(廣播消費)。

第一,叢集消費模式:預設情況下就是叢集消費,該模式下一個消費者叢集共同消費一個主題的多個隊列,一個隊列隻會被一個消費者消費,如果某個消費者挂掉,分組内其它消費者會接替挂掉的消費者繼續消費。

第二,廣播消費模式:廣播消費消息會發給消費者組中的每一個消費者進行消費。

第八,Message Order

Message Order(消息順序)有兩種:Orderly(順序消費)和Concurrently(并行消費)。

第一,順序消費:順序消費表示消息消費的順序同生産者為每個消息隊列發送的順序一緻,是以如果正在處理全局順序是強制性的場景,需要確定使用的主題隻有一個消息隊列。

第二,并行消費:并行消費不再保證消息順序,消費的最大并行數量受每個消費者用戶端指定的線程池限制。

面試問題:如何保證 消息的可用性 ?刷盤 + 主從同步

消息可用性1-記憶體中的消息持久化到磁盤(同步刷盤+異步刷盤):當我們選擇好了叢集模式之後,那麼我們需要關心的就是怎麼去存儲和複制這個資料,隻有講broker記憶體中的消息持久化到磁盤,才能保證broker當機,消息不丢失,RocketMQ對消息的持久化到磁盤,提供了同步和異步的政策來滿足我們的,

情況1:選擇同步刷盤,如果刷盤逾時會給傳回 刷盤逾時 FLUSH_DISK_TIMEOUT,

情況2:選擇異步刷盤,如果是異步刷盤不會傳回刷盤成功與否的任何資訊,

是以,選擇同步刷盤可以盡最大程度保證刷盤的時候消息不會丢失。

消息可用性2-主從partition複制(同步複制+異步複制):RocketMQ的主從同步提供了同步複制和異步複制兩種模式,當然選擇同步複制可以提升可用性,但是消息的發送RT時間會下降10%左右。

RockteMQ刷盤

RocketMQ刷盤的最終實作都是使用NIO中的 MappedByteBuffer.force() 将映射區的資料寫入到磁盤,

如果是同步刷盤,在Broker把消息寫到CommitLog映射區後,就會等待寫入完成。

如果是異步刷盤,隻是喚醒對應的線程,不保證執行的時機。

RocketMQ混合型存儲結構 + Kafka獨立型存儲結構

RocketMQ采用的是混合型的存儲結構,定義:為Broker單個執行個體下所有的隊列共用一個日志資料檔案(即為CommitLog)來存儲。缺點:會存在較多的随機讀操作,是以讀的效率偏低,同時消費消息需要依賴ConsumeQueue,建構該邏輯消費隊列需要一定開銷。

而Kafka采用的是獨立型的存儲結構,定義:每個隊列一個檔案。

RocketMQ的生産者如何保證消息順序發送?

生産者消費者一般需要保證順序消息的話,可能就是一個業務場景下的,比如訂單的建立、支付、發貨、收貨。

那這些東西是不是一個訂單号呢?一個訂單的肯定是一個訂單号的說,那簡單了呀。

一個topic下有多個隊列,為了保證發送有序,RocketMQ提供了MessageQueueSelector隊列選擇機制,他有三種實作:

SelectMessageQueueByHash

SelectMessageQueueByMachineRoom

SelectMessageQueueByRandom

SelectMessageQueueByHash:使用Hash取模法,讓同一個訂單發送到同一個隊列中,再使用同步發送,隻有同個訂單的建立消息發送成功,再發送支付消息。這樣,我們保證了發送有序。

RocketMQ的topic内的隊列機制,可以保證存儲滿足FIFO(First Input First Output 簡單說就是指先進先出),剩下的隻需要消費者順序消費即可。

RocketMQ僅保證順序發送,順序消費由消費者業務保證!!!(解釋:一個訂單你發送的時候放到一個隊列裡面去,你同一個的訂單号Hash一下是不是還是一樣的結果,那肯定是一個消費者消費,那順序是不是就保證了?)

真正的順序消費不同的中間件都有自己的不同實作我這裡就舉個例子,大家思路了解下。

考點:RocketMQ支援的分布式事務

步驟1:Producer發送半消息給Broker(對應下圖的中的 1 2 3 4)

是指暫不能被Consumer消費的消息。Producer 已經把消息成功發送到了 Broker 端,但此消息被标記為暫不能投遞狀态,處于該種狀态下的消息稱為半消息。需要 Producer對消息的二次确認後,Consumer才能去消費它。

在《分布式事務》單獨專欄中講。

步驟2:消息回查,Broker詢問Producer(對應下圖的中 5 6 7 8)

由于網絡閃段,生産者應用重新開機等原因。導緻 Producer 端一直沒有對 Half Message(半消息) 進行 二次确認。這是Brock伺服器會定時掃描長期處于半消息的消息,會主動詢問 Producer端 該消息的最終狀态(Commit或者Rollback),該消息即為 消息回查。

未使用消息回查的流程 1 2 3 4 8

已使用消息回查的流程 1 2 3 4 5 6 7 8

升空,RocketMQ

對于上圖解釋 1-8

1、A服務先發送個Half Message給Brock端,消息中攜帶 B服務 即将要+100元的資訊。

2、服務端發送響應,半消息接收成功,

3、執行本地事務(會有三種情況1、執行成功。2、執行失敗。3、網絡等原因導緻沒有響應)

4、服務A根據本地事務,發送 commit/rollback 給broker,轉到8.

5、如果因為網絡等原因遲遲沒有傳回失敗還是成功,那麼會執行RocketMQ的回調接口,來進行事務的回查。

6、服務A檢查本地事務;

7、服務A根據本地事務,發送 commit/rollback 給broker,轉到8.

8、如果本地事務成功,那麼Product像Brock伺服器發送Commit,這樣B服務就可以消費該message;如果本地事務失敗,那麼Product像Brock伺服器發送Rollback,那麼就會直接删除上面這條半消息。

考點:消息過濾

第一,Broker端消息過濾  

定義:在Broker中,按照Consumer的要求做過濾;

優點:減輕網絡傳輸負擔,減少了對于Consumer無用消息的網絡傳輸;

缺點:增加Broker的負擔,實作相對複雜。

第二,Consumer端消息過濾

定義:Consumer端,程式員完全自定義實作過濾規則來過濾;

優點:減輕Broker的負擔。

缺點:增加網絡傳輸負擔,很多無用的消息要傳輸到Consumer端。

考點:Broker的Buffer問題(RocketMQ,讀寫磁盤代替記憶體buffer)

定義:Broker的Buffer通常指的是Broker中一個隊列的記憶體緩沖Buffer大小,這個Buffer通常大小有限。

RocketMQ,讀寫磁盤代替記憶體buffer:注意,RocketMQ同其他MQ有非常顯著的差別,RocketMQ沒有記憶體Buffer概念,RocketMQ的隊列都是持久化磁盤。從這方面說,RocketMQ的記憶體Buffer抽象成一個無限長度的隊列(理由:磁盤可以不斷存入),不管有多少資料進來都能裝得下,這個無限是有前提的,Broker會定期删除過期的資料。例如Broker隻儲存3天的消息,那麼這個Buffer雖然長度無限,但是3天前的資料會被從隊尾删除。

為什麼RocketMQ沒有記憶體Buffer?

回答:不需要,RocketMQ的隊列都是持久化磁盤,不需要的時候讀盤,為了防止磁盤爆滿,資料定期清除。

考點:你知道消息隊列的 “回溯消費/重複消費” 嗎?

定義:回溯消費是指Consumer已經消費成功的消息,由于業務上的需求需要重新消費,要支援此功能,Broker在向Consumer投遞成功消息後,消息仍然需要保留。并且重新消費一般是按照時間次元,隔一段時間後回溯消費/重複消費。RocketMQ支援按照時間回溯消費,時間次元精确到毫秒,可以向前回溯,也可以向後回溯。

實踐:

例如由于Consumer系統故障,恢複後需要重新消費1小時前的資料,那麼Broker要提供一種機制,可以按照時間次元來回退消費進度。

考點:消息堆積

消息中間件三個功能:異步、解耦、流量控制,消息堆積主要是涉及流量控制功能,流量控制就是擋住前端的資料洪峰,保證後端系統的穩定性,這就要求消息中間件具有一定的消息堆積能力,消息堆積分以下兩種情況:

1、消息堆積在broker的記憶體緩沖Buffer,一旦超過記憶體緩沖Buffer,可以根據一定的丢棄政策來丢棄消息,如CORBA Notification規範中描述。适合能容忍丢棄消息的業務,這種情況消息的堆積能力主要在于記憶體Buffer大小,而且消息堆積後,性能下降不會太大,因為記憶體中資料多少對于對外提供的通路能力影響有限。

2、消息堆積到持久化存儲系統中,例如DB,KV存儲,檔案記錄形式。當消息不能在記憶體Cache命中時,要不可避免的通路磁盤,會産生大量讀IO,讀IO的吞吐量直接決定了消息堆積後的通路能力。

是以,評估消息堆積能力主要有以下四點:

(1)記憶體中,消息能堆積多少條,多少位元組?即消息的堆積容量。

(2)消息堆積後,發消息的吞吐量大小,是否會受堆積影響?消息堆積在記憶體中吞吐量不受影響,消息堆積在磁盤持久化(db kv存儲 檔案記錄存儲),吞吐量受影響。

(3)消息堆積後,正常消費的Consumer是否會受影響?

(4)消息堆積後,通路堆積在磁盤的消息時,吞吐量有多大?

考點:定時消息

定義:定時消息是指消息發到Broker後,不能立刻被Consumer消費,要到特定的時間點或者等待特定的時間後才能被消費。

RocketMQ支援定時消息,但是不支援任意時間精度,支援特定的level,例如定時5s,10s,1m等。

問題:為什麼不支援任意時間精度?

回答:如果要支援任意的時間精度,在Broker層面,必須要做消息排序,如果再涉及到持久化,那麼消息排序要不可避免的産生巨大性能開銷。

六、面試金手指

繼續閱讀