2.3 死信堵塞MQ之坑
始終無法處理的死信消息,可能會引發堵塞MQ。
若線程池的任務隊列無上限,最終可能導緻OOM,類似的MQ也要注意任務堆積問題。對于突發流量引起的MQ堆積,問題并不大,适當調整消費者的消費能力應該就可以解決。但在很多時候,消息隊列的堆積堵塞,是因為有大量始終無法處理的消息。
2.3.1 案例
使用者服務在使用者注冊後發出一條消息,會員服務監聽到消息後給使用者派發優惠券,但因使用者并沒有儲存成功,會員服務處理消息始終失敗,消息重新進入隊列,然後還是處理失敗。這種在MQ中回蕩的同一條消息,就是死信。
随着MQ被越來越多的死信填滿,消費者需花費大量時間反複處理死信,導緻正常消息的消費受阻,最終MQ可能因資料量過大而崩潰。
定義一個隊列、一個直接交換器,然後把隊列綁定到交換器
![](https://img.laitimes.com/img/__Qf2AjLwojIjJCLyojI0JCLicmbw5iNklDM2EjNhJzN5M2NjRTY0gjY0EDO5IWZzgDMycDZy8CX5d2bs92Yl1iclB3bsVmdlR2LcNWaw9CXt92Yu4GZjlGbh5yYjV3Lc9CX6MHc0RHaiojIsJye.png)
sendMessage發送消息到MQ,通路一次送出一條消息,使用自增辨別作為消息内容
收到消息後,直接NPE,模拟處理出錯
調用sendMessage接口發送兩條消息,然後來到RabbitMQ管理台,可以看到這兩條消息始終在隊列,不斷被重新投遞,導緻重新投遞QPS達到1063。
在日志中也可看到大量異常資訊。
修複方案
-
解決死信無限重複進入隊列最簡單方案
在程式處理出錯時,直接抛
,避免消息重新進入隊列AmqpRejectAndDontRequeueException
throw new AmqpRejectAndDontRequeueException("error");
但更希望對同一消息,能夠先進行幾次重試,解決因為網絡問題導緻的偶發消息處理失敗,若依舊失敗,再把消息投遞到專門設定的DLX。對于來自DLX的資料,可能隻是記錄日志發送報警,即使出現異常也不會再重複投遞。
- 邏輯如下
-
用了這麼久的RabbitMQ異步程式設計竟然都是錯的!(下) - 針對該問題,我們來看
Spring AMQP的簡便解決方案
- 定義死信交換器、死信隊列。其實都是普通交換器和隊列,隻不過專門用于處理死信消息
- 通過RetryInterceptorBuilder建構一個RetryOperationsInterceptor以處理失敗時候的重試。政策是最多嘗試5次(重試4次);并且采取指數退避重試,首次重試延遲1秒,第二次2秒,以此類推,最大延遲是10秒;如果第4次重試還是失敗,則使用RepublishMessageRecoverer把消息重新投入一個DLX
- 定義死信隊列的處理程式。本案例隻記錄日志
代碼
執行程式,發送兩條消息,檢視日志:
msg2的4次重試間隔分别是1秒、2秒、4秒、8秒,再加上首次的失敗,是以最大嘗試次數是5
4次重試後,RepublishMessageRecoverer把消息發往DLX
死信處理程式輸出了got dead message msg2。
雖然幾乎同時發倆消息,但msg2在msg1四次重試全部結束後才開始處理,因為預設SimpleMessageListenerContainer隻有一個消費線程。可通過增加消費線程避免性能問題:
直接設定concurrentConsumers參數為10,來增加到10個工作線程
- 也可設定
參數,讓maxConcurrentConsumers
動态調整消費者線程數。SimpleMessageListenerContainer
小結
一般在遇到消息處理失敗的時候,可設定重試。若重試還是不行,可把該消息扔到專門的死信隊列處理,不要讓死信影響到正常消息處理。