RabbitMQ消息處理失敗,我們會讓失敗消息進入重試隊列等待執行,因為在重試隊列距離真正執行還需要定義的時間間隔,是以,我們可以将重試隊列設定成延時處理。今天參考網上其他人的實作,簡單梳理下消息延時重試執行的思路。
消費失敗後,自動延時将消息重新投遞,當達到一定的重試次數後,将消息投遞到失敗消息隊列,等待人工介入處理。在這裡我們一步一步實作一個帶有失敗重試功能的釋出訂閱元件,使用該元件後可以非常簡單的實作消息的釋出訂閱。
業務背景
結合RabbitMQ的Topic模式和Work Queue模式實作生産方産生消息,消費方按需訂閱,消息投遞到消費方的隊列之後,多個worker同時對消息進行消費
結合RabbitMQ的 Message TTL 和 Dead Letter Exchange 實作消息的延時重試功能
消息達到最大重試次數之後,将其投遞到失敗隊列,等待人工介入處理bug後,重新将其加入隊列消費
執行流程圖
生産者釋出消息到主Exchange
主Exchange根據Routing Key将消息分發到對應的消息隊列
多個消費者的worker程序同時對隊列中的消息進行消費,是以它們之間采用“競争”的方式來争取消息的消費
消息消費後,不管成功失敗,都要傳回ACK消費确認消息給隊列,避免消息消費确認機制導緻重複投遞,同時,如果消息處理成功,則結束流程,否則進入重試階段
如果重試次數小于設定的最大重試次數(預設為3次),則将消息重新投遞到Retry Exchange的重試隊列
重試隊列不需要消費者直接訂閱,它會等待消息的有效時間過期之後,重新将消息投遞給Dead Letter Exchange,我們在這裡将其設定為主Exchange,實作延時後重新投遞消息,這樣消費者就可以重新消費消息
如果三次以上都是消費失敗,則認為消息無法被處理,直接将消息投遞給Failed Exchange的Failed Queue,這時候應用可以觸發報警機制,以通知相關責任人處理
等待人工介入處理(解決bug)之後,重新将消息投遞到主Exchange,這樣就可以重新消費了
技術實作:
為了實作消息的延時重試和失敗存儲,我們需要建立三個Exchange來處理消息。
master 主Exchange,釋出消息時釋出到該Exchange
master.retry 重試Exchange,消息處理失敗時(3次以内),将消息重新投遞給該Exchange
master.failed 失敗Exchange,超過三次重試失敗後,消息投遞到該Exchange
所有的Exchange聲明(declare)必須使用以下參數
參數
值
說明
exchange
-
Exchange名稱
type
topic
Exchange 類型
passive
false
如果Exchange已經存在,則傳回成功,不存在則建立
durable
true
持久化存儲Exchange,這裡僅僅是Exchange本身持久化,消息和隊列需要單獨指定其持久化
no-wait
該方法需要應答确認
在RabbitMQ的管理界面中,我們可以看到建立的三個Exchange
消息釋出時,使用<code>basic_publish</code>方法,參數如下
message
釋出的消息對象
master
消息釋出到的Exchange
routing-key
路由KEY,用于辨別消息類型
mandatory
是否強制路由,指定了該選項後,如果沒有訂閱該消息,則會傳回路由不可達錯誤
immediate
指定了當消息無法直接路由給消費者時如何處理
釋出消息時,對于<code>message</code>對象,其内容使用json編碼後的字元串,同時消息進行持久化
消息訂閱的實作相對複雜一些,需要完成隊列的聲明以及隊列和Exchange的綁定
對于每一個訂閱消息的服務,都必須建立一個該服務對應的隊列,将該隊列綁定到關注的路由規則,這樣之後,消息生産者将消息投遞給Exchange之後,就會按照路由規則将消息分發到對應的隊列供消費者消費了。
消費服務需要declare三個隊列
<code>[queue_name]</code> 隊列名稱,格式符合 <code>[服務名稱]@訂閱服務辨別</code>
<code>[queue_name]@retry</code> 重試隊列
<code>[queue_name]@failed</code> 失敗隊列
Declare隊列時,參數規定規則如下
queue
隊列名稱
隊列不存在則建立,存在則直接成功
隊列持久化
exclusive
排他,指定該選項為true則隊列隻對目前連接配接有效,連接配接斷開後自動删除
auto-delete
當不再使用時,是否自動删除
對于<code>@retry</code>重試隊列,需要指定額外參數
檢視隊列的詳細資訊,我們可以看到 queueName@retry 隊列與其它兩個隊列的不同
Queue
Exchange
[queue_name]
[queue_name]@retry
master.retry
[queue_name]@failed
master.failed
綁定時,需要提供訂閱的路由KEY,該路由KEY與消息釋出時的路由KEY對應,差別是這裡可以使用通配符同時訂閱多種類型的消息。
綁定的隊列
綁定的Exchange
訂閱的消息路由規則
使用 <code>basic_consume</code> 對消息進行消費的時候,需要注意下面參數
消費的隊列名稱
consumer-tag
消費者辨別,留白即可
no_local
如果設定了該字段,伺服器将不會釋出消息到 釋出它的用戶端
no_ack
需要消費确認應答
排他通路,設定後隻允許目前消費者通路該隊列
nowait
消費端在消費消息時,需要從消息中擷取消息被消費的次數,以此判斷該消息處理失敗時重試還是發送到失敗隊列。
在消息發送到重試隊列和失敗隊列時,我們在消息的headers中添加了一個名為<code>x-orig-routing-key</code>的字段,該字段是實作消息重試的關鍵字段,由于我們的消息需要在不同的Exchange,Queue之間流轉,為了避免消息在重新投遞到主Exchange時,被所有的消費者隊列重新消費,在重試過程中,我們将消息的routing-key修改為隊列名稱,直接投遞給原始消費消息的隊列。<code>x-orig-routing-key</code>用于在之後能夠重新擷取到最開始的routing-key。
這裡的重複消費是指 某個消息被兩個消費方A和B消費了,其中A消費失敗,B成功,這時候,消息由A消費者重新投遞到主Exchange後,B消費隊列也會擷取到該消息,是以就會導緻B消費者重複消費已經消費國的消息
本文實作延時重試,使用了三個重試Exchange,Exchange如果訂閱特别多的話,Exchange的壓力會非常大,是以在非常極端的情況下,消息大批量失敗,且消息收發非常快,那麼Exchange的性能可能會有問題。
本文是使用釋出訂閱實作延時重試的消息執行,也會有其他思路。