一、RocketMQ快速失敗機制案例
1.RocketMQ快速失敗機制是什麼?
RocketMQ的快速失敗機制是當broker處理請求過慢時,為了防止用戶端請求在broker端堆積,造成broker不能響應,而采取的剔除部分逾時用戶端請求的機制,其本質是RocketMQ為了保護broker而采取的限流政策。
2. RocketMQ快速失敗機制引發的問題
某天突然收到多個topic消息生産失敗的郵件預警,涉及到整個叢集,預警郵件如下:
随即檢視了生産端的日志,發現了如下異常棧:
日志中的[TIMEOUT_CLEAN_QUEUE]broker busy說明broker壓力過大,觸發了快速失敗機制。
緊急處理措施:由于預設的快速失敗機制的門檻值為200ms,即發送消息請求若200ms内還未被處理,則直接剔除該請求,故先把門檻值調整為2000ms,暫時保障叢集正常處理請求。
3.問題調查
- broker流量調查檢視出現問題的broker的消息寫入量,發現了問題:從上圖可以看出,消息寫入量從之前的平均3萬/分鐘暴漲到了22.54萬/分鐘,流量瞬間暴漲了近8倍。這個寫入量雖然出現暴漲,但是對于broker來說抗這點量應該不是問題,随即調查了問題機器的負載情況,如下圖:從上圖可以看出來,機器5分鐘負載已經達到了10.32,而此機器配置較低,僅為4核,而叢集中其他broker的流量同樣較高,但是load卻低于1。
-
流量暴漲的topic現在需要調查到底是哪個topic發送消息量暴漲,引起了這個問題。經過調查broker端的stats.log,發現了如下異常日志:2022-02-08 15:40:00 INFO - [TOPIC_PUT_NUMS] [%RETRY%get-pugc-to-ai-consumer] Stats In One Minute, SUM: 47119 TPS: 785.32 AVGPT: 1.00
即get-pugc-to-ai-consumer的重試topic出現了大量消息寫入,%RETRY%是RocketMQ為每個消費者建立的重試topic,而RocketMQ會把消費失敗的消息重新發送到這個重試topic中,以便消費者能夠重新消費。
-
重試消息暴漲的原因經過調查業務端get-pugc-to-ai-consumer的消費代碼發現,消費端使用的語言是Python,由于業務端對RocketMQ-Python不熟悉,編碼不規範,導緻消費消息後沒有傳回ACK辨別,緻使RocketMQ認為消費失敗了,進而導緻将所有消費過的消息都重新發送到了重試topic中:def callback(msg):
print(msg.id, msg.body)
return ConsumeStatus.CONSUME_SUCCESS // 此ACK語句丢失
4.問題結論
該問題實際上是由于編碼不規範,導緻大量消息重發到broker,而突增的流量導緻配置較低的broker負載過高,觸發了快速失敗機制,而快速失敗機制是不區分topic的,進而造成了其他topic消息發送失敗。
二、RocketMQ快速失敗機制的思考
這裡以本次消息發送失敗事件為參考,看一下快速失敗機制原理,下面是broker通信層的線程模型:
broker通信層采用netty實作,而netty作為異步的、事件驅動的網絡架構,在broker中其線程模型如上圖所示:
- Boss線程:用于處理accept事件,建立連接配接,注冊channel到woker的selector中。
- Woker線程:用于處理讀寫IO事件,并轉交給後的handler處理。
- handler通用處理線程:broker用于實作tls握手,編解碼,channel事件監聽分發,處理請求響應等。
- broker業務線程:用于各種功能實作,比如消息寫入請求處理,消息拉取請求處理等等。
而broker的快速失敗機制正是在broker業務線程的SendMessageThread中實作的,該機制會檢測線程的ThreadPoolExecutor的阻塞隊列,即當阻塞隊列的請求在隊列中停留時間超過200ms還未被處理,将會被移除掉。
broker的快速失敗機制,本意為保護broker,即防止在broker存儲過慢或負載過高時,大量請求堆積,導緻broker崩潰。
此機制的目的雖然沒有錯,但是其保護粒度顯然有些過于粗糙。
針對本案例,由于某一個消費者大量發送重試消息,導緻其他topic的寫消息請求别拒絕,顯然有些不合理。
三、RocketMQ快速失敗機制的優化方案
針對本案例,可以采取如下兩種方案進行優化:
- 采用單獨的線程池來處理重試消息發送請求,即跟正常topic寫消息請求的線程池隔離開,防止重試消息影響正常消息。
- 針對ip或topic采取限流政策,防止某個ip或topic大量寫消息請求影響其他topic。
第一種方案的優劣:
- 優點:實作簡單,可以保障重試消息不影響正常的消息發送。
- 缺點:
- 該方案不通用,隻适用于重試消息與普通消息。
- 突然暴增的重試消息流量依然會影響其他消費者發送重試消息。
第二種方案的優劣:
- 優點:以ip或topic為次元,實行細粒度的控制,可以很好的防止某個ip或topic的流量影響整個叢集。
- 缺點:需要單獨開發,以及遠端實時控制ip或topic級别的限流。
很明顯,使用第二種方案更優一些。
那麼,限流資源該如何選取呢?
- ip,針對ip和端口進行限流
- 優點:可以防止某個ip的寫請求量暴增。
- 缺點:若業務存在發送消息到多個topic的情況,可能存在誤傷。
- topic(重試消息可以針對consumerGroup)
- 優點:多個用戶端往一個topic發送消息依然可以觸發限流。
- 缺點:限流的門檻值不好确定。
顯然,針對topic限流更符合業務端的情況,是以采用topic次元的限流更合适。
綜上:由于一個業務往往使用的topic不是很多,而RocketMQ叢集可以支撐上萬個topic,是以一個RocketMQ叢集通常是為多個業務提供支撐,是以采用topic次元的限流的政策更适用于這個場景。
四、RocketMQ Topic次元限流的實作
1.在哪裡進行限流?
結合上面提到的broker通信層的線程模型, 如果要采取限流,就需要在3.handler通用處理線程層來做,因為不能讓消息寫入事件傳播到broker業務線程的SendMessageThread中,否則依然可能會觸發快速失敗機制。
針對netty的處理器鍊模式,在broker的handler通用處理線程增加前置處理器,專門針對寫請求進行限流,當寫請求觸發限流時,直接傳回響應,不再讓事件往後傳播,采取限流的位置如下圖所示:
如上圖所示,在broker的handler通用處理管道中,處理完通用的事件後,比如tls握手,編解碼等,增加限流處理器即可。
2.限流處理器注意事項
第一點:限流處理器運作于netty的ChannelPipeline中,需要采用單獨的線程池,不能共用broker的handler通用處理線程來處理,防止限流處理器影響handler通用處理線程的業務。
第二點:如若因為限流邏輯或性能等問題導緻限流異常,直接跳過限流政策,執行後續的handler,防止影響正常請求。
第三點:限流器采用本地限流,限流門檻值支援實時修改生效。
針對上面三點注意事項,首先是限流處理器在通信層所處的位置設計。
3.限流處理器線程池詳細設計
首先,需要看一下broker(netty)的請求響應線程模型:
netty4的連結建立後,channel的事件都是單線程處理的,整體上是并行的。
broker端針對通用handler管道設定了單獨的線程池,即DefaultEventExecutorGroup,預設8個線程。
限流處理器不可以使用DefaultEventExecutorGroup,防止對通用的事件處理有影響,是以針對限流處理器采用單獨的線程池,如下:
其次,針對限流處理器的線程池,需要單獨定制拒絕政策。
由于netty重新實作了jdk的Executor,它的線程池的阻塞隊列為每個線程單獨所有,即任務來了先往阻塞隊列放,如果線程阻塞,可能會影響channel讀寫,故針對限流處理器的線程池需要單獨定制拒絕政策,當任務被拒絕時,直接執行task,如下:
如上圖,即如果限流處理器的線程池繁忙而拒絕執行任務,則直接跳過限流處理器,而不影響事件的傳播。拒絕政策代碼如下:
public void rejected(Runnable task, SingleThreadEventExecutor executor) {
log.error("RateLimitHandler reject task, {} pendingTasks:{}", getThreadName(executor),
executor.pendingTasks());
task.run(); // 直接在拒絕政策中執行任務
}
然後,在task中判斷是否是目前線程,如果不是目前線程,則是目前任務隊列滿了,直接通知下一個handler:
protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand cmd) throws Exception {
// 非目前線程池直接觸發下一個事件
if (!ctx.executor().inEventLoop()) {
ctx.fireChannelRead(cmd);
return;
}
}
這樣,即使限流處理器線程池出現問題而無法執行任務時,也會将事件傳播到下一個處理器,不影響broker後續的事件流程。
4.限流處理器之限速器的選擇
限流處理器的核心就是限速。而目前流行的限速政策有如下幾種:
- 計數器:java的Semaphore計數來限制并發。
- 漏桶或令牌桶算法:guava限速器。
- 滑動視窗限速:sentinel、hystrix等集限速,熔斷降級等功能于一體的開源元件。
既然限速器的優秀實作有很多,這裡就沒有必要再造輪子了,選擇合适的就行。
到底什麼樣的限速器更适用于這種場景呢?
首先,由于broker topic + 重試topic較多,單個broker支援上萬個topic,是以以topic為次元進行限速時盡量不用sentinel或hystrix的限速器,因為其針對每個topic采用滑窗機制,消耗記憶體較大。
其次,采用Semaphore來限制并發的話,其需要在請求執行完畢時release,在netty的事件機制中到處傳播Semaphore不太靠譜,并且channel可能關閉等,若不能保障正确釋放Semaphore,可能造成限速錯誤。
最後,采用guava的限速器,理由:設計的輕量,精巧,其内部雖然有鎖,經過測試單線程qps可達千萬,8線程qps達800萬,滿足性能要求。
但是,這裡還有一個問題,guava的限速器并不完全滿足topic次元限速的需求。
這裡需要先看一下guava限速器的原理:
如上圖所示,guava限速器實作主要分為兩個部分:
- 令牌的計算流程使用者請求令牌時,如果目前時間大于下次可請求時間,說明限速器比較空閑,需要将時間內插補點轉換為令牌放入令牌桶。由于限速器的qps是初始化時設定的,故根據時間差可以很容易計算出空閑令牌
- 令牌的擷取流程如果能夠擷取到令牌,那麼直接傳回。關鍵是流程④,需要根據擷取的令牌量計算出需要等待的時間,加到下次可請求時間上。
這裡不能滿足topic次元限速的需求是由于如下兩個問題:
- 在流程③擷取不到令牌時,進行sleep而broker流控時,希望擷取不到令牌時,馬上傳回,通知用戶端已經超過流控門檻值,而不是卡主流程。
- 擷取不到令牌時,下次可請求時間可能增加到很大,影響後續請求舉個例子,比如qps限制為1,當請求量為10時,隻能通過一個,下次可請求時間會在目前時間基礎上加9秒,後續9秒内的請求都被流控了。
5.限流處理器之限速器的改造
針對guava限速器存在的兩個問題,進行如下的改造優化:
- 流程③擷取不到令牌時,進行sleep針對這點,移除sleep代碼,直接執行後續邏輯
- 擷取不到令牌時,下次可請求時間可能增加到很大,影響後續請求針對這個問題,本質原因是guava的限速器不滿足broker限流需求。broker限流需求是當某個topic被限流後,應該處于熔斷狀态,在熔斷狀态期間,請求都會直接傳回,不被放行。但是熔斷期過後,需要自動放開,不能因為某一次大量請求,影響後續請求故增加熔斷層,即當guava限速器無法擷取令牌時,觸發熔斷,不再增加下次可請求時間,并且當檢測到超過熔斷時間門檻值時,再關閉熔斷,改造如下:如上圖所示,當限速器觸發限速時,會自動熔斷,所有的請求直接傳回。當熔斷超過門檻值(預設1秒),自動關閉熔斷,繼續執行正常的限速流程。
部分限速器改造代碼如下:
/**
* 令牌桶限速器-改造自guava
* 增加熔斷機制,超門檻值自動熔斷ns後恢複
*/
public class TokenBucketRateLimiter {
/**
* 擷取令牌
*/
public boolean acquire(int permits) {
synchronized (this) {
// 擷取啟動以來的時間
long nowMicros = readMicros();
// 熔斷器打開
if (circuitBreakerOpen) {
// 目前時間超過可以預支的時間門檻值後,流量放開
if (nowMicros - nextFreeTicketMicros >= circuitBreakerOpenTimeInMicros) {
circuitBreakerOpen = false;
}
}
// 熔斷器未打開
if (!circuitBreakerOpen) {
// 擷取需要等待的時間
lastNeedWaitMicrosecs = reserveAndGetWaitLength(permits, nowMicros);
// 需要等待證明熔斷器打開
if (lastNeedWaitMicrosecs > 0) {
circuitBreakerOpen = true;
// 記錄限流時間戳
lastRateLimitTimestamp = System.currentTimeMillis();
}
}
// 熔斷器未打開,則可以擷取令牌
return !circuitBreakerOpen;
}
}
}
限流處理器部分代碼如下:
/**
* 限流處理器
*/
@ChannelHandler.Sharable
public class RateLimitHandler extends SimpleChannelInboundHandler<RemotingCommand> {
// 限流器map
private ConcurrentMap<String, TokenBucketRateLimiter> rateLimiterMap = new ConcurrentHashMap<>();
// 是否禁用
private volatile boolean disabled;
// 限流器處理線程池
private EventExecutorGroup rateLimitEventExecutorGroup;
@Override
protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand cmd) throws Exception {
// 非目前線程池直接觸發下一個事件
if (!ctx.executor().inEventLoop()) {
ctx.fireChannelRead(cmd);
return;
}
if (disabled || cmd == null || cmd.getType() != RemotingCommandType.REQUEST_COMMAND) {
ctx.fireChannelRead(cmd);
return;
}
// 擷取限流資源
String resource = getResource(cmd);
if (resource == null) {
ctx.fireChannelRead(cmd);
return;
}
// 限流流量
double limitQps = defaultLimitQps;
if (cmd.getCode() == RequestCode.CONSUMER_SEND_MSG_BACK) {
limitQps = sendMsgBackLimitQps;
}
final double finalLimitQps = limitQps;
// 擷取或建立限流器
TokenBucketRateLimiter rateLimiter = rateLimiterMap.computeIfAbsent(resource, k -> new TokenBucketRateLimiter(finalLimitQps));
boolean acquired = rateLimiter.acquire();
// 不需要限流
if (acquired) {
ctx.fireChannelRead(cmd);
return;
}
// 需要限流
RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY, "RateLimit");
response.setOpaque(cmd.getOpaque());
ctx.writeAndFlush(response);
}
}
6.限流處理器之遠端通路
限流處理器需要具備遠端實時修改和通路的功能,以便能夠根據某個topic的流量進行調整,避免重新開機broker。
通過擴充RocketMQ的通信協定,支援限流處理器的屬性擷取和修改,實作了在web端進行實時檢視和修改,如下圖所示:
五、總結
Broker的快速失敗機制本質是為了保護broker,而進行的整體性限流,然而該限流機制的粒度顯然過于粗糙。
因為通常情況下,broker是多個業務共用的,不能因為某個業務流量的突增而影響整體的可用性,是以,應該以更為精細的粒度來進行限流保護。
通過在broker的通信層增加額外的限流處理器,進行topic次元的限流保護,并針對限流處理器的線程池制定合适的拒絕政策,保障該限流處理器即使無法執行時,也會将事件傳播到下一個處理器,不影響整體事件流程。
另外,改造guava的限速器,支援熔斷及實時響應,以滿足broker限流的需求,進而避免了某個topic流量過大對其他topic的影響,提升了整體叢集的穩定性。
六、參考文獻
- RocketMQ開發指南 https://github.com/apache/rocketmq/tree/develop/docs/cn
- guava限流原理 https://zhuanlan.zhihu.com/p/60979444
作者:高永飛
來源:微信公衆号:搜狐技術産品
出處:https://mp.weixin.qq.com/s/OWfh2Iy8b8pT9EvUaouPnw