天天看点

rabbitmq 不同的消费者消费同一个队列_RabbitMQ手动确认+重试+死信队列保证消费可靠性...引言死信队列死信队列的消息来源死信队列的配置代码示例:生产者代码消费者手动确认+重试

引言

RabbitMQ 消息的消费有两种确认模式: 自动确认和手动确认

rabbitmq 不同的消费者消费同一个队列_RabbitMQ手动确认+重试+死信队列保证消费可靠性...引言死信队列死信队列的消息来源死信队列的配置代码示例:生产者代码消费者手动确认+重试

自动确认:Broker(RabbitMQ 服务器)在将消息发送给消费者后即将消息从队列中删除,无论消费者是否消费成功。如果消费者消费时业务代码出现异常或者还未消费完毕时系统宕机,就会导致消息丢失。

手动确认:消费者消费完毕后手动地向 Broker 发送确认通知,Broker 收到确认通知后再从队列中删除对应的消息。

由于自动确认方式存在的缺陷,对于一些重要的消息,实际中一般采用手动确认的方式来保证消息业务的可靠性。

那么在手动确认方式下,消费者业务中具体如何保证消息的可靠性呢? 下面我们介绍一种方式,即采用 重试 + 手动确认 + 死信队列 的方式来保证消费信息不丢失。

死信队列

死信(Dead Letter),指无法被消费者正确地进行业务处理的消息,消费者消费时业务程序抛出了异常,其主要原因有两个。一、消息本身是有问题的(主要原因),如付款消息中传递的银行卡号不存在。二、由于网络波动等原因,消费者依赖的第三方服务调用异常,如调用第三方接口失败,数据库由于无法获取连接访问失败等情况。对于这类消息,一般将其放入 RabbitMQ 的死信队列中,使用专门的消费者对死信进行处理,或者进行人工补偿。

死信队列的消息来源

1、消息被否定确认使用 channel.basicNack 或 channel.basicReject ,并且此时requeue 属性被设置为false。

2、消息在队列中的时间超过了设置的TTL(time to live)时间。

3、消息数量超过了队列的容量限制。

当一个队列中的消息满足上述三种情况任一个时,该消息就会从原队列移至死信队列,若该队列没有绑定死信队列则消息被丢弃。

Tips:死信队列和普通的业务队列完全一样,只不过是业务上创建用来存储处理失败的消息的队列。所以其工作方式也和业务队列相同,死信仍然需要交换机的转发到达死信队列。

死信队列的配置

1、为业务队列绑定死信交换机(即用来转发该队列中中死信的交换机)。

2、将死信队列与死信交换机绑定。

代码示例:

