天天看點

快手基于RocketMQ的線上消息系統建設實踐為什麼建設線上消息系統部署模式和落地政策用戶端封裝政策叢集負載均衡 & 機房災備多樣的消息功能分布式對賬監控性能優化

作者:黃理

黃理,10多年軟體開發和架構經驗,熱衷于代碼和性能優化,開發和參與過多個開源項目。曾在淘寶任業務架構師多年,目前在快手負責線上消息系統建設工作。

為什麼建設線上消息系統

在引入RocketMQ之前,快手已經在大量的使用Kafka了,但并非所有情況下Kafka都是最合适的,比如以下場景:

  • 業務希望個别消費失敗以後可以重試,并且不堵塞後續其它消息的消費。
  • 業務希望消息可以延遲一段時間再投遞。
  • 業務需要發送的時候保證資料庫操作和消息發送是一緻的(也就是事務發送)。
  • 為了排查問題,有的時候業務需要一定的單個消息查詢能力。

為了應對以上這類場景,我們需要建設一個主要面向線上業務的消息系統,作為Kafka的補充。在考察的一些消息中間件中,RocketMQ和業務需求比對度比較高,同時部署結構簡單,使用的公司也比較多,于是最後我們就采用了RocketMQ。

部署模式和落地政策

在一個已有的體系内落地一個開軟軟體,通常大概有兩種方式:

快手基于RocketMQ的線上消息系統建設實踐為什麼建設線上消息系統部署模式和落地政策用戶端封裝政策叢集負載均衡 & 機房災備多樣的消息功能分布式對賬監控性能優化

方式一,在開源軟體的基礎上做深度修改,很容易實作公司内需要的定制功能。但和社群開源版本分道揚镳,以後如何更新?

方式二,盡量不修改社群版本(或減少不相容的修改),而是在它的外圍或者上層進一步包裝來實作公司内部需要的定制功能。

注:上圖方式一的圖畫的比較極端,實際上很多公司是方式一、方式二結合的。

我們選擇了方式二。最早的時候,我們使用的是4.5.2版本,後來社群4.7版本大幅減小了同步複制的延遲,正好我們的部署模式就是同步複制,于是就很輕松的更新了4.7系列,享受了新版本的紅利。

在部署叢集的時候,還會面臨很多部署政策的選擇:

•      大叢集 vs 小叢集

•      選擇副本數

•      同步刷盤 vs 異步刷盤

•      同步複制  vs 異步複制

•      SSD vs 機械硬碟

大叢集會有更好的性能彈性,而小叢集具有更好的隔離型,此外小叢集可以不需要跨可用區/IDC部署,是以會有更好的健壯性,我們非常看重穩定性,是以選擇了小叢集。叢集同步複制異步刷盤,首選SSD。

用戶端封裝政策

如上所述,我們沒有在Rocketmq裡面做深度修改,是以需要提供一個SDK來提供公司内的需要的定制功能,這個SDK大概是這樣的:

快手基于RocketMQ的線上消息系統建設實踐為什麼建設線上消息系統部署模式和落地政策用戶端封裝政策叢集負載均衡 & 機房災備多樣的消息功能分布式對賬監控性能優化

對外隻提供最基本的API,所有通路必須經過我們提供的接口。簡潔的API就像冰山的一個角,除了對外的簡單接口,下面所有的東西都可以更新更換,而不會破壞相容性。

業務開發起來也很簡單,隻要需要提供Topic(全局唯一)和Group就可以生産和消費,不用提供環境、Name Server位址等。SDK内部會根據Topic解析出叢集Name Server的位址,然後連接配接相應的叢集。生産環境和測試環境環境會解析出不同的位址,進而實作了隔離。

上圖分為3層,第二層是通用的,第三層才對應具體的MQ實作,是以,理論上可以更換為其它消息中間件,而用戶端程式不需要修改。

SDK内部內建了熱變更機制,可以在不重新開機client的情況下做動态配置,比如下發路由政策(更換叢集name server的位址,或者連接配接到别的叢集去),Client的線程數、逾時時間等。通過maven強制更新機制,可以保證業務使用的SDK基本上是最新的。

叢集負載均衡 & 機房災備

所有的Topic預設都配置設定到兩個可用區,生産者和消費者會同時連接配接至少兩個獨立叢集(分布在不同的可用區),如下圖:

