天天看點

RocketMQ 用戶端最佳實踐

本文站在消費者和生産者的角度給出一些rocketmq用戶端使用的實踐意見。

一個應用盡可能用一個topic,消息子類型用tags來辨別,tags可以由應用自由設定。隻有發送消息設定了tags,消費方在訂閱消息時,才可以利用tags在broker做消息過濾。

每個消息在業務層面的唯一辨別碼,要設定到keys字段,友善将來定位消息丢失問題。伺服器會為每個消息建立索引(哈希索引),應用可以通過topic,key來查詢這條消息内容,以及消息被誰消費。由于是哈希索引,請務必保證key盡可能唯一,這樣可以避免潛在的哈希沖突。

消息發送成功或者失敗,要列印消息日志,務必要列印sendresult和key字段。

send_ok,消息發送成功。

flush_disk_timeout,消息發送成功,但是伺服器刷盤逾時,消息已經進入伺服器隊列,隻有此時伺服器當機,消息才會丢失。

flush_slave_timeout,消息發送成功,但是伺服器同步到slave時逾時,消息已經進入伺服器隊列,隻有此時伺服器當機,消息才會丢失。

slave_not_available,消息發送成功,但是此時slave不可用,消息已經進入伺服器隊列,隻有此時伺服器當機,消息才會丢失。

對于消息不可丢失應用,務必要有消息重發機制,例如如果消息發送失敗,存儲到資料庫,能有定時程式嘗試重發,或者人工觸發重發。

producer的send方法本身支援内部重試,重試邏輯如下:

至多重試3次。

如果發送失敗,則輪轉到下一個broker。

這個方法的總耗時時間不超過sendmsgtimeout設定的值,預設10s。

是以,如果本身向broker發送消息産生逾時異常,就不會再做重試。

以上政策仍然不能保證消息一定發送成功,為保證消息一定成功,建議應用這樣做:

如果調用send同步方法發送失敗,則嘗試将消息存儲到db,由背景線程定時重試,保證消息一定到達broker。

上述db重試方式為什麼沒有內建到mq用戶端内部做,而是要求應用自己去完成,我們基于以下幾點考慮:

mq的用戶端設計為無狀态模式,友善任意的水準擴充,且對機器資源的消耗僅僅是cpu、記憶體、網絡。

如果mq用戶端内部內建一個kv存儲子產品,那麼資料隻有同步落盤才能較可靠,而同步落盤本身性能開銷較大,是以通常會采用異步落盤,又由于應用關閉過程不受mq運維人員控制,可能經常會發生kill -9這樣暴力方式關閉,造成資料沒有及時落盤而丢失。

producer所在機器的可靠性較低,一般為虛拟機,不适合存儲重要資料。

綜上,建議重試過程交由應用來控制。

一個rpc調用,通常是這樣一個過程

用戶端發送請求到伺服器

伺服器處理該請求

伺服器向用戶端傳回應答

是以一個rpc的耗時時間是上述三個步驟的總和,而某些場景要求耗時非常短,但是對可靠性要求并不高,例如日志收集類應用,此類應用可以采用oneway形式調用,oneway形式隻發送請求不等待應答,而發送請求在用戶端實作層面僅僅是一個os系統調用的開銷,即将資料寫入用戶端的socket緩沖區,此過程耗時通常在微秒級。

rocketmq目前無法避免消息重複,是以如果業務對消費重複非常敏感,務必要在業務層面去重,有以下幾種去重方式:

将消息的唯一鍵,可以是msgid,也可以是消息内容中的唯一辨別字段,例如訂單id等,消費之前判斷是否在db或tair(全局kv存儲)中存在,如果不存在則插入,并消費,否則跳過。(實際過程要考慮原子性問題,判斷是否存在可以嘗試插入,如果報主鍵沖突,則插入失敗,直接跳過)。msgid一定是全局唯一辨別符,但是可能會存在同樣的消息有兩個不同msgid的情況(有多種原因),這種情況可能會使業務上重複消費,建議最好使用消息内容中的唯一辨別字段去重。

使用業務層面的狀态機去重。

消費并行度與消費吞吐量關系如下圖所示:

RocketMQ 用戶端最佳實踐

消費并行度與消費rt關系如下圖所示:

RocketMQ 用戶端最佳實踐

絕大部分消息消費行為屬于io密集型,即可能是操作資料庫,或者調用rpc,這類消費行為的消費速度在于後端資料庫或者外系統的吞吐量,通過增加消費并行度,可以提高總的消費吞吐量,但是并行度增加到一定程度,反而會下降,如圖所示,呈現抛物線形式。是以應用必須要設定合理的并行度。cpu密集型應用除外。

修改消費并行度方法如下所示:

同一個consumergroup下,通過增加consumer執行個體數量來提高并行度,超過訂閱隊列數的consumer執行個體無效。可以通過加機器,或者在已有機器啟動多個程序的方式。

提高單個consumer的消費并行線程,通過修改以下參數

某些業務流程如果支援批量方式消費,則可以很大程度上提高消費吞吐量,例如訂單扣款類應用,一次處理一個訂單耗時1秒鐘,一次處理10個訂單可能也隻耗時2秒鐘,這樣即可大幅度提高消費的吞吐量,通過設定consumer的consumemessagebatchmaxsize這個參數,預設是1,即一次隻消費一條消息,例如設定為n,那麼每次消費的消息數小于等于n。

發生消息堆積時,如果消費速度一直追不上發送速度,可以選擇丢棄不重要的消息,那麼如何判斷消息是否有堆積情況呢,可以加入如下代碼邏輯:

如以上代碼所示,當某個隊列的消息數堆積到100000條以上,則嘗試丢棄部分或全部消息,這樣就可以快速追上發送消息的速度。

舉例如下,某條消息的消費過程如下:

根據消息從db查詢資料1

根據消息從db查詢資料2

複雜的業務計算

向db插入資料3

向db插入資料4

這條消息的消費過程與db互動了4次,如果按照每次5ms計算,那麼總共耗時20ms,假設業務計算耗時5ms,那麼總過耗時25ms,如果能把4次db互動優化為2次,那麼總耗時就可以優化到15ms,也就是說總體性能提高了40%。

對于mysql等db,如果部署在磁盤,那麼與db進行互動,如果資料沒有命中cache,每次互動的rt會直線上升,如果采用ssd,則rt上升趨勢要明顯好于磁盤。個别應用可能會遇到這種情況:

線上下壓測消費過程中,db表現非常好,每次rt都很短,但是上線運作一段時間,rt就會變長,消費吞吐量直線下降。

主要原因是線下壓測時間過短,線上運作一段時間後,cache命中率下降,那麼rt就會增加。建議線上下壓測時,要測試足夠長時間,盡可能模拟線上環境,壓測過程中,資料的分布也很重要,資料不同,可能cache的命中率也會完全不同。