在生産環境中由于一些不明原因,導緻rabbitmq重新開機,在RabbitMQ重新開機期間生産者消息投遞失敗,導緻消息丢失,需要手動處理和恢複。于是,我們開始思考,如何才能進行RabbitMQ的消息可靠投遞呢?特别是在這樣比較極端的情況,RabbitMQ叢集不可用的時候,無法投遞的消息該如何處理呢?
![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsIiNx8FesU2cfdGLwczX0xiRGZkRGZ0Xy9GbvNGLwIzXlpXazxSP9c3YmJUeilnVtVWQClGVF5UMR9Fd4VGdsATNfd3bkFGazxycykFaKdkYzZUbapXNXlleSdVY2pESa9VZwlHdssmch1mclRXY39CXldWYtlWPzNXZj9mcw1ycz9WL49zZuBnL2MjMxMDMykTM0ADMxEjMwIzLc52YucWbp5GZzNmLn9Gbi1yZtl2Lc9CX6MHc0RHaiojIsJye.png)
代碼架構:
需要一個回調接口,當消息發出去後交換機沒有被接收,就會觸發回調接口,然後将消息重發。
# 交換機确認消息
spring.rabbitmq.publisher-confirm-type=correlated
# 開啟回退消息
spring.rabbitmq.publisher-returns=true
需要開啟兩個配置,第一個是确認機制,第二個是回退即通知生産者消息發送的情況
代碼實作
1、配置類
@Component
@Slf4j
public class MyCallBack implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback{
@Autowired
private RabbitTemplate rabbitTemplate;
// 注入
@PostConstruct
public void init(){
rabbitTemplate.setConfirmCallback(this);
}
/**
* 交換機确認回調的方法
* 1、發消息 交換機接收到了 回調
* 1.1 correlationData 儲存回調消息的id和相關資訊
* 1.2 交換機收到消息 ack = true
* 1.3 原因 cause
* 2、發消息 交換機接收失敗了 回調
* ack = false
* 3、第一個參數是從消費者中來的
* @param correlationData
* @param ack
* @param s
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String s) {
String id = correlationData != null ? correlationData.getId() : "";
if(ack){
log.info("交換機已經收到id為:{}的消息",id);
}else {
log.info("交換機還未收到id為:{}的消息,由于原因:{}",id,s);
}
}
/**
* 回退接口的方法
* 可以将消息傳遞過程中不可達目的地時将消息傳回給生産者
* 隻有将消息不可達的時候才進行回退
* @param message
* @param i
* @param replyText
* @param exchange
* @param routingKey
*/
@Override
public void returnedMessage(Message message, int i, String replyText, String exchange, String routingKey) {
log.info("消息{}被交換機{}回退,退回原因{},路由key{}",
new String(message.getBody()),exchange,replyText,routingKey);
}
}
2、生産者
/**
* 生産者
*/
@RestController
@Slf4j
@RequestMapping("/api/v1/confirm")
public class ProducerController {
@Autowired
private RabbitTemplate rabbitTemplate;
// 發消息
@GetMapping("sendMessage/{message}")
public void sendMesaage(@PathVariable String message){
// 設定回調
CorrelationData correlationData = new CorrelationData("1");
rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME,
ConfirmConfig.CONFIRM_ROUTING_KEY,message,correlationData);
log.info("發送消息:{}",message);
}
}