【服務端确認】
1.确認message成功發送到交換器:exchange
2.确認message成功從exchange發送到了隊列:queue
# 導入依賴
<!-- spring-rabbit -->
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>1.7.14.RELEASE</version>
<exclusions>
<exclusion>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- amqp-client -->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>4.12.0</version>
</dependency>
# 消息提供者配置
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xmlns:p="http://www.springframework.org/schema/p"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:aop="http://www.springframework.org/schema/aop"
xmlns:tx="http://www.springframework.org/schema/tx"
xsi:schemaLocation="http://www.springframework.org/schema/rabbit
https://www.springframework.org/schema/rabbit/spring-rabbit.xsd
http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans-4.3.xsd
http://www.springframework.org/schema/context
https://www.springframework.org/schema/context/spring-context-4.3.xsd
http://www.springframework.org/schema/aop
https://www.springframework.org/schema/aop/spring-aop-4.3.xsd
http://www.springframework.org/schema/tx
https://www.springframework.org/schema/tx/spring-tx-4.3.xsd">
<context:component-scan base-package="com.learn" />
<bean id="msgConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter" />
<!-- 1.定義連接配接工廠 publisher-confirms="true" 表示使用了消息确認,此時不可以再使用事務 -->
<rabbit:connection-factory id="connectionFactory"
host="192.168.199.237"
port="5672"
username="app_client"
password="client123"
virtual-host="/app"
publisher-confirms="true"
publisher-returns="true" />
<!-- 配置消息發送确認回調方法-->
<bean id="confirmCallback" class="com.learn.config.MsgSendConfirmCallback"></bean>
<!-- 2.定義 rabbitmq 的模闆 -->
<rabbit:template id="template"
message-converter="msgConverter"
connection-factory="connectionFactory"
exchange="my_direct_exchange" reply-timeout="2000"
routing-key="my.binding"
confirm-callback="confirmCallback"
return-callback="confirmCallback"
mandatory="true" />
<rabbit:admin connection-factory="connectionFactory" auto-startup="true"/>
<!-- 1.定義隊列 -->
<rabbit:queue id="my_queue"
name="my_queue"
auto-declare="true"
durable="true"
auto-delete="false"
exclusive="false"/>
<!-- 2.定義交換機 -->
<rabbit:direct-exchange id="my_direct_exchange"
name="my_direct_exchange"
auto-declare="true"
durable="true"
auto-delete="true">
<rabbit:bindings>
<rabbit:binding queue="my_queue" key="my.binding" />
</rabbit:bindings>
</rabbit:direct-exchange>
</beans>
# 消息确認回調
package com.learn.config;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import javax.annotation.PostConstruct;
public class MsgSendConfirmCallback implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void init() {
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnCallback(this);
rabbitTemplate.setMandatory(true);
}
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if(ack) {
System.out.println("消息發送成功: " + correlationData);
} else {
System.out.println("消息發送失敗,id= " + correlationData + " 原因:" + cause);
}
}
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
System.out.println("returned message: " + new String(message.getBody()));
System.out.println("應答碼: " + replyCode);
System.out.println("描述:" + replyText);
System.out.println("消息使用的交換器 exchange : " + exchange);
System.out.println("消息使用的路由鍵 routing : " + routingKey);
}
}
【消費者端确認】
# 消息消費者配置
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xmlns:p="http://www.springframework.org/schema/p"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:aop="http://www.springframework.org/schema/aop"
xmlns:tx="http://www.springframework.org/schema/tx"
xsi:schemaLocation="http://www.springframework.org/schema/rabbit
https://www.springframework.org/schema/rabbit/spring-rabbit.xsd
http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans-4.3.xsd
http://www.springframework.org/schema/context
https://www.springframework.org/schema/context/spring-context-4.3.xsd
http://www.springframework.org/schema/aop
https://www.springframework.org/schema/aop/spring-aop-4.3.xsd
http://www.springframework.org/schema/tx
https://www.springframework.org/schema/tx/spring-tx-4.3.xsd">
<context:component-scan base-package="com.learn" />
<!-- 1.配置連接配接工廠 -->
<rabbit:connection-factory id="connectionFactory"
host="192.168.199.237"
port="5672"
username="app_client"
password="client123"
virtual-host="/app"
connection-timeout="3000"
channel-cache-size="5"/>
<!-- 2. rabbitmq的管理:包括隊列、交換機的聲明 -->
<rabbit:admin connection-factory="connectionFactory" />
<!-- 3.配置隊列 -->
<rabbit:queue name="my_queue" auto-declare="true" auto-delete="false" exclusive="false" durable="true"/>
<!-- 4.定義交換機 + 綁定隊列 + 指定比對條件表達式(bindingkey) -->
<rabbit:direct-exchange name="my_direct_exchange" auto-declare="true" durable="true" auto-delete="false">
<rabbit:bindings>
<rabbit:binding queue="my_queue" key="my.queue.key" />
</rabbit:bindings>
</rabbit:direct-exchange>
<!-- 5.監聽生産者發送的消息,接收消息的處理類 -->
<bean id="confirmListener" class="com.learn.listeners.ConsumeConfirmListener" />
<!-- 6.監聽容器類,當收到消息時,會執行内部的配置 -->
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual">
<!-- 指明消費者,監聽的隊列對象,以及接收到消息的處理方法 -->
<rabbit:listener ref="confirmListener" method="onMessage" queue-names="my_queue"/>
</rabbit:listener-container>
</beans>
# 監聽器 - 開啟消費端手動确認
package com.learn.listeners;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
* 消費端 - 手動确認
*/
public class ConsumeConfirmListener implements ChannelAwareMessageListener {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
try {
System.out.println("consumer received msg: " + new String(message.getBody()) );
// 手動确認:ack
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (IOException e) {
e.printStackTrace();
channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);
}
}
}