天天看點

RocketMQ 架構簡析,看這篇就夠了!

Apache RocketMQ是阿裡開源的一款高性能、高吞吐量的分布式消息中間件。

1

整體結構

RocketMQ 架構簡析,看這篇就夠了!

RocketMQ主要由 Producer、Broker、Consumer 三部分組成,其中Producer 負責生産消息,Consumer 負責消費消息,Broker 負責存儲消息。每個 Broker 可以存儲多個Topic的消息,每個Topic的消息也可以分片存儲于叢集中的不同的Broker Group。

Namesrv

說道Namesrv首先會想到服務注冊與發現。分布式服務SOA架構體系中會有服務注冊與發現中心。主要作用是指導服務調用方找到服務提供者提供的服務執行個體。

RocketMQ體系中Namesrv主要作用是:

為producer和consumer提供關于topic的路由資訊。

管理broker節點:監控更新broker的實時狀态。路由注冊、路由删除(故障剔除)。

Namesrv充當路由消息的提供者。Namesrv是一個幾乎無狀态節點,多個Namesrv執行個體組成叢集,但互相獨立,沒有資訊交換。另外,歡迎關注我們,公号終碼一生,背景回複“資料”,可以擷取相關視訊教程和最新面試資料。

路由元資訊

  • topicQueueTable:topic 消息隊列路由資訊。
  • brokerAddrTable:broker基礎資訊。包含broker name,所屬叢集名稱,主broker位址等。
  • clusterAddrTable:broker叢集資訊,存儲叢集中所有broker的名稱。
  • brokerLiveTable:broker狀态資訊。
  • filterServerTable:broker上的filterServer清單。filterServer用于消息過濾。

路由注冊

RocketMQ路由注冊是通過broker與Namesrv的心跳功能實作的。broker啟動時向叢集中所有Namesrv發送心跳包,之後每隔30秒向叢集中所有Namesrv發送心跳包。

心跳包中包含:broker叢集資訊、broker資訊、topic配置資訊、broker關聯的FilterServer清單等。

如果brokerA為Master。并且brokerA上的topic1的配置資訊發生變化或初次注冊,Namesrv會根據封包建立或更新Topic路由中繼資料,填充topicQueueTable。

路由删除

Namesrv收到brokerA的心跳包會更新brokerLiveTable中的brokerA對應的BrokerLiveInfo中的lastUpdateTimestamp。Namesrv每隔10秒掃描brokerLiveTable一次。如果brokerA對應的BrokerLiveInfo 中 lastUpdateTimestamp距目前時間超過 120秒,Namesrv認為brokerA失效,會将brokerA的路由資訊移除并關閉與broker的socket連接配接。更新:topicQueueInfo、brokerAddrTable、brokerLiveTable、filterServerTable等。

路由發現

RocketMQ路由發現是非實時的。當Topic路由資訊發生變化是,Namesrv不會主動推送給用戶端(Producer、Consumer)。而是由用戶端定時到Namesrv拉去最新的路由資訊并緩存(包含Topic路由資訊)。

與kafka對比 

kafka 由zookeeper叢集提供命名服務(Naming Service)。 

Kafka通過 ZooKeeper 管理叢集配置、選舉 Leader 以及在 consumer group 發生變化時進行 Rebalance

Broker

消息中轉角色,負責存儲消息、轉發消息。代理伺服器在RocketMQ系統中負責接收從生産者發送來的消息并存儲、同時為消費者的拉取請求作準備。代理伺服器也存儲消息相關的中繼資料,包括消費者組、消費進度偏移和主題和隊列消息等。另外,歡迎關注我們,公号終碼一生,背景回複“資料”,可以擷取相關視訊教程和最新面試資料。

Broker是以group為機關提供服務。一個group裡面分Master和Slave。Master和Slave存儲的資料一樣,slave從master同步資料(同步雙寫或異步複制看配置)。一個Master可以對應多個Slave,一個Slave隻能對應一個Master。Master與Slave的對應關系通過指定相同的BrokerName、不同的BrokerId來定義,BrokerId為0表示Master,非0表示Slave。Master也可以部署多個。

broker不必須是實體機或虛拟機:

RocketMQ 架構簡析,看這篇就夠了!

每個Broker與Namesrv叢集中的所有節點建立長連接配接,定時發送心跳包到所有Namesrv,更新broker資訊、topic路由資訊等。

一個Topic的不同queue(分區)可分布到叢集中不同的broker group上。

與kafka對比:

kafka和RocketMQ的broker都可以容納多個一個或多個分區資料(kafka分區:partition;RocketMQ分區:queue)

kafka基于partition(分區) 做備份/高可用(partition follower)。

RocketMQ增加了broker group的概念,基于broker(可能包含多個分區)做備份/高可用(broker slave)。

Producer(消息)生産者

