天天看點

MQ系列4:NameServer 原了解析

MQ系列1:消息中間件執行原理

MQ系列2:消息中間件的技術選型

MQ系列3:RocketMQ 架構分析

1 關于NameServer

上一節的 MQ系列3:RocketMQ 架構分析,我們大緻介紹了 RocketMQ的基本元件構成,包括 NameServer、Broker、Producer以及Consumer四部分。

NameServer,指的是服務可以根據給定的名字來進行資源或對象的位址定位,并擷取有關的屬性資訊。在Rocket中也一樣,NameServer是 RocketMQ 的服務注冊中心(類似于 Kafka 叢集 後面的 Zookeeper 叢集一樣, 對叢集中繼資料進行管理),根據中繼資料(ip、port和router資訊)來唯一定位服務。RocketMQ 需要先啟動 NameServer ,再啟動 Rocket 中的 Broker。

2 NameServer運作流程

2.1 注冊

注冊發生在Broker啟動之後,啟動後快速與NameServer建立長連接配接,并每30s對NameService發送一次心跳包,Broker會将自己的IP Address、Port、Router 等資訊随着心跳一并注冊到 NameServer中。

MQ系列4:NameServer 原了解析

這裡的RouterInfo 主要指Broker下包含哪些Topic資訊,這種映射關系友善後面消息的生産和消費的時候進行尋址。

注冊使用到的核心資料結構如下:

HashMap<String BrokerName, BrokerData> brokerAddrTable

  • HashMap 的 Key 是 Broker 的名稱,存儲了一個Broker服務所對應的屬性資訊。
  • Value 是個對象,資料結構如下:
字段 類型 說明
cluster String 所屬的叢集名稱
broker String broker的名稱
brokerAddress HashMap Broker的IP位址清單,包含一個Master IP位址清單 和 多個Slave IP位址清單
" Broker-A":{
	"cluster":"Broker-Cluster",
	"brokerName":"Broker-A",
	"cluster":{  // 1主2從
	   "0":"192.168.0.1:1234",
	   "1":"192.168.0.2:1234",
	   "2":"192.168.0.3:1234"
	}
}
           

2.2 注冊資訊更新

當你對你的Broker中的Topic資訊進行更新了(增、删、改)怎麼辦,你才需要重新将資訊注冊到NameServer中。

  • 如果你建立了新的 Topic,Broker會向 NameServer 發送注冊資訊,接收到資訊後會對每個Master 角色的Broker ,建立一個新的QueueData對象。
  • 如果你修改了Topic,則NameServer 會先把舊的 QueueData 删除,在加一個新的 QueueData。
  • 如果你删除了Topic,則NameServer 會将對應的 QueueData 删除。
MQ系列4:NameServer 原了解析

使用到的核心資料結構如下:

HashMap<String topic, List<QueueData>> topicQueueTable

  • HashMap 的 Key 是 Topic 的名稱,裡面存儲了Topic的所有屬性資訊。
  • Value 是個清單,清單的資料類型是 QueueData,清單的length就是Topic中的 Master角色的 Broker 個數。
  • QueueData的結構如下
字段 類型 說明
brokerName String broker名稱
readQueueNums Long 讀Queue的數量
writeQueueNums Long 寫Queue的數量
perm Integer 權限 PRIORITY = 3, READ = 2, WRITE = 1 , INHERIT = 0
topicSyncFlag Long 同步的位置辨別
{
  "topic-test":[ // topic名稱,注意下面會用到
   { 
    "brokerName":"Broker-A",
	"readQueueNums":37,
	"writeQueueNums":37,
	"perm":6,  // 讀寫權限
	"topicSynFlag":12
   },
   { 
    "brokerName":"Broker-B",
	"readQueueNums":37,
	"writeQueueNums":37,
	"perm":6,  // 讀寫權限
	"topicSynFlag":12
   } 
 ]
}
           

參考RocketMQ源碼如下,這邊加了注釋,友善了解:

/**
     * 建立或者更新 MessageQueue 的資料
     * @param brokerName
     * @param topicConfig
     */
    private void createAndUpdateQueueData(final String brokerName, final TopicConfig topicConfig) {
        QueueData queueData = new QueueData();
        queueData.setBrokerName(brokerName); // broker 名稱
        queueData.setWriteQueueNums(topicConfig.getWriteQueueNums());   // 讀Queue的數量
        queueData.setReadQueueNums(topicConfig.getReadQueueNums());  // 寫Queue的數量
        queueData.setPerm(topicConfig.getPerm());  // 權限: PRIORITY = 3, READ = 2, WRITE = 1 , INHERIT = 0
        queueData.setTopicSynFlag(topicConfig.getTopicSysFlag());

        List<QueueData> queueDataList = this.topicQueueTable.get(topicConfig.getTopicName());
        if (null == queueDataList) {  // 新增
            queueDataList = new LinkedList<QueueData>();
            queueDataList.add(queueData);
            this.topicQueueTable.put(topicConfig.getTopicName(), queueDataList);
            log.info("new topic registerd, {} {}", topicConfig.getTopicName(), queueData);
        } else {   // 更新
            boolean addNewOne = true;   
            Iterator<QueueData> it = queueDataList.iterator();
            while (it.hasNext()) {
                QueueData qd = it.next();
                if (qd.getBrokerName().equals(brokerName)) {
                    if (qd.equals(queueData)) {
                        addNewOne = false;
                    } else {
                        log.info("topic changed, {} OLD: {} NEW: {}", topicConfig.getTopicName(), qd,
                                queueData);
                        it.remove();   // 先删除
                    }
                }
            }
            if (addNewOne) {
                queueDataList.add(queueData);   // 再添加
            }
        }
    }
           

