天天看點

Rocketmq原理&最佳實踐RocketMQ叢集概述

MQ背景&選型

消息隊列作為高并發系統的核心元件之一,能夠幫助業務系統解構提升開發效率和系統穩定性。主要具有以下優勢:

  • 削峰填谷(主要解決瞬時寫壓力大于應用服務能力導緻消息丢失、系統奔潰等問題)
  • 系統解耦(解決不同重要程度、不同能力級别系統之間依賴導緻一死全死)
  • 提升性能(當存在一對多調用時,可以發一條消息給消息系統,讓消息系統通知相關系統)
  • 蓄流壓測(線上有些鍊路不好壓測,可以通過堆積一定量消息再放開來壓測)

目前主流的MQ主要是Rocketmq、kafka、Rabbitmq,Rocketmq相比于Rabbitmq、kafka具有主要優勢特性有:

• 支援事務型消息(消息發送和DB操作保持兩方的最終一緻性,rabbitmq和kafka不支援)

• 支援結合rocketmq的多個系統之間資料最終一緻性(多方事務,二方事務是前提)

• 支援18個級别的延遲消息(rabbitmq和kafka不支援)

• 支援指定次數和時間間隔的失敗消息重發(kafka不支援,rabbitmq需要手動确認)

• 支援consumer端tag過濾,減少不必要的網絡傳輸(rabbitmq和kafka不支援)

• 支援重複消費(rabbitmq不支援,kafka支援)

Rocketmq、kafka、Rabbitmq的詳細對比,請參照下表格:

Rocketmq原理&最佳實踐RocketMQ叢集概述

RocketMQ叢集概述

RocketMQ叢集部署結構

Rocketmq原理&最佳實踐RocketMQ叢集概述

image.png

Name Server

Name Server是一個幾乎無狀态節點,可叢集部署,節點之間無任何資訊同步。

 Broker

Broker部署相對複雜,Broker分為Master與Slave,一個Master可以對應多個Slave,但是一個Slave隻能對應一個Master,Master與Slave的對應關系通過指定相同的Broker Name,不同的Broker Id來定義,BrokerId為0表示Master,非0表示Slave。Master也可以部署多個。

每個Broker與Name Server叢集中的所有節點建立長連接配接,定時(每隔30s)注冊Topic資訊到所有Name Server。Name Server定時(每隔10s)掃描所有存活broker的連接配接,如果Name Server超過2分鐘沒有收到心跳,則Name Server斷開與Broker的連接配接。

Producer

Producer與Name Server叢集中的其中一個節點(随機選擇)建立長連接配接,定期從Name Server取Topic路由資訊,并向提供Topic服務的Master建立長連接配接,且定時向Master發送心跳。Producer完全無狀态,可叢集部署。

Producer每隔30s(由ClientConfig的pollNameServerInterval)從Name server擷取所有topic隊列的最新情況,這意味着如果Broker不可用,Producer最多30s能夠感覺,在此期間内發往Broker的所有消息都會失敗。

Producer每隔30s(由ClientConfig中heartbeatBrokerInterval決定)向所有關聯的broker發送心跳,Broker每隔10s中掃描所有存活的連接配接,如果Broker在2分鐘内沒有收到心跳資料,則關閉與Producer的連接配接。

Consumer

Consumer與Name Server叢集中的其中一個節點(随機選擇)建立長連接配接,定期從Name Server取Topic路由資訊,并向提供Topic服務的Master、Slave建立長連接配接,且定時向Master、Slave發送心跳。Consumer既可以從Master訂閱消息,也可以從Slave訂閱消息,訂閱規則由Broker配置決定。

Consumer每隔30s從Name server擷取topic的最新隊列情況,這意味着Broker不可用時,Consumer最多最需要30s才能感覺。

Consumer每隔30s(由ClientConfig中heartbeatBrokerInterval決定)向所有關聯的broker發送心跳,Broker每隔10s掃描所有存活的連接配接,若某個連接配接2分鐘内沒有發送心跳資料,則關閉連接配接;并向該Consumer Group的所有Consumer發出通知,Group内的Consumer重新配置設定隊列,然後繼續消費。

當Consumer得到master當機通知後,轉向slave消費,slave不能保證master的消息100%都同步過來了,是以會有少量的消息丢失。但是一旦master恢複,未同步過去的消息會被最終消費掉。

