天天看點

rabbitmq 多個消費者消費一個隊列_RabbitMQ異常監控及動态控制隊列消費的解決方案...

最近,随着系統的上線,在RabbitMQ的使用上遇到了一些問題,現将其分析總結及對應的解決方法分享出來,以便大家借鑒參考。如有不對的地方歡迎指正交流。

rabbitmq 多個消費者消費一個隊列_RabbitMQ異常監控及動态控制隊列消費的解決方案...

1.遇到的問題

(1)某個消費者線程因記憶體溢出而挂掉,造成對應的隊列沒有消費者,消息在MQ Server堆積,而系統缺少對該類異常的監控,無法及時有效的進行處理。

(2)在一些業務場景,消息的消費速度遠低于生産速度,造成大量消息堆積在MQ Server,系統沒有提供相應的機制來動态擴充消息的消費速度。

(3)聯調測試時,某些場景需要停止(或重新開機)用戶端對消息隊列的監聽,系統沒有處理這類操作的功能。

2.解決方案

2.1準備

springboot項目,隻需添加下面的依賴即可使用RabbitMQ。

org.springframework.boot spring-boot-starter-amqp
           

2.2對消費者進行異常監控

spring通過釋出事件的方式,可以通知觀察者(即事件監聽器)消費者的一些行為,消費者相關的事件如下所示。

AsyncConsumerStartedEvent:An event that is published whenever a new consumer is started.AsyncConsumerStoppedEvent:An event that is published whenever a consumer is stopped (and not restarted).AsyncConsumerRestartedEvent:An event that is published whenever a consumer is restarted.ListenerContainerConsumerFailedEvent:Published when a listener consumer fails.
           

基于事件機制,可以通過監聽事件ListenerContainerConsumerFailedEvent,當有消費者發生緻命錯誤時,重新建立消費者消費消息,并發送告警資訊給相關責任人。具體實作如下:

import java.util.Arrays;import org.springframework.amqp.rabbit.listener.ListenerContainerConsumerFailedEvent;import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;import org.springframework.context.ApplicationListener;import org.springframework.stereotype.Component;import org.springframework.util.Assert;import lombok.extern.slf4j.Slf4j;/** * MQ消費者失敗事件監聽器 */@[email protected] class ListenerContainerConsumerFailedEventListener implements ApplicationListener { @Override public void onApplicationEvent(ListenerContainerConsumerFailedEvent event) { log.error("消費者失敗事件發生:{}