2.3 異常清理

如果Broker挂掉,那麼再被消息的生産者和消費者使用就會有問題了。這時候需要對已經宕掉的Broker進行清理,確定NamServer中注冊的Broker服務資訊都是Alive的。它的做法是這樣的:

  • 前面我們說了,Broker每30s對NameService發送一次心跳包給NameServer
  • NameServer接收到心跳包的時候,會将目前時間戳更新到

    brokerLiveTable

    表的

    lastUpdateTimestamp

    字段中。
  • NameServer中會啟動一個定時任務
  • 每10s(記住這邊掃描是10s間隔,與上面心跳包區分開)掃描 一下

    brokerLiveTable

  • 檢查

    lastUpdateTimestamp

    字段,如果時間戳與目前時間相隔超過 120s(即兩分鐘),則認為 Broker 已經宕了,并會将broker清除出NameServer的系統資料庫。
    MQ系列4:NameServer 原了解析

使用到的核心資料結構如下:

HashMap<String BrokerAddr, BrokerLiveInfo> brokerLiveTable

  • HashMap 的 Key 是 Broker伺服器的位址資訊(IP+Port),裡面存儲了該Broker伺服器的基本資訊。
  • Value 是個對象,結構如下:
字段 類型 說明
lastUpdateTimestamp Long 最後一次收到心跳包的時間戳
dataVersion DataVersion 資料版本号對象
channel Channel netty的Channel,IO資料互動媒介
haServerAddr String master位址,初次請求的時候值為空,slave向NameServer注冊之後傳回

2.4 消息生産和消費

上面的步驟都完成之後,NameServer這個 "中央大腦" 正式開始投入使用。這時候 ,消息的生産和消費具體是怎麼做的呢?

  • Producer 或者 Consumer 啟動之後會和 NameServer 建立長連接配接
  • 定時(預設為每30s)從 NameServer 擷取Routers資訊,并将路由資訊儲存至Producer或者Consumer的本地。
  • Producer發送一條消息

    hello-brand

    到 topic (

    topic-test

    ) 中
  • 因為名稱為

    topic-test

    的 topic 存在于多個 broker中,是以需要如下幾個步驟,才能找到具體的位址:
    • 先 根據 topic 名稱

      topic-test

      查詢

      topicQueueTable

      , 選擇一個并擷取它的broker資訊(包含brokerName)
    • 再根據已經擷取到的brokerName 查詢

      brokerAddressTable

      擷取具體的Broker IP位址(一般包含1個Master和n個Slave的IP位址)
    • 拿到IP位址之後,生産者與broker建立連接配接,并發送消息
    • 消費者同理
      MQ系列4:NameServer 原了解析

3 總結

MQ系列4:NameServer 原了解析

上述的流程圖比較清晰的描述如下運轉流程:

  • NameServer 作為整個 RocketMQ 的“中央大腦” ,負責對叢集中繼資料進行管理,是以 RocketMQ 需要先啟動 NameServer 再啟動 Rocket 中的 Broker。
  • Broker 啟動後,與 NameServer 保持長連接配接,每 30s 發送一次發送心跳包,來確定Broker是否存活。并将 Broker 資訊 ( IP+、端口等資訊)以及Broker中存儲的Topic資訊上報。注冊成功後,NameServer 叢集中就有 Topic 跟 Broker 的映射關系。
  • NameServer有個定時任務,每10s掃描下

    brokerLiveTable

    表 , 如果檢測到某個Broker 當機(因為使用心跳機制, 如果檢測超120s(兩分鐘)無上報心跳),則從路由系統資料庫中将其移除。
  • 生産者在發送某個主題的消息之前先從 NamerServer 擷取 Broker 伺服器位址清單(通過topic名稱查詢

    topicQueueTable

    獲得broker名稱,通過broker名稱查詢

    brokerAddressTable

    擷取具體的Broker IP位址),然後根據負載均衡算法從清單中選擇1台Broker ,建立連接配接通道,進行消息發送。
  • 消費者在訂閱某個topic的消息之前從 NamerServer 擷取 Broker 伺服器位址清單(同上),包括關聯的全部Topic隊列資訊。進而擷取目前訂閱 Topic 存在哪些 Broker 上,然後直接跟 Broker 建立連接配接通道,開始消費資料。
  • 生産者和消費者預設每30s 從 NamerServer 擷取 Broker 伺服器位址清單,以及關聯的所有Topic隊列資訊,更新到Client本地。

參考:

https://zhuanlan.zhihu.com/p/388807516

架構與思維·公衆号:撰稿者為bat、位元組的幾位高階研發/架構。不做廣告、不賣課、不要打賞,隻分享優質技術

★ 加公衆号擷取學習資料和面試集錦

碼字不易,歡迎關注,歡迎轉載

作者:翁智華

出處:https://www.cnblogs.com/wzh2010/

本文采用「CC BY 4.0」知識共享協定進行許可,轉載請注明作者及出處。

繼續閱讀