天天看點

MQ回退消息 springboot

Mandatory參數

如果發現該消息不可路由,那麼消息會被直接丢棄,此時生産者是不知道消息被丢棄這個事件的。那麼如何讓無法被路由的消息幫我想辦法處理一下?最起碼通知我一聲,我好自己處理啊。通過設定mandatory參數可以在當消息傳遞過程中不可達目的地時将消息傳回給生産者。  

 消息生産者:

@Slf4j
@RestController
public class MessageProduce implements RabbitTemplate.ConfirmCallback ,RabbitTemplate.ReturnCallback{
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @PostConstruct
    private  void init()
    {
        rabbitTemplate.setConfirmCallback(this);
        /**
         * true:
         *       交換機無法将消息進行路由的時候,會将該消息傳回給生産者
         * false:
         *       如果發現消息無法進行路由,則直接将消息扔掉
         */
        rabbitTemplate.setMandatory(true);
        //将回退消息交給誰處理
        rabbitTemplate.setReturnCallback(this);
    }
    @GetMapping("sendMessage/{message}")
    public void sendMessage(@PathVariable String message)
    {
        //讓消息綁定一個id值
        CorrelationData correlationData1 = new CorrelationData(UUID.randomUUID().toString());
        rabbitTemplate.convertAndSend("confirm.exchange","key1",message+"key1",correlationData1);
        rabbitTemplate.convertAndSend("confirm.exchange","key2",message+"key2",correlationData1);
        log.info("發送消息id位{}内容為{}",correlationData1.getId(),message+"key1");
        log.info("發送消息id位{}内容為{}",correlationData1.getId(),message+"key2");

    }
    @Override
    public void confirm(CorrelationData correlationData, boolean b, String s) {
        String id= correlationData!=null?correlationData.getId():"";
        if(b)
        {
            log.info("交換機收到消息确認成功;id{}",id);
        }
        else {
            log.error("消息id{}未成功投遞到交換機,原因是:{}",id,s);
        }
    }

    @Override
    public void returnedMessage(Message message, int replyCode, String replyText,
                                String exchange, String routingKey)  {
            log.info("消息:{}被伺服器退回,退回的原因是{},交換機是{}",
                    new String(message.getBody()),replyText,exchange,routingKey);
    }
}      

回調接口

@Component
@Slf4j
public class MyCallBack implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback {
    @Override
    public void confirm(CorrelationData correlationData, boolean ack ,String cause) {
        String id=correlationData!=null?correlationData.getId():"";
        if(ack)
        {
            log.info("交換機已經收到id為{}的消息",id);
        }
        else
        {
            log.info("交換機還未收到id未:{}的消息,原因是{}",cause);
        }
    }

    @Override
    public void returnedMessage(Message message, int replyCode, String replyText,
                                String exchange, String routingKey) {
        log.error("消息{},被交換機{}退回。退回的原因是:{},路由key為{}",new String(message.getBody()),
                exchange,replyText,routingKey);
    }
}      

消費者

@Component
@Slf4j
public class ConfirmConsumer {
    public static  final  String CONFIRM_QUEUE_NAME="confirm.queue";
    @RabbitListener(queues = CONFIRM_QUEUE_NAME)
    public void receiveMsg(Message message)
    {
        String s = new String(message.getBody());
        log.info("接收到隊列confirm.queue消息:{}",s);
    }
}      

 ​​http://localhost:8989/sendMessage/8888​​

結果: