天天看點

生産RabbitMQ隊列阻塞該如何處理?

生産RabbitMQ隊列阻塞該如何處理?

前言

  那天我和同僚一起吃完晚飯回公司加班,然後就群裡就有人@我說xxx商戶說收不到推送,一開始覺得沒啥。我第一反應是不是極光沒注冊上,就讓客服通知商戶,重新登入下試試。這邊打開極光推送的背景進行檢查。後面反應收不到推送的越來越多,我就知道這事情不簡單。

事故經過

  由于大量商戶反應收不到推送,我第一反應是不是推送系統挂了,導緻沒有進行推送。于是讓運維老哥檢查推送系統各節點的情況,發現都正常。于是打開RabbitMQ的管控台看了一下,人都蒙了。已經有幾萬條消息處于

ready

狀态,還有幾百條

unacked

的消息。

  我以為推送服務和MQ連接配接斷開了,導緻無法推送消息,于是讓運維重新開機推送服務,将所有的推送服務重新開機完,發現

unacked

的消息全部變成

ready

,但是沒過多久又有幾百條

unacked

的消息了,這個就很明顯了能消費,沒有進行

ack

呀。

  當時我以為是網絡問題,導緻mq無法接收到

ack

,讓運維老哥檢查了一下,發現網絡沒問題。現在看是真的是傻,網絡有問題連接配接都連不上。由于确定的是無法

ack

造成的,立馬将

ack模式

由原來的

manual

改成

auto

緊急釋出。将所有的節點更新好以後,發現推送正常了。

  你以為這就結束了其實并沒有,沒過多久發現有一台MQ服務出現異常,由于生産采用了

鏡像隊列

,立即将這台有問題的MQ從叢集中移除。直接進行重置,然後加入回叢集。這事情算是告一段落了。此時已經接近24:00了。

生産RabbitMQ隊列阻塞該如何處理?

  時間來到第二天上午10:00,運維那邊又出現報警了,說推送系統有台機器,磁盤快被寫滿了,并且占用率很高。我的乖乖從昨晚到現在寫了快40G的日志,一看報錯資訊瞬間就明白問題出在哪裡了。麻溜的把

bug

修了緊急釋出。

吐槽一波公司的ELK,壓根就沒有收集到這個報錯資訊,導緻我沒有及時發現。
生産RabbitMQ隊列阻塞該如何處理?

事故重制-隊列阻塞

MQ配置

spring:
  # 消息隊列
  rabbitmq:
    host: 10.0.0.53
    username: guest
    password: guest
    virtual-host: local
    port: 5672
    # 消息發送确認
    publisher-confirm-type: correlated
    # 開啟發送失敗退回
    publisher-returns: true
    listener:
      simple:
        # 消費端最小并發數
        concurrency: 1
        # 消費端最大并發數
        max-concurrency: 5
        # 一次請求中預處理的消息數量
        prefetch: 2
        # 手動應答
        acknowledge-mode: manual           

問題代碼

@RabbitListener(queues = ORDER_QUEUE)
public void receiveOrder(@Payload String encryptOrderDto,
                                      @Headers Map<String,Object> headers,
                                      Channel channel) throws Exception {
    // 解密和解析
    String decryptOrderDto = EncryptUtil.decryptByAes(encryptOrderDto);
    OrderDto orderDto = JSON.parseObject(decryptOrderDto, OrderDto.class);

    try {
        // 模拟推送
        pushMsg(orderDto);
    }catch (Exception e){
        log.error("推送失敗-錯誤資訊:{},消息内容:{}", e.getLocalizedMessage(), JSON.toJSONString(orderDto));
    }finally {
        // 消息簽收
        channel.basicAck((Long) headers.get(AmqpHeaders.DELIVERY_TAG),false);
    }

}           

  看起來好像沒啥問題。由于和交易系統約定好,訂單資料需要先轉換

json

串,然後再使用

AES

進行加密,是以這邊需要,先進行解密然後在進行解析。才能得到訂單資料。

  為了防止消息丢失,交易系統做了

失敗重發

