天天看點

SpringBoot + RabbitMQ自定義ConnectionFacotry後,配置發送确認回調無效

問題現象

由于項目需要連接配接到多個RabbitMQ執行個體或者一個執行個體的多個vhost上,需要自定義配置多個ConnectionFactory來區分連接配接,以達到可以消費或者向多個RabbitMQ執行個體/多個vhost發送消息。手動配置ConnectionFactory後,發現原來配置的發送确認回調無效了,ConnectionFactory的配置如下,

/**
 * 自定義RabbitMQ不同執行個體/不同vhost的ConnectionFactory
 * @param rabbitProperties 配置檔案中rabbitMQ的配置屬性
 * @return
 */
@Bean("defaultRabbitConnectionFactory")
@Primary
public ConnectionFactory defaultRabbitConnectionFactory(RabbitProperties rabbitProperties){
  CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
  cachingConnectionFactory.setHost(rabbitProperties.getHost());
  cachingConnectionFactory.setPort(rabbitProperties.getPort());
  cachingConnectionFactory.setUsername(rabbitProperties.getUsername());
  cachingConnectionFactory.setPassword(rabbitProperties.getPassword());
  cachingConnectionFactory.setVirtualHost("/");
  return cachingConnectionFactory;
}

/**
 * 配置自定義的RabbitTemplate模闆
 * @param connectionFactory 連接配接工廠
 * @param confirmCallback 消息發送後回調的執行個體對象
 * @return
 */
@Bean("rabbitTemplate")
@Primary
public RabbitTemplate defaultRabbitTemplate(ConnectionFactory connectionFactory, MQConfirmCallback confirmCallback) {
  CachingConnectionFactory cachingConnectionFactory = (CachingConnectionFactory) connectionFactory;
  cachingConnectionFactory.setVirtualHost("/");
  RabbitTemplate rabbitTemplate = new CorrelateRabbitTemplate(connectionFactory);
  rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
  rabbitTemplate.setConfirmCallback(confirmCallback);
  return rabbitTemplate;
}
           

原因分析

從上面的自定義的ConnectionFactory配置來看,配置了連接配接的RabbitMQ執行個體位址和vhost,以及RabbitTemplate中配置定義的ConfirmCallback對象執行個體。此時即使在Properties檔案中配置了

spring.rabbitmq.publisher-confirm-type=correlated

,在發送消息後是不會對實作了

RabbitTemplate.ConfirmCallback

接口的Bean對象發起回調的。跟蹤RabbitTemplate.java發送消息的接口,在RabbitTemplate的源碼中,發現在

doSend

方法中,調用了

setupConfirm

方法,用于設定回調,源碼如下:

/**
	 * Send the given message to the specified exchange.
	 *
	 * @param channel The RabbitMQ Channel to operate within.
	 * @param exchangeArg The name of the RabbitMQ exchange to send to.
	 * @param routingKeyArg The routing key.
	 * @param message The Message to send.
	 * @param mandatory The mandatory flag.
	 * @param correlationData The correlation data.
	 * @throws IOException If thrown by RabbitMQ API methods.
	 */
	public void doSend(Channel channel, String exchangeArg, String routingKeyArg, Message message,
			boolean mandatory, @Nullable CorrelationData correlationData) throws IOException {

		String exch = nullSafeExchange(exchangeArg);
		String rKey = nullSafeRoutingKey(routingKeyArg);

		if (logger.isTraceEnabled()) {
			logger.trace("Original message to publish: " + message);
		}

		Message messageToUse = message;
		MessageProperties messageProperties = messageToUse.getMessageProperties();
		if (mandatory) {
			messageProperties.getHeaders().put(PublisherCallbackChannel.RETURN_LISTENER_CORRELATION_KEY, this.uuid);
		}
		if (this.beforePublishPostProcessors != null) {
			for (MessagePostProcessor processor : this.beforePublishPostProcessors) {
				messageToUse = processor.postProcessMessage(messageToUse, correlationData, exch, rKey);
			}
		}
		setupConfirm(channel, messageToUse, correlationData);
		if (this.userIdExpression != null && messageProperties.getUserId() == null) {
			String userId = this.userIdExpression.getValue(this.evaluationContext, messageToUse, String.class);
			if (userId != null) {
				messageProperties.setUserId(userId);
			}
		}
		if (logger.isDebugEnabled()) {
			logger.debug("Publishing message [" + messageToUse
					+ "] on exchange [" + exch + "], routingKey = [" + rKey + "]");
		}
		sendToRabbit(channel, exch, rKey, mandatory, messageToUse);
		// Check if commit needed
		if (isChannelLocallyTransacted(channel)) {
			// Transacted channel created by this template -> commit.
			RabbitUtils.commitIfNecessary(channel);
		}
	}

