1、消息存儲
分布式隊列因為有高可靠性的要求,是以資料要進行持久化存儲。
- 消息生成者發送消息
- MQ收到消息,将消息進行持久化,在存儲中新增一條記錄
- 傳回ACK給生産者
- MQ push 消息給對應的消費者,然後等待消費者傳回ACK
- 如果消息消費者在指定時間内成功傳回ack,那麼MQ認為消息消費成功,在存儲中删除消息,即執行第6步;如果MQ在指定時間内沒有收到ACK,則認為消息消費失敗,會嘗試重新push消息,重複執行4、5、6步驟
- MQ删除消息
1.1、存儲媒體
- 關系型資料庫DB
Apache下開源的另外一款MQ—ActiveMQ(預設采用的KahaDB做消息存儲)可選用JDBC的方式來做消息持久化,通過簡單的xml配置資訊即可實作JDBC消息存儲。由于,普通關系型資料庫(如Mysql)在單表資料量達到千萬級别的情況下,其IO讀寫性能往往會出現瓶頸。在可靠性方面,該種方案非常依賴DB,如果一旦DB出現故障,則MQ的消息就無法落盤存儲會導緻線上故障
- 檔案系統
目前業界較為常用的幾款産品(RocketMQ/Kafka/RabbitMQ)均采用的是消息刷盤至所部署虛拟機/實體機的檔案系統來做持久化(刷盤一般可以分為異步刷盤和同步刷盤兩種模式)。消息刷盤為消息存儲提供了一種高效率、高可靠性和高性能的資料持久化方式。除非部署MQ機器本身或是本地磁盤挂了,否則一般是不會出現無法持久化的故障問題。
1.2、性能對比
檔案系統>關系型資料庫DB
1.3、消息的存儲和發送
1)消息存儲
磁盤如果使用得當,磁盤的速度完全可以比對上網絡的資料傳輸速度。目前的高性能磁盤,順序寫速度可以達到600MB/s, 超過了一般網卡的傳輸速度。但是磁盤随機寫的速度隻有大概100KB/s,和順序寫的性能相差6000倍!因為有如此巨大的速度差别,好的消息隊列系統會比普通的消息隊列系統速度快多個數量級。RocketMQ的消息用順序寫,保證了消息存儲的速度。
2)消息發送
Linux作業系統分為【使用者态】和【核心态】,檔案操作、網絡操作需要涉及這兩種形态的切換,免不了進行資料複制。
一台伺服器 把本機磁盤檔案的内容發送到用戶端,一般分為兩個步驟:
1)read;讀取本地檔案内容;
2)write;将讀取的内容通過網絡發送出去。
這兩個看似簡單的操作,實際進行了4 次資料複制,分别是:
- 從磁盤複制資料到核心态記憶體;
- 從核心态記憶體複制到使用者态記憶體;
- 然後從使用者态記憶體複制到網絡驅動的核心态記憶體;
- 最後是從網絡驅動的核心态記憶體複制到網卡中進行傳輸。
RocketMQ充分利用了上述特性,也就是所謂的“零拷貝”技術,提高消息存盤和網絡發送的速度。
這裡需要注意的是,采用MappedByteBuffer這種記憶體映射的方式有幾個限制,其中之一是一次隻能映射1.5~2G 的檔案至使用者态的虛拟記憶體,這也是為何RocketMQ預設設定單個CommitLog日志資料檔案為1G的原因了
1.4、消息存儲結構
RocketMQ消息的存儲是由ConsumeQueue和CommitLog配合完成 的,消息真正的實體存儲檔案是CommitLog,ConsumeQueue是消息的邏輯隊列,類似資料庫的索引檔案,存儲的是指向實體存儲的位址。每 個Topic下的每個Message Queue都有一個對應的ConsumeQueue檔案。
- CommitLog:存儲消息的中繼資料
- ConsumerQueue:存儲消息在CommitLog的索引
- IndexFile:為了消息查詢提供了一種通過key或時間區間來查詢消息的方法,這種通過IndexFile來查找消息的方法不影響發送與消費消息的主流程
1.5、刷盤機制
RocketMQ的消息是存儲到磁盤上的,這樣既能保證斷電後恢複, 又可以讓存儲的消息量超出記憶體的限制。RocketMQ為了提高性能,會盡可能地保證磁盤的順序寫。消息在通過Producer寫入RocketMQ的時 候,有兩種寫磁盤方式,分布式同步刷盤和異步刷盤。
1)同步刷盤
在傳回寫成功狀态時,消息已經被寫入磁盤。具體流程是,消息寫入記憶體的PAGECACHE後,立刻通知刷盤線程刷盤, 然後等待刷盤完成,刷盤線程執行完成後喚醒等待的線程,傳回消息寫成功的狀态。
2)異步刷盤
在傳回寫成功狀态時,消息可能隻是被寫入了記憶體的PAGECACHE,寫操作的傳回快,吞吐量大;當記憶體裡的消息量積累到一定程度時,統一觸發寫磁盤動作,快速寫入。
3)配置
同步刷盤還是異步刷盤,都是通過Broker配置檔案裡的flushDiskType 參數設定的,這個參數被配置成SYNC_FLUSH、ASYNC_FLUSH中的 一個。
2、高可用性機制
RocketMQ分布式叢集是通過Master和Slave的配合達到高可用性的。
Master和Slave的差別:在Broker的配置檔案中,參數 brokerId的值為0表明這個Broker是Master,大于0表明這個Broker是 Slave,同時brokerRole參數也會說明這個Broker是Master還是Slave。
Master角色的Broker支援讀和寫,Slave角色的Broker僅支援讀,也就是 Producer隻能和Master角色的Broker連接配接寫入消息;Consumer可以連接配接 Master角色的Broker,也可以連接配接Slave角色的Broker來讀取消息。
2.1、消息消費高可用
在Consumer的配置檔案中,并不需要設定是從Master讀還是從Slave 讀,當Master不可用或者繁忙的時候,Consumer會被自動切換到從Slave 讀。有了自動切換Consumer這種機制,當一個Master角色的機器出現故障後,Consumer仍然可以從Slave讀取消息,不影響Consumer程式。這就達到了消費端的高可用性。
2.2、消息發送高可用
在建立Topic的時候,把Topic的多個Message Queue建立在多個Broker組上(相同Broker名稱,不同 brokerId的機器組成一個Broker組),這樣當一個Broker組的Master不可 用後,其他組的Master仍然可用,Producer仍然可以發送消息。 RocketMQ目前還不支援把Slave自動轉成Master,如果機器資源不足, 需要把Slave轉成Master,則要手動停止Slave角色的Broker,更改配置文 件,用新的配置檔案啟動Broker。
2.3、消息主從複制
如果一個Broker組有Master和Slave,消息需要從Master複制到Slave 上,有同步和異步兩種複制方式。
1)同步複制
同步複制方式是等Master和Slave均寫 成功後才回報給用戶端寫成功狀态;
在同步複制方式下,如果Master出故障, Slave上有全部的備份資料,容易恢複,但是同步複制會增大資料寫入 延遲,降低系統吞吐量。
2)異步複制
異步複制方式是隻要Master寫成功 即可回報給用戶端寫成功狀态。
在異步複制方式下,系統擁有較低的延遲和較高的吞吐量,但是如果Master出了故障,有些資料因為沒有被寫 入Slave,有可能會丢失;
3)配置
同步複制和異步複制是通過Broker配置檔案裡的brokerRole參數進行設定的,這個參數可以被設定成ASYNC_MASTER、 SYNC_MASTER、SLAVE三個值中的一個。
4)總結
實際應用中要結合業務場景,合理設定刷盤方式和主從複制方式, 尤其是SYNC_FLUSH方式,由于頻繁地觸發磁盤寫動作,會明顯降低 性能。通常情況下,應該把Master和Save配置成ASYNC_FLUSH的刷盤 方式,主從之間配置成SYNC_MASTER的複制方式,這樣即使有一台 機器出故障,仍然能保證資料不丢,是個不錯的選擇。
3、負載均衡
3.1、Producer負載均衡
Producer端,每個執行個體在發消息的時候,預設會輪詢所有的message queue發送,以達到讓消息平均落在不同的queue上。而由于queue可以散落在不同的broker,是以消息就發送到不同的broker下,如下圖:
圖中箭頭線條上的标号代表順序,釋出方會把第一條消息發送至 Queue 0,然後第二條消息發送至 Queue 1,以此類推。
3.2、Consumer負載均衡
1)叢集模式
在叢集消費模式下,每條消息隻需要投遞到訂閱這個topic的Consumer Group下的一個執行個體即可。RocketMQ采用主動拉取的方式拉取并消費消息,在拉取的時候需要明确指定拉取哪一條message queue。
而每當執行個體的數量有變更,都會觸發一次所有執行個體的負載均衡,這時候會按照queue的數量和執行個體的數量平均配置設定queue給每個執行個體。
預設的配置設定算法是AllocateMessageQueueAveragely,如下圖:
還有另外一種平均的算法是AllocateMessageQueueAveragelyByCircle,也是平均分攤每一條queue,隻是以環狀輪流分queue的形式,如下圖:
需要注意的是,叢集模式下,queue都是隻允許配置設定隻一個執行個體,這是由于如果多個執行個體同時消費一個queue的消息,由于拉取哪些消息是consumer主動控制的,那樣會導緻同一個消息在不同的執行個體下被消費多次,是以算法上都是一個queue隻分給一個consumer執行個體,一個consumer執行個體可以允許同時分到不同的queue。
通過增加consumer執行個體去分攤queue的消費,可以起到水準擴充的消費能力的作用。而有執行個體下線的時候,會重新觸發負載均衡,這時候原來配置設定到的queue将配置設定到其他執行個體上繼續消費。
但是如果consumer執行個體的數量比message queue的總數量還多的話,多出來的consumer執行個體将無法分到queue,也就無法消費到消息,也就無法起到分攤負載的作用了。是以需要控制讓queue的總數量大于等于consumer的數量。
2)廣播模式
由于廣播模式下要求一條消息需要投遞到一個消費組下面所有的消費者執行個體,是以也就沒有消息被分攤消費的說法。
在實作上,其中一個不同就是在consumer配置設定queue的時候,所有consumer都分到所有的queue。
4、消息重試
4.1、順序消息的重試
對于順序消息,當消費者消費消息失敗後,消息隊列 RocketMQ 會自動不斷進行消息重試(每次間隔時間為 1 秒),這時,應用會出現消息消費被阻塞的情況。是以,在使用順序消息時,務必保證應用能夠及時監控并處理消費失敗的情況,避免阻塞現象的發生。
4.2、無序消息的重試
對于無序消息(普通、定時、延時、事務消息),當消費者消費消息失敗時,您可以通過設定傳回狀态達到消息重試的結果。
無序消息的重試隻針對叢集消費方式生效;廣播方式不提供失敗重試特性,即消費失敗後,失敗消息不再重試,繼續消費新的消息。
1)重試次數
消息隊列 RocketMQ 預設允許每條消息最多重試 16 次,每次重試的間隔時間如下:
第幾次重試 | 與上次重試的間隔時間 | 第幾次重試 | 與上次重試的間隔時間 |
1 | 10 秒 | 9 | 7 分鐘 |
2 | 30 秒 | 10 | 8 分鐘 |
3 | 1 分鐘 | 11 | 9 分鐘 |
4 | 2 分鐘 | 12 | 10 分鐘 |
5 | 3 分鐘 | 13 | 20 分鐘 |
6 | 4 分鐘 | 14 | 30 分鐘 |
7 | 5 分鐘 | 15 | 1 小時 |
8 | 6 分鐘 | 16 | 2 小時 |
如果消息重試 16 次後仍然失敗,消息将不再投遞。如果嚴格按照上述重試時間間隔計算,某條消息在一直消費失敗的前提下,将會在接下來的 4 小時 46 分鐘之内進行 16 次重試,超過這個時間範圍消息将不再重試投遞。
注意: 一條消息無論重試多少次,這些重試消息的 Message ID 不會改變。
2)配置方式
消費失敗後,重試配置方式
叢集消費方式下,消息消費失敗後期望消息重試,需要在消息監聽器接口的實作中明确進行配置(三種方式任選一種):
- 傳回 Action.ReconsumeLater (推薦)
- 傳回 Null
- 抛出異常
public class MessageListenerImpl implements MessageListener {
@Override
public Action consume(Message message, ConsumeContext context) {
//處理消息
doConsumeMessage(message);
//方式1:傳回 Action.ReconsumeLater,消息将重試
return Action.ReconsumeLater;
//方式2:傳回 null,消息将重試
return null;
//方式3:直接抛出異常, 消息将重試
throw new RuntimeException("Consumer Message exceotion");
}
}
消費失敗後,不重試配置方式
叢集消費方式下,消息失敗後期望消息不重試,需要捕獲消費邏輯中可能抛出的異常,最終傳回 Action.CommitMessage,此後這條消息将不會再重試。
public class MessageListenerImpl implements MessageListener {
@Override
public Action consume(Message message, ConsumeContext context) {
try {
doConsumeMessage(message);
} catch (Throwable e) {
//捕獲消費邏輯中的所有異常,并傳回 Action.CommitMessage;
return Action.CommitMessage;
}
//消息處理正常,直接傳回 Action.CommitMessage;
return Action.CommitMessage;
}
}
自定義消息最大重試次數
消息隊列 RocketMQ 允許 Consumer 啟動的時候設定最大重試次數,重試時間間隔将按照如下政策:
- 最大重試次數小于等于 16 次,則重試時間間隔同上表描述。
- 最大重試次數大于 16 次,超過 16 次的重試時間間隔均為每次 2 小時。
Properties properties = new Properties();
//配置對應 Group ID 的最大消息重試次數為 20 次
properties.put(PropertyKeyConst.MaxReconsumeTimes,"20");
Consumer consumer =ONSFactory.createConsumer(properties);
注意:
- 消息最大重試次數的設定對相同 Group ID 下的所有 Consumer 執行個體有效。
- 如果隻對相同 Group ID 下兩個 Consumer 執行個體中的其中一個設定了 MaxReconsumeTimes,那麼該配置對兩個 Consumer 執行個體均生效。
- 配置采用覆寫的方式生效,即最後啟動的 Consumer 執行個體會覆寫之前的啟動執行個體的配置
擷取消息重試次數
消費者收到消息後,可按照如下方式擷取消息的重試次數:
public class MessageListenerImpl implements MessageListener {
@Override
public Action consume(Message message, ConsumeContext context) {
//擷取消息的重試次數
System.out.println(message.getReconsumeTimes());
return Action.CommitMessage;
}
}
5、死信隊列
當一條消息初次消費失敗,消息隊列 RocketMQ 會自動進行消息重試;達到最大重試次數後,若消費依然失敗,則表明消費者在正常情況下無法正确地消費該消息,此時,消息隊列 RocketMQ 不會立刻将消息丢棄,而是将其發送到該消費者對應的特殊隊列中。
在消息隊列 RocketMQ 中,這種正常情況下無法被消費的消息稱為死信消息(Dead-Letter Message),存儲死信消息的特殊隊列稱為死信隊列(Dead-Letter Queue)。
1.5.1 死信特性
死信消息具有以下特性
- 不會再被消費者正常消費。
- 有效期與正常消息相同,均為 3 天,3 天後會被自動删除。是以,請在死信消息産生後的 3 天内及時處理。
死信隊列具有以下特性:
- 一個死信隊列對應一個 Group ID, 而不是對應單個消費者執行個體。
- 如果一個 Group ID 未産生死信消息,消息隊列 RocketMQ 不會為其建立相應的死信隊列。
- 一個死信隊列包含了對應 Group ID 産生的所有死信消息,不論該消息屬于哪個 Topic。
1.5.2 檢視死信資訊
- 在控制台查詢出現死信隊列的主題資訊
- 在消息界面根據主題查詢死信消息
- 選擇重新發送消息
一條消息進入死信隊列,意味着某些因素導緻消費者無法正常消費該消息,是以,通常需要您對其進行特殊處理。排查可疑因素并解決問題後,可以在消息隊列 RocketMQ 控制台重新發送該消息,讓消費者重新消費一次。
6、消費幂等
消息隊列 RocketMQ 消費者在接收到消息以後,有必要根據業務上的唯一 Key 對消息做幂等處理的必要性。
6.1、消費幂等的必要性
在網際網路應用中,尤其在網絡不穩定的情況下,消息隊列 RocketMQ 的消息有可能會出現重複,這個重複簡單可以概括為以下情況:
-
發送時消息重複
當一條消息已被成功發送到服務端并完成持久化,此時出現了網絡閃斷或者用戶端當機,導緻服務端對用戶端應答失敗。 如果此時生産者意識到消息發送失敗并嘗試再次發送消息,消費者後續會收到兩條内容相同并且 Message ID 也相同的消息。
-
投遞時消息重複
消息消費的場景下,消息已投遞到消費者并完成業務處理,當用戶端給服務端回報應答的時候網絡閃斷。 為了保證消息至少被消費一次,消息隊列 RocketMQ 的服務端将在網絡恢複後再次嘗試投遞之前已被處理過的消息,消費者後續會收到兩條内容相同并且 Message ID 也相同的消息。
-
負載均衡時消息重複(包括但不限于網絡抖動、Broker 重新開機以及訂閱方應用重新開機)
當消息隊列 RocketMQ 的 Broker 或用戶端重新開機、擴容或縮容時,會觸發 Rebalance,此時消費者可能會收到重複消息。
6.2、處理方式
Message message = new Message();
message.setKey("ORDERID_100");
SendResult sendResult = producer.send(message);
consumer.subscribe("ons_test", "*", new MessageListener() {
public Action consume(Message message, ConsumeContext context) {
String key = message.getKey()
// 根據業務唯一辨別的 key 做幂等處理
}
});