機制,防止消息丢失,不巧的是重發的時候沒有對訂單資料進行加密。這就導緻推送系統,在解密的時候出異常,進而無法進行

ack

默默的吐槽一句:人在家中坐,鍋從天上來。

模拟推送

推送代碼

發送3條正常的消息

curl http://localhost:8080/sendMsg/3           

發送1條錯誤的消息

curl http://localhost:8080/sendErrorMsg/1           

再發送3條正常的消息

curl http://localhost:8080/sendMsg/3           

  觀察日志發下,雖然有報錯,但是還能正常進行推送。但是RabbitMQ已經出現了一條

unacked

生産RabbitMQ隊列阻塞該如何處理?

繼續發送1條錯誤的消息

curl http://localhost:8080/sendErrorMsg/1           
curl http://localhost:8080/sendMsg/3           

  這個時候你會發現控制台報錯,當然錯誤資訊是解密失敗,但是正常的消息卻沒有被消費,這個時候其實隊列已經阻塞了。

生産RabbitMQ隊列阻塞該如何處理?
生産RabbitMQ隊列阻塞該如何處理?

  從

RabbitMQ

管控台也可以看到,剛剛發送的的3條消息處于

ready

狀态。這個時候就如果一直有消息進入,都會堆積在隊裡裡面無法被消費。

curl http://localhost:8080/sendMsg/3           
生産RabbitMQ隊列阻塞該如何處理?

分析原因

  上面說了是由于沒有進行

ack

導緻隊裡阻塞。那麼問題來了,這是為什麼呢?其實這是

RabbitMQ

的一種保護機制。防止當消息激增的時候,海量的消息進入

consumer

而引發

consumer

當機。

  RabbitMQ提供了一種QOS(服務品質保證)功能,即在非自動确認的消息的前提下,限制信道上的消費者所能保持的最大未确認的數量。可以通過設定

PrefetchCount

實作。

  舉例說明:可以了解為在

consumer

前面加了一個緩沖容器,容器能容納最大的消息數量就是

PrefetchCount

。如果容器沒有滿

RabbitMQ

就會将消息投遞到容器内,如果滿了就不投遞了。當

consumer

對消息進行

ack

以後就會将此消息移除,進而放入新的消息。

listener:
  simple:
    # 消費端最小并發數
    concurrency: 1
    # 消費端最大并發數
    max-concurrency: 5
    # 一次處理的消息數量
    prefetch: 2
    # 手動應答
    acknowledge-mode: manual           
prefetch參數就是PrefetchCount

  通過上面的配置發現

prefetch

我隻配置了2,并且

concurrency

配置的隻有1,是以當我發送了2條錯誤消息以後,由于解密失敗這2條消息一直沒有被

ack

。将緩沖區沾滿了,這個時候

RabbitMQ

認為這個

consumer

已經沒有消費能力了就不繼續給它推送消息了,是以就造成了隊列阻塞。

判斷隊列是否有阻塞的風險。

  當

ack

模式為

manual

,并且線上出現了

unacked

消息,這個時候不用慌。由于QOS是限制信道

channel

上的消費者所能保持的最大未确認的數量。是以允許出現

unacked

的數量可以通過

channelCount * prefetchCount * 節點數量

得出。

channlCount

就是由

concurrency

,

max-concurrency

決定的。
  • min

    =

    concurrency * prefetch * 節點數量

  • max

    max-concurrency * prefetch * 節點數量

由此可以的出結論

  • unacked_msg_count

    <

    min

    隊列不會阻塞。但需要及時處理

    unacked

  • unacked_msg_count

    >=

    min

    可能會出現堵塞。
  • unacked_msg_count

    max

    隊列一定阻塞。

這裡需要好好了解一下。

處理方法

其實處理的方法很簡單,将解密和解析的方法放入

try catch

中就解決了這樣不管解密正常與否,消息都會被簽收。如果出錯将會輸出錯誤日志,讓開發人員進行處理了。

