天天看點

深入了解MQ:NameServer 原了解析

作者:JAVA後端架構
深入了解MQ:NameServer 原了解析

1 關于NameServer

上一節的 MQ系列3:RocketMQ 架構分析,我們大緻介紹了 RocketMQ的基本元件構成,包括 NameServer、Broker、Producer以及Consumer四部分。

NameServer,指的是服務可以根據給定的名字來進行資源或對象的位址定位,并擷取有關的屬性資訊。在Rocket中也一樣,NameServer是 RocketMQ 的服務注冊中心(類似于 Kafka 叢集 後面的 Zookeeper 叢集一樣, 對叢集中繼資料進行管理),根據中繼資料(ip、port和router資訊)來唯一定位服務。RocketMQ 需要先啟動 NameServer ,再啟動 Rocket 中的 Broker。

2 NameServer運作流程

2.1 注冊

注冊發生在Broker啟動之後,啟動後快速與NameServer建立長連接配接,并每30s對NameService發送一次心跳包,Broker會将自己的IP Address、Port、Router 等資訊随着心跳一并注冊到 NameServer中。

深入了解MQ:NameServer 原了解析

這裡的RouterInfo 主要指Broker下包含哪些Topic資訊,這種映射關系友善後面消息的生産和消費的時候進行尋址。

注冊使用到的核心資料結構如下:

HashMap<String BrokerName, BrokerData> brokerAddrTable           

HashMap 的 Key 是 Broker 的名稱,存儲了一個Broker服務所對應的屬性資訊。

Value 是個對象,資料結構如下:

字段 類型 說明
cluster String 所屬的叢集名稱
broker String broker的名稱
brokerAddress HashMap Broker的IP位址清單,包含一個Master IP位址清單 和 多個Slave IP位址清單
{
    "Broker-A": {
        "cluster": "Broker-Cluster",
        "brokerName": "Broker-A",
        "cluster": {
            //1主2從
            "0": "192.168.0.1: 1234",
            "1": "192.168.0.2: 1234",
            "2": "192.168.0.3: 1234"
        }
    }
}

           

2.2 注冊資訊更新

當你對你的Broker中的Topic資訊進行更新了(增、删、改)怎麼辦,你才需要重新将資訊注冊到NameServer中。

  • 如果你建立了新的 Topic,Broker會向 NameServer 發送注冊資訊,接收到資訊後會對每個Master 角色的Broker ,建立一個新的QueueData對象。
  • 如果你修改了Topic,則NameServer 會先把舊的 QueueData 删除,再加一個新的 QueueData。
  • 如果你删除了Topic,則NameServer 會将對應的 QueueData 删除。
深入了解MQ:NameServer 原了解析

使用到的核心資料結構如下:

HashMap<String topic, List<QueueData>> topicQueueTable           

HashMap 的 Key 是 Topic 的名稱,裡面存儲了Topic的所有屬性資訊。

Value 是個清單,清單的資料類型是 QueueData,清單的length就是Topic中的 Master角色的 Broker 個數。

QueueData的結構如下

字段 類型 說明
brokerName String broker名稱
readQueueNums Long 讀Queue的數量
writeQueueNums Long 寫Queue的數量
perm Integer PRIORITY = 3, READ = 2, WRITE = 1 , INHERIT = 0
topicSyncFlag Long 同步的位置辨別
{
  "topic-test":[ // topic名稱,注意下面會用到
   { 
    "brokerName":"Broker-A",
  "readQueueNums":37,
  "writeQueueNums":37,
  "perm":6,  // 讀寫權限
  "topicSynFlag":12
   },
   { 
    "brokerName":"Broker-B",
  "readQueueNums":37,
  "writeQueueNums":37,
  "perm":6,  // 讀寫權限
  "topicSynFlag":12
   }
]}

           

參考RocketMQ源碼如下,這邊加了注釋,友善了解:

/**
     * 建立或者更新 MessageQueue 的資料
     * @param brokerName
     * @param topicConfig
     */
    private void createAndUpdateQueueData(final String brokerName, final TopicConfig topicConfig) {
        QueueData queueData = new QueueData();
        queueData.setBrokerName(brokerName); // broker 名稱
        queueData.setWriteQueueNums(topicConfig.getWriteQueueNums());   // 讀Queue的數量
        queueData.setReadQueueNums(topicConfig.getReadQueueNums());  // 寫Queue的數量
        queueData.setPerm(topicConfig.getPerm());  // 權限:PRIORITY = 3, READ = 2, WRITE = 1 , INHERIT = 0
        queueData.setTopicSynFlag(topicConfig.getTopicSysFlag());

        List<QueueData> queueDataList = this.topicQueueTable.get(topicConfig.getTopicName());        
        if (null == queueDataList) {  // 新增
            queueDataList = new LinkedList<QueueData>();
            queueDataList.add(queueData);            
            this.topicQueueTable.put(topicConfig.getTopicName(), queueDataList);
            log.info("new topic registerd, {} {}", topicConfig.getTopicName(), queueData);
        } else {   // 更新
            boolean addNewOne = true;   
            Iterator<QueueData> it = queueDataList.iterator();            
            while (it.hasNext()) {
                QueueData qd = it.next();                
                if (qd.getBrokerName().equals(brokerName)) {                    
                if (qd.equals(queueData)) {
                        addNewOne = false;
                    } else {
                        log.info("topic changed, {} OLD: {} NEW: {}", 
                        topicConfig.getTopicName(), qd, queueData);
                        it.remove();   // 先删除
                    }
                }
            }            if (addNewOne) {
                queueDataList.add(queueData);   // 再添加
            }
        }
    }
           

