2 RocketMQ和Kafka的消息模型
這兩個消息隊列産品的消息模型是一樣的。通過具體案例再次講解下。
假設有一主題MyTopic,為主題建立5個隊列,分布到倆Broker。

消息生産端
設有3個生産者執行個體:Produer0、Produer1、Producer2。
這3生産者如何對應到2Broker,又如何對應到5個隊列?
無需對應,随便發。
每個生産者可在5個隊列中輪詢發送,也可随機選個隊列發送,或隻往某隊列發,這皆可。
消費端
很多人沒搞清消費組、消費者和隊列差別。
消費組
每個消費組是一份訂閱,它要消費主題MyTopic下所有隊列的全部消息。
隊列裡的消息并非消費掉就沒了,這裡的“消費”,隻是去隊列裡面讀了消息,并不是删除,消費完這消息,還是在隊列裡。
多個消費組在消費同一主題時,消費組間互不影響。
比如有2個消費組:G0和G1。
- G0消費了哪些消息,G1是不知道的,也不用知道
- G0消費過的消息,G1還可以消費
- 即使G0積壓了很多消息,對G1來說也沒有任何影響
消費組内部
一個消費組中可包含多個消費者執行個體。
比如消費組G1,包含2個消費者C0和C1,那這2個消費者又是怎麼和主題MyTopic的5個隊列對應的呢?
由于消費确認機制,在同一消費組裡,每個隊列隻能被一個消費者執行個體占用。
至于如何配置設定,這裡面有很多政策,我就不展開說了。總之保證每個隊列配置設定一個消費者就行了。比如,我們可以讓消費者C0消費Q0,Q1和Q2,C1消費Q3和Q4,如果C0當機了,會觸發重新配置設定,這時候C1同時消費全部5個隊列。
隊列占用隻針對消費組内部,對其他消費組沒有影響。
比如隊列Q2被消費組G1的消費者C1占用,對消費組G2完全沒有影響,G2也可配置設定它的消費者占用和消費隊列Q2。
消費位置
每個消費組内部維護自己的一組消費位置,每個隊列對應一個消費位置。
消費位置在服務端儲存,并且消費位置和消費者沒有關系。
每個消費位置一般就是個整數,記錄這個消費組中,這個隊列消費到哪個位置了,這位置之前的消息都成功消費了,之後的消息都沒有消費或正在消費。
- 例子的消費位置表格
并沒有消費者這一列,即消費者和消費位置沒有關系。
實作單個隊列的并行消費
如果不要求嚴格順序,如何實作單個隊列的并行消費?
有很多的實作方式,講個實作思路。
比如隊列中目前有10條消息,編号0-9,目前的消費位置是5。
同時來了三個消費者拉消息,把編号為5、6、7的消息分别給三個消費者,每人一條。
過了一段時間,三個消費成功的響應都回來了,這時候就可以把消費位置更新為8了,就實作了并行消費。
這是理想的情況。還有可能編号為6、7的消息響應回來了,編号5的消息響應一直回不來,怎麼辦?
這個位置5就是一個消息空洞。為了避免位置5把這個隊列卡住,可以先把消費位置5這條消息,複制到一個特殊重試隊列,然後依舊把消費位置更新為8,繼續消費。
再有消費者來拉消息的時候,優先把重試隊列中的那條消息給消費者就可以了。
這是并行消費的一種實作方式。
并行消費開銷還是很大的,不應該作為一個正常的,提升消費并發的手段,如果消費慢需要增加消費者的并發數,還是需要擴容隊列數。
4 保證消息的嚴格順序
怎麼保證消息的嚴格順序?
主題層面是無法保證嚴格順序的,隻有在隊列上才能保證消息的嚴格順序。
如果說,你的業務必須要求全局嚴格順序,就隻能把消息隊列數配置成1,生産者和消費者也隻能是一個執行個體,才能保證全局嚴格順序。
大部分情況下,我們并不需要全局嚴格順序,隻要保證局部有序即可滿足。
比如,在傳遞賬戶流水記錄的時候,隻要保證每個賬戶的流水有序,不同賬戶間流水記錄無需保證順序。
保證局部嚴格順序,可以這樣實作。
在發送端,使用賬戶ID作為Key,采用一緻性雜湊演算法計算出隊列編号,指定隊列來發送消息。
一緻性雜湊演算法可以保證,相同Key的消息總是發送到同一隊列,保證相同Key的消息嚴格有序。
如果不考慮隊列擴容,也可以用隊列數量取模的簡單方法來計算隊列編号。
消息傳入kafka的函數中,參數key本身的實作是普通hash還是一緻性hash?
Kafka的分區選擇器是可以配置的,預設情況下,如果不傳入key,采用輪詢算法,傳入key的話,按照key做普通hash,然後哈希值與分區總數取模,計算出分區号。
總結
使用消息隊列,大部分的難點在宏觀架構層面,要解決這些難點,你需要掌握消息隊列宏觀層面上的實作原理和最佳實踐,這樣,無論你使用什麼消息隊列,都可以做到遊刃有餘。在標明了合适的消息隊列産品,準備寫代碼之前,再去文檔中檢視這些細節都來得及。
是以,我們先講的是消息隊列的使用,注重通用的原理。
關于事務消息的ACID那個問題沒有提到,能不能找機會說下你的看法?
沒有實作隔離性,一緻性隻能保證最終一緻,而原子操作和持久化可以通過各種手段實作。
嚴格的說,ACI都沒實作,隻有D實作了。
放寬點兒限制的話,或者考慮實際效果的話,A(原子性)絕大多數情況下還是可以保證的,即“要麼都成功,要麼都失敗”。C(一緻性)通過補償,大部分情況下也可以保證最終一緻。
如果一個topic中有多個消費者,但每個消費者可能隻需要其中的一部分資料,一種可行的方案是消費者消費全量消息,然後自行過濾;另一種方式是生産者将這些消息進行分類,不同類别的消息分别對應不同的topic,但這樣可能會出現N多的topic,topic太多是否又會出現随機io太多導緻性能問題,另外對生産端的編碼也不友好,每種消息都要感覺發到哪個topic中,這種情況下應該如何取舍?
可以使用RocketMQ的服務端過濾功能,正好可以滿足這個需求。
JSR330的注釋 Inject , 但實際上和spring自身的 Autowired 注釋功能相同, 是以我平時都是直接用 spring 自帶的注釋。 請問老師使用 JSR330 提供的注釋是有什麼講究莫
JSR是一個标準,Spring是JSR的一個實作,并做了很多的擴充。
消息隊列就是個分布式存儲系統。