對于這個就需要有日志監控系統,來及時告警了。
@RabbitListener(queues = ORDER_QUEUE)
public void receiveOrder(@Payload String encryptOrderDto,
                                      @Headers Map<String,Object> headers,
                                      Channel channel) throws Exception {
    try {

        // 解密和解析
        String decryptOrderDto = EncryptUtil.decryptByAes(encryptOrderDto);
        OrderDto orderDto = JSON.parseObject(decryptOrderDto, OrderDto.class);
        
        // 模拟推送
        pushMsg(orderDto);
    }catch (Exception e){
        log.error("推送失敗-錯誤資訊:{},消息内容:{}", e.getLocalizedMessage(), encryptOrderDto);
    }finally {
        // 消息簽收
        channel.basicAck((Long) headers.get(AmqpHeaders.DELIVERY_TAG),false);
    }

}           

注意的點

  

unacked

的消息在

consumer

切斷連接配接後(重新開機),會自動回到隊頭。

事故重制-磁盤占用飙升

  一開始我不知道代碼有問題,就是以為單純的沒有進行

ack

是以将

ack

模式改成

auto

自動,緊急更新了,這樣不管正常與否,消息都會被簽收,是以在當時确實是解決了問題。

  其實作在回想起來是非常危險的操作的,将

ack

auto

自動,這樣會使QOS不生效。會出現大量消息湧入

consumer

進而造成

consumer

當機,可以是因為當時在晚上,交易比較少,并且推送系統有多個節點,才沒出現問題。

@RabbitListener(queues = ORDER_QUEUE)
public void receiveOrder(@Payload String encryptOrderDto,
                                      @Headers Map<String,Object> headers,
                                      Channel channel) throws Exception {
    // 解密和解析
    String decryptOrderDto = EncryptUtil.decryptByAes(encryptOrderDto);
    OrderDto orderDto = JSON.parseObject(decryptOrderDto, OrderDto.class);
    
    try {

        // 模拟推送
        pushMsg(orderDto);
    }catch (Exception e){
        log.error("推送失敗-錯誤資訊:{},消息内容:{}", e.getLocalizedMessage(), encryptOrderDto);
    }finally {
        // 消息簽收
        channel.basicAck((Long) headers.get(AmqpHeaders.DELIVERY_TAG),false);
    }

}           

配置檔案

listener:
  simple:
    # 消費端最小并發數
    concurrency: 1
    # 消費端最大并發數
    max-concurrency: 5
    # 一次處理的消息數量
    prefetch: 2
    # 手動應答
    acknowledge-mode: auto           

  由于當時不知道交易系統的重發機制,重發時沒有對訂單資料加密的bug,是以還是會發出少量有誤的消息。

curl http://localhost:8080/sendErrorMsg/1           
生産RabbitMQ隊列阻塞該如何處理?
生産RabbitMQ隊列阻塞該如何處理?

原因

RabbitMQ

消息監聽程式異常時,

consumer

會向

rabbitmq server

發送

Basic.Reject

,表示消息拒絕接受,由于

Spring

預設

requeue-rejected

配置為

true

,消息會重新入隊,然後

rabbitmq server

重新投遞。就相當于死循環了,是以控制台在瘋狂刷錯誤日志造成磁盤使用率飙升的原因。

解決方法

  将

default-requeue-rejected: false

即可。

總結

  • 個人建議,生産環境不建議使用自動ack,這樣會QOS無法生效。
  • 在使用手動ack的時候,需要非常注意消息簽收。
  • 其實在将有問題的MQ重置時,是将錯誤的消息給清除才沒有問題了,相當于是消息丢失了。
try {
    // 業務邏輯。
}catch (Exception e){
    // 輸出錯誤日志。
}finally {
    // 消息簽收。
}           

參考資料

代碼位址

https://gitee.com/huangxunhui/rabbitmq_accdient.git

結尾

  如果有人告訴你遇到線上事故不要慌,除非是超級大佬久經沙場。否則就是瞎扯淡,你讓他來試試,看看他會不會大腦一片空白,直冒汗。

  如果覺得對你有幫助,可以多多評論,多多點贊哦,也可以到我的首頁看看,說不定有你喜歡的文章,也可以随手點個關注哦,謝謝。

  我是不一樣的科技宅,每天進步一點點,體驗不一樣的生活。我們下期見!