消費者對列是消費者連接配接之後(或者之前有連接配接過)才建立的。我們将原生的消費者辨別由 {IP}@{消費者group}擴充為 {IP}@{消費者group}{topic}{tag},(例如[email protected]_producer-group_2m2sTest_tag-zyk)。任何一個元素不同,都認為是不同的消費端,每個消費端會擁有一份自己消費對列(預設是broker對列數量*broker數量)。新挂載的消費者對列中擁有commitlog中的所有資料。

如果有需要,可以檢視Rocketmq更多源碼解析

Rocketmq如何支援分布式事務消息

場景

A(存在DB操作)、B(存在DB操作)兩方需要保證分布式事務一緻性,通過引入中間層MQ,A和MQ保持事務一緻性(異常情況下通過MQ反查A接口實作check),B和MQ保證事務一緻(通過重試),進而達到最終事務一緻性。

原理:大事務 = 小事務 + 異步

MQ與DB一緻性原理(兩方事務)

流程圖

Rocketmq原理&最佳實踐RocketMQ叢集概述

image.png

上圖是RocketMQ提供的保證MQ消息、DB事務一緻性的方案。

MQ消息、DB操作一緻性方案:

1)發送消息到MQ伺服器,此時消息狀态為SEND_OK。此消息為consumer不可見。

2)執行DB操作;DB執行成功Commit DB操作,DB執行失敗Rollback DB操作。

3)如果DB執行成功,回複MQ伺服器,将狀态為COMMIT_MESSAGE;如果DB執行失敗,回複MQ伺服器,将狀态改為ROLLBACK_MESSAGE。注意此過程有可能失敗。

4)MQ内部提供一個名為“事務狀态服務”的服務,此服務會檢查事務消息的狀态,如果發現消息未COMMIT,則通過Producer啟動時注冊的TransactionCheckListener來回調業務系統,業務系統在checkLocalTransactionState方法中檢查DB事務狀态,如果成功,則回複COMMIT_MESSAGE,否則回複ROLLBACK_MESSAGE。

說明:

上面以DB為例,其實此處可以是任何業務或者資料源。

以上SEND_OK、COMMIT_MESSAGE、ROLLBACK_MESSAGE均是client jar提供的狀态,在MQ伺服器内部是一個數字。

TransactionCheckListener 是在消息的commit或者rollback消息丢失的情況下才會回調(上圖中灰色部分)。這種消息丢失隻存在于斷網或者rocketmq叢集挂了的情況下。當rocketmq叢集挂了,如果采用異步刷盤,存在1s内資料丢失風險,異步刷盤場景下保障事務沒有意義。是以如果要核心業務用Rocketmq解決分布式事務問題,建議選擇同步刷盤模式。

多系統之間資料一緻性(多方事務)

Rocketmq原理&最佳實踐RocketMQ叢集概述

image.png

當需要保證多方(超過2方)的分布式一緻性,上面的兩方事務一緻性(通過Rocketmq的事務性消息解決)已經無法支援。這個時候需要引入TCC模式思想(Try-Confirm-Cancel,不清楚的自行百度)。

以上圖交易系統為例:

1)交易系統建立訂單(往DB插入一條記錄),同時發送訂單建立消息。通過RocketMq事務性消息保證一緻性

2)接着執行完成訂單所需的同步核心RPC服務(非核心的系統通過監聽MQ消息自行處理,處理結果不會影響交易狀态)。執行成功更改訂單狀态,同時發送MQ消息。

3)交易系統接受自己發送的訂單建立消息,通過定時排程系統建立延時復原任務(或者使用RocketMq的重試功能,設定第二次發送時間為定時任務的延遲建立時間。在非消息堵塞的情況下,消息第一次到達延遲為1ms左右,這時可能RPC還未執行完,訂單狀态還未設定為完成,第二次消費時間可以指定)。延遲任務先通過查詢訂單狀态判斷訂單是否完成,完成則不建立復原任務,否則建立。 PS:多個RPC可以建立一個復原任務,通過一個消費組接受一次消息就可以;也可以通過建立多個消費組,一個消息消費多次,每次消費建立一個RPC的復原任務。 復原任務失敗,通過MQ的重發來重試。

以上是交易系統和其他系統之間保持最終一緻性的解決方案。

案例分析

單機環境下的事務示意圖

如下為A給B轉賬的例子。

