1. namesrv的來源
- namesrv整體來說和zookeeper的類似,為了保證消息的高可用,我們通常會布置多台broker,那麼用戶端在發送消息的時候怎麼去選擇,或者某一台當機了,用戶端怎麼去感覺到,namesrv就是為了解決這種問題,負責管理叢集,broker,topic等中繼資料,動态的維護broker清單,剔除已經當機的broker,根據負載均衡算法可以選擇到一台broker。
2. 解析namesrv的啟動流程
- 這個我們需要參考一下源碼看一下啟動方法和重要的參數資訊:.NamesrvStartup
- 1,解析配置檔案填充屬性到NamesrvConfig和NettyServerConfig兩個對象中,配置檔案的來源是主要是在啟動的時候通過-c 指定檔案的。
- 2,根據啟動的屬性建立namesrvController對象,并且進行初始化
- 3,加載kv配置,并且建立netty網絡傳輸對象,然後啟動兩個定時任務。
- namesrv每隔10s中掃描一次broker,根據最後一次的心跳檢測時間剔除已經挂掉的broker;
- namesrv每隔10分鐘列印一次kv配置資訊
- 4,注冊JVM 鈎子函數并啟動伺服器,以便監聽Broker、消息生産者的網絡請求。
3. 兩主兩從的部署結構圖,下面的解析都是按照這個圖進行解析的

