天天看點

深入了解MQ:消息通信,高效和穩定性保障

作者:JAVA後端架構
深入了解MQ:消息通信,高效和穩定性保障

1、介紹

前面的章節我學習了 NameServer的原理,消息的生産發送,以及消息的消費的全過程。

我們來回顧一下:

RocketMQ 消息隊列架構主要包括NameServe、Broker(Master/Slave)、Producer、Consumer 4個核心部件,基本執行流程如下:

深入了解MQ:消息通信,高效和穩定性保障
  1. NameServer 優先啟動。NameServer 是整個 RocketMQ 的“中央大腦” ,作為 RocketMQ 的服務注冊中心,是以 RocketMQ 需要先啟動 NameServer 再啟動 Rocket 中的 Broker。
  2. Broker 啟動後,需要将自己注冊至NameServer中,并 保持長連接配接,每 30s 發送一次發送心跳包,來確定Broker是否存活。并将 Broker 資訊 ( IP+、端口等資訊)以及Broker中存儲的Topic資訊上報。注冊成功後,NameServer 叢集中就有 Topic 跟 Broker 的映射關系。
  3. NameServer 如果檢測到Broker 當機(因為使用心跳機制, 如果檢測超120s(兩分鐘)無響應),則從路由系統資料庫中将其移除。
  4. 生産者在發送某個主題的消息之前先從 NamerServer 擷取 Broker 伺服器位址清單(Broker可能是Cluster模式),然後根據負載均衡算法從清單中選擇1台Broker ,建立連接配接通道,進行消息發送。
  5. 消費者在訂閱某個topic的消息之前從 NamerServer 擷取 Broker 伺服器位址清單(Broker可能是Cluster模式),包括關聯的全部Topic隊列資訊。進而擷取目前訂閱 Topic 存在哪些 Broker 上,然後直接跟 Broker 建立連接配接通道,開始消費資料。
  6. 生産者和消費者預設每30s 從 NamerServer 擷取 Broker 伺服器位址清單,以及關聯的所有Topic隊列資訊,更新到Client本地。

2 ~ 4 步驟實際上是 Producer、Broker 以及NameServer 之間整個進行資料通信的過程,面對複雜的消息隊列系統,一個性能優良,穩定性高的網絡通信子產品是非常重要的,它展現了RocketMQ叢集消息的整體吞吐和負載能力。也是RocketMQ保證高性能、高穩定性的基石。

2、網絡通信過程分析

2.1 通信類(rocketmq-remoting )的結構解析

深入了解MQ:消息通信,高效和穩定性保障

通過上圖可以看到,在整個RocketMQ隊列系統中,rocketmq-remoting 這個module是專門用來負責網絡通信職能的。

并且從子產品依賴關系中可以看出 ,rocketmq-client(client)、rocketmq-broker(broker)、rocketmq-namesrv(namesrc 命名服務) 等子產品均依賴了它。

深入了解MQ:消息通信,高效和穩定性保障

通信層是基于 Netty 進行擴充的,并自定義了通信協定,用于将消息傳遞給 Broker 進行存儲。實作Client與Server之間高效的資料請求與接收。

2.2 協定結構設計

因為是基于Netty進行擴充的,是以自定義了RocketMQ的消息協定,在傳輸過程的資料進行結構制定、封裝、編解碼的過程。

在RocketMQ中,負責這個工作的就是RemotingCommand類,我們來看看這個類的幾個重要屬性:

字段 類型 Request次元 Response次元
code int 請求操作碼,依據不同的請求碼做不同的業務處理 應答響應碼:0成功,非0辨別對應的錯誤
language LanguageCode 枚舉(JAVA、CPP、PYThON、GO等):請求方實作的編碼語言 應答方實作的編碼語言
version int 請求方程式的版本 應答方版本
opaque int 類似請求ID:reqeustId,唯一識别碼,區分每一個獨立的請求 response的時候直接傳回
flag int 區分是普通還是oneway的RPC:RPC_ONEWAY = 1; RPC = 0。 區分是普通還是oneway RPC
remark String 自定義備注資訊 自定義備注資訊
extFields HashMap<String, String> Request自定義擴充的字段屬性 Response自定義擴充的字段屬性

2.3 消息内容的組成結構

傳輸的消息内容主要由一下幾個部分組成:

組成部分 說明
消息長度 消息的總長度,int類型,四個位元組存儲
序列化類型+消息頭length int類型,位元組1表示序列化類型,位元組2~4表示消息頭長度
消息頭的資料 序列化後的消息頭資料
消息主體資料 消息主體資料内容,二進制位元組
深入了解MQ:消息通信,高效和穩定性保障

2.4 RocketMQ 消息通信流程

在RocketMQ消息隊列中支援通信的模式主要有

  • sync 同步發送模式
  • async 異步發送模式
  • oneway 單向模式,無需關注Response

2.4.1 通信流程說明

下圖從 NettyRemotingClient 初始化,NettyRemotingServer 初始化,基于 NettyRemotingClient 的消息發送,以及Handler 處理過程來說明。

