
前言
消息确認是保證消息傳遞可靠性的重要步驟,上一節我們說到持久化,持久化隻能保證消息不丢失,但是如果消息如果投遞失敗我們怎麼進行補償操作呢?解決辦法就是實作回調函數進行操作,在消息的發送和消息的消費都可以進行補償操作,下面我們就要講解消息确認。
正文
目錄
前言
正文
消息确認種類
消息發送确認
ConfirmCallback
ReturnCallback
消息消費确認
消息确認種類
消息的确認做有很多法,其中包括事務機制、批量确認、異步确認等。
事務機制:我們在channel對象中可以看到 txSelect(),txCommit(),txrollback() 這些方法,分别對應着開啟事務,送出事務,復原。由于使用事務會造成生産者與Broker互動次數增加,造成性能資源的浪費,而且事務機制是阻塞的,在發送一條消息後需要等待RabbitMq回應,之後才能發送下一條,是以事務機制不提倡,大家在網上也很少看到RabbitMq使用事務進行消息确認的。
批量确認:批量其實是一個節約資源的操作,但是在RabbitMq中我們使用批量操作會造成消息重複消費,原因是批量操作是使用戶端程式定期或者消息達到一定量,來調用方法等待Broker傳回,這樣其實是一個提高效率的做法,但是如果出現消息重發的情況,目前這批次的消息都需要重發,這就造成了重複消費,是以批量确認的操作性能沒有提高反而下降。
異步确認:異步确認雖然程式設計邏輯比上兩個要複雜,但是成本效益最高,無論是可靠性還是效率都沒得說,他是利用回調函數來達到消息可靠性傳遞的,筆者接觸過RocketMq,這個中間件也是通過函數回調來保證是否投遞成功,下面就讓我們來詳細講解異步确認是怎麼實作的。
請看一下RabbitMq工作原理圖
每一個顔色塊之間都存在着消息的确認機制,我們大概分為兩大類,發送方确認和接收方确認,其中發送方确認又分為生産者到交換器到确認和交換器到隊列的确認。
消息發送确認
ConfirmCallback
ConfirmCallback是一個回調接口,消息發送到 Broker 後觸發回調,确認消息是否到達 Broker 伺服器,也就是隻确認是否正确到達 Exchange 中。
我們需要在生産者的配置中添加下面配置,表示開啟釋出者确認
spring.rabbitmq.publisher-confirms=true
然後在生産者的Java配置類實作該接口
@Componentpublic class RabbitTemplateConfig implements RabbitTemplate.ConfirmCallback{ @Autowired private RabbitTemplate rabbitTemplate; @PostConstruct public void initRabbitTemplate() { // 設定生産者消息确認 rabbitTemplate.setConfirmCallback(this); } /** * 消息發送到 Broker 後觸發回調,确認消息是否到達 Broker 伺服器,也就是隻确認是否正确到達 Exchange 中 * * @param correlationData * @param b * @param s */ @Override public void confirm(@Nullable CorrelationData correlationData, boolean b, @Nullable String s) { System.out.println("ack:[{}]" + b); if (b) { System.out.println("消息到達rabbitmq伺服器"); } else { System.out.println("消息可能未到達rabbitmq伺服器"); } }
ReturnCallback
通過實作 ReturnCallback 接口,啟動消息失敗傳回,此接口是在交換器路由不到隊列時觸發回調,該方法可以不使用,因為交換器和隊列是在代碼裡綁定的,如果消息成功投遞到Broker後幾乎不存在綁定隊列失敗,除非你代碼寫錯了。
使用此接口需要在生産者配置中加入一下配置,表示釋出者傳回。
spring.rabbitmq.publisher-returns=true
然後基于剛才的生産者Java配置裡實作接口ReturnCallback。
以上兩段Java配置可以寫在一個類裡。
到此,我們完成了生産者的異步确認,我們可以在回調函數中對目前失敗的消息進行補償,這樣保證了我們沒有發送成功的資料也被觀察到了,比如某某條資料需要發送到消費者消費,但是沒有發送成功,這就需要你在此做一些其他操作喽,根據你具體業務來。
消息消費确認
消費者确認發生在監聽隊列的消費者處理業務失敗,如,發生了異常,不符合要求的資料……,這些場景我們就需要手動處理,比如重新發送或者丢棄。
我們知道ACK是預設是自動的,自動确認會在消息發送給消費者後立即确認,但存在丢失消息的可能,如果消費端消費邏輯抛出異常,加入你用復原了也隻是保證了資料的一緻性,但是消息還是丢了,也就是消費端沒有處理成功這條消息,那麼就相當于丢失了消息。
消息确認模式有:
- AcknowledgeMode.NONE:自動确認
- AcknowledgeMode.AUTO:根據情況确認
- AcknowledgeMode.MANUAL:手動确認
長話短說……
需要在消費者的配置裡加手動 ack(确認)則需要修改确認模式為 manual,手動确認的方式有很多,可以在RabbitListenerContainerFactory類進行設定。
spring.rabbitmq.listener.direct.acknowledge-mode=MANUAL
消費者類
@Servicepublic class AsyncConfirmConsumer { @RabbitListener(queues = "confirm_queue") @RabbitHandler public void asyncConfirm(Order order, Message message, Channel channel) throws IOException { try { System.out.println("消費消息:" + order.getName());// int a = 1 / 0; channel.basicAck(message.getMessageProperties().getDeliveryTag(), true); System.out.println("消費消息确認" + message.getMessageProperties().getConsumerQueue() + ",接收到了回調方法"); } catch (Exception e) { //重新回到隊列// channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);// System.out.println("嘗試重發:" + message.getMessageProperties().getConsumerQueue()); //requeue =true 重回隊列,false 丢棄 channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); // TODO 該消息已經導緻異常,重發無意義,自己實作補償機制 } }}
需要注意的 basicAck 方法需要傳遞兩個參數
- deliveryTag(唯一辨別 ID):當一個消費者向 RabbitMQ 注冊後,會建立起一個 Channel ,RabbitMQ 會用 basic.deliver 方法向消費者推送消息,這個方法攜帶了一個 delivery tag, 它代表了 RabbitMQ 向該 Channel 投遞的這條消息的唯一辨別 ID,是一個單調遞增的正整數,delivery tag 的範圍僅限于 Channel
- multiple:為了減少網絡流量,手動确認可以被批處理,當該參數為 true 時,則可以一次性确認 delivery_tag 小于等于傳入值的所有消息
basicNack方法需要傳遞三個參數
- deliveryTag(唯一辨別 ID):上面已經解釋了。
- multiple:上面已經解釋了。
- requeue: true :重回隊列,false :丢棄,我們在nack方法中必須設定 false,否則重發沒有意義。
basicReject方法需要傳遞兩個參數
- deliveryTag(唯一辨別 ID):上面已經解釋了。
- requeue:上面已經解釋了,在reject方法裡必須設定true。
還要說明一下,建議大家不要重發,重發後基本還是失敗,因為出現問題一般都是異常導緻的,出現異常的話,我的觀點是丢棄這個消息,然後在catch裡做補償操作。
到此,我們都已經準備好了,可以進行測試,我把剩餘相關代碼都寫在一起了。
@RestControllerpublic class AsyncConfirmController { @Autowired private RabbitTemplate rabbitTemplate; @GetMapping("/async/{id}") public String AETest(@PathVariable Integer id) { Order order = new Order(id, "胖虎"); rabbitTemplate.convertAndSend("confirm_exchange", "", order); return "成功"; }[email protected] class AsyncConfirmListener { @Bean public Queue confirmQueue() { return new Queue("confirm_queue"); } @Bean public FanoutExchange confirmExchange() { return new FanoutExchange("confirm_exchange"); } //交換器綁定隊列 @Bean Binding bindingExchangeConfirm(Queue confirmQueue, FanoutExchange confirmExchange) { return BindingBuilder.bind(confirmQueue).to(confirmExchange); }}
我們可以自己下載下傳項目進行測試,這裡就不貼運作結果了。
github位址:https://github.com/362460453/rabbitMQ-demo