天天看點

高并發:RocketMQ 削峰實戰

優質文章,第一時間送達

作者:WilsonHe

MQ的主要特點為解耦、異步、削峰,該文章主要記錄與分享個人在實際項目中的RocketMQ削峰用法,用于減少資料庫壓力的業務場景,其中RocketMQ的核心元件概念如下:

Producer:生産發送消息

Broker:存儲Producer發送過來的消息

Consumer:從Broker拉取消息并進行消費

NameServer:為Producer或Consumer路由到Broker

高并發:RocketMQ 削峰實戰

其中消費流程有以下幾點是必須注意的:

RocketMQ的Consumer擷取消息是通過向Broker發送拉取請求擷取的,而不是由Broker發送Consumer接收的方式。

Consumer每次拉取消息時消息都會被均勻分發到消息隊列再進行傳輸,是以RocketMQ中的很多參數都是針對隊列而不是Topic的(這個是重點,順便吐槽下源碼的文檔講的真不清晰,很多都需要自己試錯,但Dashboard做得很好),其中每個Broker消息隊列(ConsumeQueue)的數量都可以通過RocketMQ DashBoard實時更改調整。

rocketmq-spring-boot-starter用法簡介

當開發中需要快速內建RocketMQ時可以考慮使用 rocketmq-spring-boot-starter 搭建RocketMQ的內建環境,但該架構并不完全具備RocketMQ所有的配置簡化,如需批量消費消息便需要自定義一個DefaultMQPushConsumer bean去消費了。個人在開發中常用的<code>rocketmq-spring-boot-starter</code>相關類:

<code>RocketMQListener</code>接口:消費者都需實作該接口的消費方法<code>onMessage(msg)</code>。

<code>RocketMQPushConsumerLifecycleListener</code>接口:當<code>@RocketMQMessageListener</code>中的配置不足以滿足我們的需求時,可以實作該接口直接更改消費者類<code>DefaultMQPushConsumer</code>配置

<code>@RocketMQMessageListener</code>:被該注解标注并實作了接口<code>RocketMQListener</code>的bean為一個消費者并監聽指定topic隊列中的消息,該注解中包含消費者的一些常用配置(大部分按預設即可),一般隻需更改consumerGroup(消費組)與topic。<code>RocketMQMessageListener</code>中的屬性配置是可以使用Placeholder(占位符)從配置檔案或配置中心擷取的,如下圖:

高并發:RocketMQ 削峰實戰

業務案例

有一個點贊業務,不限制使用者的點贊數隻需進行記錄(産品需求,開發提議無效),當每個使用者都進行x連擊享受數量猛增的快感時如果資料庫都需要進行x個點贊資料的插入,資料庫毫無疑問會塞死導緻崩潰。于是想到可以嘗試下MQ削峰,比如每秒來了5000消息但資料庫隻能承受2000,那我消費時每次隻拉取消費1600就好了,剩下的放在Broker堆積慢慢消費就好。由于之前的消息中心也在用RocketMQ,于是确認使用RocketMQ來進行削峰。

高并發:RocketMQ 削峰實戰

環境配置

文章例子環境:1NameServer + 2Broker + 1Consumer

PraiseRecord(點贊記錄):

MessageController(簡單的測試接口):

由于使用者可以連續點贊,是以考慮可以在點贊消息的處理上寬松一點(容許消息丢失)以追求更高的性能,是以選擇使用<code>sendOneyWay()</code>進行消息發送。

RocketMQ的消息發送方式主要含syncSend()同步發送、asyncSend()異步發送、sendOneWay()三種方式,sendOneWay()也是異步發送,差別在于不需等待Broker傳回确認,是以可能會存在資訊丢失的狀況,但吞吐量更高,具體需根據業務情況選用。 性能:sendOneWay &gt; asyncSend &gt; syncSend RocketMQTemplate的send()方法預設是同步(syncSend)的,更多可看源碼實作。

單次pull消息的最大數目受broker存儲的<code>MessageStoreConfig.maxTransferCountOnMessageInMemory</code>(預設為32)值限制,即若想要消費者從隊列拉取的消息數大于32有效(pullBatchSize&gt;32)則需更改Broker的啟動參數<code>maxTransferCountOnMessageInMemory</code>值。在MQ削峰的配置參數裡,以下幾個<code>DefaultMQPushConsumer</code>的參數是需要注意一下的:

pullInterval:每次從Broker拉取消息的間隔,機關為毫秒

pullBatchSize:每次從Broker隊列拉取到的消息數,該參數很容易讓人誤解,一開始我以為是每次拉取的消息總數,但測試過幾次後确認了實質上是從每個隊列的拉取數(源碼上的注釋文檔真的很差,跟沒有一樣),即Consume每次拉取的消息總數如下:<code>EachPullTotal=所有Broker上的寫隊列數和(writeQueueNums=readQueueNums) * pullBatchSize</code>

consumeMessageBatchMaxSize:每次消費(即将多條消息合并為List消費)的最大消息數目,預設值為1,rocketmq-spring-boot-starter 目前不支援批量消費(2.1.0版本)

