天天看點

RocketMQ-筆記 1:namesrv1. namesrv的來源2. 解析namesrv的啟動流程3. 兩主兩從的部署結構圖,下面的解析都是按照這個圖進行解析的4. 解析路由中繼資料的資料結構4. 路由資訊的注冊5. 路由資訊的删除6. 路由資訊的發現7.總結

1. namesrv的來源

  • namesrv整體來說和zookeeper的類似,為了保證消息的高可用,我們通常會布置多台broker,那麼用戶端在發送消息的時候怎麼去選擇,或者某一台當機了,用戶端怎麼去感覺到,namesrv就是為了解決這種問題,負責管理叢集,broker,topic等中繼資料,動态的維護broker清單,剔除已經當機的broker,根據負載均衡算法可以選擇到一台broker。

2. 解析namesrv的啟動流程

  • 這個我們需要參考一下源碼看一下啟動方法和重要的參數資訊:.NamesrvStartup
    • 1,解析配置檔案填充屬性到NamesrvConfig和NettyServerConfig兩個對象中,配置檔案的來源是主要是在啟動的時候通過-c 指定檔案的。
    • 2,根據啟動的屬性建立namesrvController對象,并且進行初始化
    • 3,加載kv配置,并且建立netty網絡傳輸對象,然後啟動兩個定時任務。
      • namesrv每隔10s中掃描一次broker,根據最後一次的心跳檢測時間剔除已經挂掉的broker;
      • namesrv每隔10分鐘列印一次kv配置資訊
    • 4,注冊JVM 鈎子函數并啟動伺服器,以便監聽Broker、消息生産者的網絡請求。

3. 兩主兩從的部署結構圖,下面的解析都是按照這個圖進行解析的

RocketMQ-筆記 1:namesrv1. namesrv的來源2. 解析namesrv的啟動流程3. 兩主兩從的部署結構圖,下面的解析都是按照這個圖進行解析的4. 解析路由中繼資料的資料結構4. 路由資訊的注冊5. 路由資訊的删除6. 路由資訊的發現7.總結

4. 解析路由中繼資料的資料結構

  • 下面主要是namesrv主要記錄了哪些中繼資料,其主要是在RouteInfoManager中的5個hashMap中
    • HashMap> topicQueueTable : topic 消息隊列路由資訊
{
    "topicTest": [
        {
            "brokerName": "brokerName-a",
            "readQueueNums": 4,
            "writeQueueNums": 4,
            "perm": 6,
            "topicSynFlag": 0
        }
    ]
}           
  • HashMap brokerAddrTable:broker的叢集資訊,包含叢集,name,位址
{
    "broker-a": {
        "cluster": "c1",
        "brokerName": "broker-a",
        "brokerAddrs": [
            "0:192.168.8.101:10000",
            "1:192.168.8.102:10000"
        ]
    },

    "broker-b": {
        "cluster": "c1",
        "brokerName": "broker-b",
        "brokerAddrs": [
            "0:192.168.8.103:10000",
            "1:192.168.8.104:10000"
        ]
    }
}           
  • HashMap> clusterAddrTable : Broker 叢集資訊
{
    "c1": [
        "broker-a",
        "broker-b"
    ]
}           
  • HashMap brokerLiveTable : broker狀态資訊
{
    "192.168.8.101:10000": {
        "lastUpdateTimestamp": 20190909099999,
        "dataVersion": {
            "timestamp": 20190909099999,
            "counter": 0
        },
        "Channel": "channel-a",
        "haServerAddr": "192.168.8.102:10000"
    },

    "192.168.8.102:10000": {
        "lastUpdateTimestamp": 20190909099999,
        "dataVersion": {
            "timestamp": 20190909099999,
            "counter": 0
        },
        "Channel": "channel-a",
        "haServerAddr": ""
    },
    "192.168.8.103:10000": {
        "lastUpdateTimestamp": 20190909099999,
        "dataVersion": {
            "timestamp": 20190909099999,
            "counter": 0
        },
        "Channel": "channel-a",
        "haServerAddr": "192.168.8.104:10000"
    },

    "192.168.8.104:10000": {
        "lastUpdateTimestamp": 20190909099999,
        "dataVersion": {
            "timestamp": 20190909099999,
            "counter": 0
        },
        "Channel": "channel-a",
        "haServerAddr": ""
    }
}           
  • HashMap> filterServerTable : broker上的FilterServer清單。
{
    "192.168.8.101:10000": ["192.168.9.101:10000"],

    "192.168.8.102:10000": ["192.168.9.102:10000"]
}           

