預設情況下,Rabbitmq消費者為單線程串行消費,這也是隊列的特性,當然在有些業務中需要并發消費,本文主要示例spring中和boot中指定消費者數量來并發消費。spring amqp文檔也有相關描述:文檔位址
通過設定Messagelistener 的containerconcurrency屬性來設定消費者數量
XML示例:
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual" concurrency= "5">
<rabbit:listener ref="myMessageListener" queue-names="hello"/>
</rabbit:listener-container>
Boot中消費者配置可以參考前面文章,但是在@RabbitListener中,concurrency并不是配置在注解中,而且通過配置containerFactory來設定:
定義containerFactory bean:
@Bean("pointTaskContainerFactory")
public SimpleRabbitListenerContainerFactory pointTaskContainerFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setPrefetchCount(PointTaskConstant.DEFAULT_PREFETCH_COUNT);
factory.setConcurrentConsumers(PointTaskConstant.DEFAULT_CONCURRENT);
configurer.configure(factory, connectionFactory);
return factory;
}
/**
* 消費者數量,預設10
*/
public static final int DEFAULT_CONCURRENT = 10;
/**
* 每個消費者擷取最大投遞數量 預設50
*/
public static final int DEFAULT_PREFETCH_COUNT = 50;
Listenner:
@RabbitListener(queues = MQConstant.POINT_TASK_QUEUE_NAME,containerFactory = "pointTaskContainerFactory")
MQConstant.POINT_TASK_QUEUE_NAME 為隊列名稱,常量,自定義
在多消費同時消費情況下,必須要考慮到多線程安全問題。