在消費者開始消息消費時會先從各隊列中拉取一條消息進行消費,消費成功後再以每次pullBatchSize的數目進行拉取。

PraiseListener中設定了每次拉取的間隔為2s,每次從隊列拉取的消息數為16,在搭建了2master broker且broker上writeQueueNums=readQueueNums=4的環境下每次拉取的消息理論數值為16 * 2 * 4 = 128,在第一次從各隊列拉取1條消息(即共8條)後消費成功後會每次就會拉取最多128條消息進行消費,想驗證下的可以把onMessage()的insert()改為log.info("1")然後統計機關秒内列印的日志數是否為128。

高并發:RocketMQ 削峰實戰

根據以上配置單Conumer情況下每2s理論消費為128,即每2秒資料庫新增的點贊資料大概為128條左右,有20%偏差都在個人可接受範圍内,然後對點贊接口進行簡單壓測1s 2000請求校驗MQ效果,根據消費配置理論上需要16次拉取即需32s才能消費完,壓測後檢視資料庫校驗效果:

高并發:RocketMQ 削峰實戰

由上圖可以看出除第一次2s和最後一次2s外資料庫每2s的插入資料數和一般都在128附近波動,也用了34s(因第一次拉取數較少是以比理論多花費一次拉取)消費的偏差大小可能會受每次拉取數pullBatchSize、Broker上的消息隊列數、網絡波動等情況影響,但需要的目的已經達到了,我隻想把機關時間内過多的資料庫操作交給MQ做分隔成多個機關時間内的小批量操作,消息過多就堆積,當請求峰值過了後直到MQ堆積的消息消費完前資料庫的插入數依舊會與峰值期的插入數相差不大,達到了MQ削峰填谷的效果。

當把拉取數pullBatchSize設定Broker的預設最大傳輸值32了,線上又不想重新開機Broker更改maxTransferCountOnMessageInMemory參數,如有2個Broker且queue都為4,那麼拉取消費效率才為32 * 2 * 4 = 256,如果想要動态調整,可以從Broker數或Broker隊列數下手,可以将Broker的writeQueueNums、readQueueNums增大,如都改為8,那麼效率就成了32 * 2 * 8 = 512。

需要注意的是更改完queues後必須去Dashboard的Topic下的CONSUMER MANAGER檢視新增的隊列上是否都有Consumer成功注冊上去了,因為遇到了在測試與生産上使用rocketmq-spring-boot-starter @RocketMQListener标注消費者不會自動注冊到新隊列上的情況,但沒排除是不是RocketMQ版本的原因(個人本地的版本比環境上的高了一個小版本0.0.1,本地沒出現沒消費者注冊到新隊列上的問題),而是使用了自定義DefaultMQPushConsumer bean(原生的方式都是沒有問題的)的備用方案。當再啟動新的消費者應用時CONSUMER MANAGER(下圖)中就會出現 新Consumer數 * 各Broker隊列數和的隊列行。

高并發:RocketMQ 削峰實戰

雖然點贊業務使用MQ單條插入後TPS已經達到目前業務名額要求了,但考慮到如果後續要求在不添加機器數的情況下增加TPS,且資料量還沒到分庫分表的程度,個人就打算從批量消費下手,由一次插入一條點贊記錄改為一次性插入多條(insertBatch)。當然能滿足現有需求能不做肯定不做的,過度優化過分礙事,但想多點方案不會壞事。

rocketmq-spring-boot-starter并沒有提供批量消費的功能,是以要批量消費消息需要自定義<code>DefaultMQPushConsumer</code>并配置其<code>consumeMessageBatchMaxSize</code>屬性。<code>consumeMessageBatchMaxSize</code>屬性預設值為1,即每次隻消費一條消息,需要注意的是該屬性也會受<code>pullBatchSize</code>影響,如果<code>consumeMessageBatchMaxSize</code>為32但<code>pullBatchSize</code>隻為12,那麼每次批量消費的最大消息數也就隻有12。如下為個人測試批量消費Consumer的測試bean:

如果預設配置情況下log列印出的userInfo size恒為1,但由于設定了<code>consumeMessageBatchMaxSize</code>與<code>pullBatchSize</code>,且<code>pullBatchSize</code>較小,是以每次消費的消息數最大值為12,如下圖:

高并發:RocketMQ 削峰實戰

附本文相關資訊

確定mqnamesrv與mqbroker已啟動成功,如該文章環境的啟動:

RocketMQ DashBoard啟動流程可參考官方github文檔或到我的資源裡下載下傳jar包運作

源碼位址(https://github.com/Wilson-He/spring-boot-series/tree/master/spring-rocketmq),2m-noslave目錄是該文章中例子中的2master broker配置與啟動腳本,spring-boot-consumer-peak目錄為包含該文章相關代碼的實際例子

高并發:RocketMQ 削峰實戰
高并發:RocketMQ 削峰實戰
高并發:RocketMQ 削峰實戰
下一篇: ELK安裝