雲栖号資訊:【 點選檢視更多行業資訊】
在這裡您可以找到不同行業的第一手的上雲資訊,還在等什麼,快來!
簡述
消息中間件作為分布式系統的重要成員,各大公司及開源均有許多解決方案。目前主流的開源解決方案包括RabbitMQ、RocketMQ、Kafka、ActiveMQ等。消息這個東西說簡單也簡單,說難也難。簡單之處在于好用友善,接入簡單使用簡單,異步操作能夠解耦系統間的依賴,同時失敗後也能夠追溯重試。難的地方在于,設計一套可以支撐業務的消息機制,并提供高可用架構,解決消息存儲、消息重試、消息隊列的負載均衡等一系列問題。然而難也不代表沒有方法或者“套路”,熟悉一下原理與實作,多看幾個架構的源碼後多總結勢必能找出一些共性。
消息架構大同小異,熟練掌握其原理、工作機制是必要的。就拿用的比較多的RocketMQ為引,來說說消息引擎的設計與實作。阿裡的消息引擎經過了從Notify到Napoli、再到MetaQ三代的發展,現在已經非常成熟,在不同部門的代碼中現在沒準都還可以從代碼裡看到這一系列演進過程。目前的Apache RocketMQ 就是阿裡将MetaQ項目捐贈給了Apache基金會,而内部還是沿用MetaQ的名稱。
首先诠釋幾個消息相關的基本概念
- 每個消息隊列都必須建立一個Topic。
- 消息可以分組,每個消息隊列都至少需要一個生産者Producer和一個消費者Consumer。生産者生産發送消息,消費者接收消費消息。
- 每個消費者和生産者都會配置設定一個ID。
RocketMQ 系統架構