4. 路由資訊的注冊

  • RocketMQ的路由注冊主要是broker與nameSrv的心跳功能檢測,當broker啟動的時候會向叢集中所有的namesrv發送心跳語句,每隔30s向namesrv發送心跳包,namesrv收到心跳包之後會更新brokerLiveTable緩存中的lastUpdateTimestamp時間,然後nameSrv的定時任務會每隔10s掃描brokerLiveTable,如果超過120s沒有收到心跳包則将其剔除并且關閉socket連接配接。
  • 源碼層次解析步驟
    • 1,brokerController的start方法中發送心跳包注冊registerBrokerAll,
    • 2,調用BrokerOuterAPI中的registerBrokerAll中主要周遊所有的nameSrv清單,BrokerSrv依次向nameSrv清單發送心跳包
    • 3,調用BrokerOuterAPI的registerBroker發送request資訊,RocketMQ是基于netty傳輸的,如果我們需要網絡跟蹤,rocketMQ會為每個消息生成一個requestCode,然後服務端會有對應的網絡處理器(processs包中),隻需整庫搜尋 questCode 即可找到相應的處理邏輯
      • 心跳包request的資訊
        • brokerAddr:位址資訊
        • brokerId:0代表master,大于0代表slave
        • brokerName:名稱
        • clusterName:叢集名稱
        • haServerAddr:master位址,初次請求時為空,slave向nameSrv注冊之後傳回
        • requestBody:
          • filterServerList:消息過濾清單
          • topicConfigWrapper:主體配置
        • body:消息體
    • nameSrv接收到網絡請求之後開始進行處理,DefaultRequestProcessor預設的處理器,如果請求類型為 RequestCode REGISTER_BROKER,那麼最終會被轉發到RoutelnfoManager#registerBroker。
      • 1,首先在方法開始加入一個寫鎖,防止并發的修改RouteInfoManager中的路由表,然後開始維護clusterAddrTable資訊,首先判斷broker所屬的叢集是不是已經存在,如果不存在則建立并且将broker名稱放入到集合中。
      • 2,維護BrokerData,首先從brokerAddrTable中根據brokerName擷取broker資訊,如果擷取不到則将建立的brokerData資訊放入并且将registerFirst設定為true,表示第一次注冊,如果已經存在則将其替換并且将registerFirst設定為false,
      • 3,如果broker為master,并且Broker Topic配置資訊發生變化或者是初次注冊,那麼則需要建立或者更新topicQueueTable的資訊,如果在發送消息的時候topic是不存在的,那麼如果設定了brokerConfig中的autoCreateTopicEnable為true,就會傳回一個rocketMQ的預設的路由資訊。
      • 4,更新brokerLiveInfo的最後更新時間,這個是路由删除的最重要的清單
      • 5,注冊Broker上的filter過濾清單
      • 6,完成路由的注冊

5. 路由資訊的删除

  • 路由删除的兩種方式
    • broker主動方式:當broker正常關機的時候,會發送一個unregisterBroker指令
    • nameSrv主動方式:nameSrv在啟動的時候,會開啟兩個定時任務,其中有一個是每隔10s掃描一次brokerLiveInfo清單,然後擷取清單中每一個元素的lastupdateTime,如果超過120s沒有收到broker的心跳包則會将其剔除清單關閉channel,
      • 判斷時間超過120s
      • 申請寫鎖
      • 根據brokerAddress删除brokerLiveTable和FilterServerTable
      • 維護brokerAddrTable,周遊brokerAddrTable擷取BrokeData,然後從brokeData中的屬性brokerAddr(map集合)中剔除,如果剔除之後集合為空則從brokerAddrTable一處brokerName
      • 根據 BrokerName,從 clusterAddrTable 中找到 Broker并從叢集中移除,如果移

        除後,叢集中不包含任何 Broker,則将改叢集從clusterAddrTable 中一處

      • 根據 brokerName周遊所有主題的隊列,如果隊列中包含了目前 Broker的隊列則移除,如果 topic 隻包含待移除 Broker 的隊列的話,從路由表中删除該 topic
      • 釋放寫鎖

        然後申請一個寫鎖(防止并發更新)删除brokerLiveTable,FilterServerTable資訊,然後維護brokerAddrTable,clusterAddrTable。

6. 路由資訊的發現

  • RocketMQ路由并非是實時的,當topic路由發生變化的時候,不會主動推送給用戶端,而是由用戶端定時的拉取最新的路由資訊。
  • 傳回給用戶端的對象是TopicRouteData,其結構是
orderTopicConf:順序消息配置内容,來自于kvConfig
List<QueueData> queueDatas:topic隊列中繼資料
List<BrokerData> brokerDatas:topic 分布的 broker 中繼資料
HashMap< String/ * brokerAdress*/,List<String> /* filt rServer* /> : broker 上過濾伺服器位址清單           
  • 路由發現實作類DefaultRequestProcessor的getRoutelnfoByTopic
    • 調用 RouterlnfoManager 的方法,從路由 topicQueueTable brokerAddrTable
  1. terServerTable 中分别填充 TopicRouteData 中的 List

filterServer 位址表

  • 如果找到主題對應的路由資訊并且該主題為順序消息,則從 NameServer

    KVconfig 中擷取關于順序消息相 的配置填充路由資訊

  • 如果找不到路由資訊 CODE 則使用 TOPIC NOT_EXISTS ,表示沒有找到對應的路由

7.總結

RocketMQ-筆記 1:namesrv1. namesrv的來源2. 解析namesrv的啟動流程3. 兩主兩從的部署結構圖,下面的解析都是按照這個圖進行解析的4. 解析路由中繼資料的資料結構4. 路由資訊的注冊5. 路由資訊的删除6. 路由資訊的發現7.總結