天天看點

RocketMQ快速失敗機制引發叢集不穩定的思考和優化

作者:閃念基因

一、RocketMQ快速失敗機制案例

1.RocketMQ快速失敗機制是什麼?

RocketMQ的快速失敗機制是當broker處理請求過慢時,為了防止用戶端請求在broker端堆積,造成broker不能響應,而采取的剔除部分逾時用戶端請求的機制,其本質是RocketMQ為了保護broker而采取的限流政策。

2. RocketMQ快速失敗機制引發的問題

某天突然收到多個topic消息生産失敗的郵件預警,涉及到整個叢集,預警郵件如下:

RocketMQ快速失敗機制引發叢集不穩定的思考和優化

随即檢視了生産端的日志,發現了如下異常棧:

RocketMQ快速失敗機制引發叢集不穩定的思考和優化

日志中的[TIMEOUT_CLEAN_QUEUE]broker busy說明broker壓力過大,觸發了快速失敗機制。

緊急處理措施:由于預設的快速失敗機制的門檻值為200ms,即發送消息請求若200ms内還未被處理,則直接剔除該請求,故先把門檻值調整為2000ms,暫時保障叢集正常處理請求。

3.問題調查

  1. broker流量調查檢視出現問題的broker的消息寫入量,發現了問題:從上圖可以看出,消息寫入量從之前的平均3萬/分鐘暴漲到了22.54萬/分鐘,流量瞬間暴漲了近8倍。這個寫入量雖然出現暴漲,但是對于broker來說抗這點量應該不是問題,随即調查了問題機器的負載情況,如下圖:從上圖可以看出來,機器5分鐘負載已經達到了10.32,而此機器配置較低,僅為4核,而叢集中其他broker的流量同樣較高,但是load卻低于1。
  2. 流量暴漲的topic現在需要調查到底是哪個topic發送消息量暴漲,引起了這個問題。經過調查broker端的stats.log,發現了如下異常日志:2022-02-08 15:40:00 INFO - [TOPIC_PUT_NUMS] [%RETRY%get-pugc-to-ai-consumer] Stats In One Minute, SUM: 47119 TPS: 785.32 AVGPT: 1.00

    即get-pugc-to-ai-consumer的重試topic出現了大量消息寫入,%RETRY%是RocketMQ為每個消費者建立的重試topic,而RocketMQ會把消費失敗的消息重新發送到這個重試topic中,以便消費者能夠重新消費。

  3. 重試消息暴漲的原因經過調查業務端get-pugc-to-ai-consumer的消費代碼發現,消費端使用的語言是Python,由于業務端對RocketMQ-Python不熟悉,編碼不規範,導緻消費消息後沒有傳回ACK辨別,緻使RocketMQ認為消費失敗了,進而導緻将所有消費過的消息都重新發送到了重試topic中:def callback(msg):

    print(msg.id, msg.body)

    return ConsumeStatus.CONSUME_SUCCESS // 此ACK語句丢失

4.問題結論

該問題實際上是由于編碼不規範,導緻大量消息重發到broker,而突增的流量導緻配置較低的broker負載過高,觸發了快速失敗機制,而快速失敗機制是不區分topic的,進而造成了其他topic消息發送失敗。

二、RocketMQ快速失敗機制的思考

這裡以本次消息發送失敗事件為參考,看一下快速失敗機制原理,下面是broker通信層的線程模型:

RocketMQ快速失敗機制引發叢集不穩定的思考和優化

broker通信層采用netty實作,而netty作為異步的、事件驅動的網絡架構,在broker中其線程模型如上圖所示:

  1. Boss線程:用于處理accept事件,建立連接配接,注冊channel到woker的selector中。
  2. Woker線程:用于處理讀寫IO事件,并轉交給後的handler處理。
  3. handler通用處理線程:broker用于實作tls握手,編解碼,channel事件監聽分發,處理請求響應等。
  4. broker業務線程:用于各種功能實作,比如消息寫入請求處理,消息拉取請求處理等等。

而broker的快速失敗機制正是在broker業務線程的SendMessageThread中實作的,該機制會檢測線程的ThreadPoolExecutor的阻塞隊列,即當阻塞隊列的請求在隊列中停留時間超過200ms還未被處理,将會被移除掉。

broker的快速失敗機制,本意為保護broker,即防止在broker存儲過慢或負載過高時,大量請求堆積,導緻broker崩潰。

此機制的目的雖然沒有錯,但是其保護粒度顯然有些過于粗糙。

針對本案例,由于某一個消費者大量發送重試消息,導緻其他topic的寫消息請求别拒絕,顯然有些不合理。

三、RocketMQ快速失敗機制的優化方案

針對本案例,可以采取如下兩種方案進行優化:

  1. 采用單獨的線程池來處理重試消息發送請求,即跟正常topic寫消息請求的線程池隔離開,防止重試消息影響正常消息。
  2. 針對ip或topic采取限流政策,防止某個ip或topic大量寫消息請求影響其他topic。

第一種方案的優劣:

  • 優點:實作簡單,可以保障重試消息不影響正常的消息發送。
  • 缺點:
    • 該方案不通用,隻适用于重試消息與普通消息。
    • 突然暴增的重試消息流量依然會影響其他消費者發送重試消息。

第二種方案的優劣:

  • 優點:以ip或topic為次元,實行細粒度的控制,可以很好的防止某個ip或topic的流量影響整個叢集。
  • 缺點:需要單獨開發,以及遠端實時控制ip或topic級别的限流。

很明顯,使用第二種方案更優一些。

那麼,限流資源該如何選取呢?

  • ip,針對ip和端口進行限流
    • 優點:可以防止某個ip的寫請求量暴增。
    • 缺點:若業務存在發送消息到多個topic的情況,可能存在誤傷。
  • topic(重試消息可以針對consumerGroup)
    • 優點:多個用戶端往一個topic發送消息依然可以觸發限流。
    • 缺點:限流的門檻值不好确定。

顯然,針對topic限流更符合業務端的情況,是以采用topic次元的限流更合适。

綜上:由于一個業務往往使用的topic不是很多,而RocketMQ叢集可以支撐上萬個topic,是以一個RocketMQ叢集通常是為多個業務提供支撐,是以采用topic次元的限流的政策更适用于這個場景。

四、RocketMQ Topic次元限流的實作

1.在哪裡進行限流?

結合上面提到的broker通信層的線程模型, 如果要采取限流,就需要在3.handler通用處理線程層來做,因為不能讓消息寫入事件傳播到broker業務線程的SendMessageThread中,否則依然可能會觸發快速失敗機制。

針對netty的處理器鍊模式,在broker的handler通用處理線程增加前置處理器,專門針對寫請求進行限流,當寫請求觸發限流時,直接傳回響應,不再讓事件往後傳播,采取限流的位置如下圖所示:

RocketMQ快速失敗機制引發叢集不穩定的思考和優化

如上圖所示,在broker的handler通用處理管道中,處理完通用的事件後,比如tls握手,編解碼等,增加限流處理器即可。

2.限流處理器注意事項

第一點:限流處理器運作于netty的ChannelPipeline中,需要采用單獨的線程池,不能共用broker的handler通用處理線程來處理,防止限流處理器影響handler通用處理線程的業務。

第二點:如若因為限流邏輯或性能等問題導緻限流異常,直接跳過限流政策,執行後續的handler,防止影響正常請求。

第三點:限流器采用本地限流,限流門檻值支援實時修改生效。

針對上面三點注意事項,首先是限流處理器在通信層所處的位置設計。

3.限流處理器線程池詳細設計

首先,需要看一下broker(netty)的請求響應線程模型:

RocketMQ快速失敗機制引發叢集不穩定的思考和優化

netty4的連結建立後,channel的事件都是單線程處理的,整體上是并行的。

broker端針對通用handler管道設定了單獨的線程池,即DefaultEventExecutorGroup,預設8個線程。

限流處理器不可以使用DefaultEventExecutorGroup,防止對通用的事件處理有影響,是以針對限流處理器采用單獨的線程池,如下:

RocketMQ快速失敗機制引發叢集不穩定的思考和優化

其次,針對限流處理器的線程池,需要單獨定制拒絕政策。

由于netty重新實作了jdk的Executor,它的線程池的阻塞隊列為每個線程單獨所有,即任務來了先往阻塞隊列放,如果線程阻塞,可能會影響channel讀寫,故針對限流處理器的線程池需要單獨定制拒絕政策,當任務被拒絕時,直接執行task,如下:

RocketMQ快速失敗機制引發叢集不穩定的思考和優化

如上圖,即如果限流處理器的線程池繁忙而拒絕執行任務,則直接跳過限流處理器,而不影響事件的傳播。拒絕政策代碼如下:

public void rejected(Runnable task, SingleThreadEventExecutor executor) {
    log.error("RateLimitHandler reject task, {} pendingTasks:{}", getThreadName(executor),
            executor.pendingTasks());
    task.run(); // 直接在拒絕政策中執行任務
}
           

然後,在task中判斷是否是目前線程,如果不是目前線程,則是目前任務隊列滿了,直接通知下一個handler:

protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand cmd) throws Exception {
    // 非目前線程池直接觸發下一個事件
    if (!ctx.executor().inEventLoop()) {
        ctx.fireChannelRead(cmd);
        return;
    }
}
           

這樣,即使限流處理器線程池出現問題而無法執行任務時,也會将事件傳播到下一個處理器,不影響broker後續的事件流程。

4.限流處理器之限速器的選擇

限流處理器的核心就是限速。而目前流行的限速政策有如下幾種:

  1. 計數器:java的Semaphore計數來限制并發。
  2. 漏桶或令牌桶算法:guava限速器。
  3. 滑動視窗限速:sentinel、hystrix等集限速,熔斷降級等功能于一體的開源元件。

既然限速器的優秀實作有很多,這裡就沒有必要再造輪子了,選擇合适的就行。

到底什麼樣的限速器更适用于這種場景呢?

首先,由于broker topic + 重試topic較多,單個broker支援上萬個topic,是以以topic為次元進行限速時盡量不用sentinel或hystrix的限速器,因為其針對每個topic采用滑窗機制,消耗記憶體較大。

其次,采用Semaphore來限制并發的話,其需要在請求執行完畢時release,在netty的事件機制中到處傳播Semaphore不太靠譜,并且channel可能關閉等,若不能保障正确釋放Semaphore,可能造成限速錯誤。

最後,采用guava的限速器,理由:設計的輕量,精巧,其内部雖然有鎖,經過測試單線程qps可達千萬,8線程qps達800萬,滿足性能要求。

但是,這裡還有一個問題,guava的限速器并不完全滿足topic次元限速的需求。

這裡需要先看一下guava限速器的原理:

RocketMQ快速失敗機制引發叢集不穩定的思考和優化

如上圖所示,guava限速器實作主要分為兩個部分:

  1. 令牌的計算流程使用者請求令牌時,如果目前時間大于下次可請求時間,說明限速器比較空閑,需要将時間內插補點轉換為令牌放入令牌桶。由于限速器的qps是初始化時設定的,故根據時間差可以很容易計算出空閑令牌
  2. 令牌的擷取流程如果能夠擷取到令牌,那麼直接傳回。關鍵是流程④,需要根據擷取的令牌量計算出需要等待的時間,加到下次可請求時間上。

這裡不能滿足topic次元限速的需求是由于如下兩個問題:

  1. 在流程③擷取不到令牌時,進行sleep而broker流控時,希望擷取不到令牌時,馬上傳回,通知用戶端已經超過流控門檻值,而不是卡主流程。
  2. 擷取不到令牌時,下次可請求時間可能增加到很大,影響後續請求舉個例子,比如qps限制為1,當請求量為10時,隻能通過一個,下次可請求時間會在目前時間基礎上加9秒,後續9秒内的請求都被流控了。

5.限流處理器之限速器的改造

針對guava限速器存在的兩個問題,進行如下的改造優化:

  1. 流程③擷取不到令牌時,進行sleep針對這點,移除sleep代碼,直接執行後續邏輯
  2. 擷取不到令牌時,下次可請求時間可能增加到很大,影響後續請求針對這個問題,本質原因是guava的限速器不滿足broker限流需求。broker限流需求是當某個topic被限流後,應該處于熔斷狀态,在熔斷狀态期間,請求都會直接傳回,不被放行。但是熔斷期過後,需要自動放開,不能因為某一次大量請求,影響後續請求故增加熔斷層,即當guava限速器無法擷取令牌時,觸發熔斷,不再增加下次可請求時間,并且當檢測到超過熔斷時間門檻值時,再關閉熔斷,改造如下:如上圖所示,當限速器觸發限速時,會自動熔斷,所有的請求直接傳回。當熔斷超過門檻值(預設1秒),自動關閉熔斷,繼續執行正常的限速流程。

部分限速器改造代碼如下:

/**
 * 令牌桶限速器-改造自guava
 * 增加熔斷機制,超門檻值自動熔斷ns後恢複
 */