首先來看看RocketMQ的架構,如上圖所示,簡要描述一下幾種角色及作用。
NameServer
- NameServer是消息Topic的注冊中心,用于發現和管理消息生産者、消費者、維護路由關系。
Broker
- 消息存儲與轉發的中轉站,使用隊列機制管理資料存儲。Broker中會存儲多份消息資料進行容錯,以Master/Slave的架構保證系統的高可用,Broker中可以部署單個或多個Master。單個Master的場景,Master挂掉後,Producer新産生的消息無法被消費,但已經發送到Broker的消息,由于Slave節點的存在,還能繼續被Consumer所消費;如果部署多個Master則系統能能正常運轉。
- 另外,Broker中的Master和Slave不是像Zookeeper叢集中用選舉機制進行确定,而是固定的配置,這也是在高可用場景需要部署多個Master的原因。生産者将消息發送到Broker中後,Broker會将消息寫到本地的CommitLog檔案中,儲存消息。
Producer
生産者會和NameServer叢集中某一節點建立長連結,定時從NamerServeri擷取Topic路由資訊,并且和Broker建立心跳。
Consumer
消費者需要給生産者一個明确的消費成功的回應,MetaQ才會認為消費成功,否則失敗。失敗後,RocketMQ會将消息重新發回Broker,在指定的延遲時間内進行重試,當重試達到一定的次數後(預設16次),MetaQ則認為此消息不能被消費,消息會被投遞到死信隊列。
這個架構看其實是否很熟悉?好像接觸過的一些分布式系統的架構和這個長的都比較像是吧,甚至隻要裡面框圖的角色稍微換換就能變成另外一個架構的介紹,比如Dubbo/Redis...。
并且在RocketMQ架構設計中,要解決的問題與其他分布式架構也可以觸類旁通。Master/Slave機制,天然的讀寫分離方式都是分布式高可用系統的典型解決方案。
負載均衡
負載均衡是消息架構需要解決的又一個重要問題。當系統中生産者生産了大量消息,而消費者有多個或多台機器時,就需要平衡負載,讓消息均分地被消費者進行消費。目前RocketMQ中使用了多種負載均衡算法。主要有以下幾種,靜态配置由于比較簡單,就是直接為消費者指定需要消費的隊列是以直接忽略。
- 求平均數法
- 環形隊列法
- 一緻Hash算法
- Machine Room算法
- 靜态配置
來看一下源碼,RocketMQ内部對以上負載均衡算法均有實作,并定義了一個接口 AllocateMessageQueueStrategy,采用了政策模式,每種負載均衡算法都依靠實作這個接口實作。在運作态時,會擷取這個接口的執行個體,進而動态判斷到底采用的是哪種負載均衡算法。
1. 求平均數法
顧名思義,就是根據消息隊列的數量和消費者的數量,求出單個消費者上應該負擔的平均消費隊列數,然後根據消費者的ID,按照取模的方式将消息隊列配置設定到指定的consumer上。具體代碼可以去Github上找,截取核心算法代碼如下, mqAll就是消息隊列的結構,是一個MessageQueue的List,cidAll是消費者ID的清單,也是一個List。考慮mqAll和cidAll固定時以及變化時,目前消費者節點會從隊列中擷取到哪個隊列中的消息,比如當 averageSize 大于1時,這時每個消費者上的消息隊列就不止一個,而配置設定在每個消費者的上的隊列的ID是連續的。
@Override
public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
List<String> cidAll) {
if (currentCID == null || currentCID.length() < 1) {
throw new IllegalArgumentException("currentCID is empty");
}
if (mqAll == null || mqAll.isEmpty()) {
throw new IllegalArgumentException("mqAll is null or mqAll empty");
}
if (cidAll == null || cidAll.isEmpty()) {
throw new IllegalArgumentException("cidAll is null or cidAll empty");
}
List<MessageQueue> result = new ArrayList<MessageQueue>();
if (!cidAll.contains(currentCID)) {
log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}",
consumerGroup,
currentCID,
cidAll);
return result;
}
int index = cidAll.indexOf(currentCID);
int mod = mqAll.size() % cidAll.size();
int averageSize =
mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size()
+ 1 : mqAll.size() / cidAll.size());
int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;
int range = Math.min(averageSize, mqAll.size() - startIndex);
for (int i = 0; i < range; i++) {
result.add(mqAll.get((startIndex + i) % mqAll.size()));
}
return result;
}
2. 環形平均法
這種算法更為簡單。首先擷取目前消費者在整個清單中的下标index,直接用求餘方法得到目前消費者應該處理的消息隊列。注意mqAll的size和cidAll的size可以是任意的。
- 當ciAll.size() == mqAll.size() 時,該算法就是類似hashtable的求餘分桶。
- 當ciAll.size() > mqAll.size()時,那麼多出的消費者上并不能擷取到消費的隊列,隻有部分消費者能夠擷取到消息隊列并執行,相當于在消費者資源充足的情況下,由于隊列數少,是以使用其中一部分消費者就能滿足需求,不用額外的開銷。
- 當ciAll.size() < mqAll.size()時,這樣每個消費者上需要負載的隊列數就超過了1個,并且差別于直接求平均的方式,配置設定在每個消費者上的消費隊列不是連續的,而是有一定步長的間隔。
實作代碼
@Override
public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
List<String> cidAll) {
if (currentCID == null || currentCID.length() < 1) {
throw new IllegalArgumentException("currentCID is empty");
}
if (mqAll == null || mqAll.isEmpty()) {
throw new IllegalArgumentException("mqAll is null or mqAll empty");
}
if (cidAll == null || cidAll.isEmpty()) {
throw new IllegalArgumentException("cidAll is null or cidAll empty");
}
List<MessageQueue> result = new ArrayList<MessageQueue>();
if (!cidAll.contains(currentCID)) {
log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}",
consumerGroup,
currentCID,
cidAll);
return result;
}
int index = cidAll.indexOf(currentCID);
for (int i = index; i < mqAll.size(); i++) {
if (i % cidAll.size() == index) {
result.add(mqAll.get(i));
}
}
return result;
}
3. 一緻Hash算法
循環所有需要消費的隊列,根據隊列toString後的hash值計算出處理目前隊列的最近節點并配置設定給該節點。routeNode中方法稍微複雜一些,有時間建議細看,這裡就隻說功能。
@Override
public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
List<String> cidAll) {
if (currentCID == null || currentCID.length() < 1) {
throw new IllegalArgumentException("currentCID is empty");
}
if (mqAll == null || mqAll.isEmpty()) {
throw new IllegalArgumentException("mqAll is null or mqAll empty");
}
if (cidAll == null || cidAll.isEmpty()) {
throw new IllegalArgumentException("cidAll is null or cidAll empty");
}
List<MessageQueue> result = new ArrayList<MessageQueue>();
if (!cidAll.contains(currentCID)) {
log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}",
consumerGroup,
currentCID,
cidAll);
return result;
}
Collection<ClientNode> cidNodes = new ArrayList<ClientNode>();
for (String cid : cidAll) {
cidNodes.add(new ClientNode(cid));
}
final ConsistentHashRouter<ClientNode> router; //for building hash ring
if (customHashFunction != null) {
router = new ConsistentHashRouter<ClientNode>(cidNodes, virtualNodeCnt, customHashFunction);
} else {
router = new ConsistentHashRouter<ClientNode>(cidNodes, virtualNodeCnt);
}
List<MessageQueue> results = new ArrayList<MessageQueue>();
for (MessageQueue mq : mqAll) {
ClientNode clientNode = router.routeNode(mq.toString());
if (clientNode != null && currentCID.equals(clientNode.getKey())) {
results.add(mq);
}
}
return results;
}
4. Machine Room算法
基于機房的Hash算法。這個命名看起來很詐唬,其實和上面的普通求餘算法是一樣的,隻不過多了個配置和過濾,為了把這個說清楚就把源碼貼全一點。可以看到在這個算法的實作類中多了一個成員 consumeridcs,這個就是consumer id的一個集合,按照一定的約定,預先給broker命名,例如us@metaq4,然後給不同叢集配置不同的consumeridcs,進而實作不同機房處理不同消息隊列的能力。
@Override
public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
List<String> cidAll) {
List<MessageQueue> result = new ArrayList<MessageQueue>();
int currentIndex = cidAll.indexOf(currentCID);
if (currentIndex < 0) {
return result;
}
List<MessageQueue> premqAll = new ArrayList<MessageQueue>();
for (MessageQueue mq : mqAll) {
String[] temp = mq.getBrokerName().split("@");
if (temp.length == 2 && consumeridcs.contains(temp[0])) {
premqAll.add(mq);
}
}
int mod = premqAll.size() / cidAll.size();
int rem = premqAll.size() % cidAll.size();
int startIndex = mod * currentIndex;
int endIndex = startIndex + mod;
for (int i = startIndex; i < endIndex; i++) {
result.add(mqAll.get(i));
}
if (rem > currentIndex) {
result.add(premqAll.get(currentIndex + mod * cidAll.size()));
}
return result;
}
應用執行個體
由于近些年阿裡海外業務的擴充和投入,RocketMQ等中間件對常見的海外業務場景的支援也更加健全。典型的場景包括跨單元消費以及全球消息路由。
跨單元消費的應用是比較好實作的,就是在consumer中額外增加一個配置,指定接收消息的來源單元,RocketMQ内部會完成用戶端從指定單元拉取消息的工作。而全球消息路由則是需要一些公共資源,消息的發送方隻能将消息發送到一個指定單元/機房,然後将消息路由到另外指定的單元,consumer部署在指定單元。差別在于一個配置在用戶端,一個配置在服務端。
如上圖所示就是一個需要跨單元消費的場景,Producer在多個站點,均能夠發送消息,但由于機房、資源及政策限制等原因消息隻能被發送到一個統一的單元,例如新加坡SG單元。是以多個機房部署的消費者就必須進行跨單元收消息,然後過濾跟自身相關的消息,進行業務邏輯處理,消息會平均地配置設定在每一台consumer叢集的伺服器上。
總結
從RocketMQ的設計、原理以及用過的個人用過的其他分布式架構上看,典型的分布式系統在設計中無外乎要解決的就是以下幾點,RocketMQ全都用上了。
- 服務的注冊和發現。一般會有一個統一的注冊中心進行管理維護。
- 服務的提供方和使用方間的通信,可以是異步也可以是同步,例如dubbo服務同步服務,而消息類型就是異步通信。
- HA——高可用架構。八字決 ———— “主從同步,讀寫分離”。 要再加一句的話可以是“異地多活”。
- 負載均衡。典型的負載均衡算法在文章内容裡面已經列出好幾種了,常用的基本也就這些。
當然消息架構設計中用到的套路遠不止這些,包括如何保證消息消費的順序性、消費者和服務端通信、以及消息持久化等問題也是難點和重點,同樣,分布式緩存系統也需要解決這些問題,先寫到這裡,要完全了解并自己設計一個這樣的架構難度還是相當大的。
【雲栖号線上課堂】每天都有産品技術專家分享!
課程位址:
https://yqh.aliyun.com/live立即加入社群,與專家面對面,及時了解課程最新動态!
【雲栖号線上課堂 社群】
https://c.tb.cn/F3.Z8gvnK
原文釋出時間:2020-06-06
本文作者:Shadowfiendxiaoh
本文來自:“
掘金”,了解相關資訊可以關注“掘金”