前言
消費者在處理消息的過程中可能會發生異常,那麼此時此刻該如何處理這個異常的消息呢?
RabbitMQ有兩個方法channel.basicNack 或 channel.basicReject能夠讓消息重新回到原隊列中,這樣子可以實作重試。但是如果第二次消費又發生了異常,一直消費一直異常。由于沒有明确重試次數,會造就無限重試,這是一個緻命的問題。
本文就來使用spring-rabbit中自帶的retry功能來解決這個問題。
編碼
依賴
starter-amqp中包含了spring-rabbit。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.16</version>
<scope>compile</scope>
</dependency>
複制代碼
配置
需要進行簡單的配置即可開啟
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: auto # 消息确認方式,其有三種配置方式,分别是none、manual(手動ack) 和auto(自動ack) 預設auto
retry:
enabled: true #監聽重試是否可用
max-attempts: 5 #最大重試次數 預設為3
initial-interval: 2000 # 傳遞消息的時間間隔 預設1s
host: 47.105.*
port: 5672
virtual-host: /*-1
username: *
password: *
mq:
queueBinding:
queue: prod_queue_pay
exchange:
name: exchang_prod_pay
type: topic
key: prod_pay
複制代碼
建立業務隊列、交換機
@Configuration
public class RabbitConfig {
@Value("${mq.queueBinding.queue}")
private String queueName;
@Value("${mq.queueBinding.exchange.name}")
private String exchangeName;
@Value("${mq.queueBinding.key}")
private String key;
/**
* 業務隊列
* @return
*/
@Bean
public Queue payQueue(){
Map<String,Object> params = new HashMap<>();
return QueueBuilder.durable(queueName).withArguments(params).build();
}
@Bean
public TopicExchange payTopicExchange(){
return new TopicExchange(exchangeName,true,false);
}
//隊列與交換機進行綁定
@Bean
public Binding BindingPayQueueAndPayTopicExchange(Queue payQueue, TopicExchange payTopicExchange){
return BindingBuilder.bind(payQueue).to(payTopicExchange).with(key);
}
}
複制代碼
生産者
@Component
@Slf4j
public class RabbitSender {
@Value("${mq.queueBinding.exchange.name}")
private String exchangeName;
@Value("${mq.queueBinding.key}")
private String key;
@Autowired
private RabbitTemplate rabbitTemplate;
public void send(String msg){
log.info("RabbitSender.send() msg = {}",msg);
// 将消息發送給業務交換機
rabbitTemplate.convertAndSend(exchangeName,key,msg);
}
}
複制代碼
消費者
@Component
@Slf4j
public class RabbitReceiver {
int count = 0;
//測試重試
@RabbitListener(queues = "${mq.queueBinding.queue}")
public void infoConsumption(String data, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {
log.info("重試次數 = {}",count++);
int i = 10 /0;
channel.basicAck(tag,false);
}
}
複制代碼
提供對外方法
@Controller
public class TestController {
@Autowired
private RabbitSender rabbitSender;
@GetMapping
public void test(@RequestParam String msg){
rabbitSender.send(msg);
}
}
複制代碼
然後調用接口:http://localhost:8080/?msg=紅紅火火 ,消息會被發送到 prod_queue_pay這個隊列。然後重試 5 次。
每次重試時間間隔為2秒,與配置相符。
注意: 重試并不是 RabbitMQ 重新發送了消息到了隊列,僅僅是消費者内部進行了重試,換句話說就是重試跟mq沒有任何關系。上述消費者代碼不能添加try{}catch(){},一旦捕獲了異常,在自動 ack 模式下,就相當于消息正确處理了,消息直接被确認掉了,不會觸發重試的。 當然并不是說不能添加 try{}catch(){},而是不能将異常給處理了。可以如下寫:
@RabbitListener(queues = "${mq.queueBinding.queue}")
public void infoConsumption(String data, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {
log.info("重試次數 = {}",count++);
try {
// 處理主要業務
int i = 10 /0;
} catch (Exception e) {
// 處理業務失敗,還要進行其他操作,比如記錄失敗原因
log.info("記錄失敗原因 ====>");
throw new RuntimeException("手動抛出");
}
channel.basicAck(tag,false);
}
複制代碼
MessageReCoverer
上面的例子在測試中我們還發現了一個問題,就是經過 5 次重試以後,控制台輸出了一個異常的堆棧日志,然後隊列中的資料也被 ack 掉了(因為我配置了 auto, 自動ack模式)。如果你配置的是 manual(手動ack),結果就會如下:
五次重試後,消費處于一個未被确認的狀态。因為需要你手動 ack!下次服務重新開機的時候,會繼續消費這條消息。
首先我們先來看一下這個異常日志是什麼:
org.springframework.amqp.rabbit.support.ListenerExecutionFailedException:
Retry Policy Exhausted
複制代碼
出現上述異常的原因是因為在建構SimpleRabbitListenerContainerFactoryConfigurer類時使用了 MessageRecoverer接口,這個接口有一個recover方法,用來實作重試完成之後對消息的處理,源碼如下:
public final class SimpleRabbitListenerContainerFactoryConfigurer
extends AbstractRabbitListenerContainerFactoryConfigurer<SimpleRabbitListenerContainerFactory> {
@Override
public void configure(SimpleRabbitListenerContainerFactory factory, ConnectionFactory connectionFactory) {
PropertyMapper map = PropertyMapper.get();
RabbitProperties.SimpleContainer config = getRabbitProperties().getListener().getSimple();
configure(factory, connectionFactory, config); >> 1
map.from(config::getConcurrency).whenNonNull().to(factory::setConcurrentConsumers);
map.from(config::getMaxConcurrency).whenNonNull().to(factory::setMaxConcurrentConsumers);
map.from(config::getBatchSize).whenNonNull().to(factory::setBatchSize);
}
}
複制代碼
注意标記為 >> 1 的configure方法
ListenerRetry retryConfig = configuration.getRetry();
if (retryConfig.isEnabled()) {
RetryInterceptorBuilder<?, ?> builder = (retryConfig.isStateless()) ? RetryInterceptorBuilder.stateless()
: RetryInterceptorBuilder.stateful();
RetryTemplate retryTemplate = new RetryTemplateFactory(this.retryTemplateCustomizers)
.createRetryTemplate(retryConfig, RabbitRetryTemplateCustomizer.Target.LISTENER);
builder.retryOperations(retryTemplate);
MessageRecoverer recoverer = (this.messageRecoverer != null) ? this.messageRecoverer
: new RejectAndDontRequeueRecoverer(); //<1>
builder.recoverer(recoverer);
factory.setAdviceChain(builder.build());
複制代碼
注意看<1>處的代碼,預設使用的是RejectAndDontRequeueRecoverer類,這個類已經出現過了,注意筆者前面的那幾張圖。根據類的名字我們就可以看出來該實作類的作用就是拒絕并且不會将消息重新發回隊列,也就是說,重試之後如果還沒有成功,就認為該消息沒救了,放棄它了。我們可以看一下這個實作類的具體内容:
public class RejectAndDontRequeueRecoverer implements MessageRecoverer {
protected Log logger = LogFactory.getLog(RejectAndDontRequeueRecoverer.class); // NOSONAR protected
@Override
public void recover(Message message, Throwable cause) {
if (this.logger.isWarnEnabled()) {
this.logger.warn("Retries exhausted for message " + message, cause);
}
throw new ListenerExecutionFailedException("Retry Policy Exhausted",
new AmqpRejectAndDontRequeueException(cause), message);
}
}
複制代碼
上述源碼給出了異常的來源。
MessageRecoverer 接口中就一個recover方法,回調已消費但所有重試嘗試失敗的消息。
重寫了recover方法的有四個類,MessageBatchRecoverer這個不在本文範圍内。
而RejectAndDontRequeueRecoverer的功能已經看到過了,畢竟是預設的。那還有另外兩個實作類,分别是RepublishMessageRecoverer和ImmediateRequeueMessageRecoverer,意思大意分别是重新釋出消息和立即重新傳回原隊列,下面我們分别測試一下這兩個實作類的效果。
RepublishMessageRecoverer
将消息重新發送到指定隊列。先建立一個隊列,然後與交換機綁定進行綁定,綁定之後設定 MessageRecoverer。在 RabbitConfig類中增加代碼。跟死信隊列看起來差不多。
@Autowired
private RabbitTemplate rabbitTemplate;
private static String errorTopicExchange = "error-topic-exchange";
private static String errorQueue = "error-queue";
private static String errorRoutingKey = "error-routing-key";
//建立異常交換機
@Bean
public TopicExchange errorTopicExchange(){
return new TopicExchange(errorTopicExchange,true,false);
}
//建立異常隊列
@Bean
public Queue errorQueue(){
return new Queue(errorQueue,true);
}
//隊列與交換機進行綁定
@Bean
public Binding BindingErrorQueueAndExchange(Queue errorQueue,TopicExchange errorTopicExchange){
return BindingBuilder.bind(errorQueue).to(errorTopicExchange).with(errorRoutingKey);
}
//設定MessageRecoverer
@Bean
public MessageRecoverer messageRecoverer(){
//AmqpTemplate和RabbitTemplate都可以
return new RepublishMessageRecoverer(rabbitTemplate,errorTopicExchange,errorRoutingKey);
}
複制代碼
啟動服務,重新調用接口,檢視結果:
通過控制台可以看到,使用了我們所配置的RepublishMessageRecoverer,并且消息重試 5 次以後直接以新的 routingKey發送到了配置的交換機中,此時再檢視監控頁面,可以看原始隊列中已經沒有消息了,但是配置的異常隊列中存在了一條消息。
ImmediateRequeueMessageRecoverer
使用ImmediateRequeueMessageRecoverer,重試失敗的消息會立馬回到原隊列中。 修改messageRecoverer方法
@Bean
public MessageRecoverer messageRecoverer(){
return new ImmediateRequeueMessageRecoverer();
}
複制代碼
啟動服務,重新調用接口,檢視結果:
重試5次之後,傳回隊列,然後再被消費,繼續重試5次,周而複始直到消息被正常消費為止。
總結
通過上面的測試,對于重試之後仍然異常的消息,可以采用 RepublishMessageRecoverer,将消息發送到其他的隊列中,再專門針對新的隊列進行處理。
死信隊列
除了可以采用上述RepublishMessageRecoverer,還可以采用死信隊列的方式處理重試失敗的消息。這也是我們常用的方式。
建立死信交換機、死信隊列以及兩者的綁定
繼續在 RabbitConfig中增加配置
private static String dlTopicExchange = "dl-topic-exchange";
private static String dlQueue = "dl-queue";
private static String dlRoutingKey = "dl-routing-key";
//建立交換機
@Bean
public TopicExchange dlTopicExchange(){
return new TopicExchange(dlTopicExchange,true,false);
}
//建立隊列
@Bean
public Queue dlQueue(){
return new Queue(dlQueue,true);
}
//隊列與交換機進行綁定
@Bean
public Binding BindingDlQueueAndExchange(Queue dlQueue, TopicExchange dlTopicExchange){
return BindingBuilder.bind(dlQueue).to(dlTopicExchange).with(dlRoutingKey);
}
複制代碼
死信交換機的定義和普通交換機的定義完全相同,隊列綁定死信交換機與綁定普通交換機的方式完全相同,死信交換機就是一個普通的交換機,隻是換了一個叫法而已,沒有什麼特殊之處。
修改配置
修改業務隊列的配置,還有将之前提供的 MessageReCoverer進行注釋,不然死信交換機不會生效,會以我們所配置的MessageReCoverer為主。
/**
* 綁定死信交換機需要給隊列設定如下兩個參數
* 業務隊列
* @return
*/
@Bean
public Queue payQueue(){
Map<String,Object> params = new HashMap<>();
//聲明目前隊列綁定的死信交換機
params.put("x-dead-letter-exchange",dlTopicExchange);
//聲明目前隊列的死信路由鍵
params.put("x-dead-letter-routing-key",dlRoutingKey);
return QueueBuilder.durable(queueName).withArguments(params).build();
}
//設定MessageRecoverer
//@Bean
//public MessageRecoverer messageRecoverer() {
//AmqpTemplate和RabbitTemplate都可以
//return new ImmediateRequeueMessageRecoverer();
// }
複制代碼
啟動服務之前,需要将之前建立的隊列進行删除,因為本次隊列的配置有所改動。啟動成功,可以看到同時建立了業務隊列以及死信隊列、業務交換機、死信交換機。
在業務隊列上出現了 DLX 以及 DLK 的辨別,代表已經綁定了死信交換機以及死信路由鍵,此時調用生産者發送消息,消費者在重試5次後,由于MessageCover預設的實作類是RejectAndDontRequeueRecoverer,又因為業務隊列綁定了死信隊列,是以消息會從業務隊列中删除,同時發送到死信隊列中。