@Configurationpublic class RabbitMQConfig {    public static final String BUSINESS_EXCHANGE_NAME = "business-exchange";    public static final String DEAD_LETTER_EXCHANGE_NAME = "dead-letter-exchange";    public static final String BUSINESS_QUEUE_NAME = "business-queue";    public static final String DEAD_LETTER_QUEUE_NAME = "dead-letter-queue";    public static final String ROUTING_KEY = "routing-key";    // 声明业务交换机    @Bean    public DirectExchange businessExchange(){        return new DirectExchange(BUSINESS_EXCHANGE_NAME);    }    // 声明死信交换机    @Bean    public DirectExchange deadLetterExchange(){        return new DirectExchange(DEAD_LETTER_EXCHANGE_NAME);    }    // 声明业务队列    @Bean    public Queue businessQueue(){        Map args = new HashMap<>(2);        // 设置业务队列的死信交换机        args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE_NAME);        return QueueBuilder.durable(BUSINESS_QUEUE_NAME).withArguments(args).build();    }    // 声明死信队列    @Bean    public Queue deadLetterQueue(){        return new Queue(DEAD_LETTER_QUEUE_NAME);    }    // 将业务队列绑定到业务交换机    @Bean    public Binding bindBusinessQueue(){        return BindingBuilder.bind(businessQueue()).to(businessExchange()).with(ROUTING_KEY);    }    // 将死信队列绑定到死信交换机    @Bean    public Binding bindDeadLetterQueue(){        return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange()).with(ROUTING_KEY);    }}复制代码
           

在上面的配置示例中,我们声明了一个业务队列、一个业务交换机、一个死信队列、一个死信交换机。其中声明业务队列时的 x-dead-letter-exchange 参数指定队列的死信交换机,当信息被判定为死信时就会被Broker自动转发给配置的死信交换机。

生产者代码

我们定义一个Controller接口来测试消息发送,消息体由接口参数传入。

@[email protected]("/rabbitmq")public class RabbitController {    @Autowired    private RabbitTemplate rabbitTemplate;    @PostMapping("/send")    public void send(@RequestParam String msg){        rabbitTemplate.convertAndSend(RabbitMQConfig.BUSINESS_EXCHANGE_NAME, RabbitMQConfig.ROUTING_KEY, msg);    }}复制代码
           

使用postman调用生产者接口:

http://localhost:5006/rabbitmq/send?msg=normal meaage复制代码
           

通过 RabbitMQ 控制台查看可以看到我们声明的交换机个队列成功创建,消息被发送到了业务队列中。

rabbitmq 不同的消费者消费同一个队列_RabbitMQ手动确认+重试+死信队列保证消费可靠性...引言死信队列死信队列的消息来源死信队列的配置代码示例:生产者代码消费者手动确认+重试
rabbitmq 不同的消费者消费同一个队列_RabbitMQ手动确认+重试+死信队列保证消费可靠性...引言死信队列死信队列的消息来源死信队列的配置代码示例:生产者代码消费者手动确认+重试

消费者手动确认+重试

消费者配置

# rabbitmq服务器连接端口 (默认为5672)spring.rabbitmq.host=192.168.44.104spring.rabbitmq.port=5672spring.rabbitmq.username=adminspring.rabbitmq.password=123456# 开启消费者手动确认spring.rabbitmq.listener.simple.acknowledge-mode=manual复制代码
           
@[email protected]@RabbitListener(queues = "business-queue")public class RabbitConsumer {    /**     * 指定消费的队列     */    @RabbitHandler    public void consume(String msg, Message message, Channel channel){        boolean success = false;        int retryCount = 3;        while (!success && retryCount-- > 0){            try {                // 处理消息                log.info("收到消息: {}, deliveryTag = {}", msg, message.getMessageProperties().getDeliveryTag());                if(message.equals("dead-letter")){                    throw new RuntimeException("收到死信");                }                // 正常处理完毕,手动确认                success = true;                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);            }catch (Exception e){                log.error("程序异常:{}", e.getMessage());            }        }        // 达到最大重试次数后仍然消费失败        if(!success){            // 手动删除,移至死信队列            try {                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);            } catch (IOException e) {                e.printStackTrace();            }        }    }}复制代码
           

启动消费者,消息被正常消费后手动确认,消息从队列中删除

rabbitmq 不同的消费者消费同一个队列_RabbitMQ手动确认+重试+死信队列保证消费可靠性...引言死信队列死信队列的消息来源死信队列的配置代码示例:生产者代码消费者手动确认+重试
rabbitmq 不同的消费者消费同一个队列_RabbitMQ手动确认+重试+死信队列保证消费可靠性...引言死信队列死信队列的消息来源死信队列的配置代码示例:生产者代码消费者手动确认+重试

我们下面发送一个私信

http://localhost:5006/rabbitmq/send?msg=dead-letter复制代码
           
rabbitmq 不同的消费者消费同一个队列_RabbitMQ手动确认+重试+死信队列保证消费可靠性...引言死信队列死信队列的消息来源死信队列的配置代码示例:生产者代码消费者手动确认+重试

从上图我们可以看到消费的代码重试了3次后将消息否定确认,Broker将消息判断为死信,发送至死信交换机,最终转发到死信队列。

rabbitmq 不同的消费者消费同一个队列_RabbitMQ手动确认+重试+死信队列保证消费可靠性...引言死信队列死信队列的消息来源死信队列的配置代码示例:生产者代码消费者手动确认+重试

图中看到死信确实被转发到了死信队列。根据实际的业务情况,我们可以创建专门的死信消费者对死信进行处理,或者进行人工补偿。

Tips: 代码示例中没有涉及数据库事务,若消费程序使用了声明式的事务@Transactional,在捕获异常后要手动回滚事务。如下图:

rabbitmq 不同的消费者消费同一个队列_RabbitMQ手动确认+重试+死信队列保证消费可靠性...引言死信队列死信队列的消息来源死信队列的配置代码示例:生产者代码消费者手动确认+重试

作者:stars

链接:https://juejin.cn/post/6906819135997640712

来源:掘金

著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。