public class TokenBucketRateLimiter {
    /**
     * 擷取令牌
     */
    public boolean acquire(int permits) {
        synchronized (this) {
            // 擷取啟動以來的時間
            long nowMicros = readMicros();
            // 熔斷器打開
            if (circuitBreakerOpen) {
                // 目前時間超過可以預支的時間門檻值後,流量放開
                if (nowMicros - nextFreeTicketMicros >= circuitBreakerOpenTimeInMicros) {
                    circuitBreakerOpen = false;
                }
            }
            // 熔斷器未打開
            if (!circuitBreakerOpen) {
                // 擷取需要等待的時間
                lastNeedWaitMicrosecs = reserveAndGetWaitLength(permits, nowMicros);
                // 需要等待證明熔斷器打開
                if (lastNeedWaitMicrosecs > 0) {
                    circuitBreakerOpen = true;
                    // 記錄限流時間戳
                    lastRateLimitTimestamp = System.currentTimeMillis();
                }
            }
            // 熔斷器未打開,則可以擷取令牌
            return !circuitBreakerOpen;
        }
    }
}
           

限流處理器部分代碼如下:

/**
 * 限流處理器
 */
@ChannelHandler.Sharable
public class RateLimitHandler extends SimpleChannelInboundHandler<RemotingCommand> {

    // 限流器map
    private ConcurrentMap<String, TokenBucketRateLimiter> rateLimiterMap = new ConcurrentHashMap<>();
    
    // 是否禁用
    private volatile boolean disabled;

    // 限流器處理線程池
    private EventExecutorGroup rateLimitEventExecutorGroup;

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand cmd) throws Exception {
        // 非目前線程池直接觸發下一個事件
        if (!ctx.executor().inEventLoop()) {
            ctx.fireChannelRead(cmd);
            return;
        }
        if (disabled || cmd == null || cmd.getType() != RemotingCommandType.REQUEST_COMMAND) {
            ctx.fireChannelRead(cmd);
            return;
        }
        // 擷取限流資源
        String resource = getResource(cmd);
        if (resource == null) {
            ctx.fireChannelRead(cmd);
            return;
        }
        // 限流流量
        double limitQps = defaultLimitQps;
        if (cmd.getCode() == RequestCode.CONSUMER_SEND_MSG_BACK) {
            limitQps = sendMsgBackLimitQps;
        }
        final double finalLimitQps = limitQps;
        // 擷取或建立限流器
        TokenBucketRateLimiter rateLimiter = rateLimiterMap.computeIfAbsent(resource, k -> new TokenBucketRateLimiter(finalLimitQps));
        boolean acquired = rateLimiter.acquire();
        // 不需要限流
        if (acquired) {
            ctx.fireChannelRead(cmd);
            return;
        }
        // 需要限流
  RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY, "RateLimit");
  response.setOpaque(cmd.getOpaque());
  ctx.writeAndFlush(response);
    }
}
           

6.限流處理器之遠端通路

限流處理器需要具備遠端實時修改和通路的功能,以便能夠根據某個topic的流量進行調整,避免重新開機broker。

通過擴充RocketMQ的通信協定,支援限流處理器的屬性擷取和修改,實作了在web端進行實時檢視和修改,如下圖所示:

RocketMQ快速失敗機制引發叢集不穩定的思考和優化

五、總結

Broker的快速失敗機制本質是為了保護broker,而進行的整體性限流,然而該限流機制的粒度顯然過于粗糙。

因為通常情況下,broker是多個業務共用的,不能因為某個業務流量的突增而影響整體的可用性,是以,應該以更為精細的粒度來進行限流保護。

通過在broker的通信層增加額外的限流處理器,進行topic次元的限流保護,并針對限流處理器的線程池制定合适的拒絕政策,保障該限流處理器即使無法執行時,也會将事件傳播到下一個處理器,不影響整體事件流程。

另外,改造guava的限速器,支援熔斷及實時響應,以滿足broker限流的需求,進而避免了某個topic流量過大對其他topic的影響,提升了整體叢集的穩定性。

六、參考文獻

  1. RocketMQ開發指南 https://github.com/apache/rocketmq/tree/develop/docs/cn
  2. guava限流原理 https://zhuanlan.zhihu.com/p/60979444

作者:高永飛

來源:微信公衆号:搜狐技術産品

出處:https://mp.weixin.qq.com/s/OWfh2Iy8b8pT9EvUaouPnw

繼續閱讀