引言
RabbitMQ 消息的消費有兩種确認模式: 自動确認和手動确認
![](https://img.laitimes.com/img/__Qf2AjLwojIjJCLyojI0JCLicmbw5iNxYTOmVDNlRjMzE2NiRWZ4kjYzcjM4kTY5QjYzUTY18CX0JXZ252bj91Ztl2Lc52YucWbp5GZzNmLn9Gbi1yZtl2Lc9CX6MHc0RHaiojIsJye.png)
自動确認:Broker(RabbitMQ 伺服器)在将消息發送給消費者後即将消息從隊列中删除,無論消費者是否消費成功。如果消費者消費時業務代碼出現異常或者還未消費完畢時系統當機,就會導緻消息丢失。
手動确認:消費者消費完畢後手動地向 Broker 發送确認通知,Broker 收到确認通知後再從隊列中删除對應的消息。
由于自動确認方式存在的缺陷,對于一些重要的消息,實際中一般采用手動确認的方式來保證消息業務的可靠性。
那麼在手動确認方式下,消費者業務中具體如何保證消息的可靠性呢? 下面我們介紹一種方式,即采用 重試 + 手動确認 + 死信隊列 的方式來保證消費資訊不丢失。
死信隊列
死信(Dead Letter),指無法被消費者正确地進行業務處理的消息,消費者消費時業務程式抛出了異常,其主要原因有兩個。一、消息本身是有問題的(主要原因),如付款消息中傳遞的銀行卡号不存在。二、由于網絡波動等原因,消費者依賴的第三方服務調用異常,如調用第三方接口失敗,資料庫由于無法擷取連接配接通路失敗等情況。對于這類消息,一般将其放入 RabbitMQ 的死信隊列中,使用專門的消費者對死信進行處理,或者進行人工補償。
死信隊列的消息來源
1、消息被否定确認使用 channel.basicNack 或 channel.basicReject ,并且此時requeue 屬性被設定為false。
2、消息在隊列中的時間超過了設定的TTL(time to live)時間。
3、消息數量超過了隊列的容量限制。
當一個隊列中的消息滿足上述三種情況任一個時,該消息就會從原隊列移至死信隊列,若該隊列沒有綁定死信隊列則消息被丢棄。
Tips:死信隊列和普通的業務隊列完全一樣,隻不過是業務上建立用來存儲處理失敗的消息的隊列。是以其工作方式也和業務隊列相同,死信仍然需要交換機的轉發到達死信隊列。
死信隊列的配置
1、為業務隊列綁定死信交換機(即用來轉發該隊列中中死信的交換機)。
2、将死信隊列與死信交換機綁定。
代碼示例:
@Configurationpublic class RabbitMQConfig { public static final String BUSINESS_EXCHANGE_NAME = "business-exchange"; public static final String DEAD_LETTER_EXCHANGE_NAME = "dead-letter-exchange"; public static final String BUSINESS_QUEUE_NAME = "business-queue"; public static final String DEAD_LETTER_QUEUE_NAME = "dead-letter-queue"; public static final String ROUTING_KEY = "routing-key"; // 聲明業務交換機 @Bean public DirectExchange businessExchange(){ return new DirectExchange(BUSINESS_EXCHANGE_NAME); } // 聲明死信交換機 @Bean public DirectExchange deadLetterExchange(){ return new DirectExchange(DEAD_LETTER_EXCHANGE_NAME); } // 聲明業務隊列 @Bean public Queue businessQueue(){ Map args = new HashMap<>(2); // 設定業務隊列的死信交換機 args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE_NAME); return QueueBuilder.durable(BUSINESS_QUEUE_NAME).withArguments(args).build(); } // 聲明死信隊列 @Bean public Queue deadLetterQueue(){ return new Queue(DEAD_LETTER_QUEUE_NAME); } // 将業務隊列綁定到業務交換機 @Bean public Binding bindBusinessQueue(){ return BindingBuilder.bind(businessQueue()).to(businessExchange()).with(ROUTING_KEY); } // 将死信隊列綁定到死信交換機 @Bean public Binding bindDeadLetterQueue(){ return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange()).with(ROUTING_KEY); }}複制代碼
在上面的配置示例中,我們聲明了一個業務隊列、一個業務交換機、一個死信隊列、一個死信交換機。其中聲明業務隊列時的 x-dead-letter-exchange 參數指定隊列的死信交換機,當資訊被判定為死信時就會被Broker自動轉發給配置的死信交換機。
生産者代碼
我們定義一個Controller接口來測試消息發送,消息體由接口參數傳入。
@[email protected]("/rabbitmq")public class RabbitController { @Autowired private RabbitTemplate rabbitTemplate; @PostMapping("/send") public void send(@RequestParam String msg){ rabbitTemplate.convertAndSend(RabbitMQConfig.BUSINESS_EXCHANGE_NAME, RabbitMQConfig.ROUTING_KEY, msg); }}複制代碼
使用postman調用生産者接口:
http://localhost:5006/rabbitmq/send?msg=normal meaage複制代碼
通過 RabbitMQ 控制台檢視可以看到我們聲明的交換機個隊列成功建立,消息被發送到了業務隊列中。
消費者手動确認+重試
消費者配置
# rabbitmq伺服器連接配接端口 (預設為5672)spring.rabbitmq.host=192.168.44.104spring.rabbitmq.port=5672spring.rabbitmq.username=adminspring.rabbitmq.password=123456# 開啟消費者手動确認spring.rabbitmq.listener.simple.acknowledge-mode=manual複制代碼
@[email protected]@RabbitListener(queues = "business-queue")public class RabbitConsumer { /** * 指定消費的隊列 */ @RabbitHandler public void consume(String msg, Message message, Channel channel){ boolean success = false; int retryCount = 3; while (!success && retryCount-- > 0){ try { // 處理消息 log.info("收到消息: {}, deliveryTag = {}", msg, message.getMessageProperties().getDeliveryTag()); if(message.equals("dead-letter")){ throw new RuntimeException("收到死信"); } // 正常處理完畢,手動确認 success = true; channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); }catch (Exception e){ log.error("程式異常:{}", e.getMessage()); } } // 達到最大重試次數後仍然消費失敗 if(!success){ // 手動删除,移至死信隊列 try { channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false); } catch (IOException e) { e.printStackTrace(); } } }}複制代碼
啟動消費者,消息被正常消費後手動确認,消息從隊列中删除
我們下面發送一個私信
http://localhost:5006/rabbitmq/send?msg=dead-letter複制代碼
從上圖我們可以看到消費的代碼重試了3次後将消息否定确認,Broker将消息判斷為死信,發送至死信交換機,最終轉發到死信隊列。
圖中看到死信确實被轉發到了死信隊列。根據實際的業務情況,我們可以建立專門的死信消費者對死信進行處理,或者進行人工補償。
Tips: 代碼示例中沒有涉及資料庫事務,若消費程式使用了聲明式的事務@Transactional,在捕獲異常後要手動復原事務。如下圖:
作者:stars
連結:https://juejin.cn/post/6906819135997640712
來源:掘金
著作權歸作者所有。商業轉載請聯系作者獲得授權,非商業轉載請注明出處。