一、基礎知識
Spring 通過釋出事件的方式,可以通知觀察者(即事件監聽器)消費者的一些行為,消費者相關的事件如下所示:
- AsyncConsumerStartedEvent:An event that is published whenever a new consumer is started.
- AsyncConsumerStoppedEvent:An event that is published whenever a consumer is stopped (and not restarted).
- AsyncConsumerRestartedEvent:An event that is published whenever a consumer is restarted.
- ListenerContainerConsumerFailedEvent:Published when a listener consumer fails.
基于事件機制,可以通過監聽事件ListenerContainerConsumerFailedEvent,當有消費者發生緻命錯誤時,重新建立消費者消費消息,還可以将異常資訊通知負責人。
二、問題現象
正常情況下 RabbitMQ 中隊列是有消費者進行監聽消費的,但 MQ 重裝後有些隊列居然無消費者服務來監聽隊列?????????類似于下圖的情況:

在這說明一下啊,監聽 queue.test 隊列的消費者服務是啟動的,沒有挂!!!!為什麼會是 0 呢????
三、問題産生原因
當消費者服務隻是監聽
queue.test
時,代碼:
@RabbitListener(queues = "queue.test")
,生産者服務未啟動(queue.test 未建立),這時的消費者服務先啟動了就會發生上述問題。出現問題了,身為攻城獅就應該去解決啊,是以我在本地搭了一個 RabbitMQ,專門解決這個問題。怎麼搭的,大家可以百度去,一大把的教程,本人是建議去 RabbitMQ 官網去下載下傳 rpm 包安裝。傳送門。
四、解決辦法
産生這個問題的原因是因為撸代碼一時爽,一直撸一直爽。想想消費者監聽一個隊列是多麼容易啊,用注解
@RabbitListener
加上要監聽的 queue name 就完事了,結果是釀成了上面的慘案,導緻又得攻城獅祭天了。
其實啊,在這我們差了一件事,就是消費者其實也是需要寫代碼進行 exchange 和 queue 進行 binding 的,目的就是防止上述問題的産生,消費者遠先于生産者啟動,為什麼我要加個 “遠” 字呢,主要是因為消費者發現沒有要監聽的 queue 時,預設會進行三次重試監聽 queue,三次都失敗後就無法重試了(三次時間很短,預設10s一次),是以我在這用的 “遠”。綁定代碼如下:
/**
* @author ouyang
* @version 1.0
* @date 2019/9/18 17:34
**/
@ConfigurationProperties(value = "test")
@Component
@Configuration
public class TestRabbitMqConfig {
@Bean("queue.task")
public Queue createQueue() {
return new Queue(this.getQueueName(),true);
}
@Bean("exchange.task")
public TopicExchange createMessageExchange() {
return new TopicExchange(getTopicExchangeName());
}
@Bean("binding.task")
public Binding binding(@Qualifier("queue.task") Queue queue,
@Qualifier("exchange.task") TopicExchange messageExchange) {
return BindingBuilder.bind(queue).to(messageExchange).with("#");
}
private String queueName;
private String topicExchangeName;
public String getTopicExchangeName() {
return topicExchangeName;
}
public void setTopicExchangeName(String topicExchangeName) {
this.topicExchangeName = topicExchangeName;
}
public String getQueueName() {
return queueName;
}
public void setQueueName(String queueName) {
this.queueName = queueName;
}
}
假設我們現在的消費者服務将監聽的 queue 綁定了 exchange,也監聽了 queue,不要想着這就太平了,你仔細想想上面的話,還是可能存在不能監聽的情況的,比如,手賤不小心将 queue 删除了,監聽過程中出現了異常,這時我們是可以使用
ListenerContainerConsumerFailedEvent
,通過實作該接口可以處理異常資訊,當遇到監聽異常時,我們可以停止監聽,然後重新監聽隊列,當然,還有很多很多操作,比如,監聽異常後郵件、短信等通知相關人員。
簡單的示例代碼:
/**
* @author ouyang
* @version 1.0
* @date 2020/2/19 20:46
**/
@Component
public class ListenerContainerConsumerFailedEventListener implements ApplicationListener<ListenerContainerConsumerFailedEvent> {
private final Logger logger = LoggerFactory.getLogger(ListenerContainerConsumerFailedEventListener.class);
@Override
public void onApplicationEvent(ListenerContainerConsumerFailedEvent event) {
logger.error("listener queue failed:{}", event);
if(event.isFatal()) {
logger.error("reason:{}, 錯誤:{}", event.getReason(), event.getThrowable());
SimpleMessageListenerContainer container = (SimpleMessageListenerContainer) event.getSource();
String queueNames = Arrays.toString(container.getQueueNames());
//重新開機
try {
// 暫停10s
Thread.sleep(10000);
try {
container.stop();
} catch (Exception e) {
logger.error("stop listener queue {} failed!", queueNames);
}
Assert.state(!container.isRunning(), "listener queue: " + container + " is running!");
container.start();
logger.info("restat listener queue {} successed !", queueNames);
} catch (Exception e) {
logger.error("restat listener queue {} failed!", queueNames, e);
}
}
}
}
做到這裡,基本上不會出現問題了。