天天看點

rabbitmq 多個消費者消費一個隊列_RabbitMQ手動确認+重試+死信隊列保證消費可靠性...引言死信隊列死信隊列的消息來源死信隊列的配置代碼示例:生産者代碼消費者手動确認+重試

引言

RabbitMQ 消息的消費有兩種确認模式: 自動确認和手動确認

rabbitmq 多個消費者消費一個隊列_RabbitMQ手動确認+重試+死信隊列保證消費可靠性...引言死信隊列死信隊列的消息來源死信隊列的配置代碼示例:生産者代碼消費者手動确認+重試

自動确認:Broker(RabbitMQ 伺服器)在将消息發送給消費者後即将消息從隊列中删除,無論消費者是否消費成功。如果消費者消費時業務代碼出現異常或者還未消費完畢時系統當機,就會導緻消息丢失。

手動确認:消費者消費完畢後手動地向 Broker 發送确認通知,Broker 收到确認通知後再從隊列中删除對應的消息。

由于自動确認方式存在的缺陷,對于一些重要的消息,實際中一般采用手動确認的方式來保證消息業務的可靠性。

那麼在手動确認方式下,消費者業務中具體如何保證消息的可靠性呢? 下面我們介紹一種方式,即采用 重試 + 手動确認 + 死信隊列 的方式來保證消費資訊不丢失。

死信隊列

死信(Dead Letter),指無法被消費者正确地進行業務處理的消息,消費者消費時業務程式抛出了異常,其主要原因有兩個。一、消息本身是有問題的(主要原因),如付款消息中傳遞的銀行卡号不存在。二、由于網絡波動等原因,消費者依賴的第三方服務調用異常,如調用第三方接口失敗,資料庫由于無法擷取連接配接通路失敗等情況。對于這類消息,一般将其放入 RabbitMQ 的死信隊列中,使用專門的消費者對死信進行處理,或者進行人工補償。

死信隊列的消息來源

1、消息被否定确認使用 channel.basicNack 或 channel.basicReject ,并且此時requeue 屬性被設定為false。

2、消息在隊列中的時間超過了設定的TTL(time to live)時間。

3、消息數量超過了隊列的容量限制。

當一個隊列中的消息滿足上述三種情況任一個時,該消息就會從原隊列移至死信隊列,若該隊列沒有綁定死信隊列則消息被丢棄。

Tips:死信隊列和普通的業務隊列完全一樣,隻不過是業務上建立用來存儲處理失敗的消息的隊列。是以其工作方式也和業務隊列相同,死信仍然需要交換機的轉發到達死信隊列。

死信隊列的配置

1、為業務隊列綁定死信交換機(即用來轉發該隊列中中死信的交換機)。

2、将死信隊列與死信交換機綁定。

代碼示例:

@Configurationpublic class RabbitMQConfig {    public static final String BUSINESS_EXCHANGE_NAME = "business-exchange";    public static final String DEAD_LETTER_EXCHANGE_NAME = "dead-letter-exchange";    public static final String BUSINESS_QUEUE_NAME = "business-queue";    public static final String DEAD_LETTER_QUEUE_NAME = "dead-letter-queue";    public static final String ROUTING_KEY = "routing-key";    // 聲明業務交換機    @Bean    public DirectExchange businessExchange(){        return new DirectExchange(BUSINESS_EXCHANGE_NAME);    }    // 聲明死信交換機    @Bean    public DirectExchange deadLetterExchange(){        return new DirectExchange(DEAD_LETTER_EXCHANGE_NAME);    }    // 聲明業務隊列    @Bean    public Queue businessQueue(){        Map args = new HashMap<>(2);        // 設定業務隊列的死信交換機        args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE_NAME);        return QueueBuilder.durable(BUSINESS_QUEUE_NAME).withArguments(args).build();    }    // 聲明死信隊列    @Bean    public Queue deadLetterQueue(){        return new Queue(DEAD_LETTER_QUEUE_NAME);    }    // 将業務隊列綁定到業務交換機    @Bean    public Binding bindBusinessQueue(){        return BindingBuilder.bind(businessQueue()).to(businessExchange()).with(ROUTING_KEY);    }    // 将死信隊列綁定到死信交換機    @Bean    public Binding bindDeadLetterQueue(){        return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange()).with(ROUTING_KEY);    }}複制代碼
           

在上面的配置示例中,我們聲明了一個業務隊列、一個業務交換機、一個死信隊列、一個死信交換機。其中聲明業務隊列時的 x-dead-letter-exchange 參數指定隊列的死信交換機,當資訊被判定為死信時就會被Broker自動轉發給配置的死信交換機。

生産者代碼

我們定義一個Controller接口來測試消息發送,消息體由接口參數傳入。

@[email protected]("/rabbitmq")public class RabbitController {    @Autowired    private RabbitTemplate rabbitTemplate;    @PostMapping("/send")    public void send(@RequestParam String msg){        rabbitTemplate.convertAndSend(RabbitMQConfig.BUSINESS_EXCHANGE_NAME, RabbitMQConfig.ROUTING_KEY, msg);    }}複制代碼
           

使用postman調用生産者接口:

http://localhost:5006/rabbitmq/send?msg=normal meaage複制代碼
           

通過 RabbitMQ 控制台檢視可以看到我們聲明的交換機個隊列成功建立,消息被發送到了業務隊列中。

rabbitmq 多個消費者消費一個隊列_RabbitMQ手動确認+重試+死信隊列保證消費可靠性...引言死信隊列死信隊列的消息來源死信隊列的配置代碼示例:生産者代碼消費者手動确認+重試
rabbitmq 多個消費者消費一個隊列_RabbitMQ手動确認+重試+死信隊列保證消費可靠性...引言死信隊列死信隊列的消息來源死信隊列的配置代碼示例:生産者代碼消費者手動确認+重試

消費者手動确認+重試

消費者配置

# rabbitmq伺服器連接配接端口 (預設為5672)spring.rabbitmq.host=192.168.44.104spring.rabbitmq.port=5672spring.rabbitmq.username=adminspring.rabbitmq.password=123456# 開啟消費者手動确認spring.rabbitmq.listener.simple.acknowledge-mode=manual複制代碼
           
@[email protected]@RabbitListener(queues = "business-queue")public class RabbitConsumer {    /**     * 指定消費的隊列     */    @RabbitHandler    public void consume(String msg, Message message, Channel channel){        boolean success = false;        int retryCount = 3;        while (!success && retryCount-- > 0){            try {                // 處理消息                log.info("收到消息: {}, deliveryTag = {}", msg, message.getMessageProperties().getDeliveryTag());                if(message.equals("dead-letter")){                    throw new RuntimeException("收到死信");                }                // 正常處理完畢,手動确認                success = true;                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);            }catch (Exception e){                log.error("程式異常:{}", e.getMessage());            }        }        // 達到最大重試次數後仍然消費失敗        if(!success){            // 手動删除,移至死信隊列            try {                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);            } catch (IOException e) {                e.printStackTrace();            }        }    }}複制代碼
           

啟動消費者,消息被正常消費後手動确認,消息從隊列中删除

rabbitmq 多個消費者消費一個隊列_RabbitMQ手動确認+重試+死信隊列保證消費可靠性...引言死信隊列死信隊列的消息來源死信隊列的配置代碼示例:生産者代碼消費者手動确認+重試
rabbitmq 多個消費者消費一個隊列_RabbitMQ手動确認+重試+死信隊列保證消費可靠性...引言死信隊列死信隊列的消息來源死信隊列的配置代碼示例:生産者代碼消費者手動确認+重試

我們下面發送一個私信

http://localhost:5006/rabbitmq/send?msg=dead-letter複制代碼
           
rabbitmq 多個消費者消費一個隊列_RabbitMQ手動确認+重試+死信隊列保證消費可靠性...引言死信隊列死信隊列的消息來源死信隊列的配置代碼示例:生産者代碼消費者手動确認+重試

從上圖我們可以看到消費的代碼重試了3次後将消息否定确認,Broker将消息判斷為死信,發送至死信交換機,最終轉發到死信隊列。

rabbitmq 多個消費者消費一個隊列_RabbitMQ手動确認+重試+死信隊列保證消費可靠性...引言死信隊列死信隊列的消息來源死信隊列的配置代碼示例:生産者代碼消費者手動确認+重試

圖中看到死信确實被轉發到了死信隊列。根據實際的業務情況,我們可以建立專門的死信消費者對死信進行處理,或者進行人工補償。

Tips: 代碼示例中沒有涉及資料庫事務,若消費程式使用了聲明式的事務@Transactional,在捕獲異常後要手動復原事務。如下圖:

rabbitmq 多個消費者消費一個隊列_RabbitMQ手動确認+重試+死信隊列保證消費可靠性...引言死信隊列死信隊列的消息來源死信隊列的配置代碼示例:生産者代碼消費者手動确認+重試

作者:stars

連結:https://juejin.cn/post/6906819135997640712

來源:掘金

著作權歸作者所有。商業轉載請聯系作者獲得授權,非商業轉載請注明出處。