問題現象
由于項目需要連接配接到多個RabbitMQ執行個體或者一個執行個體的多個vhost上,需要自定義配置多個ConnectionFactory來區分連接配接,以達到可以消費或者向多個RabbitMQ執行個體/多個vhost發送消息。手動配置ConnectionFactory後,發現原來配置的發送确認回調無效了,ConnectionFactory的配置如下,
/**
* 自定義RabbitMQ不同執行個體/不同vhost的ConnectionFactory
* @param rabbitProperties 配置檔案中rabbitMQ的配置屬性
* @return
*/
@Bean("defaultRabbitConnectionFactory")
@Primary
public ConnectionFactory defaultRabbitConnectionFactory(RabbitProperties rabbitProperties){
CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
cachingConnectionFactory.setHost(rabbitProperties.getHost());
cachingConnectionFactory.setPort(rabbitProperties.getPort());
cachingConnectionFactory.setUsername(rabbitProperties.getUsername());
cachingConnectionFactory.setPassword(rabbitProperties.getPassword());
cachingConnectionFactory.setVirtualHost("/");
return cachingConnectionFactory;
}
/**
* 配置自定義的RabbitTemplate模闆
* @param connectionFactory 連接配接工廠
* @param confirmCallback 消息發送後回調的執行個體對象
* @return
*/
@Bean("rabbitTemplate")
@Primary
public RabbitTemplate defaultRabbitTemplate(ConnectionFactory connectionFactory, MQConfirmCallback confirmCallback) {
CachingConnectionFactory cachingConnectionFactory = (CachingConnectionFactory) connectionFactory;
cachingConnectionFactory.setVirtualHost("/");
RabbitTemplate rabbitTemplate = new CorrelateRabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
rabbitTemplate.setConfirmCallback(confirmCallback);
return rabbitTemplate;
}
原因分析
從上面的自定義的ConnectionFactory配置來看,配置了連接配接的RabbitMQ執行個體位址和vhost,以及RabbitTemplate中配置定義的ConfirmCallback對象執行個體。此時即使在Properties檔案中配置了
spring.rabbitmq.publisher-confirm-type=correlated
,在發送消息後是不會對實作了
RabbitTemplate.ConfirmCallback
接口的Bean對象發起回調的。跟蹤RabbitTemplate.java發送消息的接口,在RabbitTemplate的源碼中,發現在
doSend
方法中,調用了
setupConfirm
方法,用于設定回調,源碼如下:
/**
* Send the given message to the specified exchange.
*
* @param channel The RabbitMQ Channel to operate within.
* @param exchangeArg The name of the RabbitMQ exchange to send to.
* @param routingKeyArg The routing key.
* @param message The Message to send.
* @param mandatory The mandatory flag.
* @param correlationData The correlation data.
* @throws IOException If thrown by RabbitMQ API methods.
*/
public void doSend(Channel channel, String exchangeArg, String routingKeyArg, Message message,
boolean mandatory, @Nullable CorrelationData correlationData) throws IOException {
String exch = nullSafeExchange(exchangeArg);
String rKey = nullSafeRoutingKey(routingKeyArg);
if (logger.isTraceEnabled()) {
logger.trace("Original message to publish: " + message);
}
Message messageToUse = message;
MessageProperties messageProperties = messageToUse.getMessageProperties();
if (mandatory) {
messageProperties.getHeaders().put(PublisherCallbackChannel.RETURN_LISTENER_CORRELATION_KEY, this.uuid);
}
if (this.beforePublishPostProcessors != null) {
for (MessagePostProcessor processor : this.beforePublishPostProcessors) {
messageToUse = processor.postProcessMessage(messageToUse, correlationData, exch, rKey);
}
}
setupConfirm(channel, messageToUse, correlationData);
if (this.userIdExpression != null && messageProperties.getUserId() == null) {
String userId = this.userIdExpression.getValue(this.evaluationContext, messageToUse, String.class);
if (userId != null) {
messageProperties.setUserId(userId);
}
}
if (logger.isDebugEnabled()) {
logger.debug("Publishing message [" + messageToUse
+ "] on exchange [" + exch + "], routingKey = [" + rKey + "]");
}
sendToRabbit(channel, exch, rKey, mandatory, messageToUse);
// Check if commit needed
if (isChannelLocallyTransacted(channel)) {
// Transacted channel created by this template -> commit.
RabbitUtils.commitIfNecessary(channel);
}
}
private void setupConfirm(Channel channel, Message message, @Nullable CorrelationData correlationDataArg) {
if ((this.publisherConfirms || this.confirmCallback != null) && channel instanceof PublisherCallbackChannel) {
PublisherCallbackChannel publisherCallbackChannel = (PublisherCallbackChannel) channel;
CorrelationData correlationData = this.correlationDataPostProcessor != null
? this.correlationDataPostProcessor.postProcess(message, correlationDataArg)
: correlationDataArg;
long nextPublishSeqNo = channel.getNextPublishSeqNo();
message.getMessageProperties().setPublishSequenceNumber(nextPublishSeqNo);
publisherCallbackChannel.addPendingConfirm(this, nextPublishSeqNo,
new PendingConfirm(correlationData, System.currentTimeMillis()));
if (correlationData != null && StringUtils.hasText(correlationData.getId())) {
message.getMessageProperties().setHeader(PublisherCallbackChannel.RETURNED_MESSAGE_CORRELATION_KEY,
correlationData.getId());
}
}
else if (channel instanceof ChannelProxy && ((ChannelProxy) channel).isConfirmSelected()) {
long nextPublishSeqNo = channel.getNextPublishSeqNo();
message.getMessageProperties().setPublishSequenceNumber(nextPublishSeqNo);
}
}
通過源碼發現在發送消息前,在setupConfirm方法中設定具體的回調對象,要設定回調對象,則首先要判斷是否開啟了發送确認或者回調對象不為空,此時,這個條件肯定滿足,還有個關鍵點就是
channel instanceof PublisherCallbackChannel
,也就是說目前發送消息的
Channel
必須是
PublisherCallbackChannel
,既然是從Channel中判斷的,那麼這個設定的地方應該就Connect有關系了,因為在沒有手動配置ConnectFactory之前,使用預設的配置,回調時沒有問題,手動配置ConnectFacotry後,發送确認回調就無效了,此時,可以基本判斷問題出現在自定義的ConnectFactory上。此時問題就要回到确認這個Channel是什麼時候建立的以及建立時使用了哪些配置條件。
回到ConnectionFactory定義的地方,檢查new的動作,注意在
new CachingConnectionFactory
時,内部的屬性
private PublisherCallbackChannelFactory publisherChannelFactory = PublisherCallbackChannelImpl.factory();
,器publisherCallbackChannelImpl.factory(),此時建立的正是PublisherCallbackChannelImpl
public static PublisherCallbackChannelFactory factory() {
return (channel, exec) -> new PublisherCallbackChannelImpl(channel, exec);
}
在服務啟動時,最終會執行到
CachingConnectionFactory#doCreateBareChannel
方法,在705行的if條件中,需要判斷
this.ConfirmType
是否是
ConfirmType.CORRELATED
類型的确認,我們再來看
this.ConfirmType
是從什麼時候設定值的。
[外鍊圖檔轉存失敗,源站可能有防盜鍊機制,建議将圖檔儲存下來直接上傳(img-42lButdl-1622359786369)(/Users/yuxiao/Documents/rabbitmq-publisher-channel.png)]
從源碼得知,
CachingConnectionFactory
中的
confirmType
的預設值為
ConfirmType.NONE
,那麼至此可以明白此值應該可以由外界配置傳入,既然properties檔案中已經配置了``spring.rabbitmq.publisher-confirm-type=correlated
,那麼應該就是在自定義
CachingConnectionFactory`的時候沒有設定該值,至此問題得以了然。
解決方案
需要再聲明自定的ConnectionFactory時,配置confirm-type,完成的ConnectionFactory的定義如下:
/**
* 自定義RabbitMQ不同執行個體/不同vhost的ConnectionFactory
* @param rabbitProperties 配置檔案中rabbitMQ的配置屬性
* @return
*/
@Bean("defaultRabbitConnectionFactory")
@Primary
public ConnectionFactory defaultRabbitConnectionFactory(RabbitProperties rabbitProperties){
CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
cachingConnectionFactory.setHost(rabbitProperties.getHost());
cachingConnectionFactory.setPort(rabbitProperties.getPort());
cachingConnectionFactory.setUsername(rabbitProperties.getUsername());
cachingConnectionFactory.setPassword(rabbitProperties.getPassword());
cachingConnectionFactory.setVirtualHost("/");
cachingConnectionFactory.setCacheMode(rabbitProperties.getCache().getConnection().getMode());
// 配置發送确認回調時,次配置必須配置,否則即使在RabbitTemplate配置了ConfirmCallback也不會生效
cachingConnectionFactory.setPublisherConfirmType(rabbitProperties.getPublisherConfirmType());
return cachingConnectionFactory;
}
參考資料
rabbitmq:publisher confirms - 知乎 (zhihu.com)