如未看第一章的建議先看第一章基本用法: SpringBoot使用RabbitMQ ,這裡講解SpringBoot使用RabbitMQ進行有回調的用法和消費者端手動簽收消息的用法。
一、yml檔案配置
server:
port: 8080
spring:
application:
name: rabbit-confirm
rabbitmq:
template:
# 使用return-callback時必須設定mandatory為true
mandatory: true
# 消息發送到交換機确認機制,是否确認回調
publisher-confirms: true
# 消息發送到交換機确認機制,是否傳回回調
publisher-returns: true
listener:
simple:
# 并發消費者初始化值
concurrency: 5
# 最大值
max-concurrency: 10
# 每個消費者每次監聽時可拉取處理的消息數量
prefetch: 20
# 确認模式設定為手動簽收
acknowledge-mode: manual
二、定義配置類
/**
* @author Gjing
**/
@Configuration
public class ConfirmConfiguration {
/**
* 聲明confirm.message隊列
*/
@Bean
public Queue confirmQueue() {
return new Queue("confirm.message");
}
/**
* 聲明一個名為exchange-2的交換機
*/
@Bean
public TopicExchange exchange2() {
return new TopicExchange("exchange-2");
}
/**
* 将confirm.message的隊列綁定到exchange-2交換機
*/
@Bean
public Binding bindMessage1() {
return BindingBuilder.bind(confirmQueue()).to(exchange2()).with("confirm.message");
}
}
三、定義生産者
/**
* @author Gjing
**/
@Component
@Slf4j
public class ConfirmProducer {
@Resource
private RabbitTemplate rabbitTemplate;
/**
* 如果消息沒有到exchange,則confirm回調,ack=false
* 如果消息到達exchange,則confirm回調,ack=true
* exchange到queue成功,則不回調return
* exchange到queue失敗,則回調return(需設定mandatory=true,否則不回回調,消息就丢了)
*/
private final RabbitTemplate.ConfirmCallback confirmCallback = (correlationData, ack, cause) -> {
if (!ack) {
log.error("消息發送失敗:correlationData: {},cause: {}", correlationData, cause);
}else {
log.info("消息發送成功:correlationData: {},ack: {}", correlationData, ack);
}
};
private final RabbitTemplate.ReturnCallback returnCallback = (message, replyCode, replyText, exchange, routeKey) ->
log.error("消息丢失: exchange: {},routeKey: {},replyCode: {},replyText: {}", exchange, routeKey, replyCode, replyText);
/**
* 發送消息
* @param message 消息内容
*/
public void send(String message) {
// 建構回調傳回的資料
CorrelationData correlationData = new CorrelationData();
correlationData.setId(TimeUtil.localDateTimeToStamp(LocalDateTime.now()) + "");
Message message1 = MessageBuilder.withBody(message.toString().getBytes())
.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
// 将CorrelationData的id 與 Message的correlationId綁定,然後關系儲存起來,然後人工處理
.setCorrelationId(correlationData.getId())
.build();
rabbitTemplate.setConfirmCallback(confirmCallback);
rabbitTemplate.setReturnCallback(returnCallback);
rabbitTemplate.convertAndSend("exchange-2", "confirm.message", message1, correlationData);
}
}
四、定義消費者
/**
* @author Gjing
**/
@Component
@Slf4j
public class ConfirmConsumer {
@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "confirm.message",durable = "true")
,exchange = @Exchange(value = "exchange-2",type = "topic")
,key = "confirm.message"))
public void receive(String message, Message message1, Channel channel) throws IOException {
log.info("消費者收到消息:{}", message);
long deliverTag = message1.getMessageProperties().getDeliveryTag();
//第一個deliveryTag參數為每條資訊帶有的tag值,第二個multiple參數為布爾類型
//為true時會将小于等于此次tag的所有消息都确認掉,如果為false則隻确認目前tag的資訊,可根據實際情況進行選擇。
channel.basicAck(deliverTag, false);
}
}
五、建立controller調用
/**
* @author Gjing
**/
@RestController
public class ConfirmController {
@Resource
private ConfirmProducer confirmProducer;
@PostMapping("/confirm-message")
public void confirmMessage() {
confirmProducer.send("hello confirm message");
}
}
六、執行結果

以上為個人了解,如有誤歡迎各位指正