天天看點

釋出确認模式

   在生産環境中由于一些不明原因,導緻rabbitmq重新開機,在RabbitMQ重新開機期間生産者消息投遞失敗,導緻消息丢失,需要手動處理和恢複。于是,我們開始思考,如何才能進行RabbitMQ的消息可靠投遞呢?特别是在這樣比較極端的情況,RabbitMQ叢集不可用的時候,無法投遞的消息該如何處理呢?

釋出确認模式

 代碼架構:

釋出确認模式

需要一個回調接口,當消息發出去後交換機沒有被接收,就會觸發回調接口,然後将消息重發。

# 交換機确認消息
spring.rabbitmq.publisher-confirm-type=correlated
# 開啟回退消息
spring.rabbitmq.publisher-returns=true      

需要開啟兩個配置,第一個是确認機制,第二個是回退即通知生産者消息發送的情況

代碼實作

1、配置類

@Component
@Slf4j
public class MyCallBack implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback{

    @Autowired
     private RabbitTemplate rabbitTemplate;

    // 注入
    @PostConstruct
    public void init(){
        rabbitTemplate.setConfirmCallback(this);
    }
    /**
     * 交換機确認回調的方法
     * 1、發消息 交換機接收到了  回調
     *     1.1 correlationData 儲存回調消息的id和相關資訊
     *     1.2 交換機收到消息  ack = true
     *     1.3 原因 cause
     * 2、發消息 交換機接收失敗了  回調
     *     ack = false
     * 3、第一個參數是從消費者中來的
     * @param correlationData
     * @param ack
     * @param s
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String s) {

        String id = correlationData != null ? correlationData.getId() : "";
        if(ack){
            log.info("交換機已經收到id為:{}的消息",id);
        }else {
            log.info("交換機還未收到id為:{}的消息,由于原因:{}",id,s);
        }
    }

    /**
     * 回退接口的方法
     * 可以将消息傳遞過程中不可達目的地時将消息傳回給生産者
     * 隻有将消息不可達的時候才進行回退
     * @param message
     * @param i
     * @param replyText
     * @param exchange
     * @param routingKey
     */
    @Override
    public void returnedMessage(Message message, int i, String replyText, String exchange, String routingKey) {
        log.info("消息{}被交換機{}回退,退回原因{},路由key{}",
                new String(message.getBody()),exchange,replyText,routingKey);

    }
}
           

2、生産者

/**
 * 生産者
 */
@RestController
@Slf4j
@RequestMapping("/api/v1/confirm")
public class ProducerController {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    // 發消息
    @GetMapping("sendMessage/{message}")
    public void sendMesaage(@PathVariable String message){
        // 設定回調
        CorrelationData correlationData = new CorrelationData("1");

        rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME,
                ConfirmConfig.CONFIRM_ROUTING_KEY,message,correlationData);

        log.info("發送消息:{}",message);
    }
}