Producer與Namesrv叢集中的其中一個節點(随機選擇)建立長連接配接,定期從Name Server取Topic路由資訊,并向提供Topic服務的broker master建立長連接配接,且定時向broker master發送心跳。Producer完全無狀态,可叢集部署。

Producer負責生産消息,一般由業務系統負責生産消息。一個消息生産者會把業務應用系統裡産生的消息發送到broker伺服器。RocketMQ提供多種發送方式,同步發送、異步發送、順序發送、單向發送。同步和異步方式均需要Broker傳回确認資訊,單向發送不需要。

Consumer(消息)消費者

Consumer與Namesrv叢集中的其中一個節點(随機選擇)建立長連接配接,定期從Name Server取Topic路由資訊,并向提供Topic服務的Master、Slave建立長連接配接,且定時向Master、Slave發送心跳。Consumer既可以從Master訂閱消息,也可以從Slave訂閱消息,訂閱規則由Broker配置決定。另外,歡迎關注我們,公号終碼一生,背景回複“資料”,可以擷取相關視訊教程和最新面試資料。

Consumer負責消費消息,一般是背景系統負責異步消費。一個消息消費者會從Broker伺服器拉取消息、并将其提供給應用程式。從使用者應用的角度而言提供了兩種消費形式:拉取式消費、推動式消費。

叢集模式下:相同Consumer Group的每個Consumer執行個體平均分攤消息。一個條消息僅能被一個Consumer Group消費一次。

Producer、Consumer都隻需要和叢集中一個Namesrv建立長連接配接。Broker需要向叢集中所有的Namesrv發送心跳包。

其實很好了解:

Namesrv叢集提供高可用的命名服務。

Producer、Consumer隻需要從其中一台定期同步路由資訊。

如果Broker隻随機調一台發送心跳包。那麼不同的Namesrv儲存的路由資訊會出現不一緻的情況。

消費者類型:

  • 拉取式消費(Pull Consumer)

Consumer消費的一種類型,應用通常主動調用Consumer的拉消息方法從Broker伺服器拉消息、主動權由應用控制。一旦擷取了批量消息,應用就會啟動消費過程。

Pull方式裡,取消息的過程需要使用者自己寫(包括送出offset等操作)。

  • 推動式消費(Push Consumer)

Consumer消費的一種類型,該模式下Broker收到資料後會主動推送給消費端,該消費模式一般實時性較高。

Push Consumer原理上也是采取pull模式。實際上就是長輪詢的pull模式。

一些概念

  • 主題(Topic)

表示一類消息的集合,每個主題包含若幹條消息,每條消息隻能屬于一個主題,是RocketMQ進行消息訂閱的基本機關。每個topic可分為若幹個分區(queue)

  • 生産者組(Producer Group)

同一類Producer的集合,這類Producer發送同一類消息且發送邏輯一緻。如果發送的是事務消息且原始生産者在發送之後崩潰,則Broker伺服器會聯系同一生産者組的其他生産者執行個體以送出或回溯消費。

  • 消費者組(Consumer Group)

同一類Consumer的集合,這類Consumer通常消費同一類消息且消費邏輯一緻。消費者組使得在消息消費方面,實作負載均衡和容錯的目标變得非常容易。要注意的是,消費者組的消費者執行個體必須訂閱完全相同的Topic。RocketMQ 支援兩種消息模式:叢集消費(Clustering)和廣播消費(Broadcasting)。

  • 普通順序消息(Normal Ordered Message)

普通順序消費模式下,消費者通過同一個消費隊列收到的消息是有順序的,不同消息隊列收到的消息則可能是無順序的。

  • 嚴格順序消息(Strictly Ordered Message)

嚴格順序消息模式下,消費者收到的所有消息均是有順序的。

  • 消息(Message)

消息系統所傳輸資訊的實體載體,生産和消費資料的最小機關,每條消息必須屬于一個主題。RocketMQ中每個消息擁有唯一的Message ID,且可以攜帶具有業務辨別的Key。系統提供了通過Message ID和Key查詢消息的功能。

  • 标簽(Tag)

為消息設定的标志,用于同一主題下區分不同類型的消息。來自同一業務單元的消息,可以根據不同業務目的在同一主題下設定不同标簽。标簽能夠有效地保持代碼的清晰度和連貫性,并優化RocketMQ提供的查詢系統。消費者可以根據Tag實作對不同子主題的不同消費邏輯,實作更好的擴充性。

2

關于消息中間件

消息中間件需要解決的問題:異步化、削峰填谷。

消息中間件應具備的基礎能力是:消息釋出、訂閱、消費。概念相對簡單這裡不過多描述。

消息中間件的一些重要的機制:

1. 消息優先級(Message Priority;RocketMQ不支援)

優先級是指在一個消息隊列中,每條消息都有不同的優先級,一般用整數來描述,優先級高的消息先投遞,如果消息完全在一個記憶體隊列中,那麼在投遞前可以按照優先級排序,令優先級高的先投遞。

由于RocketMQ所有消息都是持久化的,是以如果按照優先級來排序,開銷會非常大,是以RocketMQ沒有特意支援消息優先級,但是可以通過變通的方式實作類似功能,即單獨配置一個優先級高的隊列,和一個普通優先級的隊列,将不同優先級發送到不同隊列即可。

2. 順序消息(Message Order)

消息有序指的是一類消息消費時,能按照發送的順序來消費。例如:一個訂單産生了3條消息,分别是訂單建立,訂單付款,訂單完成。消費時,要按照這個順序消費才能有意義。但是同時訂單之間是可以并行消費的。

RocketMQ可以嚴格的保證消息有序。

  • 投遞消息的順序性:

投遞消息的順序性可通過将一組消息投遞到同一分區實作。例如:借助MessageQueueSelector将對相同訂單的操作消息投放到同一分區。

  • 消費消息的順序性:

RoctetMQ特性保障:特定分區(queue)中的消息不能同時被同一個消費者組中的多個Consumer消費,以避免重複消費。

通過自定義或使用預置的AllocateQueueStrategy可設定分區的配置設定政策(哪些分區配置設定給哪個消費者消費)。

3. 高可用、消息可靠性

3.1 消息持久化

RocketMQ、Kafka 以檔案記錄形式持久化。

RocketMQ采用了單一的日志檔案,即把同1個broker上面所有topic的所有queue的消息,存放在一個檔案裡面,進而避免了随機的磁盤寫入。

RocketMQ 架構簡析,看這篇就夠了!

如上圖所示,所有消息都存在一個單一的CommitLog檔案裡面,然後有背景線程異步的同步到ConsumeQueue,再由Consumer進行消費。

TODO 同步、異步刷盤。

RocketMQ 架構簡析,看這篇就夠了!

TODO RocketMQ充分利用Linux檔案系統記憶體cache來提高性能。

TODO CommitLog index Commitlog segment的大小與頁緩存一緻

RocketMQ消息存儲機制會在後面的文章詳細說明。

3.2 broker master/salve

TODO broker group master/salve

TODO Async/Sync Master;

4. 高并發、可擴充 ==> 分布式

提高并發效率 => 提高生産、消費并行度=>提高分區數量

RocketMQ、kafka都支援topic資料分區存放、動态擴充。

以RocketMQ為例:

topic建立的時候可以用叢集模式去建立(這樣叢集裡面每個broker的queue的數量相同),也可以用單個broker模式去建立(這樣每個broker的queue數量可以不一緻)。

4.1 生産并行度

RocketMQ的生産并行度是由其自身機制及broker的數量決定的。這塊後面的文章會詳細分析。

4.2 消費并行度

廣播模式下所有消費者會接受并消費目前topic下所有Queue的消息。

叢集模式下,一個queue隻配置設定給一個consumer執行個體:

這是由于拉取消息是consumer主動控制的,如果多個執行個體同時消費一個queue的消息,會導緻同一個消息在不同的執行個體下被消費多次,是以算法上都是一個queue隻分給一個consumer執行個體,一個consumer執行個體可以允許同時分到不同的queue。

Kafka的消費并行度依賴Topic配置的分區數,如分區數為10,那麼最多10台機器來并行消費(每台機器隻能開啟一個線程),或者一台機器消費(10個線程并行消費)。即消費并行度和分區數一緻。

RocketMQ消費并行度分兩種情況:

順序消費方式并行度同卡夫卡完全一緻;

亂序方式并行度取決于Consumer的線程數,如Topic配置10個隊列,10台機器消費,每台機器100個線程,那麼并行度為1000。

4.3 消息隊列配置設定政策

Producer使用MessageQueueSelector選擇将消息投放到哪個分區

使用AllocateMessageQueueStrategy将不同分區配置設定給Consumer Group中的不同Consumer。一個分區(queue)僅允許配置設定給同一個Consumer Group下的一個Consumer(防止重複消費)。

MessageQueueSelector

RocketMQ 架構簡析,看這篇就夠了!

内置實作類:

SelectMessageQueueByMachineRoom

SelectMessageQueueByHash

SelectMessageQueueByRandom

可以通過實作MessageQueueSelector接口,來自定義Producer投遞消息時選擇分區的算法。

AllocateMessageQueueStrategy

RocketMQ 架構簡析,看這篇就夠了!

内置實作類:

AllocateMessageQueueAveragely:平均配置設定算法

AllocateMessageQueueAveragelyByCircle:基于環形平均配置設定算法

AllocateMachineRoomNearby:基于機房臨近原則算法

AllocateMessageQueueByMachineRoom:基于機房配置設定算法

AllocateMessageQueueConsistentHash:基于一緻性hash算法

AllocateMessageQueueByConfig:基于配置配置設定算法

可以通過實作AllocateMessageQueueStrategy來自定義queue 配置設定給特定Consumer Group下不同Consumer的政策。