天天看點

spring-boot-rabbitmq動态管理

使用spring boot + rabbitmq的時候,在開發過程中,可能會想要臨時停用/啟用監聽,或修改監聽消費者數量。如果每次修改都重新開機比較浪費時間,是以研究了一下不停機就啟用停用監聽或修改一些配置

一. 關于rabbitmq監聽的配置

  • 配置屬性類:RabbitProperties,包含rabbitmq的認證、監聽、發送者以及其他的一些配置
  • 自動配置類:RabbitAutoConfiguration,主要配置rabbitmq的連接配接工廠和發送者等,不包含監聽的配置
  • rabbitmq監聽的配置是RabbitAnnotationDrivenConfiguration,是通過RabbitAutoConfiguration引入的
@Configuration
@ConditionalOnClass({ RabbitTemplate.class, Channel.class })
@EnableConfigurationProperties(RabbitProperties.class)
@Import(RabbitAnnotationDrivenConfiguration.class)
public class RabbitAutoConfiguration {
    ...
}
           
  • RabbitAnnotationDrivenConfiguration中主要就是監聽工廠的配置、監聽工廠,但是這裡也隻是建立bean,并沒有真正的初始化
  • 通過配置裡的bean類名,分析一下,rabbitmq的監聽肯定是由監聽工廠建立的,是以找到監聽工廠SimpleRabbitListenerContainerFactory
@Bean
@ConditionalOnMissingBean(name = "rabbitListenerContainerFactory")
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
		SimpleRabbitListenerContainerFactoryConfigurer configurer,
		ConnectionFactory connectionFactory) {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    configurer.configure(factory, connectionFactory);
    return factory;
}
           
  • 既然自動配置裡面沒有初始化監聽,那就應該是在其他地方調用的,進入監聽工廠類中,發現有initializeContainer(SimpleMessageListenerContainer instance)方法,猜測初始化肯定與這個方法有關,是以檢視有哪些地方調用,于是找到RabbitListenerEndpointRegistry.createListenerContainer(RabbitListenerEndpoint endpoint,RabbitListenerContainerFactory<?> factory)方法中有建立監聽容器和初始化的代碼
/**
 * Create and start a new {@link MessageListenerContainer} using the specified factory.
 * @param endpoint the endpoint to create a {@link MessageListenerContainer}.
 * @param factory the {@link RabbitListenerContainerFactory} to use.
 * @return the {@link MessageListenerContainer}.
 */
protected MessageListenerContainer createListenerContainer(RabbitListenerEndpoint endpoint,
		RabbitListenerContainerFactory<?> factory) {

    MessageListenerContainer listenerContainer = factory.createListenerContainer(endpoint);
    
    if (listenerContainer instanceof InitializingBean) {
    	try {
            ((InitializingBean) listenerContainer).afterPropertiesSet();
    	}
    	catch (Exception ex) {
            throw new BeanInitializationException("Failed to initialize message listener container", ex);
    	}
    }
    
    int containerPhase = listenerContainer.getPhase();
    if (containerPhase < Integer.MAX_VALUE) {  // a custom phase value
    	if (this.phase < Integer.MAX_VALUE && this.phase != containerPhase) {
            throw new IllegalStateException("Encountered phase mismatch between container factory definitions: " +
            		this.phase + " vs " + containerPhase);
    	}
    	this.phase = listenerContainer.getPhase();
    }
    
    return listenerContainer;
}
           
  • 繼續找調用這個方法的地方,找到RabbitListenerEndpointRegistrar.afterPropertiesSet()方法之後,發現調用的地方很多了
    spring-boot-rabbitmq動态管理
  • 看看afterPropertiesSet方法,是InitializingBean接口中的,猜測應該是spring容器建立bean之後都會調用的bean初始化的方法,是以查找找到RabbitListenerEndpointRegistrar是在哪裡建立的執行個體。原來是在RabbitListenerAnnotationBeanPostProcessor中的私有屬性,而RabbitListenerAnnotationBeanPostProcessor是在RabbitBootstrapConfiguration這個自動配置裡面初始化的,是以這就找到rabbitmq初始化監聽的源頭了

二. 動态管理rabbitmq監聽

  • 回到最初的問題,想要動态的啟用停用mq的監聽,是以先看看初始化配置的類,既然有初始化,那可能會有相關的管理,于是在RabbitListenerEndpointRegistry中找到了start()和stop()方法,裡面有對監聽容器進行操作,主要源碼如下
/**
 * @return the managed {@link MessageListenerContainer} instance(s).
 */
public Collection<MessageListenerContainer> getListenerContainers() {
    return Collections.unmodifiableCollection(this.listenerContainers.values());
}
	
@Override
public void start() {
    for (MessageListenerContainer listenerContainer : getListenerContainers()) {
    	startIfNecessary(listenerContainer);
    }
}

/**
 * Start the specified {@link MessageListenerContainer} if it should be started
 * on startup or when start is called explicitly after startup.
 * @see MessageListenerContainer#isAutoStartup()
 */
private void startIfNecessary(MessageListenerContainer listenerContainer) {
    if (this.contextRefreshed || listenerContainer.isAutoStartup()) {
    	listenerContainer.start();
    }
}

@Override
public void stop() {
    for (MessageListenerContainer listenerContainer : getListenerContainers()) {
    	listenerContainer.stop();
    }
}
           
  • 寫個controller,注入RabbitListenerEndpointRegistry,使用start()和stop()對監聽進行啟用停用的操作,并且RabbitListenerEndpointRegistry執行個體還可以擷取監聽容器,對監聽的一些參數也能進行修改,比如消費者數量。代碼如下:
import java.util.Set;

import javax.annotation.Resource;

import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistry;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import com.itopener.framework.ResultMap;

/**
 * Created by fuwei.deng on 2017年7月24日.
 */
@RestController
@RequestMapping("rabbitmq/listener")
public class RabbitMQController {

    @Resource
    private RabbitListenerEndpointRegistry rabbitListenerEndpointRegistry;
    
    @RequestMapping("stop")
    public ResultMap stop(){
    	rabbitListenerEndpointRegistry.stop();
    	return ResultMap.buildSuccess();
    }
    
    @RequestMapping("start")
    public ResultMap start(){
    	rabbitListenerEndpointRegistry.start();
    	return ResultMap.buildSuccess();
    }
    
    @RequestMapping("setup")
    public ResultMap setup(int consumer, int maxConsumer){
    	Set<String> containerIds = rabbitListenerEndpointRegistry.getListenerContainerIds();
    	SimpleMessageListenerContainer container = null;
    	for(String id : containerIds){
    		container = (SimpleMessageListenerContainer) rabbitListenerEndpointRegistry.getListenerContainer(id);
    		if(container != null){
    			container.setConcurrentConsumers(consumer);
    			container.setMaxConcurrentConsumers(maxConsumer);
    		}
    	}
    	return ResultMap.buildSuccess();
    }
    
}
           

轉載于:https://my.oschina.net/dengfuwei/blog/1595044