天天看點

深入剖析RocketMQ源碼-NameServer

RocketMQ是基于主題的釋出與訂閱模式設計的一款高性能、高吞吐量的消息隊列,抛棄了業界常用的Zookeeper,而是使用自研的NameServer來實作中繼資料的管理。本文基于NameServer的源碼來深入剖析NameServer的實作原理以及其中的一些程式設計技巧。

深入剖析RocketMQ源碼-NameServer

(圖檔來自網絡)

通過上圖可以看到,RocketMQ的核心元件主要包括4個,分别是NameServer、Broker、Producer和Consumer,下面我們先依次簡單說明下這四個核心元件:

NameServer:NameServer充當路由資訊的提供者。生産者或消費者能夠通過NameServer查找各Topic相應的Broker IP清單。多個Namesrver執行個體組成叢集,但互相獨立,沒有資訊交換。

Broker:消息中轉角色,負責存儲消息、轉發消息。Broker伺服器在RocketMQ系統中負責接收從生産者發送來的消息并存儲、同時為消費者的拉取請求作準備。Broker伺服器也存儲消息相關的中繼資料,包括消費者組、消費進度偏移和主題和隊列消息等。

Producer:負責生産消息,一般由業務系統負責生産消息。一個消息生産者會把業務應用系統裡産生的消息發送到Broker伺服器。RocketMQ提供多種發送方式,同步發送、異步發送、順序發送、單向發送。同步和異步方式均需要Broker傳回确認資訊,單向發送不需要。

Consumer:負責消費消息,一般是背景系統負責異步消費。一個消息消費者會從Broker伺服器拉取消息、并将其提供給應用程式。從使用者應用的角度而言提供了兩種消費形式:拉取式消費、推動式消費。

除了上面說的三個核心元件外,還有Topic這個概念下面也會多次提到:

Topic:表示一類消息的集合,每個Topic包含若幹條消息,每條消息隻能屬于一個Topic,是RocketMQ進行消息訂閱的基本機關。一個Topic可以分片在多個Broker叢集上,每一個Topic分片包含多個queue,具體結構可以參考下圖:

深入剖析RocketMQ源碼-NameServer

RocketMQ是基于主題的釋出與訂閱模式,核心功能包括消息發送、消息存儲、消息消費,整體設計追求簡單與性能第一,歸納來說主要是下面三種:

NameServer取代ZK充當注冊中心,NameServer叢集間互不通信,容忍路由資訊在叢集内分鐘級不一緻,更加輕量級;

使用記憶體映射機制實作高效的IO存儲,達到高吞吐量;

容忍設計缺陷,通過ACK確定消息至少消費一次,但是如果ACK丢失,可能消息重複消費,這種情況設計上允許,交給使用者自己保證。

本文重點介紹的就是NameServer,我們下面一起來看下NameServer是如何啟動以及如何進行路由管理的。

在第一章已經簡單介紹了NameServer取代zk作為一種更輕量級的注冊中心充當路由資訊的提供者。那麼具體是如何來實作路由資訊管理的呢?我們先看下圖:

深入剖析RocketMQ源碼-NameServer

上面的圖描述了NameServer進行路由注冊、路由剔除和路由發現的核心原理。

路由注冊:Broker伺服器在啟動的時候會想NameServer叢集中所有的NameServer發送心跳信号進行注冊,并會每隔30秒向nameserver發送心跳,告訴NameServer自己活着。NameServer接收到Broker發送的心跳包之後,會記錄該broker資訊,并儲存最近一次收到心跳包的時間。

路由剔除:NameServer和每個Broker保持長連接配接,每隔30秒接收Broker發送的心跳包,同時自身每個10秒掃描BrokerLiveTable,比較上次收到心跳時間和目前時間比較是否大于120秒,如果超過,那麼認為Broker不可用,剔除路由表中該Broker相關資訊。

路由發現:路由發現不是實時的,路由變化後,NameServer不主動推給用戶端,等待producer定期拉取最新路由資訊。這樣的設計方式降低了NameServer實作的複雜性,當路由發生變化時通過在消息發送端的容錯機制來保證消息發送的高可用(這塊内容會在後續介紹producer消息發送時介紹,本文不展開講解)。

高可用:NameServer通過部署多台NameServer伺服器來保證自身的高可用,同時多個NameServer伺服器之間不進行通信,這樣路由資訊發生變化時,各個NameServer伺服器之間資料可能不是完全相同的,但是通過發送端的容錯機制保證消息發送的高可用。這個也正是NameServer追求簡單高效的目的所在。

