天天看點

RocketMQ消費消息失敗的處理流程

RocketMQ消費失敗流程圖

RocketMQ消費消息失敗的處理流程

 RocketMQ消費失敗細節

        一般的,我們在RocketMQ處理消息的時候,可能會在消費者中使用類似下面的代碼

consumer.registerMessageListener(new MessageListenerConcurrently() {
  @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
        ConsumeConcurrentlyContext context) {
        System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
});
           

        如果消息被成功消費的話,會傳回ConsumeConcurrentlyStatus.CONSUME_SUCCESS狀态,但是如果消息消費失敗的話,又會怎麼處理呢?

其實我們隻要找到ConsumeConcurrentlyStatus這個枚舉就能知道RocketMQ是如何處理了,代碼如下:

public enum ConsumeConcurrentlyStatus {
    /**
     * Success consumption
     */
    CONSUME_SUCCESS,
    /**
     * Failure consumption,later try to consume
     */
    RECONSUME_LATER;
}
           

        很明顯,如果無法傳回CONSUME_SUCCESS狀态,那麼就傳回RECONSUME_LATER,過一會再嘗試消費即可。那麼第二個問題來了,既然這條消息消費失敗了,總不能一直卡着後面的消息也等着吧,那麼消費失敗的消息肯定需要放到另一個Topic中,讓它一個人等着被再次消費

        是以這時會有一個重試隊列,用于暫時儲存因為各種異常而導緻Consumer端無法消費的消息,重試隊列的名稱是在原隊列的名稱前加上%RETRY%(這個Topic的重試隊列是針對消費組,而不是針對每個Topic設定的)

RocketMQ對于重試消息的處理是先儲存至Topic名稱為“SCHEDULE_TOPIC_XXXX”的延遲隊列中,背景定時任務按照對應的時間進行Delay後重新儲存至“%RETRY%+consumerGroup”的重試隊列中

        在RocketMQ的console控制台上可以看到重試隊列的資訊

RocketMQ消費消息失敗的處理流程

        現在我們已經知道消費失敗的消息會進入重試隊列,那麼多久重試一次呢?能進行多少次的重試呢?

        考慮到異常恢複起來需要一些時間,會為重試隊列設定多個重試級别,每個重試級别都有與之對應的重新投遞延時間,重試次數越多投遞延時就越大。有一個參數messageDelayLevel,這個參數是在伺服器端的Broker上配置的,預設是

messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
           

        預設是最多可以重試16次

        如果重試了16次之後,這條消息還是沒有被成功消費,那麼就認為這條消息是搶救不過來了,此時,消息隊列不會立刻将消息丢棄,于是它被放入了死信隊列中,上面重試隊列的圖中你也可以看到死信隊列,死信隊列的名稱是在原隊列名稱前加%DLQ%。如果你還是不死心的話,覺得這條消息還能搶救一下,可以開啟一個背景線程不斷掃描死信隊列然後繼續重試,也可以通過使用console控制台對死信隊列中的消息進行重發來使得消費者執行個體再次進行消費

RocketMQ消費消息失敗的處理辦法_LO_YUN的部落格-CSDN部落格_rocketmq消費失敗如何放回隊列

https://blog.csdn.net/LO_YUN/article/details/104301740