引言
RabbitMQ 消息的消费有两种确认模式: 自动确认和手动确认
自动确认:Broker(RabbitMQ 服务器)在将消息发送给消费者后即将消息从队列中删除,无论消费者是否消费成功。如果消费者消费时业务代码出现异常或者还未消费完毕时系统宕机,就会导致消息丢失。
手动确认:消费者消费完毕后手动地向 Broker 发送确认通知,Broker 收到确认通知后再从队列中删除对应的消息。
由于自动确认方式存在的缺陷,对于一些重要的消息,实际中一般采用手动确认的方式来保证消息业务的可靠性。
那么在手动确认方式下,消费者业务中具体如何保证消息的可靠性呢? 下面我们介绍一种方式,即采用 重试 + 手动确认 + 死信队列 的方式来保证消费信息不丢失。
死信队列
死信(Dead Letter),指无法被消费者正确地进行业务处理的消息,消费者消费时业务程序抛出了异常,其主要原因有两个。一、消息本身是有问题的(主要原因),如付款消息中传递的银行卡号不存在。二、由于网络波动等原因,消费者依赖的第三方服务调用异常,如调用第三方接口失败,数据库由于无法获取连接访问失败等情况。对于这类消息,一般将其放入 RabbitMQ 的死信队列中,使用专门的消费者对死信进行处理,或者进行人工补偿。
死信队列的消息来源
1、消息被否定确认使用 channel.basicNack 或 channel.basicReject ,并且此时requeue 属性被设置为false。
2、消息在队列中的时间超过了设置的TTL(time to live)时间。
3、消息数量超过了队列的容量限制。
当一个队列中的消息满足上述三种情况任一个时,该消息就会从原队列移至死信队列,若该队列没有绑定死信队列则消息被丢弃。
Tips:死信队列和普通的业务队列完全一样,只不过是业务上创建用来存储处理失败的消息的队列。所以其工作方式也和业务队列相同,死信仍然需要交换机的转发到达死信队列。
死信队列的配置
1、为业务队列绑定死信交换机(即用来转发该队列中中死信的交换机)。
2、将死信队列与死信交换机绑定。
代码示例:
@Configurationpublic class RabbitMQConfig { public static final String BUSINESS_EXCHANGE_NAME = "business-exchange"; public static final String DEAD_LETTER_EXCHANGE_NAME = "dead-letter-exchange"; public static final String BUSINESS_QUEUE_NAME = "business-queue"; public static final String DEAD_LETTER_QUEUE_NAME = "dead-letter-queue"; public static final String ROUTING_KEY = "routing-key"; // 声明业务交换机 @Bean public DirectExchange businessExchange(){ return new DirectExchange(BUSINESS_EXCHANGE_NAME); } // 声明死信交换机 @Bean public DirectExchange deadLetterExchange(){ return new DirectExchange(DEAD_LETTER_EXCHANGE_NAME); } // 声明业务队列 @Bean public Queue businessQueue(){ Map args = new HashMap<>(2); // 设置业务队列的死信交换机 args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE_NAME); return QueueBuilder.durable(BUSINESS_QUEUE_NAME).withArguments(args).build(); } // 声明死信队列 @Bean public Queue deadLetterQueue(){ return new Queue(DEAD_LETTER_QUEUE_NAME); } // 将业务队列绑定到业务交换机 @Bean public Binding bindBusinessQueue(){ return BindingBuilder.bind(businessQueue()).to(businessExchange()).with(ROUTING_KEY); } // 将死信队列绑定到死信交换机 @Bean public Binding bindDeadLetterQueue(){ return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange()).with(ROUTING_KEY); }}复制代码
在上面的配置示例中,我们声明了一个业务队列、一个业务交换机、一个死信队列、一个死信交换机。其中声明业务队列时的 x-dead-letter-exchange 参数指定队列的死信交换机,当信息被判定为死信时就会被Broker自动转发给配置的死信交换机。
生产者代码
我们定义一个Controller接口来测试消息发送,消息体由接口参数传入。
@[email protected]("/rabbitmq")public class RabbitController { @Autowired private RabbitTemplate rabbitTemplate; @PostMapping("/send") public void send(@RequestParam String msg){ rabbitTemplate.convertAndSend(RabbitMQConfig.BUSINESS_EXCHANGE_NAME, RabbitMQConfig.ROUTING_KEY, msg); }}复制代码
使用postman调用生产者接口:
http://localhost:5006/rabbitmq/send?msg=normal meaage复制代码
通过 RabbitMQ 控制台查看可以看到我们声明的交换机个队列成功创建,消息被发送到了业务队列中。
消费者手动确认+重试
消费者配置
# rabbitmq服务器连接端口 (默认为5672)spring.rabbitmq.host=192.168.44.104spring.rabbitmq.port=5672spring.rabbitmq.username=adminspring.rabbitmq.password=123456# 开启消费者手动确认spring.rabbitmq.listener.simple.acknowledge-mode=manual复制代码
@[email protected]@RabbitListener(queues = "business-queue")public class RabbitConsumer { /** * 指定消费的队列 */ @RabbitHandler public void consume(String msg, Message message, Channel channel){ boolean success = false; int retryCount = 3; while (!success && retryCount-- > 0){ try { // 处理消息 log.info("收到消息: {}, deliveryTag = {}", msg, message.getMessageProperties().getDeliveryTag()); if(message.equals("dead-letter")){ throw new RuntimeException("收到死信"); } // 正常处理完毕,手动确认 success = true; channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); }catch (Exception e){ log.error("程序异常:{}", e.getMessage()); } } // 达到最大重试次数后仍然消费失败 if(!success){ // 手动删除,移至死信队列 try { channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false); } catch (IOException e) { e.printStackTrace(); } } }}复制代码
启动消费者,消息被正常消费后手动确认,消息从队列中删除
我们下面发送一个私信
http://localhost:5006/rabbitmq/send?msg=dead-letter复制代码
从上图我们可以看到消费的代码重试了3次后将消息否定确认,Broker将消息判断为死信,发送至死信交换机,最终转发到死信队列。
图中看到死信确实被转发到了死信队列。根据实际的业务情况,我们可以创建专门的死信消费者对死信进行处理,或者进行人工补偿。
Tips: 代码示例中没有涉及数据库事务,若消费程序使用了声明式的事务@Transactional,在捕获异常后要手动回滚事务。如下图:
作者:stars
链接:https://juejin.cn/post/6906819135997640712
来源:掘金
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。