在整理了解了NameServer的架構設計之後,我們先來看下NameServer到底是如何啟動的呢?

既然是源碼解讀,那麼我們先來看下代碼入口:org.apache.rocketmq.namesrv.NamesrvStartup#main(String[] args),實際調用的是main0()方法,

代碼如下:

通過main方法啟動NameServer,主要分為兩大步,先建立NamesrvController,然後再初始化并啟動NamesrvController。我們分别展開來分析。

具體展開閱讀代碼之前,我們先通過一個序列圖對整體流程有個了解,如下圖:

深入剖析RocketMQ源碼-NameServer

先來看核心代碼,如下:

通過上面對每一行代碼的注釋,可以看出來,建立NamesrvController的過程主要分為兩步:

Step1:通過指令行中擷取配置。指派給NamesrvConfig和NettyServerConfig類。 Step2:根據配置類NamesrvConfig和NettyServerConfig構造一個NamesrvController執行個體。

可見NamesrvConfig和NettyServerConfig是想當重要的,這兩個類分别是NameServer的業務參數和網絡參數,我們分别看下這兩個類裡面有哪些屬性:

NamesrvConfig

深入剖析RocketMQ源碼-NameServer

NettyServerConfig

深入剖析RocketMQ源碼-NameServer
注:Apache Commons CLI是開源的指令行解析工具,它可以幫助開發者快速建構啟動指令,并且幫助你組織指令的參數、以及輸出清單等。

建立了NamesrvController執行個體之後,開始初始化并啟動NameServer。

首先進行初始化,代碼入口是NamesrvController#initialize。

上面的代碼是NameServer初始化流程,通過每行代碼的注釋,可以看出來,主要有5步驟操作:

Step1:加載KV配置,并寫入到KVConfigManager的configTable屬性中;

Step2:初始化netty伺服器;

Step3:初始化處理netty網絡互動資料的線程池;

Step4:注冊心跳機制線程池,啟動5秒後每隔10秒檢測一次Broker的存活情況;

Step5:注冊列印KV配置的線程池,啟動1分鐘後,每隔10分鐘列印一次KV配置。

RocketMQ的開發團隊還使用了一個常用的程式設計技巧,就是使用JVM鈎子函數對NameServer進行優雅停機。這樣在JVM程序關閉前,會先執行shutdown操作。

執行start函數,啟動NameServer。代碼比較簡單,就是将第一步中建立的netty server進行啟動。其中remotingServer.start()方法不展開詳細說明了,需要對netty比較熟悉,不是本篇文章重點,有興趣的同學可以自行下載下傳源碼閱讀。

我們在第二章開篇有了解到NameServer作為一個輕量級的注冊中心,主要是為消息生産者和消費者提供Topic的路由資訊,并對這些路由資訊和Broker節點進行管理,主要包括路由注冊、路由剔除和路由發現。

本章将會通過源碼的角度來具體分析NameServer是如果進行路由資訊管理的。核心代碼主要都在org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager中實作。

在了解路由資訊管理之前,我們首先需要了解NameServer到底存儲了哪些路由元資訊,資料結構分别是什麼樣的。

檢視代碼我們可以看到主要通過5個屬性來維護路由元資訊,如下:

我們依次對這5個屬性進行展開說明。

說明:Topic消息隊列路由資訊,消息發送時根據路由表進行負載均衡。

資料結構:HashMap結構,key是Topic名字,value是一個類型是QueueData的隊列集合。在第一章就講過,一個Topic中有多個隊列。QueueData的資料結構如下:

深入剖析RocketMQ源碼-NameServer

資料結構示例:

說明:Broker基礎資訊,包含BrokerName、所屬叢集名稱、主備Broker位址。

資料結構:HashMap結構,key是BrokerName,value是一個類型是BrokerData的對象。BrokerData的資料結構如下(可以結合下面Broker主從結構邏輯圖來了解):

深入剖析RocketMQ源碼-NameServer

Broker主從結構邏輯圖:

深入剖析RocketMQ源碼-NameServer

說明:Broker叢集資訊,存儲叢集中所有Broker名稱。

資料結構:HashMap結構,key是ClusterName,value是存儲BrokerName的Set結構。

說明:Broker狀态資訊。NameServer每次收到心跳包時會替換該資訊

資料結構:HashMap結構,key是Broker的位址,value是BrokerLiveInfo結構的該Broker資訊對象。BrokerLiveInfo的資料結構如下:

深入剖析RocketMQ源碼-NameServer

說明:Broker上的FilterServer清單,消息過濾伺服器清單,後續介紹Consumer時會介紹,consumer拉取資料是通過filterServer拉取,consumer向Broker注冊。

資料結構:HashMap結構,key是Broker位址,value是記錄了filterServer位址的List集合。

路由注冊是通過Broker和NameServer之間的心跳功能來實作的。主要分為兩步:

Step1: Broker啟動時向叢集中所有NameServer發送心跳語句,每隔30秒(預設30s,時間間隔在10秒到60秒之間)再發一次。 Step2: NameServer收到心跳包更新topicQueueTable,brokerAddrTable,brokerLiveTable,clusterAddrTable,filterServerTable。

我們分别展開分析這兩步。

發送心跳包的核心邏輯是在Broker啟動邏輯裡,代碼入口是org.apache.rocketmq.broker.BrokerController#start,本篇文章重點關注的是發送心跳包的邏輯實作,隻列出發送心跳包的核心代碼,如下:

1)建立了一個線程池注冊Broker,程式啟動10秒後執行,每隔30秒(預設30s,時間間隔在10秒到60秒之間,BrokerConfig.getRegisterNameServerPeriod()的預設值是30秒)執行一次。

2)封裝Topic配置和版本号之後,進行實際的路由注冊(注:封裝Topic配置不是本篇重點,會在介紹Broker源碼時重點講解)。實際路由注冊是在org.apache.rocketmq.broker.out.BrokerOuterAPI#registerBrokerAll中實作,核心代碼如下:

從上面代碼來看,也比較簡單,首先需要封裝請求標頭和requestBody,然後開啟多線程到每個NameServer伺服器去注冊。

請求標頭類型為RegisterBrokerRequestHeader,主要包括如下字段:

深入剖析RocketMQ源碼-NameServer

requestBody類型是RegisterBrokerBody,主要包括如下字段:

深入剖析RocketMQ源碼-NameServer

1)實際的路由注冊是通過registerBroker方法實作,核心代碼如下:

borker和NameServer之間通過netty進行網絡傳輸,Broker向NameServer發起注冊時會在請求中添加注冊碼RequestCode.REGISTER_BROKER。這是一種網絡跟蹤方法,RocketMQ的每個請求都會定義一個requestCode,服務端的網絡處理器會根據不同的requestCode進行影響的業務處理。

Broker發出路由注冊的心跳包之後,NameServer會根據心跳包中的requestCode進行處理。NameServer的預設網絡處理器是DefaultRequestProcessor,具體代碼如下:

判斷requestCode,如果是RequestCode.REGISTER_BROKER,那麼确定業務處理邏輯是注冊Broker。根據Broker版本号選擇不同的方法,我們已V3_0_11以上為例,調用registerBrokerWithFilterServer方法進行注冊主要步驟分為三步:

解析requestHeader并驗簽(基于crc32),判斷資料是否正确; 解析Topic資訊; Step3: 調用RouteInfoManager#registerBroker來進行Broker注冊;

核心注冊邏輯是由RouteInfoManager#registerBroker來實作,核心代碼如下:

通過上面的源碼分析,可以分解出一個Broker的注冊主要分7步:

Step1:加寫鎖,防止并發寫RoutInfoManager中的路由表資訊;

Step2:判斷Broker所屬叢集是否存在,不存在需要建立,并将Broker名加入到叢集Broker集合中;

Step3:維護BrokerData;

Step4:如過Broker是Master,并且Broker的Topic配置資訊發生變化或者是首次注冊,需要建立或更新Topic路由中繼資料,填充TopicQueueTable;

Step5:更新BrokerLivelnfo;

Step6:注冊Broker的filterServer位址清單;

Step7:如果此Broker為從節點,則需要查找Broker Master的節點資訊,并更新對應masterAddr屬性,并傳回給Broker端。

路由剔除的觸發條件主要有兩個:

NameServer每隔10s掃描BrokerLiveTable,連續120s沒收到心跳包,則移除該Broker并關閉socket連接配接; Broker正常關閉時觸發路由删除。

上面描述的觸發點最終删除路由的邏輯是一樣的,統一在RouteInfoManager#onChannelDestroy

中實作,核心代碼如下:

路由删除整體邏輯主要分為6步:

Step1:加readlock,通過channel從BrokerLiveTable中找出對應的Broker位址,釋放readlock,若該Broker已經從存活的Broker位址清單中被清除,則直接使用remoteAddr。

