天天看点

RabbitMQ消息确认、延迟队列

消息确认

默认情况下如果一个 Message 被消费者所正确接收则会被从 Queue 中移除

生产者配置:

spring:
  rabbitmq:
    publisher-confirms: true # 开启发送确认
    publisher-returns: true # 开启发送失败退回
           

回调方法:

@Override
public void afterPropertiesSet() throws Exception {
        // 用来确认消息是否有送达消息队列
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            log.info("消息发送成功:correlationData({}), ack({}), cause({})", correlationData, ack, cause);
        });

        // 若消息找不到对应的Exchange会被触发
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            log.info("消息丢失:exchange({}), route({}), replyCode({}), replyText({}), message:{}", exchange, routingKey, replyCode, replyText, message);
        });
}
           

消费者配置:

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: manual # 开启ACK
           

在消费成功后,

channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

表示消息已被成功消费

如果发生异常,可以重回队列、拒绝接受或者发送到指定的异常队列中

if (message.getMessageProperties().getRedelivered()) {
      log.info("消息已重复处理失败,拒绝再次接收...");
      // 拒绝消息
      // channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);

      // 发到另一个队列(可以是专门接收失败消息的队列)
      channel.basicPublish(MY_EXCHANGE, QUEUE_MSGB, null, ("msg: " + msg + " error").getBytes());
} else {
      log.info("消息即将再次返回队列处理...");
      // requeue为是否重新回到队列
      channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
}
           

延迟队列

RabbitMQ本身没有延迟队列属性,我们通过ttl(Time-To-Live)可以设置消息的过期时间来达到效果。

应用场景:

1、用户生成订单之后,需要过一段时间校验订单的支付状态,如果订单仍未支付则需要及时地关闭订单

2、延迟重试。比如消费者从队列里消费消息时失败了,但是想要延迟一段时间后自动重试

生产者定义死信队列:

@Bean
public Queue delayQueue() {
        Map<String, Object> params = new HashMap<>();
        // 声明队列里的死信转发到的DLX名称
        params.put("x-dead-letter-exchange", MY_EXCHANGE);
        // 声明这些死信在转发时携带的routing-key名称
        params.put("x-dead-letter-routing-key", QUEUE_MSGB);

        return new Queue(QUEUE_DELAY, true, false, false, params);
}

@Bean
public Queue queueMsgB() {
        return new Queue(QUEUE_MSGB);
}

/**
* Exchange(Direct) 延迟
*
* @return
*/
@Bean
DirectExchange delayExchange() {
        return new DirectExchange(DELAY_EXCHANGE);
}

/**
* Binding 消息绑定B
*
* @param queueMsgB
* @param exchange
* @return
*/
@Bean
Binding bindMsgB(Queue queueMsgB, TopicExchange exchange) {
        return BindingBuilder.bind(queueMsgB).to(exchange).with(QUEUE_MSGB);
}

/**
* 延迟队列消息绑定
*
* @param delayQueue
* @param delayExchange
* @return
*/
@Bean
Binding delayBind(Queue delayQueue, DirectExchange delayExchange) {
        return BindingBuilder.bind(delayQueue).to(delayExchange).with(DELAY_ROUTING_KEY);
}
           

发送方法:

public void senDealy(String route, Object object, Integer timeout) {
rabbitTemplate.convertAndSend(DELAY_EXCHANGE, route, object, message -> {
            // 延迟 5 秒
            message.getMessageProperties().setExpiration(timeout * 1000 + "");
            return message;
        });
}
           

消费者:

@RabbitListener(queues = QUEUE_MSGB, containerFactory = "rabbitListenerContainerFactory")
public void process(Message message, Channel channel) throws IOException {
        String msg = new String(message.getBody());
        log.info("B Receiver: " + msg + " ,channelno: " + channel.getChannelNumber());
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}