源碼dmeo在文章末尾擷取👇🏻
1. SpringAMQP則允許配置三種确認模式
1. manual:手動ack,需要在業務代碼結束後,調用api發送ack。
2. auto:自動ack,由spring監測listener代碼是否出現異常,沒有異常則傳回ack;抛出異常則傳回nack
3. none:關閉ack,MQ假定消費者擷取消息後會成功處理,是以消息投遞後立即被删除
首先聲明隊列交換機
@Configuration
public class CommonConfig {
@Bean
public DirectExchange simpleDirect() {
return new DirectExchange("simple.direct", true, false);
}
@Bean
public Queue simpleQueue() {
return QueueBuilder.durable("simple.queue").build();
}
}
建立一個會報錯的消息接收方 ,模拟消費者報錯
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueue(String msg){
log.info("消費者接收到simple.queue的消息:【" + msg + "】");
//該代碼會報錯異常, 數值溢出
System.out.println(1 / 0);
System.out.println("消費者處理消息成功!");
}
配置好上述這些代碼, 我們就可以啟動消費者伺服器了, 然後去mq用戶端檢視建立的隊列和交換機
1.2 none
在我們配置成none的時候, 很明顯此消息投遞後立即被删除
1.3 aoto(推薦使用)
當換成aoto模式的時候, 代碼出現異常消息, mq會進行重新投遞, 但是重新投遞會一直無限重試
為了解決這種無限重試的問題, spring提供了retry(重試)機制, 使用這個機制我們就可以在消費者報錯抛異常的時候, 利用本地的重試來解決這個問題, 如下所示.
現在我們配置了失敗等待的時間為1秒, 等待時間的倍數為2 , 重試次數最大為3 , 這樣一來我們就會收到三次重試的資訊
第一次和第二次的時間相差一秒(失敗時間(1秒) x 失敗倍數1 = 1秒),
第二次和第三次的時間相差兩秒(失敗時間(1秒) x 失敗倍數2 = 2秒),
最後在到達最大重試次數就會停止重試
但是這種方法的弊端就是當重試次數達到最大耗盡後, 消息回直接被丢棄
這裡我們采用一種解決方案就是RepublishMessageRecoverer失敗之後将消息投遞到一個指定的隊列當中
1. 建立RepublishMessageRecoverer 交換機和隊列
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class ErrorMessageConfig {
@Bean
public DirectExchange errorMessageExchange(){
return new DirectExchange("error.direct");
}
@Bean
public Queue errorQueue(){
return new Queue("error.queue", true);
}
@Bean
public Binding errorMessageBinding(){
return BindingBuilder.bind(errorQueue()).to(errorMessageExchange()).with("e");
}
//失敗後将消息投遞到一個指定的,專門存放異常消息的隊列,後續由人工集中處理。
@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "e");
}
}
建立成功後啟動伺服器, 在rabbitmq用戶端檢視建立的隊列和交換機
并向simple.queue隊列發送消息
這時重試的次數已經達到最大, MQ就轉發到失敗的隊列當中去了
Republishing failed message to exchange 'error.direct' with routing key e
我們再檢視失敗的消息error.queue的資訊, 這裡直接展示了我們控制台中棧錯誤資訊, 其中錯誤原因就是 代碼1/0的問題
連結:https://pan.baidu.com/s/1il41ywFnYM4_q3MU9GN_MQ
提取碼:heng