天天看點

RabbitMQ消費者确認消息入門示範 源碼dmeo在文章末尾擷取👇🏻1. SpringAMQP則允許配置三種确認模式 

 源碼dmeo在文章末尾擷取👇🏻

1. SpringAMQP則允許配置三種确認模式 

RabbitMQ消費者确認消息入門示範 源碼dmeo在文章末尾擷取👇🏻1. SpringAMQP則允許配置三種确認模式 

1. manual:手動ack,需要在業務代碼結束後,調用api發送ack。

2. auto:自動ack,由spring監測listener代碼是否出現異常,沒有異常則傳回ack;抛出異常則傳回nack

3.  none:關閉ack,MQ假定消費者擷取消息後會成功處理,是以消息投遞後立即被删除

首先聲明隊列交換機 

@Configuration
public class CommonConfig {

    @Bean
    public DirectExchange simpleDirect() {
        return new DirectExchange("simple.direct", true, false);
    }

    @Bean
    public Queue simpleQueue() {
        return QueueBuilder.durable("simple.queue").build();
    }
}
           

建立一個會報錯的消息接收方 ,模拟消費者報錯

@RabbitListener(queues = "simple.queue")
    public void listenSimpleQueue(String msg){
        log.info("消費者接收到simple.queue的消息:【" + msg + "】");
        //該代碼會報錯異常, 數值溢出
        System.out.println(1 / 0);
        System.out.println("消費者處理消息成功!");
    }
           

 配置好上述這些代碼, 我們就可以啟動消費者伺服器了, 然後去mq用戶端檢視建立的隊列和交換機

RabbitMQ消費者确認消息入門示範 源碼dmeo在文章末尾擷取👇🏻1. SpringAMQP則允許配置三種确認模式 

1.2 none

在我們配置成none的時候, 很明顯此消息投遞後立即被删除

RabbitMQ消費者确認消息入門示範 源碼dmeo在文章末尾擷取👇🏻1. SpringAMQP則允許配置三種确認模式 

 1.3 aoto(推薦使用)

當換成aoto模式的時候, 代碼出現異常消息, mq會進行重新投遞, 但是重新投遞會一直無限重試

RabbitMQ消費者确認消息入門示範 源碼dmeo在文章末尾擷取👇🏻1. SpringAMQP則允許配置三種确認模式 
RabbitMQ消費者确認消息入門示範 源碼dmeo在文章末尾擷取👇🏻1. SpringAMQP則允許配置三種确認模式 

為了解決這種無限重試的問題, spring提供了retry(重試)機制, 使用這個機制我們就可以在消費者報錯抛異常的時候, 利用本地的重試來解決這個問題, 如下所示. 

RabbitMQ消費者确認消息入門示範 源碼dmeo在文章末尾擷取👇🏻1. SpringAMQP則允許配置三種确認模式 

 現在我們配置了失敗等待的時間為1秒, 等待時間的倍數為2 , 重試次數最大為3 , 這樣一來我們就會收到三次重試的資訊

第一次和第二次的時間相差一秒(失敗時間(1秒) x 失敗倍數1 = 1秒), 

第二次和第三次的時間相差兩秒(失敗時間(1秒) x 失敗倍數2 = 2秒), 

最後在到達最大重試次數就會停止重試

RabbitMQ消費者确認消息入門示範 源碼dmeo在文章末尾擷取👇🏻1. SpringAMQP則允許配置三種确認模式 

但是這種方法的弊端就是當重試次數達到最大耗盡後, 消息回直接被丢棄

 這裡我們采用一種解決方案就是RepublishMessageRecoverer失敗之後将消息投遞到一個指定的隊列當中

1. 建立RepublishMessageRecoverer 交換機和隊列

import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


@Configuration
public class ErrorMessageConfig {

    @Bean
    public DirectExchange errorMessageExchange(){
        return new DirectExchange("error.direct");
    }

    @Bean
    public Queue errorQueue(){
        return new Queue("error.queue", true);
    }

    @Bean
    public Binding errorMessageBinding(){
        return BindingBuilder.bind(errorQueue()).to(errorMessageExchange()).with("e");
    }

    //失敗後将消息投遞到一個指定的,專門存放異常消息的隊列,後續由人工集中處理。
    @Bean
    public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
        return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "e");
    }
}
           

 建立成功後啟動伺服器, 在rabbitmq用戶端檢視建立的隊列和交換機

RabbitMQ消費者确認消息入門示範 源碼dmeo在文章末尾擷取👇🏻1. SpringAMQP則允許配置三種确認模式 

 并向simple.queue隊列發送消息

RabbitMQ消費者确認消息入門示範 源碼dmeo在文章末尾擷取👇🏻1. SpringAMQP則允許配置三種确認模式 

 這時重試的次數已經達到最大, MQ就轉發到失敗的隊列當中去了

Republishing failed message to exchange 'error.direct' with routing key e
RabbitMQ消費者确認消息入門示範 源碼dmeo在文章末尾擷取👇🏻1. SpringAMQP則允許配置三種确認模式 

 我們再檢視失敗的消息error.queue的資訊, 這裡直接展示了我們控制台中棧錯誤資訊, 其中錯誤原因就是 代碼1/0的問題

RabbitMQ消費者确認消息入門示範 源碼dmeo在文章末尾擷取👇🏻1. SpringAMQP則允許配置三種确認模式 
RabbitMQ消費者确認消息入門示範 源碼dmeo在文章末尾擷取👇🏻1. SpringAMQP則允許配置三種确認模式 

 連結:https://pan.baidu.com/s/1il41ywFnYM4_q3MU9GN_MQ 

提取碼:heng