2.3 異常清理

如果Broker挂掉,那麼再被消息的生産者和消費者使用就會有問題了。這時候需要對已經宕掉的Broker進行清理,確定NamServer中注冊的Broker服務資訊都是Alive的。它的做法是這樣的:

  • 前面我們說了,Broker每30s對NameService發送一次心跳包給NameServer
  • NameServer接收到心跳包的時候,會将目前時間戳更新到 brokerLiveTable 表的 lastUpdateTimestamp 字段中。
  • NameServer中會啟動一個定時任務
  • 每10s(記住這邊掃描是10s間隔,與上面心跳包區分開)掃描 一下 brokerLiveTable 表
  • 檢查lastUpdateTimestamp字段,如果時間戳與目前時間相隔超過 120s(即兩分鐘),則認為 Broker 已經宕了,并會将broker清除出NameServer的系統資料庫。
深入了解MQ:NameServer 原了解析

使用到的核心資料結構如下:

HashMap<String BrokerAddr, BrokerLiveInfo> brokerLiveTable           

HashMap 的 Key 是 Broker伺服器的位址資訊(IP+Port),裡面存儲了該Broker伺服器的基本資訊。

Value 是個對象,結構如下:

字段 類型 說明
lastUpdateTimestamp Long 最後一次收到心跳包的時間戳
dataVersion DataVersion 資料版本号對象
channel Channel netty的Channel,IO資料互動媒介
haServerAddr String master位址,初次請求的時候值為空,slave向NameServer注冊之後傳回

2.4 消息生産和消費

上面的步驟都完成之後,NameServer這個 “中央大腦” 正式開始投入使用。這時候 ,消息的生産和消費具體是怎麼做的呢?

  • Producer 或者 Consumer 啟動之後會和 NameServer 建立長連接配接
  • 定時(預設為每30s)從 NameServer 擷取Routers資訊,并将路由資訊儲存至Producer或者Consumer的本地。
  • Producer發送一條消息 hello-brand 到 topic (topic-test) 中
  • 因為名稱為 topic-test 的 topic 存在于多個 broker中,是以需要如下幾個步驟,才能找到具體的位址:
  1. 先 根據 topic 名稱 topic-test 查詢 topicQueueTable , 選擇一個并擷取它的broker資訊(包含brokerName)
  2. 再根據已經擷取到的brokerName 查詢 brokerAddressTable 擷取具體的Broker IP位址(一般包含1個Master和n個Slave的IP位址)
  3. 拿到IP位址之後,生産者與broker建立連接配接,并發送消息
  4. 消費者同理
深入了解MQ:NameServer 原了解析

3 總結

深入了解MQ:NameServer 原了解析

上述的流程圖比較清晰的描述如下運轉流程:

  • NameServer 作為整個 RocketMQ 的“中央大腦” ,負責對叢集中繼資料進行管理,是以 RocketMQ 需要先啟動 NameServer 再啟動 Rocket 中的 Broker。
  • Broker 啟動後,與 NameServer 保持長連接配接,每 30s 發送一次發送心跳包,來確定Broker是否存活。并将 Broker 資訊 ( IP+、端口等資訊)以及Broker中存儲的Topic資訊上報。注冊成功後,NameServer 叢集中就有 Topic 跟 Broker 的映射關系。
  • NameServer有個定時任務,每10s掃描下brokerLiveTable表 , 如果檢測到某個Broker 當機(因為使用心跳機制, 如果檢測超120s(兩分鐘)無上報心跳),則從路由系統資料庫中将其移除。
  • 生産者在發送某個主題的消息之前先從 NamerServer 擷取 Broker 伺服器位址清單(通過topic名稱查詢topicQueueTable獲得broker名稱,通過broker名稱查詢brokerAddressTable擷取具體的Broker IP位址),然後根據負載均衡算法從清單中選擇1台Broker ,建立連接配接通道,進行消息發送。
  • 消費者在訂閱某個topic的消息之前從 NamerServer 擷取 Broker 伺服器位址清單(同上),包括關聯的全部Topic隊列資訊。進而擷取目前訂閱 Topic 存在哪些 Broker 上,然後直接跟 Broker 建立連接配接通道,開始消費資料。
  • 生産者和消費者預設每30s 從 NamerServer 擷取 Broker 伺服器位址清單,以及關聯的所有Topic隊列資訊,更新到Client本地。

為幫助開發者們提升面試技能、有機會入職BATJ等大廠公司,特别制作了這個專輯——這一次整體放出。

大緻内容包括了: Java 集合、JVM、多線程、并發程式設計、設計模式、Spring全家桶、Java、MyBatis、ZooKeeper、Dubbo、Elasticsearch、Memcached、MongoDB、Redis、MySQL、RabbitMQ、Kafka、Linux、Netty、Tomcat等大廠面試題等、等技術棧!

深入了解MQ:NameServer 原了解析

歡迎大家關注公衆号【Java爛豬皮】,回複【666】,擷取以上最新Java後端架構VIP學習資料以及視訊學習教程,然後一起學習,一文在手,面試我有。

每一個專欄都是大家非常關心,和非常有價值的話題,如果我的文章對你有所幫助,還請幫忙點贊、好評、轉發一下,你的支援會激勵我輸出更高品質的文章,非常感謝!

深入了解MQ:NameServer 原了解析

繼續閱讀