Step2:申請寫鎖,根據BrokerAddress從BrokerLiveTable、filterServerTable移除。

Step3:周遊BrokerAddrTable,根據BrokerAddress找到對應的brokerData,并将brokerData中對應的brokerAddress移除,如果移除後,整個brokerData的brokerAddress空了,那麼将整個brokerData移除。

Step4:周遊clusterAddrTable,根據第三步中擷取的需要移除的BrokerName,将對應的brokerName移除了。如果移除後,該集合為空,那麼将整個叢集從clusterAddrTable中移除。

Step5:周遊TopicQueueTable,根據BrokerName,将Topic下對應的Broker移除掉,如果該Topic下隻有一個待移除的Broker,那麼該Topic也從table中移除。

Step6:釋放寫鎖。

從上面可以看出,路由剔除的整體邏輯比較簡單,就是單純地針對路由元資訊的資料結構進行操作。為了大家能夠更好地了解這塊代碼,建議大家對照4.1中介紹的路由元資訊的資料結構來進行代碼走讀。

當路由資訊發生變化之後,NameServer不會主動推送給用戶端,而是等待用戶端定期到nameserver主動拉取最新路由資訊。這種設計方式降低了NameServer實作的複雜性。

producer在啟動後會開啟一系列定時任務,其中有一個任務就是定期從NameServer擷取Topic路由資訊。代碼入口是MQClientInstance#start-ScheduledTask(),核心代碼如下:

producer和NameServer之間通過netty進行網絡傳輸,producer向NameServer發起的請求中添加注冊碼

RequestCode.GET_ROUTEINFO_BY_TOPIC。

NameServer收到producer發送的請求後,會根據請求中的requestCode進行處理。處理requestCode同樣是在預設的網絡處理器DefaultRequestProcessor中進行處理,最終通過RouteInfoManager#pickupTopicRouteData來實作。

TopicRouteData結構

在正式解析源碼前,我們先看下NameServer傳回給producer的資料結構。通過代碼可以看到,傳回的是一個TopicRouteData對象,具體結構如下:

深入剖析RocketMQ源碼-NameServer

其中QueueData,BrokerData,filterServerTable在4.1章節介紹路由元資訊時有介紹。

源碼分析

在了解了傳回給producer的TopicRouteData結構後,我們進入RouteInfoManager#pickupTopicRouteData方法來看下具體如何實作。

上面代碼封裝了TopicRouteData的queueDatas、BrokerDatas和filterServerTable,還有orderTopicConf字段沒封裝,我們再看下這個字段是在什麼時候封裝的,我們向上看RouteInfoManager#pickupTopicRouteData的調用方法DefaultRequestProcessor#getRouteInfoByTopic如下:

結合這兩個方法,我們可以總結出查找Topic路由主要分為3個步驟:

調用RouteInfoManager#pickupTopicRouteData,從topicQueueTable,brokerAddrTabl,filterServerTable中擷取資訊,分别填充queue-Datas、BrokerDatas、filterServerTable。 如果topic為順序消息,那麼從KVconfig中擷取關于順序消息先關的配置填充到orderTopicConf中。 如果找不到路由資訊,那麼傳回code為ResponseCode.TOPIC_NOT_EXIST。

本篇文章主要是從源碼的角度給大家介紹了RocketMQ的NameServer,包括NameServer的啟動流程、路由注冊、路由剔除和路由發現。我們在了解了NameServer的設計原理之後,也可以回過頭思考下在設計過程中一些值得我們學習的小技巧,在此我抛磚引玉提出兩點:

啟動流程注冊JVM鈎子用于優雅停機。這是一個程式設計技巧,我們在實際開發過程中,如果有使用線程池或者一些常駐線程任務時,可以考慮通過注冊JVM鈎子的方式,在JVM關閉前釋放資源或者完成一些事情來保證優雅停機。

更新路由表時需要通過加鎖防止并發操作,這裡使用的是鎖粒度較少的讀寫鎖,允許多個消息發送者并發讀,保證消息發送時的高并發,但同一時刻NameServer隻處理一個Broker心跳包,多個心跳包請求串行執行,這也是讀寫鎖經典使用場景。

1、《RocketMQ技術内幕》

2、《RocketMQ核心原理和實踐》

3、Apache RocketMQ開發者指南

作者:vivo網際網路伺服器團隊-Ye Wenhao

分享 vivo 網際網路技術幹貨與沙龍活動,推薦最新行業動态與熱門會議。

繼續閱讀