快手基于RocketMQ的線上消息系統建設實踐為什麼建設線上消息系統部署模式和落地政策用戶端封裝政策叢集負載均衡 & 機房災備多樣的消息功能分布式對賬監控性能優化

生産者同時連接配接兩個叢集,如果可用區A出現故障,流量就會自動切換到可用區B的叢集2去。我們開發了一個小元件來實作自适應的叢集負載均衡,它包含以下能力:

•      千萬級OPS

•      靈活的權重調整政策

•      健康檢查支援/事件通知

•      并發度控制(自動降低響應慢的伺服器的請求數)

•      資源優先級(類似Envoy,實作本地機房優先,或是被調伺服器很多的時候選取一個子集來調用)

•      自動優先級管理

•      增量熱變更

實際上它并不僅僅用于消息生産者,而是一個通用的主調方負載均衡類庫,可以在github上找到:

https://github.com/PhantomThief/simple-failover-java

核心的SimpleFailover接口和PriorityFailover類沒有傳遞第三方依賴,非常容易整合。

多樣的消息功能

延遲消息

延遲消息是非常重要的業務功能,不過RocketMQ内置的延遲消息隻能支援幾個固定的延遲級别,是以我們又開發了單獨的Delay Server來排程延遲消息:

快手基于RocketMQ的線上消息系統建設實踐為什麼建設線上消息系統部署模式和落地政策用戶端封裝政策叢集負載均衡 & 機房災備多樣的消息功能分布式對賬監控性能優化

上圖這個結構沒有直接将延遲消息發到Delay Server,而是更換Topic以後存入RocketMQ。這樣的好處是可以複用現有的消息發送接口(以及上面的所有擴充能力)。對業務來說,隻需要在構造消息的時候額外指定一個延遲時間字段即可,其它用法都不變。

事務消息

RocketMQ 4.3版本以後支援了事務消息,可以保證本地事務和消費發送同時成功或者失敗,對于一些業務場景很有幫助。事務消息的用法和原理有很多資料,這裡就不細述了。但關于事務消息的實踐網上資料較少,我們可以給出一些建議。

首先,事務消息功能一直在不斷完善,應該使用最新的版本,至少是4.6.1以後的版本,可以避免很多問題。

其次,事務消息性能是不如普通消息的,它在内部實際上會生成3個消息(一階段1個,二階段2個),是以性能大約隻有普通消息的1/3,如果事務消息量大的話,要做好容量規劃。回查排程線程也隻有1個,不要用極限壓力去考驗它。

最後有一些參數注意事項。在broker的配置中:

  • transientStorePoolEnable這個參數必須保持預設值false,否則會有嚴重的問題。
  • endTransactionThreadPoolNums是事務消息二階段處理線程大小,sendMessageThreadPoolNums則指定一階段處理線程池大小。如果二階段的處理速度跟不上一階段,就會造成二階段消息丢失導緻大量回查,是以建議endTransactionThreadPoolNums應該大于sendMessageThreadPoolNums,建議至少4倍。
  • useReentrantLockWhenPutMessage設定為true(預設值是false),以免線程搶鎖出現嚴重的不公平,導緻二階段處理線程長時間搶不到鎖。
  • transactionTimeOut預設值6秒太短了,如果事務執行時間超過6秒,就可能導緻消息丢失。建議改到1分鐘左右。

生産者client也有一個注意事項,如果有多組broker,并且是2副本(有1個Slave),應該打開retryAnotherBrokerWhenNotStoreOK,以免某個Slave出現故障以後,大量消息發送失敗。

分布式對賬監控

除了比較一些正常的監控手段以外,我們開發了一個監控程式做分布式對賬。可以發現我們的叢集以及我們提供的SDK是否有異常。

快手基于RocketMQ的線上消息系統建設實踐為什麼建設線上消息系統部署模式和落地政策用戶端封裝政策叢集負載均衡 & 機房災備多樣的消息功能分布式對賬監控性能優化

具體做法是在每個Broker上都建立一個監控專用的Topic,監控程式使用我們自己提供的SDK架構來連接配接叢集(就像我們的業務使用者那樣),監控生産者會給每個叢集發送少量消息。然後檢查發送是否成功:

發送成功 成功
刷盤逾時
Slave逾時
Slave不可用
發送失敗 具體錯誤碼

生産者隻對這些結果進行打點,不判斷是否正常,具體到監控(或者演練)場景可以配置不同的報警規則。