4. 解析路由中繼資料的資料結構
- 下面主要是namesrv主要記錄了哪些中繼資料,其主要是在RouteInfoManager中的5個hashMap中
- HashMap> topicQueueTable : topic 消息隊列路由資訊
{
"topicTest": [
{
"brokerName": "brokerName-a",
"readQueueNums": 4,
"writeQueueNums": 4,
"perm": 6,
"topicSynFlag": 0
}
]
}
- HashMap brokerAddrTable:broker的叢集資訊,包含叢集,name,位址
{
"broker-a": {
"cluster": "c1",
"brokerName": "broker-a",
"brokerAddrs": [
"0:192.168.8.101:10000",
"1:192.168.8.102:10000"
]
},
"broker-b": {
"cluster": "c1",
"brokerName": "broker-b",
"brokerAddrs": [
"0:192.168.8.103:10000",
"1:192.168.8.104:10000"
]
}
}
- HashMap> clusterAddrTable : Broker 叢集資訊
{
"c1": [
"broker-a",
"broker-b"
]
}
- HashMap brokerLiveTable : broker狀态資訊
{
"192.168.8.101:10000": {
"lastUpdateTimestamp": 20190909099999,
"dataVersion": {
"timestamp": 20190909099999,
"counter": 0
},
"Channel": "channel-a",
"haServerAddr": "192.168.8.102:10000"
},
"192.168.8.102:10000": {
"lastUpdateTimestamp": 20190909099999,
"dataVersion": {
"timestamp": 20190909099999,
"counter": 0
},
"Channel": "channel-a",
"haServerAddr": ""
},
"192.168.8.103:10000": {
"lastUpdateTimestamp": 20190909099999,
"dataVersion": {
"timestamp": 20190909099999,
"counter": 0
},
"Channel": "channel-a",
"haServerAddr": "192.168.8.104:10000"
},
"192.168.8.104:10000": {
"lastUpdateTimestamp": 20190909099999,
"dataVersion": {
"timestamp": 20190909099999,
"counter": 0
},
"Channel": "channel-a",
"haServerAddr": ""
}
}
- HashMap> filterServerTable : broker上的FilterServer清單。
{
"192.168.8.101:10000": ["192.168.9.101:10000"],
"192.168.8.102:10000": ["192.168.9.102:10000"]
}
4. 路由資訊的注冊
- RocketMQ的路由注冊主要是broker與nameSrv的心跳功能檢測,當broker啟動的時候會向叢集中所有的namesrv發送心跳語句,每隔30s向namesrv發送心跳包,namesrv收到心跳包之後會更新brokerLiveTable緩存中的lastUpdateTimestamp時間,然後nameSrv的定時任務會每隔10s掃描brokerLiveTable,如果超過120s沒有收到心跳包則将其剔除并且關閉socket連接配接。
- 源碼層次解析步驟
- 1,brokerController的start方法中發送心跳包注冊registerBrokerAll,
- 2,調用BrokerOuterAPI中的registerBrokerAll中主要周遊所有的nameSrv清單,BrokerSrv依次向nameSrv清單發送心跳包
- 3,調用BrokerOuterAPI的registerBroker發送request資訊,RocketMQ是基于netty傳輸的,如果我們需要網絡跟蹤,rocketMQ會為每個消息生成一個requestCode,然後服務端會有對應的網絡處理器(processs包中),隻需整庫搜尋 questCode 即可找到相應的處理邏輯
- 心跳包request的資訊
- brokerAddr:位址資訊
- brokerId:0代表master,大于0代表slave
- brokerName:名稱
- clusterName:叢集名稱
- haServerAddr:master位址,初次請求時為空,slave向nameSrv注冊之後傳回
- requestBody:
- filterServerList:消息過濾清單
- topicConfigWrapper:主體配置
- body:消息體
- 心跳包request的資訊
- nameSrv接收到網絡請求之後開始進行處理,DefaultRequestProcessor預設的處理器,如果請求類型為 RequestCode REGISTER_BROKER,那麼最終會被轉發到RoutelnfoManager#registerBroker。
- 1,首先在方法開始加入一個寫鎖,防止并發的修改RouteInfoManager中的路由表,然後開始維護clusterAddrTable資訊,首先判斷broker所屬的叢集是不是已經存在,如果不存在則建立并且将broker名稱放入到集合中。
- 2,維護BrokerData,首先從brokerAddrTable中根據brokerName擷取broker資訊,如果擷取不到則将建立的brokerData資訊放入并且将registerFirst設定為true,表示第一次注冊,如果已經存在則将其替換并且将registerFirst設定為false,
- 3,如果broker為master,并且Broker Topic配置資訊發生變化或者是初次注冊,那麼則需要建立或者更新topicQueueTable的資訊,如果在發送消息的時候topic是不存在的,那麼如果設定了brokerConfig中的autoCreateTopicEnable為true,就會傳回一個rocketMQ的預設的路由資訊。
- 4,更新brokerLiveInfo的最後更新時間,這個是路由删除的最重要的清單
- 5,注冊Broker上的filter過濾清單
- 6,完成路由的注冊
5. 路由資訊的删除
- 路由删除的兩種方式
- broker主動方式:當broker正常關機的時候,會發送一個unregisterBroker指令
- nameSrv主動方式:nameSrv在啟動的時候,會開啟兩個定時任務,其中有一個是每隔10s掃描一次brokerLiveInfo清單,然後擷取清單中每一個元素的lastupdateTime,如果超過120s沒有收到broker的心跳包則會将其剔除清單關閉channel,
- 判斷時間超過120s
- 申請寫鎖
- 根據brokerAddress删除brokerLiveTable和FilterServerTable
- 維護brokerAddrTable,周遊brokerAddrTable擷取BrokeData,然後從brokeData中的屬性brokerAddr(map集合)中剔除,如果剔除之後集合為空則從brokerAddrTable一處brokerName
-
根據 BrokerName,從 clusterAddrTable 中找到 Broker并從叢集中移除,如果移
除後,叢集中不包含任何 Broker,則将改叢集從clusterAddrTable 中一處
- 根據 brokerName周遊所有主題的隊列,如果隊列中包含了目前 Broker的隊列則移除,如果 topic 隻包含待移除 Broker 的隊列的話,從路由表中删除該 topic
-
釋放寫鎖
然後申請一個寫鎖(防止并發更新)删除brokerLiveTable,FilterServerTable資訊,然後維護brokerAddrTable,clusterAddrTable。
6. 路由資訊的發現
- RocketMQ路由并非是實時的,當topic路由發生變化的時候,不會主動推送給用戶端,而是由用戶端定時的拉取最新的路由資訊。
- 傳回給用戶端的對象是TopicRouteData,其結構是
orderTopicConf:順序消息配置内容,來自于kvConfig
List<QueueData> queueDatas:topic隊列中繼資料
List<BrokerData> brokerDatas:topic 分布的 broker 中繼資料
HashMap< String/ * brokerAdress*/,List<String> /* filt rServer* /> : broker 上過濾伺服器位址清單
- 路由發現實作類DefaultRequestProcessor的getRoutelnfoByTopic
- 調用 RouterlnfoManager 的方法,從路由 topicQueueTable brokerAddrTable
- terServerTable 中分别填充 TopicRouteData 中的 List
filterServer 位址表
-
如果找到主題對應的路由資訊并且該主題為順序消息,則從 NameServer
KVconfig 中擷取關于順序消息相 的配置填充路由資訊
- 如果找不到路由資訊 CODE 則使用 TOPIC NOT_EXISTS ,表示沒有找到對應的路由