1 關于NameServer
上一節的 MQ系列3:RocketMQ 架構分析,我們大緻介紹了 RocketMQ的基本元件構成,包括 NameServer、Broker、Producer以及Consumer四部分。
NameServer,指的是服務可以根據給定的名字來進行資源或對象的位址定位,并擷取有關的屬性資訊。在Rocket中也一樣,NameServer是 RocketMQ 的服務注冊中心(類似于 Kafka 叢集 後面的 Zookeeper 叢集一樣, 對叢集中繼資料進行管理),根據中繼資料(ip、port和router資訊)來唯一定位服務。RocketMQ 需要先啟動 NameServer ,再啟動 Rocket 中的 Broker。
2 NameServer運作流程
2.1 注冊
注冊發生在Broker啟動之後,啟動後快速與NameServer建立長連接配接,并每30s對NameService發送一次心跳包,Broker會将自己的IP Address、Port、Router 等資訊随着心跳一并注冊到 NameServer中。
這裡的RouterInfo 主要指Broker下包含哪些Topic資訊,這種映射關系友善後面消息的生産和消費的時候進行尋址。
注冊使用到的核心資料結構如下:
HashMap<String BrokerName, BrokerData> brokerAddrTable
HashMap 的 Key 是 Broker 的名稱,存儲了一個Broker服務所對應的屬性資訊。
Value 是個對象,資料結構如下:
字段 | 類型 | 說明 |
cluster | String | 所屬的叢集名稱 |
broker | String | broker的名稱 |
brokerAddress | HashMap | Broker的IP位址清單,包含一個Master IP位址清單 和 多個Slave IP位址清單 |
{
"Broker-A": {
"cluster": "Broker-Cluster",
"brokerName": "Broker-A",
"cluster": {
//1主2從
"0": "192.168.0.1: 1234",
"1": "192.168.0.2: 1234",
"2": "192.168.0.3: 1234"
}
}
}
2.2 注冊資訊更新
當你對你的Broker中的Topic資訊進行更新了(增、删、改)怎麼辦,你才需要重新将資訊注冊到NameServer中。
- 如果你建立了新的 Topic,Broker會向 NameServer 發送注冊資訊,接收到資訊後會對每個Master 角色的Broker ,建立一個新的QueueData對象。
- 如果你修改了Topic,則NameServer 會先把舊的 QueueData 删除,再加一個新的 QueueData。
- 如果你删除了Topic,則NameServer 會将對應的 QueueData 删除。
使用到的核心資料結構如下:
HashMap<String topic, List<QueueData>> topicQueueTable
HashMap 的 Key 是 Topic 的名稱,裡面存儲了Topic的所有屬性資訊。
Value 是個清單,清單的資料類型是 QueueData,清單的length就是Topic中的 Master角色的 Broker 個數。
QueueData的結構如下
字段 | 類型 | 說明 |
brokerName | String | broker名稱 |
readQueueNums | Long | 讀Queue的數量 |
writeQueueNums | Long | 寫Queue的數量 |
perm | Integer | PRIORITY = 3, READ = 2, WRITE = 1 , INHERIT = 0 |
topicSyncFlag | Long | 同步的位置辨別 |
{
"topic-test":[ // topic名稱,注意下面會用到
{
"brokerName":"Broker-A",
"readQueueNums":37,
"writeQueueNums":37,
"perm":6, // 讀寫權限
"topicSynFlag":12
},
{
"brokerName":"Broker-B",
"readQueueNums":37,
"writeQueueNums":37,
"perm":6, // 讀寫權限
"topicSynFlag":12
}
]}
參考RocketMQ源碼如下,這邊加了注釋,友善了解:
/**
* 建立或者更新 MessageQueue 的資料
* @param brokerName
* @param topicConfig
*/
private void createAndUpdateQueueData(final String brokerName, final TopicConfig topicConfig) {
QueueData queueData = new QueueData();
queueData.setBrokerName(brokerName); // broker 名稱
queueData.setWriteQueueNums(topicConfig.getWriteQueueNums()); // 讀Queue的數量
queueData.setReadQueueNums(topicConfig.getReadQueueNums()); // 寫Queue的數量
queueData.setPerm(topicConfig.getPerm()); // 權限:PRIORITY = 3, READ = 2, WRITE = 1 , INHERIT = 0
queueData.setTopicSynFlag(topicConfig.getTopicSysFlag());
List<QueueData> queueDataList = this.topicQueueTable.get(topicConfig.getTopicName());
if (null == queueDataList) { // 新增
queueDataList = new LinkedList<QueueData>();
queueDataList.add(queueData);
this.topicQueueTable.put(topicConfig.getTopicName(), queueDataList);
log.info("new topic registerd, {} {}", topicConfig.getTopicName(), queueData);
} else { // 更新
boolean addNewOne = true;
Iterator<QueueData> it = queueDataList.iterator();
while (it.hasNext()) {
QueueData qd = it.next();
if (qd.getBrokerName().equals(brokerName)) {
if (qd.equals(queueData)) {
addNewOne = false;
} else {
log.info("topic changed, {} OLD: {} NEW: {}",
topicConfig.getTopicName(), qd, queueData);
it.remove(); // 先删除
}
}
} if (addNewOne) {
queueDataList.add(queueData); // 再添加
}
}
}
2.3 異常清理
如果Broker挂掉,那麼再被消息的生産者和消費者使用就會有問題了。這時候需要對已經宕掉的Broker進行清理,確定NamServer中注冊的Broker服務資訊都是Alive的。它的做法是這樣的:
- 前面我們說了,Broker每30s對NameService發送一次心跳包給NameServer
- NameServer接收到心跳包的時候,會将目前時間戳更新到 brokerLiveTable 表的 lastUpdateTimestamp 字段中。
- NameServer中會啟動一個定時任務
- 每10s(記住這邊掃描是10s間隔,與上面心跳包區分開)掃描 一下 brokerLiveTable 表
- 檢查lastUpdateTimestamp字段,如果時間戳與目前時間相隔超過 120s(即兩分鐘),則認為 Broker 已經宕了,并會将broker清除出NameServer的系統資料庫。
使用到的核心資料結構如下:
HashMap<String BrokerAddr, BrokerLiveInfo> brokerLiveTable
HashMap 的 Key 是 Broker伺服器的位址資訊(IP+Port),裡面存儲了該Broker伺服器的基本資訊。
Value 是個對象,結構如下:
字段 | 類型 | 說明 |
lastUpdateTimestamp | Long | 最後一次收到心跳包的時間戳 |
dataVersion | DataVersion | 資料版本号對象 |
channel | Channel | netty的Channel,IO資料互動媒介 |
haServerAddr | String | master位址,初次請求的時候值為空,slave向NameServer注冊之後傳回 |
2.4 消息生産和消費
上面的步驟都完成之後,NameServer這個 “中央大腦” 正式開始投入使用。這時候 ,消息的生産和消費具體是怎麼做的呢?
- Producer 或者 Consumer 啟動之後會和 NameServer 建立長連接配接
- 定時(預設為每30s)從 NameServer 擷取Routers資訊,并将路由資訊儲存至Producer或者Consumer的本地。
- Producer發送一條消息 hello-brand 到 topic (topic-test) 中
- 因為名稱為 topic-test 的 topic 存在于多個 broker中,是以需要如下幾個步驟,才能找到具體的位址:
- 先 根據 topic 名稱 topic-test 查詢 topicQueueTable , 選擇一個并擷取它的broker資訊(包含brokerName)
- 再根據已經擷取到的brokerName 查詢 brokerAddressTable 擷取具體的Broker IP位址(一般包含1個Master和n個Slave的IP位址)
- 拿到IP位址之後,生産者與broker建立連接配接,并發送消息
- 消費者同理
3 總結
上述的流程圖比較清晰的描述如下運轉流程:
- NameServer 作為整個 RocketMQ 的“中央大腦” ,負責對叢集中繼資料進行管理,是以 RocketMQ 需要先啟動 NameServer 再啟動 Rocket 中的 Broker。
- Broker 啟動後,與 NameServer 保持長連接配接,每 30s 發送一次發送心跳包,來確定Broker是否存活。并将 Broker 資訊 ( IP+、端口等資訊)以及Broker中存儲的Topic資訊上報。注冊成功後,NameServer 叢集中就有 Topic 跟 Broker 的映射關系。
- NameServer有個定時任務,每10s掃描下brokerLiveTable表 , 如果檢測到某個Broker 當機(因為使用心跳機制, 如果檢測超120s(兩分鐘)無上報心跳),則從路由系統資料庫中将其移除。
- 生産者在發送某個主題的消息之前先從 NamerServer 擷取 Broker 伺服器位址清單(通過topic名稱查詢topicQueueTable獲得broker名稱,通過broker名稱查詢brokerAddressTable擷取具體的Broker IP位址),然後根據負載均衡算法從清單中選擇1台Broker ,建立連接配接通道,進行消息發送。
- 消費者在訂閱某個topic的消息之前從 NamerServer 擷取 Broker 伺服器位址清單(同上),包括關聯的全部Topic隊列資訊。進而擷取目前訂閱 Topic 存在哪些 Broker 上,然後直接跟 Broker 建立連接配接通道,開始消費資料。
- 生産者和消費者預設每30s 從 NamerServer 擷取 Broker 伺服器位址清單,以及關聯的所有Topic隊列資訊,更新到Client本地。
為幫助開發者們提升面試技能、有機會入職BATJ等大廠公司,特别制作了這個專輯——這一次整體放出。
大緻内容包括了: Java 集合、JVM、多線程、并發程式設計、設計模式、Spring全家桶、Java、MyBatis、ZooKeeper、Dubbo、Elasticsearch、Memcached、MongoDB、Redis、MySQL、RabbitMQ、Kafka、Linux、Netty、Tomcat等大廠面試題等、等技術棧!
歡迎大家關注公衆号【Java爛豬皮】,回複【666】,擷取以上最新Java後端架構VIP學習資料以及視訊學習教程,然後一起學習,一文在手,面試我有。
每一個專欄都是大家非常關心,和非常有價值的話題,如果我的文章對你有所幫助,還請幫忙點贊、好評、轉發一下,你的支援會激勵我輸出更高品質的文章,非常感謝!