天天看點

RabbitMQ 釋出訂閱-實作延時重試隊列

RabbitMQ消息處理失敗,我們會讓失敗消息進入重試隊列等待執行,因為在重試隊列距離真正執行還需要定義的時間間隔,是以,我們可以将重試隊列設定成延時處理。今天參考網上其他人的實作,簡單梳理下消息延時重試執行的思路。

消費失敗後,自動延時将消息重新投遞,當達到一定的重試次數後,将消息投遞到失敗消息隊列,等待人工介入處理。在這裡我們一步一步實作一個帶有失敗重試功能的釋出訂閱元件,使用該元件後可以非常簡單的實作消息的釋出訂閱。

業務背景

結合RabbitMQ的Topic模式和Work Queue模式實作生産方産生消息,消費方按需訂閱,消息投遞到消費方的隊列之後,多個worker同時對消息進行消費

結合RabbitMQ的 Message TTL 和 Dead Letter Exchange 實作消息的延時重試功能

消息達到最大重試次數之後,将其投遞到失敗隊列,等待人工介入處理bug後,重新将其加入隊列消費

執行流程圖

RabbitMQ 釋出訂閱-實作延時重試隊列

生産者釋出消息到主Exchange

主Exchange根據Routing Key将消息分發到對應的消息隊列

多個消費者的worker程序同時對隊列中的消息進行消費,是以它們之間采用“競争”的方式來争取消息的消費

消息消費後,不管成功失敗,都要傳回ACK消費确認消息給隊列,避免消息消費确認機制導緻重複投遞,同時,如果消息處理成功,則結束流程,否則進入重試階段

如果重試次數小于設定的最大重試次數(預設為3次),則将消息重新投遞到Retry Exchange的重試隊列

重試隊列不需要消費者直接訂閱,它會等待消息的有效時間過期之後,重新将消息投遞給Dead Letter Exchange,我們在這裡将其設定為主Exchange,實作延時後重新投遞消息,這樣消費者就可以重新消費消息

如果三次以上都是消費失敗,則認為消息無法被處理,直接将消息投遞給Failed Exchange的Failed Queue,這時候應用可以觸發報警機制,以通知相關責任人處理

等待人工介入處理(解決bug)之後,重新将消息投遞到主Exchange,這樣就可以重新消費了

技術實作:

為了實作消息的延時重試和失敗存儲,我們需要建立三個Exchange來處理消息。

master 主Exchange,釋出消息時釋出到該Exchange

master.retry 重試Exchange,消息處理失敗時(3次以内),将消息重新投遞給該Exchange

master.failed 失敗Exchange,超過三次重試失敗後,消息投遞到該Exchange

所有的Exchange聲明(declare)必須使用以下參數

參數

說明

exchange

-

Exchange名稱

type

topic

Exchange 類型

passive

false

如果Exchange已經存在,則傳回成功,不存在則建立

durable

true

持久化存儲Exchange,這裡僅僅是Exchange本身持久化,消息和隊列需要單獨指定其持久化

no-wait

該方法需要應答确認

在RabbitMQ的管理界面中,我們可以看到建立的三個Exchange

RabbitMQ 釋出訂閱-實作延時重試隊列

消息釋出時,使用<code>basic_publish</code>方法,參數如下

message

釋出的消息對象

master

消息釋出到的Exchange

routing-key

路由KEY,用于辨別消息類型

mandatory

是否強制路由,指定了該選項後,如果沒有訂閱該消息,則會傳回路由不可達錯誤

immediate

指定了當消息無法直接路由給消費者時如何處理

釋出消息時,對于<code>message</code>對象,其内容使用json編碼後的字元串,同時消息進行持久化

消息訂閱的實作相對複雜一些,需要完成隊列的聲明以及隊列和Exchange的綁定

對于每一個訂閱消息的服務,都必須建立一個該服務對應的隊列,将該隊列綁定到關注的路由規則,這樣之後,消息生産者将消息投遞給Exchange之後,就會按照路由規則将消息分發到對應的隊列供消費者消費了。

消費服務需要declare三個隊列

<code>[queue_name]</code> 隊列名稱,格式符合 <code>[服務名稱]@訂閱服務辨別</code>

<code>[queue_name]@retry</code> 重試隊列

<code>[queue_name]@failed</code> 失敗隊列

Declare隊列時,參數規定規則如下

queue

隊列名稱

隊列不存在則建立,存在則直接成功

隊列持久化

exclusive

排他,指定該選項為true則隊列隻對目前連接配接有效,連接配接斷開後自動删除

auto-delete

當不再使用時,是否自動删除

對于<code>@retry</code>重試隊列,需要指定額外參數

檢視隊列的詳細資訊,我們可以看到 queueName@retry 隊列與其它兩個隊列的不同

RabbitMQ 釋出訂閱-實作延時重試隊列

Queue

Exchange

[queue_name]

[queue_name]@retry

master.retry

[queue_name]@failed

master.failed

綁定時,需要提供訂閱的路由KEY,該路由KEY與消息釋出時的路由KEY對應,差別是這裡可以使用通配符同時訂閱多種類型的消息。

綁定的隊列

綁定的Exchange

訂閱的消息路由規則

RabbitMQ 釋出訂閱-實作延時重試隊列

使用 <code>basic_consume</code> 對消息進行消費的時候,需要注意下面參數

消費的隊列名稱

consumer-tag

消費者辨別,留白即可

no_local

如果設定了該字段,伺服器将不會釋出消息到 釋出它的用戶端

no_ack

需要消費确認應答

排他通路,設定後隻允許目前消費者通路該隊列

nowait

消費端在消費消息時,需要從消息中擷取消息被消費的次數,以此判斷該消息處理失敗時重試還是發送到失敗隊列。

在消息發送到重試隊列和失敗隊列時,我們在消息的headers中添加了一個名為<code>x-orig-routing-key</code>的字段,該字段是實作消息重試的關鍵字段,由于我們的消息需要在不同的Exchange,Queue之間流轉,為了避免消息在重新投遞到主Exchange時,被所有的消費者隊列重新消費,在重試過程中,我們将消息的routing-key修改為隊列名稱,直接投遞給原始消費消息的隊列。<code>x-orig-routing-key</code>用于在之後能夠重新擷取到最開始的routing-key。

這裡的重複消費是指 某個消息被兩個消費方A和B消費了,其中A消費失敗,B成功,這時候,消息由A消費者重新投遞到主Exchange後,B消費隊列也會擷取到該消息,是以就會導緻B消費者重複消費已經消費國的消息

本文實作延時重試,使用了三個重試Exchange,Exchange如果訂閱特别多的話,Exchange的壓力會非常大,是以在非常極端的情況下,消息大批量失敗,且消息收發非常快,那麼Exchange的性能可能會有問題。

本文是使用釋出訂閱實作延時重試的消息執行,也會有其他思路。