天天看點

一文教你如何解決RabbitMQ隊列無消費者

一、基礎知識

Spring 通過釋出事件的方式,可以通知觀察者(即事件監聽器)消費者的一些行為,消費者相關的事件如下所示:

  • AsyncConsumerStartedEvent:An event that is published whenever a new consumer is started.
  • AsyncConsumerStoppedEvent:An event that is published whenever a consumer is stopped (and not restarted).
  • AsyncConsumerRestartedEvent:An event that is published whenever a consumer is restarted.
  • ListenerContainerConsumerFailedEvent:Published when a listener consumer fails.

基于事件機制,可以通過監聽事件ListenerContainerConsumerFailedEvent,當有消費者發生緻命錯誤時,重新建立消費者消費消息,還可以将異常資訊通知負責人。

二、問題現象

正常情況下 RabbitMQ 中隊列是有消費者進行監聽消費的,但 MQ 重裝後有些隊列居然無消費者服務來監聽隊列?????????類似于下圖的情況:

一文教你如何解決RabbitMQ隊列無消費者

在這說明一下啊,監聽 queue.test 隊列的消費者服務是啟動的,沒有挂!!!!為什麼會是 0 呢????

三、問題産生原因

當消費者服務隻是監聽

queue.test

時,代碼:

@RabbitListener(queues = "queue.test")

,生産者服務未啟動(queue.test 未建立),這時的消費者服務先啟動了就會發生上述問題。出現問題了,身為攻城獅就應該去解決啊,是以我在本地搭了一個 RabbitMQ,專門解決這個問題。怎麼搭的,大家可以百度去,一大把的教程,本人是建議去 RabbitMQ 官網去下載下傳 rpm 包安裝。傳送門。

四、解決辦法

産生這個問題的原因是因為撸代碼一時爽,一直撸一直爽。想想消費者監聽一個隊列是多麼容易啊,用注解

@RabbitListener

加上要監聽的 queue name 就完事了,結果是釀成了上面的慘案,導緻又得攻城獅祭天了。

其實啊,在這我們差了一件事,就是消費者其實也是需要寫代碼進行 exchange 和 queue 進行 binding 的,目的就是防止上述問題的産生,消費者遠先于生産者啟動,為什麼我要加個 “遠” 字呢,主要是因為消費者發現沒有要監聽的 queue 時,預設會進行三次重試監聽 queue,三次都失敗後就無法重試了(三次時間很短,預設10s一次),是以我在這用的 “遠”。綁定代碼如下:

/**
 * @author ouyang
 * @version 1.0
 * @date 2019/9/18 17:34
 **/
@ConfigurationProperties(value = "test")
@Component
@Configuration
public class TestRabbitMqConfig {

	@Bean("queue.task")
	public Queue createQueue() {
		return new Queue(this.getQueueName(),true);
	}

	@Bean("exchange.task")
	public TopicExchange createMessageExchange() {
		return new TopicExchange(getTopicExchangeName());
	}

	@Bean("binding.task")
	public Binding binding(@Qualifier("queue.task") Queue queue,
						   @Qualifier("exchange.task") TopicExchange messageExchange) {
		return BindingBuilder.bind(queue).to(messageExchange).with("#");
	}

	private String queueName;
	private String topicExchangeName;

	public String getTopicExchangeName() {
		return topicExchangeName;
	}

	public void setTopicExchangeName(String topicExchangeName) {
		this.topicExchangeName = topicExchangeName;
	}

	public String getQueueName() {
		return queueName;
	}

	public void setQueueName(String queueName) {
		this.queueName = queueName;
	}
}

           

假設我們現在的消費者服務将監聽的 queue 綁定了 exchange,也監聽了 queue,不要想着這就太平了,你仔細想想上面的話,還是可能存在不能監聽的情況的,比如,手賤不小心将 queue 删除了,監聽過程中出現了異常,這時我們是可以使用

ListenerContainerConsumerFailedEvent

,通過實作該接口可以處理異常資訊,當遇到監聽異常時,我們可以停止監聽,然後重新監聽隊列,當然,還有很多很多操作,比如,監聽異常後郵件、短信等通知相關人員。

簡單的示例代碼:

/**
 * @author ouyang
 * @version 1.0
 * @date 2020/2/19 20:46
 **/
@Component
public class ListenerContainerConsumerFailedEventListener implements ApplicationListener<ListenerContainerConsumerFailedEvent> {

    private final Logger logger = LoggerFactory.getLogger(ListenerContainerConsumerFailedEventListener.class);

    @Override
    public void onApplicationEvent(ListenerContainerConsumerFailedEvent event) {
        logger.error("listener queue failed:{}", event);

        if(event.isFatal()) {
            logger.error("reason:{}, 錯誤:{}", event.getReason(), event.getThrowable());

            SimpleMessageListenerContainer container = (SimpleMessageListenerContainer) event.getSource();
            String queueNames = Arrays.toString(container.getQueueNames());

            //重新開機
            try {
                // 暫停10s
                Thread.sleep(10000);

                try {
                    container.stop();
                } catch (Exception e) {
                    logger.error("stop listener queue {} failed!", queueNames);
                }

                Assert.state(!container.isRunning(), "listener queue: " + container  + " is running!");

                container.start();

                logger.info("restat listener queue {} successed !", queueNames);
            } catch (Exception e) {
                logger.error("restat listener queue {} failed!", queueNames, e);
            }
        }
    }
}

           

做到這裡,基本上不會出現問題了。