消費者收到了消息會通過TCP旁路Ack生産者,生産者這邊會做分布式對賬,将對賬結果打點:

  • 收到消息
  • 消息丢失(或逾時未收到消息)
  • 重複收到消息
  • 消息生成到最終消費的時間差
  • Ack生産者失敗(由消費者打點)

同樣監控程式隻負責打點,報警規則可另外配置。

這套機制也可以用于分布式性能壓測和故障演練。在做壓測的時候,每個消息都Ack的話,對生産者的記憶體壓力很大,因為它發出去的消息,需要在記憶體中保留一段時間(直到到達這個消息的對賬時間),這段時間消費者Ack或者重複Ack都需要記錄。是以我們實作了按比例抽樣對賬的功能,開啟以後隻有需要對賬的消息才會在記憶體中保留一段時間。

順便說一下,我們做壓測時,合格的标準是異步生産不失敗、消費不延遲、每一個消息都不丢失。這樣做是為了保證壓測時能給出更加準确的,可供線上系統參考的性能數字,而不是制造理想條件,追求一個大的數字。比如異步生産比同步生産更脆弱(壓測client如果同步生産,broker抖動的時候,同步client會被堵塞導緻發送速度降低,于是降低了broker壓力,消息發送不容易失敗,但是會看到發送速率在波動),更貼近生産環境的實際情況,我們就選擇異步生産來評估。

性能優化

Broker預設的參數在我們的場景下(SSD、同步複制、異步刷盤)不是最優的,有的參數也許在大多數場景下都不是最優的。我們列出一些重要的參數,供大家參考:

參數 預設值 說明
flushCommitLogTimed False 預設值不合理,異步刷盤這個參數應該設定成true,導緻頻繁刷盤,對性能影響極大
deleteWhen 04 幾點删除過期檔案的時間,删除檔案時有很多磁盤讀,這個預設值是合理的,有條件的話還是建議低峰删除
sendMessageThreadPoolNums 1 處理生産消息的線程數,這個線程幹的事情很多,建議設定為2~4,但太多也沒有什麼用。因為最終寫commit log的時候隻有一個線程能拿到鎖。
useReentrantLockWhenPutMessage 如果前一個參數設定比較大,這個最好設定為True,避免高負載下自旋鎖空轉消耗CPU。
sendThreadPoolQueueCapacity 10000 處理生産消息的隊列大小,預設值可能有點小,比如5萬TPS(異步發送)的情況下,卡200ms就會爆。設定比較小的數字可能是擔心有大量大消息撐爆記憶體(比如100K的話,1萬個的消息大概占用1G記憶體,也還好),具體可以自己算,如果都是小消息,可以把這個數字改大。可以修改broker參數限制client發送大消息。
brokerFastFailureEnable True Broker端快速失敗(限流),和下面兩個參數配合。這個機制可能有争議,client設定了逾時時間,如果client還願意等,并且sendThreadPoolQueue還沒有滿,不應該失敗,sendThreadPoolQueue滿了自然會拒絕新的請求。但如果client設定的逾時時間很短,沒有這個機制可能導緻消息重複。可以自行決定是否開啟。理想情況下,能根據client設定的逾時時間來清理隊列是最好的。
waitTimeMillsInSendQueue 200 200ms很容易導緻發送失敗,建議改大,比如1000
osPageCacheBusyTimeOutMills 1000 Page cache逾時時間,如果記憶體比較多,比如32G以上,建議改大點

得益于簡單、幾乎0依賴的部署模式,使得我們部署小叢集的成本非常低;不對社群版本進行魔改,保證我們可以及時更新;統一SDK入口友善叢集維護和功能更新;通過複合小叢集+自動負載均衡實作多機房多活;充分利用RocketMQ的功能比如事務消息、延遲消息(增強)來滿足業務的多樣性需求;通過自動的分布式對賬,對每一個Broker以及我們的SDK進行正确性監控;本問也進行了一些性能參數的分享,但寫的比較簡單,基本隻說了怎麼調,但沒能細說為什麼,以後我們會另寫文章詳述。目前RocketMQ已經應用在公司在大多數業務線,期待将來會有更好的發展!

掃碼了解更多技術幹貨與客戶案例:

快手基于RocketMQ的線上消息系統建設實踐為什麼建設線上消息系統部署模式和落地政策用戶端封裝政策叢集負載均衡 & 機房災備多樣的消息功能分布式對賬監控性能優化