最近,随着系統的上線,在RabbitMQ的使用上遇到了一些問題,現将其分析總結及對應的解決方法分享出來,以便大家借鑒參考。如有不對的地方歡迎指正交流。
![](https://img.laitimes.com/img/__Qf2AjLwojIjJCLyojI0JCLicmbw5iZ0UDZiJWMzUmYlJGOmZTOihzN5kjYhZzY3ETO1ETZ38CX0JXZ252bj91Ztl2Lc52YucWbp5GZzNmLn9Gbi1yZtl2Lc9CX6MHc0RHaiojIsJye.png)
1.遇到的問題
(1)某個消費者線程因記憶體溢出而挂掉,造成對應的隊列沒有消費者,消息在MQ Server堆積,而系統缺少對該類異常的監控,無法及時有效的進行處理。
(2)在一些業務場景,消息的消費速度遠低于生産速度,造成大量消息堆積在MQ Server,系統沒有提供相應的機制來動态擴充消息的消費速度。
(3)聯調測試時,某些場景需要停止(或重新開機)用戶端對消息隊列的監聽,系統沒有處理這類操作的功能。
2.解決方案
2.1準備
springboot項目,隻需添加下面的依賴即可使用RabbitMQ。
org.springframework.boot spring-boot-starter-amqp
2.2對消費者進行異常監控
spring通過釋出事件的方式,可以通知觀察者(即事件監聽器)消費者的一些行為,消費者相關的事件如下所示。
AsyncConsumerStartedEvent:An event that is published whenever a new consumer is started.AsyncConsumerStoppedEvent:An event that is published whenever a consumer is stopped (and not restarted).AsyncConsumerRestartedEvent:An event that is published whenever a consumer is restarted.ListenerContainerConsumerFailedEvent:Published when a listener consumer fails.
基于事件機制,可以通過監聽事件ListenerContainerConsumerFailedEvent,當有消費者發生緻命錯誤時,重新建立消費者消費消息,并發送告警資訊給相關責任人。具體實作如下:
import java.util.Arrays;import org.springframework.amqp.rabbit.listener.ListenerContainerConsumerFailedEvent;import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;import org.springframework.context.ApplicationListener;import org.springframework.stereotype.Component;import org.springframework.util.Assert;import lombok.extern.slf4j.Slf4j;/** * MQ消費者失敗事件監聽器 */@[email protected] class ListenerContainerConsumerFailedEventListener implements ApplicationListener { @Override public void onApplicationEvent(ListenerContainerConsumerFailedEvent event) { log.error("消費者失敗事件發生:{}