MQ背景&選型
消息隊列作為高并發系統的核心元件之一,能夠幫助業務系統解構提升開發效率和系統穩定性。主要具有以下優勢:
- 削峰填谷(主要解決瞬時寫壓力大于應用服務能力導緻消息丢失、系統奔潰等問題)
- 系統解耦(解決不同重要程度、不同能力級别系統之間依賴導緻一死全死)
- 提升性能(當存在一對多調用時,可以發一條消息給消息系統,讓消息系統通知相關系統)
- 蓄流壓測(線上有些鍊路不好壓測,可以通過堆積一定量消息再放開來壓測)
目前主流的MQ主要是Rocketmq、kafka、Rabbitmq,Rocketmq相比于Rabbitmq、kafka具有主要優勢特性有:
• 支援事務型消息(消息發送和DB操作保持兩方的最終一緻性,rabbitmq和kafka不支援)
• 支援結合rocketmq的多個系統之間資料最終一緻性(多方事務,二方事務是前提)
• 支援18個級别的延遲消息(rabbitmq和kafka不支援)
• 支援指定次數和時間間隔的失敗消息重發(kafka不支援,rabbitmq需要手動确認)
• 支援consumer端tag過濾,減少不必要的網絡傳輸(rabbitmq和kafka不支援)
• 支援重複消費(rabbitmq不支援,kafka支援)
Rocketmq、kafka、Rabbitmq的詳細對比,請參照下表格:
![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsIyZuBnLyYzM0MTO2YTOx0CMwkzNxkDM3EDOykDM5EDMy0SN4QjM0YTMvwVOwkTMwIzLcVDO0IDN2EzLcd2bsJ2Lc12bj5ycn9Gbi52YugTMwIzZtl2Lc9CX6MHc0RHaiojIsJye.png)
RocketMQ叢集概述
RocketMQ叢集部署結構
image.png
Name Server
Name Server是一個幾乎無狀态節點,可叢集部署,節點之間無任何資訊同步。
Broker
Broker部署相對複雜,Broker分為Master與Slave,一個Master可以對應多個Slave,但是一個Slave隻能對應一個Master,Master與Slave的對應關系通過指定相同的Broker Name,不同的Broker Id來定義,BrokerId為0表示Master,非0表示Slave。Master也可以部署多個。
每個Broker與Name Server叢集中的所有節點建立長連接配接,定時(每隔30s)注冊Topic資訊到所有Name Server。Name Server定時(每隔10s)掃描所有存活broker的連接配接,如果Name Server超過2分鐘沒有收到心跳,則Name Server斷開與Broker的連接配接。
Producer
Producer與Name Server叢集中的其中一個節點(随機選擇)建立長連接配接,定期從Name Server取Topic路由資訊,并向提供Topic服務的Master建立長連接配接,且定時向Master發送心跳。Producer完全無狀态,可叢集部署。
Producer每隔30s(由ClientConfig的pollNameServerInterval)從Name server擷取所有topic隊列的最新情況,這意味着如果Broker不可用,Producer最多30s能夠感覺,在此期間内發往Broker的所有消息都會失敗。
Producer每隔30s(由ClientConfig中heartbeatBrokerInterval決定)向所有關聯的broker發送心跳,Broker每隔10s中掃描所有存活的連接配接,如果Broker在2分鐘内沒有收到心跳資料,則關閉與Producer的連接配接。
Consumer
Consumer與Name Server叢集中的其中一個節點(随機選擇)建立長連接配接,定期從Name Server取Topic路由資訊,并向提供Topic服務的Master、Slave建立長連接配接,且定時向Master、Slave發送心跳。Consumer既可以從Master訂閱消息,也可以從Slave訂閱消息,訂閱規則由Broker配置決定。
Consumer每隔30s從Name server擷取topic的最新隊列情況,這意味着Broker不可用時,Consumer最多最需要30s才能感覺。
Consumer每隔30s(由ClientConfig中heartbeatBrokerInterval決定)向所有關聯的broker發送心跳,Broker每隔10s掃描所有存活的連接配接,若某個連接配接2分鐘内沒有發送心跳資料,則關閉連接配接;并向該Consumer Group的所有Consumer發出通知,Group内的Consumer重新配置設定隊列,然後繼續消費。
當Consumer得到master當機通知後,轉向slave消費,slave不能保證master的消息100%都同步過來了,是以會有少量的消息丢失。但是一旦master恢複,未同步過去的消息會被最終消費掉。
消費者對列是消費者連接配接之後(或者之前有連接配接過)才建立的。我們将原生的消費者辨別由 {IP}@{消費者group}擴充為 {IP}@{消費者group}{topic}{tag},(例如[email protected]_producer-group_2m2sTest_tag-zyk)。任何一個元素不同,都認為是不同的消費端,每個消費端會擁有一份自己消費對列(預設是broker對列數量*broker數量)。新挂載的消費者對列中擁有commitlog中的所有資料。
如果有需要,可以檢視Rocketmq更多源碼解析
Rocketmq如何支援分布式事務消息
場景
A(存在DB操作)、B(存在DB操作)兩方需要保證分布式事務一緻性,通過引入中間層MQ,A和MQ保持事務一緻性(異常情況下通過MQ反查A接口實作check),B和MQ保證事務一緻(通過重試),進而達到最終事務一緻性。
原理:大事務 = 小事務 + 異步
MQ與DB一緻性原理(兩方事務)
流程圖
image.png
上圖是RocketMQ提供的保證MQ消息、DB事務一緻性的方案。
MQ消息、DB操作一緻性方案:
1)發送消息到MQ伺服器,此時消息狀态為SEND_OK。此消息為consumer不可見。
2)執行DB操作;DB執行成功Commit DB操作,DB執行失敗Rollback DB操作。
3)如果DB執行成功,回複MQ伺服器,将狀态為COMMIT_MESSAGE;如果DB執行失敗,回複MQ伺服器,将狀态改為ROLLBACK_MESSAGE。注意此過程有可能失敗。
4)MQ内部提供一個名為“事務狀态服務”的服務,此服務會檢查事務消息的狀态,如果發現消息未COMMIT,則通過Producer啟動時注冊的TransactionCheckListener來回調業務系統,業務系統在checkLocalTransactionState方法中檢查DB事務狀态,如果成功,則回複COMMIT_MESSAGE,否則回複ROLLBACK_MESSAGE。
說明:
上面以DB為例,其實此處可以是任何業務或者資料源。
以上SEND_OK、COMMIT_MESSAGE、ROLLBACK_MESSAGE均是client jar提供的狀态,在MQ伺服器内部是一個數字。
TransactionCheckListener 是在消息的commit或者rollback消息丢失的情況下才會回調(上圖中灰色部分)。這種消息丢失隻存在于斷網或者rocketmq叢集挂了的情況下。當rocketmq叢集挂了,如果采用異步刷盤,存在1s内資料丢失風險,異步刷盤場景下保障事務沒有意義。是以如果要核心業務用Rocketmq解決分布式事務問題,建議選擇同步刷盤模式。
多系統之間資料一緻性(多方事務)
image.png
當需要保證多方(超過2方)的分布式一緻性,上面的兩方事務一緻性(通過Rocketmq的事務性消息解決)已經無法支援。這個時候需要引入TCC模式思想(Try-Confirm-Cancel,不清楚的自行百度)。
以上圖交易系統為例:
1)交易系統建立訂單(往DB插入一條記錄),同時發送訂單建立消息。通過RocketMq事務性消息保證一緻性
2)接着執行完成訂單所需的同步核心RPC服務(非核心的系統通過監聽MQ消息自行處理,處理結果不會影響交易狀态)。執行成功更改訂單狀态,同時發送MQ消息。
3)交易系統接受自己發送的訂單建立消息,通過定時排程系統建立延時復原任務(或者使用RocketMq的重試功能,設定第二次發送時間為定時任務的延遲建立時間。在非消息堵塞的情況下,消息第一次到達延遲為1ms左右,這時可能RPC還未執行完,訂單狀态還未設定為完成,第二次消費時間可以指定)。延遲任務先通過查詢訂單狀态判斷訂單是否完成,完成則不建立復原任務,否則建立。 PS:多個RPC可以建立一個復原任務,通過一個消費組接受一次消息就可以;也可以通過建立多個消費組,一個消息消費多次,每次消費建立一個RPC的復原任務。 復原任務失敗,通過MQ的重發來重試。
以上是交易系統和其他系統之間保持最終一緻性的解決方案。
案例分析
單機環境下的事務示意圖
如下為A給B轉賬的例子。
步驟 | 動作 |
---|---|
1 | 鎖定A的賬戶 |
2 | 鎖定B的賬戶 |
3 | 檢查A賬戶是否有1元 |
4 | A的賬戶扣減1元 |
5 | 給B的賬戶加1元 |
6 | 解鎖B的賬戶 |
7 | 解鎖A的賬戶 |
以上過程在代碼層面甚至可以簡化到在一個事物中執行兩條sql語句。
分布式環境下事務
和單機事務不同,A、B賬戶可能不在同一個DB中,此時無法像在單機情況下使用事物來實作。此時可以通過一下方式實作,将轉賬操作分成兩個操作。
a) A賬戶
步驟 | 動作 |
---|---|
1 | 鎖定A的賬戶 |
2 | 檢查A賬戶是否有1元 |
3 | A的賬戶扣減1元 |
4 | 解鎖A的賬戶 |
b) MQ消息
A賬戶資料發生變化時,發送MQ消息,MQ伺服器将消息推送給轉賬系統,轉賬系統來給B賬号加錢。
c) B賬戶
步驟 | 動作 |
---|---|
1 | 鎖定B的賬戶 |
2 | 給B的賬戶加1元 |
3 | 解鎖B的賬戶 |
順序消息
順序消息缺陷
發送順序消息無法利用叢集Fail Over特性消費順序消息的并行度依賴于隊列數量隊列熱點問題,個别隊列由于哈希不均導緻消息過多,消費速度跟不上,産生消息堆積問題遇到消息失敗的消息,無法跳過,目前隊列消費暫停。
原理
produce在發送消息的時候,把消息發到同一個隊列(queue)中,消費者注冊消息監聽器為MessageListenerOrderly,這樣就可以保證消費端隻有一個線程去消費消息。
注意:把消息發到同一個隊列(queue),不是同一個topic,預設情況下一個topic包括4個queue
擴充
可以通過實作發送消息的對列選擇器方法,實作部分順序消息。
舉例:比如一個資料庫通過MQ來同步,隻需要保證每個表的資料是同步的就可以。解析binlog,将表名作為對列選擇器的參數,這樣就可以保證每個表的資料到同一個對列裡面,進而保證表資料的順序消費
最佳實踐
Producer
Topic
一個應用盡可能用一個Topic,消息子類型用tags來辨別,tags可以由應用自由設定。隻有發送消息設定了tags,消費方在訂閱消息時,才可以利用tags 在broker做消息過濾。
key
每個消息在業務層面的唯一辨別碼,要設定到 keys 字段,友善将來定位消息丢失問題。伺服器會為每個消息建立索引(哈希索引),應用可以通過 topic,key來查詢這條消息内容,以及消息被誰消費。由于是哈希索引,請務必保證key 盡可能唯一,這樣可以避免潛在的哈希沖突。
//訂單Id
String orderId= "20034568923546";
message.setKeys(orderId);
日志
消息發送成功或者失敗,要列印消息日志,務必要列印 send result 和key 字段。
send
send消息方法,隻要不抛異常,就代表發送成功。但是發送成功會有多個狀态,在sendResult裡定義。
SEND_OK:消息發送成功
FLUSH_DISK_TIMEOUT:消息發送成功,但是伺服器刷盤逾時,消息已經進入伺服器隊列,隻有此時伺服器當機,消息才會丢失
FLUSH_SLAVE_TIMEOUT:消息發送成功,但是伺服器同步到Slave時逾時,消息已經進入伺服器隊列,隻有此時伺服器當機,消息才會丢失
SLAVE_NOT_AVAILABLE:消息發送成功,但是此時slave不可用,消息已經進入伺服器隊列,隻有此時伺服器當機,消息才會丢失
Consumer
幂等
RocketMQ使用的消息原語是At Least Once,是以consumer可能多次收到同一個消息,此時務必做好幂等。
日志
消費時記錄日志,以便後續定位問題。
批量消費
盡量使用批量方式消費方式,可以很大程度上提高消費吞吐量。
參考資料
文檔
RocketMQ_design.pdf
RocketMQ_experience.pdf
部落格
分布式開放消息系統(RocketMQ)的原理與實踐
http://www.jianshu.com/p/453c6e7ff81c
RocketMQ事務消費和順序消費詳解
http://www.cnblogs.com/520playboy/p/6750023.html
ZeroCopy
http://www.linuxjournal.com/article/6345
IO方式的性能資料
http://stblog.baidu-tech.com/?p=851
原文
https://www.jianshu.com/p/2838890f3284
轉載于:https://www.cnblogs.com/diandianquanquan/p/11603876.html