天天看點

用了這麼久的RabbitMQ異步程式設計竟然都是錯的!(下)

2.3 死信堵塞MQ之坑

始終無法處理的死信消息,可能會引發堵塞MQ。

若線程池的任務隊列無上限,最終可能導緻OOM,類似的MQ也要注意任務堆積問題。對于突發流量引起的MQ堆積,問題并不大,适當調整消費者的消費能力應該就可以解決。但在很多時候,消息隊列的堆積堵塞,是因為有大量始終無法處理的消息。

2.3.1 案例

使用者服務在使用者注冊後發出一條消息,會員服務監聽到消息後給使用者派發優惠券,但因使用者并沒有儲存成功,會員服務處理消息始終失敗,消息重新進入隊列,然後還是處理失敗。這種在MQ中回蕩的同一條消息,就是死信。

随着MQ被越來越多的死信填滿,消費者需花費大量時間反複處理死信,導緻正常消息的消費受阻,最終MQ可能因資料量過大而崩潰。

定義一個隊列、一個直接交換器,然後把隊列綁定到交換器

用了這麼久的RabbitMQ異步程式設計竟然都是錯的!(下)

sendMessage發送消息到MQ,通路一次送出一條消息,使用自增辨別作為消息内容

用了這麼久的RabbitMQ異步程式設計竟然都是錯的!(下)

收到消息後,直接NPE,模拟處理出錯

用了這麼久的RabbitMQ異步程式設計竟然都是錯的!(下)

調用sendMessage接口發送兩條消息,然後來到RabbitMQ管理台,可以看到這兩條消息始終在隊列,不斷被重新投遞,導緻重新投遞QPS達到1063。

用了這麼久的RabbitMQ異步程式設計竟然都是錯的!(下)

在日志中也可看到大量異常資訊。

修複方案

  • 解決死信無限重複進入隊列最簡單方案

    在程式處理出錯時,直接抛

    AmqpRejectAndDontRequeueException

    ,避免消息重新進入隊列
throw new AmqpRejectAndDontRequeueException("error");      

但更希望對同一消息,能夠先進行幾次重試,解決因為網絡問題導緻的偶發消息處理失敗,若依舊失敗,再把消息投遞到專門設定的DLX。對于來自DLX的資料,可能隻是記錄日志發送報警,即使出現異常也不會再重複投遞。

  • 邏輯如下
  • 用了這麼久的RabbitMQ異步程式設計竟然都是錯的!(下)
  • 針對該問題,我們來看

Spring AMQP的簡便解決方案

  1. 定義死信交換器、死信隊列。其實都是普通交換器和隊列,隻不過專門用于處理死信消息
  2. 通過RetryInterceptorBuilder建構一個RetryOperationsInterceptor以處理失敗時候的重試。政策是最多嘗試5次(重試4次);并且采取指數退避重試,首次重試延遲1秒,第二次2秒,以此類推,最大延遲是10秒;如果第4次重試還是失敗,則使用RepublishMessageRecoverer把消息重新投入一個DLX
  3. 定義死信隊列的處理程式。本案例隻記錄日志

代碼

用了這麼久的RabbitMQ異步程式設計竟然都是錯的!(下)

執行程式,發送兩條消息,檢視日志:

用了這麼久的RabbitMQ異步程式設計竟然都是錯的!(下)

msg2的4次重試間隔分别是1秒、2秒、4秒、8秒,再加上首次的失敗,是以最大嘗試次數是5

4次重試後,RepublishMessageRecoverer把消息發往DLX

死信處理程式輸出了got dead message msg2。

雖然幾乎同時發倆消息,但msg2在msg1四次重試全部結束後才開始處理,因為預設SimpleMessageListenerContainer隻有一個消費線程。可通過增加消費線程避免性能問題:

直接設定concurrentConsumers參數為10,來增加到10個工作線程

用了這麼久的RabbitMQ異步程式設計竟然都是錯的!(下)
  • 也可設定

    maxConcurrentConsumers

    參數,讓

    SimpleMessageListenerContainer

    動态調整消費者線程數。

小結

一般在遇到消息處理失敗的時候,可設定重試。若重試還是不行,可把該消息扔到專門的死信隊列處理,不要讓死信影響到正常消息處理。