private void setupConfirm(Channel channel, Message message, @Nullable CorrelationData correlationDataArg) {
		if ((this.publisherConfirms || this.confirmCallback != null) && channel instanceof PublisherCallbackChannel) {

			PublisherCallbackChannel publisherCallbackChannel = (PublisherCallbackChannel) channel;
			CorrelationData correlationData = this.correlationDataPostProcessor != null
					? this.correlationDataPostProcessor.postProcess(message, correlationDataArg)
					: correlationDataArg;
			long nextPublishSeqNo = channel.getNextPublishSeqNo();
			message.getMessageProperties().setPublishSequenceNumber(nextPublishSeqNo);
			publisherCallbackChannel.addPendingConfirm(this, nextPublishSeqNo,
					new PendingConfirm(correlationData, System.currentTimeMillis()));
			if (correlationData != null && StringUtils.hasText(correlationData.getId())) {
				message.getMessageProperties().setHeader(PublisherCallbackChannel.RETURNED_MESSAGE_CORRELATION_KEY,
						correlationData.getId());
			}
		}
		else if (channel instanceof ChannelProxy && ((ChannelProxy) channel).isConfirmSelected()) {
			long nextPublishSeqNo = channel.getNextPublishSeqNo();
			message.getMessageProperties().setPublishSequenceNumber(nextPublishSeqNo);
		}
	}
           

通過源碼發現在發送消息前,在setupConfirm方法中設定具體的回調對象,要設定回調對象,則首先要判斷是否開啟了發送确認或者回調對象不為空,此時,這個條件肯定滿足,還有個關鍵點就是

channel instanceof PublisherCallbackChannel

,也就是說目前發送消息的

Channel

必須是

PublisherCallbackChannel

,既然是從Channel中判斷的,那麼這個設定的地方應該就Connect有關系了,因為在沒有手動配置ConnectFactory之前,使用預設的配置,回調時沒有問題,手動配置ConnectFacotry後,發送确認回調就無效了,此時,可以基本判斷問題出現在自定義的ConnectFactory上。此時問題就要回到确認這個Channel是什麼時候建立的以及建立時使用了哪些配置條件。

回到ConnectionFactory定義的地方,檢查new的動作,注意在

new CachingConnectionFactory

時,内部的屬性

private PublisherCallbackChannelFactory publisherChannelFactory = PublisherCallbackChannelImpl.factory();

,器publisherCallbackChannelImpl.factory(),此時建立的正是PublisherCallbackChannelImpl

public static PublisherCallbackChannelFactory factory() {
		return (channel, exec) -> new PublisherCallbackChannelImpl(channel, exec);
}
           

在服務啟動時,最終會執行到

CachingConnectionFactory#doCreateBareChannel

方法,在705行的if條件中,需要判斷

this.ConfirmType

是否是

ConfirmType.CORRELATED

類型的确認,我們再來看

this.ConfirmType

是從什麼時候設定值的。

[外鍊圖檔轉存失敗,源站可能有防盜鍊機制,建議将圖檔儲存下來直接上傳(img-42lButdl-1622359786369)(/Users/yuxiao/Documents/rabbitmq-publisher-channel.png)]

從源碼得知,

CachingConnectionFactory

中的

confirmType

的預設值為

ConfirmType.NONE

,那麼至此可以明白此值應該可以由外界配置傳入,既然properties檔案中已經配置了``spring.rabbitmq.publisher-confirm-type=correlated

,那麼應該就是在自定義

CachingConnectionFactory`的時候沒有設定該值,至此問題得以了然。

解決方案

需要再聲明自定的ConnectionFactory時,配置confirm-type,完成的ConnectionFactory的定義如下:

/**
     * 自定義RabbitMQ不同執行個體/不同vhost的ConnectionFactory
     * @param rabbitProperties 配置檔案中rabbitMQ的配置屬性
     * @return
     */
    @Bean("defaultRabbitConnectionFactory")
    @Primary
    public ConnectionFactory defaultRabbitConnectionFactory(RabbitProperties rabbitProperties){
        CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
        cachingConnectionFactory.setHost(rabbitProperties.getHost());
        cachingConnectionFactory.setPort(rabbitProperties.getPort());
        cachingConnectionFactory.setUsername(rabbitProperties.getUsername());
        cachingConnectionFactory.setPassword(rabbitProperties.getPassword());
        cachingConnectionFactory.setVirtualHost("/");
        cachingConnectionFactory.setCacheMode(rabbitProperties.getCache().getConnection().getMode());
        // 配置發送确認回調時,次配置必須配置,否則即使在RabbitTemplate配置了ConfirmCallback也不會生效
        cachingConnectionFactory.setPublisherConfirmType(rabbitProperties.getPublisherConfirmType());
        return cachingConnectionFactory;
    }
           

參考資料

rabbitmq:publisher confirms - 知乎 (zhihu.com)

繼續閱讀