深入了解MQ:消息通信,高效和穩定性保障
  • Broker 和 NameServer 啟動時同步調用 NettyRemotingServer.start() 方法, 初始化 Netty 伺服器
    • 配置 BossGroup/WorkerGroup NioEventLoopGroup 線程組
    • 配置 Channel
    • 添加 NettyServerHandler
    • 調用 serverBootstrap.bind() 監聽端口,等待client的connection
  • Producer 和 Consumer 同樣需要啟動 Netty 的用戶端,通過調用NettyRemotingClient.start() 初始化 Netty 用戶端
    • 配置用戶端 NioEventLoopGroup 線程組
    • 配置 Channel
    • 添加 NettyClientHandler
  • 發送同步消息時,調用 NettyRemoteClient.invokeSync(),從 channelTables 緩存中擷取或者建立用于通信的 Channel 通道。
  • 建立完 Channel 後,生産者 Producer 調用 Channel.writeAndFlush() 發送資料
  • NettyRemotingServer 服務端線程組 處理可讀事件,調用 NettyServerHandler 處理資料。
  • 下一步,NettyServerHandler 調用 processMessageReceived方法,接收并處理傳送過來的資料。
  • 根據請求碼 RequestCode 差別不同的請求,來執行不同的 Processor。
    • 說明:Processor 在服務端初始化的時候,将 RequestCode 添加到 Processor 緩存中。消息的存、查、拉取都是不同的請求碼。
  • processMessageReceived 從ResponseTables(key 為 opaque) 緩存中取出 ResponseFuture,并将将傳回結果設定到 ResponseFuture。同步模式下執行 responseFuture.putResponse()方法,異步調用執行回調方法。
  • NettyRemotingClient 收到可讀事件,調用 NettyClientHandler 讀取并處理傳回事件。

2.4.2 Reactor多線程設計

上面我們說過了,RocketMQ的通信是采用Netty元件作為底層通信庫。同樣的,它也遵循Reactor多線程模型,并在此基礎上做了一些優化。

深入了解MQ:消息通信,高效和穩定性保障

上面圖中四個圖形可以大緻說明NettyRemotingServer的Reactor 多線程模型,在RocketMQ中的存在形式。

  • M:1個 Reactor 主線程:eventLoopGroupBoss,它的職能是負責監聽 TCP網絡連接配接請求,有連接配接請求過來時候,建立SocketChannel,并注冊到selector上。
  • S:RocketMQ的源碼中會選擇NIO或Epoll,來監聽網絡資料,當監聽到網絡資料過來時,讀取資料并丢給Worker線程池:eventLoopGroupSelector,Rocket源碼中預設設定線程數為3。
  • M1:執行業務之前的各種雜事(SSL認證、空閑檢查、網絡連接配接檢查、編解碼、序列化反序列化 等等),傳遞給 這些工作交給defaultEventExecutorGroup 去處理,RocketMQ源碼中預設線程數設定為8。
  • M2:剩下處理業務的操作,就直接放在業務線程池中執行了。按照之前說的,依據RequestCode去processorTable 本地緩存中找到對應的 processor,并封裝成task任務,在丢給對應的業務processor線程池來處理。

    |線程數辨別|線程名|說明|

    |-|-|-|

    |1|NettyBoss|Reactor 主線程,預設1|

    |N|NettyServerEPOLLSelector|Reactor 線程池,預設3|

    |M1|NettyServerCodecThread|Worker 線程池,預設8|

    |M2|RemotingExecutorThread|Processor線程池,處理業務邏輯|

完整的可以參照官網的這張圖(圖檔來自于網上):

深入了解MQ:消息通信,高效和穩定性保障

總結

上面介紹了 RocketMQ 消息通信的主要内容,我們總結以下幾點:

  • 整個RocketMQ隊列系統中,rocketmq-remoting Module是專門用來負責網絡通信職能的。
  • 網絡通信子產品基于Netty進行擴充的,是以自定義了RocketMQ的消息協定,在傳輸過程的資料進行結構制定、封裝、編解碼的過程。
  • 了解 NettyRemotingServer/NettyRemotingClient 的初始化過程,以及調用 NettyServerHandler/NettyClienthandler 進行處理的執行流程。
  • 同步異步:同步和異步消核心差別是 同步消息通過 Netty 發送請求後會執行 ResponseFuture.waitResponse() 阻塞等待,異步的請求則 SendCallback 相應的方法進行回調處理。
  • 多線程模式下會通過1個Reactor 主線程(監聽連接配接),以及Reactor 線程池(監聽資料)、Worker 線程池(處理前置工作)、Processor線程池(處理業務邏輯) 來處理通信過程。

為幫助開發者們提升面試技能、有機會入職BATJ等大廠公司,特别制作了這個專輯——這一次整體放出。

大緻内容包括了: Java 集合、JVM、多線程、并發程式設計、設計模式、Spring全家桶、Java、MyBatis、ZooKeeper、Dubbo、Elasticsearch、Memcached、MongoDB、Redis、MySQL、RabbitMQ、Kafka、Linux、Netty、Tomcat等大廠面試題等、等技術棧!

深入了解MQ:消息通信,高效和穩定性保障

歡迎大家關注公衆号【Java爛豬皮】,回複【666】,擷取以上最新Java後端架構VIP學習資料以及視訊學習教程,然後一起學習,一文在手,面試我有。

每一個專欄都是大家非常關心,和非常有價值的話題,如果我的文章對你有所幫助,還請幫忙點贊、好評、轉發一下,你的支援會激勵我輸出更高品質的文章,非常感謝!

深入了解MQ:消息通信,高效和穩定性保障

繼續閱讀