步驟 動作
1 鎖定A的賬戶
2 鎖定B的賬戶
3 檢查A賬戶是否有1元
4 A的賬戶扣減1元
5 給B的賬戶加1元
6 解鎖B的賬戶
7 解鎖A的賬戶

以上過程在代碼層面甚至可以簡化到在一個事物中執行兩條sql語句。

分布式環境下事務

和單機事務不同,A、B賬戶可能不在同一個DB中,此時無法像在單機情況下使用事物來實作。此時可以通過一下方式實作,将轉賬操作分成兩個操作。

a) A賬戶

步驟 動作
1 鎖定A的賬戶
2 檢查A賬戶是否有1元
3 A的賬戶扣減1元
4 解鎖A的賬戶

b) MQ消息

A賬戶資料發生變化時,發送MQ消息,MQ伺服器将消息推送給轉賬系統,轉賬系統來給B賬号加錢。

c) B賬戶

步驟 動作
1 鎖定B的賬戶
2 給B的賬戶加1元
3 解鎖B的賬戶

 順序消息

順序消息缺陷

發送順序消息無法利用叢集Fail Over特性消費順序消息的并行度依賴于隊列數量隊列熱點問題,個别隊列由于哈希不均導緻消息過多,消費速度跟不上,産生消息堆積問題遇到消息失敗的消息,無法跳過,目前隊列消費暫停。

原理

produce在發送消息的時候,把消息發到同一個隊列(queue)中,消費者注冊消息監聽器為MessageListenerOrderly,這樣就可以保證消費端隻有一個線程去消費消息。

注意:把消息發到同一個隊列(queue),不是同一個topic,預設情況下一個topic包括4個queue

擴充

可以通過實作發送消息的對列選擇器方法,實作部分順序消息。

舉例:比如一個資料庫通過MQ來同步,隻需要保證每個表的資料是同步的就可以。解析binlog,将表名作為對列選擇器的參數,這樣就可以保證每個表的資料到同一個對列裡面,進而保證表資料的順序消費

最佳實踐

Producer

Topic

一個應用盡可能用一個Topic,消息子類型用tags來辨別,tags可以由應用自由設定。隻有發送消息設定了tags,消費方在訂閱消息時,才可以利用tags 在broker做消息過濾。

key

每個消息在業務層面的唯一辨別碼,要設定到 keys 字段,友善将來定位消息丢失問題。伺服器會為每個消息建立索引(哈希索引),應用可以通過 topic,key來查詢這條消息内容,以及消息被誰消費。由于是哈希索引,請務必保證key 盡可能唯一,這樣可以避免潛在的哈希沖突。

//訂單Id

String orderId= "20034568923546";

message.setKeys(orderId);

日志

消息發送成功或者失敗,要列印消息日志,務必要列印 send result 和key 字段。

send

send消息方法,隻要不抛異常,就代表發送成功。但是發送成功會有多個狀态,在sendResult裡定義。

SEND_OK:消息發送成功

FLUSH_DISK_TIMEOUT:消息發送成功,但是伺服器刷盤逾時,消息已經進入伺服器隊列,隻有此時伺服器當機,消息才會丢失

FLUSH_SLAVE_TIMEOUT:消息發送成功,但是伺服器同步到Slave時逾時,消息已經進入伺服器隊列,隻有此時伺服器當機,消息才會丢失

SLAVE_NOT_AVAILABLE:消息發送成功,但是此時slave不可用,消息已經進入伺服器隊列,隻有此時伺服器當機,消息才會丢失

Consumer

幂等

RocketMQ使用的消息原語是At Least Once,是以consumer可能多次收到同一個消息,此時務必做好幂等。

日志

消費時記錄日志,以便後續定位問題。

批量消費

盡量使用批量方式消費方式,可以很大程度上提高消費吞吐量。

參考資料

文檔

RocketMQ_design.pdf

RocketMQ_experience.pdf

部落格

分布式開放消息系統(RocketMQ)的原理與實踐

http://www.jianshu.com/p/453c6e7ff81c

RocketMQ事務消費和順序消費詳解

http://www.cnblogs.com/520playboy/p/6750023.html

ZeroCopy

http://www.linuxjournal.com/article/6345

IO方式的性能資料

http://stblog.baidu-tech.com/?p=851

原文

https://www.jianshu.com/p/2838890f3284

轉載于:https://www.cnblogs.com/diandianquanquan/p/11603876.html