1、介紹
前面的章節我學習了 NameServer的原理,消息的生産發送,以及消息的消費的全過程。
我們來回顧一下:
RocketMQ 消息隊列架構主要包括NameServe、Broker(Master/Slave)、Producer、Consumer 4個核心部件,基本執行流程如下:
- NameServer 優先啟動。NameServer 是整個 RocketMQ 的“中央大腦” ,作為 RocketMQ 的服務注冊中心,是以 RocketMQ 需要先啟動 NameServer 再啟動 Rocket 中的 Broker。
- Broker 啟動後,需要将自己注冊至NameServer中,并 保持長連接配接,每 30s 發送一次發送心跳包,來確定Broker是否存活。并将 Broker 資訊 ( IP+、端口等資訊)以及Broker中存儲的Topic資訊上報。注冊成功後,NameServer 叢集中就有 Topic 跟 Broker 的映射關系。
- NameServer 如果檢測到Broker 當機(因為使用心跳機制, 如果檢測超120s(兩分鐘)無響應),則從路由系統資料庫中将其移除。
- 生産者在發送某個主題的消息之前先從 NamerServer 擷取 Broker 伺服器位址清單(Broker可能是Cluster模式),然後根據負載均衡算法從清單中選擇1台Broker ,建立連接配接通道,進行消息發送。
- 消費者在訂閱某個topic的消息之前從 NamerServer 擷取 Broker 伺服器位址清單(Broker可能是Cluster模式),包括關聯的全部Topic隊列資訊。進而擷取目前訂閱 Topic 存在哪些 Broker 上,然後直接跟 Broker 建立連接配接通道,開始消費資料。
- 生産者和消費者預設每30s 從 NamerServer 擷取 Broker 伺服器位址清單,以及關聯的所有Topic隊列資訊,更新到Client本地。
2 ~ 4 步驟實際上是 Producer、Broker 以及NameServer 之間整個進行資料通信的過程,面對複雜的消息隊列系統,一個性能優良,穩定性高的網絡通信子產品是非常重要的,它展現了RocketMQ叢集消息的整體吞吐和負載能力。也是RocketMQ保證高性能、高穩定性的基石。
2、網絡通信過程分析
2.1 通信類(rocketmq-remoting )的結構解析
通過上圖可以看到,在整個RocketMQ隊列系統中,rocketmq-remoting 這個module是專門用來負責網絡通信職能的。
并且從子產品依賴關系中可以看出 ,rocketmq-client(client)、rocketmq-broker(broker)、rocketmq-namesrv(namesrc 命名服務) 等子產品均依賴了它。
通信層是基于 Netty 進行擴充的,并自定義了通信協定,用于将消息傳遞給 Broker 進行存儲。實作Client與Server之間高效的資料請求與接收。
2.2 協定結構設計
因為是基于Netty進行擴充的,是以自定義了RocketMQ的消息協定,在傳輸過程的資料進行結構制定、封裝、編解碼的過程。
在RocketMQ中,負責這個工作的就是RemotingCommand類,我們來看看這個類的幾個重要屬性:
字段 | 類型 | Request次元 | Response次元 |
code | int | 請求操作碼,依據不同的請求碼做不同的業務處理 | 應答響應碼:0成功,非0辨別對應的錯誤 |
language | LanguageCode | 枚舉(JAVA、CPP、PYThON、GO等):請求方實作的編碼語言 | 應答方實作的編碼語言 |
version | int | 請求方程式的版本 | 應答方版本 |
opaque | int | 類似請求ID:reqeustId,唯一識别碼,區分每一個獨立的請求 | response的時候直接傳回 |
flag | int | 區分是普通還是oneway的RPC:RPC_ONEWAY = 1; RPC = 0。 | 區分是普通還是oneway RPC |
remark | String | 自定義備注資訊 | 自定義備注資訊 |
extFields | HashMap<String, String> | Request自定義擴充的字段屬性 | Response自定義擴充的字段屬性 |
2.3 消息内容的組成結構
傳輸的消息内容主要由一下幾個部分組成:
組成部分 | 說明 |
消息長度 | 消息的總長度,int類型,四個位元組存儲 |
序列化類型+消息頭length | int類型,位元組1表示序列化類型,位元組2~4表示消息頭長度 |
消息頭的資料 | 序列化後的消息頭資料 |
消息主體資料 | 消息主體資料内容,二進制位元組 |
2.4 RocketMQ 消息通信流程
在RocketMQ消息隊列中支援通信的模式主要有
- sync 同步發送模式
- async 異步發送模式
- oneway 單向模式,無需關注Response
2.4.1 通信流程說明
下圖從 NettyRemotingClient 初始化,NettyRemotingServer 初始化,基于 NettyRemotingClient 的消息發送,以及Handler 處理過程來說明。
- Broker 和 NameServer 啟動時同步調用 NettyRemotingServer.start() 方法, 初始化 Netty 伺服器
- 配置 BossGroup/WorkerGroup NioEventLoopGroup 線程組
- 配置 Channel
- 添加 NettyServerHandler
- 調用 serverBootstrap.bind() 監聽端口,等待client的connection
- Producer 和 Consumer 同樣需要啟動 Netty 的用戶端,通過調用NettyRemotingClient.start() 初始化 Netty 用戶端
- 配置用戶端 NioEventLoopGroup 線程組
- 配置 Channel
- 添加 NettyClientHandler
- 發送同步消息時,調用 NettyRemoteClient.invokeSync(),從 channelTables 緩存中擷取或者建立用于通信的 Channel 通道。
- 建立完 Channel 後,生産者 Producer 調用 Channel.writeAndFlush() 發送資料
- NettyRemotingServer 服務端線程組 處理可讀事件,調用 NettyServerHandler 處理資料。
- 下一步,NettyServerHandler 調用 processMessageReceived方法,接收并處理傳送過來的資料。
- 根據請求碼 RequestCode 差別不同的請求,來執行不同的 Processor。
- 說明:Processor 在服務端初始化的時候,将 RequestCode 添加到 Processor 緩存中。消息的存、查、拉取都是不同的請求碼。
- processMessageReceived 從ResponseTables(key 為 opaque) 緩存中取出 ResponseFuture,并将将傳回結果設定到 ResponseFuture。同步模式下執行 responseFuture.putResponse()方法,異步調用執行回調方法。
- NettyRemotingClient 收到可讀事件,調用 NettyClientHandler 讀取并處理傳回事件。
2.4.2 Reactor多線程設計
上面我們說過了,RocketMQ的通信是采用Netty元件作為底層通信庫。同樣的,它也遵循Reactor多線程模型,并在此基礎上做了一些優化。
上面圖中四個圖形可以大緻說明NettyRemotingServer的Reactor 多線程模型,在RocketMQ中的存在形式。
- M:1個 Reactor 主線程:eventLoopGroupBoss,它的職能是負責監聽 TCP網絡連接配接請求,有連接配接請求過來時候,建立SocketChannel,并注冊到selector上。
- S:RocketMQ的源碼中會選擇NIO或Epoll,來監聽網絡資料,當監聽到網絡資料過來時,讀取資料并丢給Worker線程池:eventLoopGroupSelector,Rocket源碼中預設設定線程數為3。
- M1:執行業務之前的各種雜事(SSL認證、空閑檢查、網絡連接配接檢查、編解碼、序列化反序列化 等等),傳遞給 這些工作交給defaultEventExecutorGroup 去處理,RocketMQ源碼中預設線程數設定為8。
-
M2:剩下處理業務的操作,就直接放在業務線程池中執行了。按照之前說的,依據RequestCode去processorTable 本地緩存中找到對應的 processor,并封裝成task任務,在丢給對應的業務processor線程池來處理。
|線程數辨別|線程名|說明|
|-|-|-|
|1|NettyBoss|Reactor 主線程,預設1|
|N|NettyServerEPOLLSelector|Reactor 線程池,預設3|
|M1|NettyServerCodecThread|Worker 線程池,預設8|
|M2|RemotingExecutorThread|Processor線程池,處理業務邏輯|
完整的可以參照官網的這張圖(圖檔來自于網上):
總結
上面介紹了 RocketMQ 消息通信的主要内容,我們總結以下幾點:
- 整個RocketMQ隊列系統中,rocketmq-remoting Module是專門用來負責網絡通信職能的。
- 網絡通信子產品基于Netty進行擴充的,是以自定義了RocketMQ的消息協定,在傳輸過程的資料進行結構制定、封裝、編解碼的過程。
- 了解 NettyRemotingServer/NettyRemotingClient 的初始化過程,以及調用 NettyServerHandler/NettyClienthandler 進行處理的執行流程。
- 同步異步:同步和異步消核心差別是 同步消息通過 Netty 發送請求後會執行 ResponseFuture.waitResponse() 阻塞等待,異步的請求則 SendCallback 相應的方法進行回調處理。
- 多線程模式下會通過1個Reactor 主線程(監聽連接配接),以及Reactor 線程池(監聽資料)、Worker 線程池(處理前置工作)、Processor線程池(處理業務邏輯) 來處理通信過程。
為幫助開發者們提升面試技能、有機會入職BATJ等大廠公司,特别制作了這個專輯——這一次整體放出。
大緻内容包括了: Java 集合、JVM、多線程、并發程式設計、設計模式、Spring全家桶、Java、MyBatis、ZooKeeper、Dubbo、Elasticsearch、Memcached、MongoDB、Redis、MySQL、RabbitMQ、Kafka、Linux、Netty、Tomcat等大廠面試題等、等技術棧!
歡迎大家關注公衆号【Java爛豬皮】,回複【666】,擷取以上最新Java後端架構VIP學習資料以及視訊學習教程,然後一起學習,一文在手,面試我有。
每一個專欄都是大家非常關心,和非常有價值的話題,如果我的文章對你有所幫助,還請幫忙點贊、好評、轉發一下,你的支援會激勵我輸出更高品質的文章,非常感謝!