天天看點

RabbitMQ死信隊列在SpringBoot中的使用

死信隊列可以實作消息在未被正常消費的場景下,對這些消息進行其他處理,保證消息不會被丢棄。

概念:

  • 消息會變成死信消息的場景:
    1. 消息被

      (basic.reject() or basic.nack()) and requeue = false

      ,即消息被消費者拒絕簽收,并且重新入隊為false。

      1.1 有一種場景需要注意下:消費者設定了自動ACK,當重複投遞次數達到了設定的最大retry次數之後,消息也會投遞到死信隊列,但是内部的原理還是調用了

      nack

      /

      reject

    2. 消息過期,過了TTL存活時間。
    3. 隊列設定了

      x-max-length

      最大消息數量且目前隊列中的消息已經達到了這個數量,再次投遞,消息将被擠掉,被擠掉的是最靠近被消費那一端的消息。
  • 代碼編寫流程是:
    1. 有一個(n個)正常業務的Exchange,比如為

      user-exchange

    2. 有一個(n個)正常業務的Queue,比如為

      user-queue

      。(因為該隊列需要綁定死信交換機,是以需要加倆參數:死信交換機:

      x-dead-letter-exchange

      ,死信消息路由鍵:

      x-dead-letter-routing-key

    3. 進行正常業務的交換機和隊列綁定。
    4. 定義一個死信交換機,比如為

      common-dead-letter-exchange

    5. 将正常業務的隊列綁定到死信交換機(隊列設定了

      x-dead-letter-exchange

      即會自動綁定)。
    6. 定義死信隊列

      user-dead-letter-queue

      用于接收死信消息,綁定死信交換機。
  • 業務流程是:
    1. 正常業務消息被投遞到正常業務的Exchange,該Exchange根據路由鍵将消息路由到綁定的正常隊列。
    2. 正常業務隊列中的消息變成了死信消息之後,會被自動投遞到該隊列綁定的死信交換機上(并帶上配置的路由鍵,如果沒有指定死信消息的路由鍵,則預設繼承該消息在正常業務時設定的路由鍵)。
    3. 死信交換機收到消息後,将消息根據路由規則路由到指定的死信隊列。
    4. 消息到達死信隊列後,可監聽該死信隊列,處理死信消息。
  • 死信交換機

    死信隊列

    也是普通的交換機和隊列,隻不過是我們人為的将某個交換機和隊列來處理死信消息。
  • 流程圖

代碼實作

  1. 配置
spring:
  application:
    name: learn-rabbitmq
  rabbitmq:
    host: localhost
    port: 5672
    username: futao
    password: 123456789
    virtual-host: deadletter-vh
    connection-timeout: 15000
    # 發送确認
    publisher-confirms: true
    # 路由失敗回調
    publisher-returns: true
    template:
      # 必須設定成true 消息路由失敗通知監聽者,而不是将消息丢棄
      mandatory: true
    listener:
      simple:
        # 每次從RabbitMQ擷取的消息數量
        prefetch: 1
        default-requeue-rejected: false
        # 每個隊列啟動的消費者數量
        concurrency: 1
        # 每個隊列最大的消費者數量
        max-concurrency: 1
        # 簽收模式為手動簽收-那麼需要在代碼中手動ACK
        acknowledge-mode: manual

app:
  rabbitmq:
    # 隊列定義
    queue:
      # 正常業務隊列
      user: user-queue
      # 死信隊列
      user-dead-letter: user-dead-letter-queue
    # 交換機定義
    exchange:
      # 正常業務交換機
      user: user-exchange
      # 死信交換機
      common-dead-letter: common-dead-letter-exchange           
  1. 隊列、交換機定義與綁定。
/**
 * 隊列與交換機定義與綁定
 *
 * @author futao
 * @date 2020/4/7.
 */
@Configuration
public class Declare {

        /**
     * 使用者隊列
     *
     * @param userQueueName 使用者隊列名
     * @return
     */
    @Bean
    public Queue userQueue(@Value("${app.rabbitmq.queue.user}") String userQueueName,
                           @Value("${app.rabbitmq.exchange.common-dead-letter}") String commonDeadLetterExchange) {
        return QueueBuilder
                .durable(userQueueName)
                //聲明該隊列的死信消息發送到的 交換機 (隊列添加了這個參數之後會自動與該交換機綁定,并設定路由鍵,不需要開發者手動設定)
                .withArgument("x-dead-letter-exchange", commonDeadLetterExchange)
                //聲明該隊列死信消息在交換機的 路由鍵
                .withArgument("x-dead-letter-routing-key", "user-dead-letter-routing-key")
                .build();
    }

    /**
     * 使用者交換機
     *
     * @param userExchangeName 使用者交換機名
     * @return
     */
    @Bean
    public Exchange userExchange(@Value("${app.rabbitmq.exchange.user}") String userExchangeName) {
        return ExchangeBuilder
                .topicExchange(userExchangeName)
                .durable(true)
                .build();
    }

    /**
     * 使用者隊列與交換機綁定
     *
     * @param userQueue    使用者隊列名
     * @param userExchange 使用者交換機名
     * @return
     */
    @Bean
    public Binding userBinding(Queue userQueue, Exchange userExchange) {
        return BindingBuilder
                .bind(userQueue)
                .to(userExchange)
                .with("user.*")
                .noargs();
    }

    /**
     * 死信交換機
     *
     * @param commonDeadLetterExchange 通用死信交換機名
     * @return
     */
    @Bean
    public Exchange commonDeadLetterExchange(@Value("${app.rabbitmq.exchange.common-dead-letter}") String commonDeadLetterExchange) {
        return ExchangeBuilder
                .topicExchange(commonDeadLetterExchange)
                .durable(true)
                .build();
    }


   /**
     * 使用者隊列的死信消息 路由的隊列
     * 使用者隊列user-queue的死信投遞到死信交換機`common-dead-letter-exchange`後再投遞到該隊列
     * 用這個隊列來接收user-queue的死信消息
     *
     * @return
     */
    @Bean
    public Queue userDeadLetterQueue(@Value("${app.rabbitmq.queue.user-dead-letter}") String userDeadLetterQueue) {
        return QueueBuilder
                .durable(userDeadLetterQueue)
                .build();
    }

    /**
     * 死信隊列綁定死信交換機
     *
     * @param userDeadLetterQueue      user-queue對應的死信隊列
     * @param commonDeadLetterExchange 通用死信交換機
     * @return
     */
    @Bean
    public Binding userDeadLetterBinding(Queue userDeadLetterQueue, Exchange commonDeadLetterExchange) {
        return BindingBuilder
                .bind(userDeadLetterQueue)
                .to(commonDeadLetterExchange)
                .with("user-dead-letter-routing-key")
                .noargs();
    }

}           
  • 定義好之後啟動程式,springboot會讀取Spring容器中類型為Queue和Exchange的bean進行隊列和交換機的初始化與綁定。當然也可以自己在RabbitMQ的管理背景進行手動建立與綁定。
  • 檢視管理背景

測試

  • 消息生産者
/**
 * @author futao
 * @date 2020/4/7.
 */
@Component
public class DeadLetterSender {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Value("${app.rabbitmq.exchange.user}")
    private String userExchange;

    public void send() {
        User user = User.builder()
                .userName("天文")
                .address("浙江杭州")
                .birthday(LocalDate.now(ZoneOffset.ofHours(8)))
                .build();
        rabbitTemplate.convertAndSend(userExchange, "user.abc", user);
    }
}           

1. 場景1.1

消息被(basic.reject() or basic.nack()) and requeue = false,即消息被消費者拒絕或者nack,并且重新入隊為false。

nack()與reject()的差別是:reject()不支援批量拒絕,而nack()可以.

  • 消費者代碼
/**
 * @author futao
 * @date 2020/4/9.
 */
@Slf4j
@Component
public class Consumer {

    /**
     * 正常使用者隊列消息監聽消費者
     *
     * @param user
     * @param message
     * @param channel
     */
    @RabbitListener(queues = "${app.rabbitmq.queue.user}")
    public void userConsumer(User user, Message message, Channel channel) {
        log.info("正常使用者業務監聽:接收到消息:[{}]", JSON.toJSONString(user));
        try {
            //參數為:消息的DeliveryTag,是否批量拒絕,是否重新入隊
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
            log.info("拒絕簽收...消息的路由鍵為:[{}]", message.getMessageProperties().getReceivedRoutingKey());
        } catch (IOException e) {
            log.error("消息拒絕簽收失敗", e);
        }
    }

    /**
     * @param user
     * @param message
     * @param channel
     */
    @RabbitListener(queues = "${app.rabbitmq.queue.user-dead-letter}")
    public void userDeadLetterConsumer(User user, Message message, Channel channel) {
        log.info("接收到死信消息:[{}]", JSON.toJSONString(user));
        try {
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            log.info("死信隊列簽收消息....消息路由鍵為:[{}]", message.getMessageProperties().getReceivedRoutingKey());
        } catch (IOException e) {
            log.error("死信隊列消息簽收失敗", e);
        }
    }
}           
  • 可以看到,正常消息被NACK之後最後到了死信隊列,且路由鍵發生了變化。

1. 場景1.2

消費者設定了自動簽收,當重複投遞次數達到了設定的最大retry次數之後,消息也會投遞到死信隊列,但是内部的原理還是調用了

nack

reject

  • application.yml中需要更改一些配置
spring:
  application:
    name: learn-rabbitmq
  rabbitmq:
    listener:
      simple:
        # 每次從RabbitMQ擷取的消息數量
        prefetch: 1
        default-requeue-rejected: false
        # 每個隊列啟動的消費者數量
        concurrency: 1
        # 每個隊列最大的消費者數量
        max-concurrency: 1
        # 自動簽收
        acknowledge-mode: auto
        retry:
          enabled: true
          # 第一次嘗試時間間隔
          initial-interval: 10S
          # 兩次嘗試之間的最長持續時間。
          max-interval: 10S
          # 最大重試次數(=第一次正常投遞1+重試次數4)
          max-attempts: 5
          # 上一次重試時間的乘數
          multiplier: 1.0           
/**
 * @author futao
 * @date 2020/4/9.
 */
@Slf4j
@Configuration
public class AutoAckConsumer {

    /**
     * 正常使用者隊列消息監聽消費者
     *
     * @param user
     */
    @RabbitListener(queues = "${app.rabbitmq.queue.user}")
    public void userConsumer(User user) {
        log.info("正常使用者業務監聽:接收到消息:[{}]", JSON.toJSONString(user));
        throw new RuntimeException("模拟發生異常");
    }

    /**
     * @param user
     */
    @RabbitListener(queues = "${app.rabbitmq.queue.user-dead-letter}")
    public void userDeadLetterConsumer(User user) {
        log.info("接收到死信消息并自動簽收:[{}]", JSON.toJSONString(user));
    }
}           
  • 測試結果:
  • 從測試結果可以看出,消息如果未被正常消費,則進行重試,如果最終還未被正常消費,則會被投遞到死信隊列。

initial-interval

,

max-interval

這兩個參數啥作用不知道,現在測試的結果是一直都會取最短的那個時間作為下次投遞時間...

2. 測試場景 2

  • 需要修改隊列定義,設定隊列消息的過期時間

    x-message-ttl

    .
/**
     * 使用者隊列
     *
     * @param userQueueName 使用者隊列名
     * @return
     */
    @Bean
    public Queue userQueue(@Value("${app.rabbitmq.queue.user}") String userQueueName,
                           @Value("${app.rabbitmq.exchange.common-dead-letter}") String commonDeadLetterExchange) {
        return QueueBuilder
                .durable(userQueueName)
                //聲明該隊列的死信消息發送到的 交換機 (隊列添加了這個參數之後會自動與該交換機綁定,并設定路由鍵,不需要開發者手動設定)
                .withArgument("x-dead-letter-exchange", commonDeadLetterExchange)
                //聲明該隊列死信消息在交換機的 路由鍵
                .withArgument("x-dead-letter-routing-key", "user-dead-letter-routing-key")
                //該隊列的消息的過期時間-超過這個時間還未被消費則路由到死信隊列
                .withArgument("x-message-ttl", 5000)
                .build();
    }           
  • user-queue

    的消費者注釋,使消息無法被消費,直到消息在隊列中的時間達到設定的存活時間。
  • 根據日志可以看到,消息在5S後會被投遞到死信隊列。
  • 注意:可以給隊列設定消息過期時間,那麼所有投遞到這個隊列的消息都自動具有這個屬性。還可以在消息投遞之前,給每條消息設定指定的過期時間。(當兩者都設定了,則預設取較短的值)

下面測試給每條消息設定指定的過期時間:

  • 修改消息生産者:
/**
 * @author futao
 * @date 2020/4/7.
 */
@Slf4j
@Component
public class DeadLetterSender {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Value("${app.rabbitmq.exchange.user}")
    private String userExchange;

    public void send(String exp) {
        User user = User.builder()
                .userName("天文")
                .address("浙江杭州")
                .birthday(LocalDate.now(ZoneOffset.ofHours(8)))
                .build();
        log.info("消息投遞...指定的存活時長為:[{}]ms", exp);
        rabbitTemplate.convertAndSend(userExchange, "user.abc", user, new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                MessageProperties messageProperties = message.getMessageProperties();
                //為每條消息設定過期時間
                messageProperties.setExpiration(exp);
                return message;
            }
        });
    }
}           
  • 從測試結果可以看出,每條消息都在指定的時間投遞到了死信隊列。

【坑】重點注意!!!:RabbitMQ對于消息過期的檢測:隻會檢測最近将要被消費的那條消息是否到達了過期時間,不會檢測非末端消息是否過期。造成的問題是:非末端消息已經過期了,但是因為末端消息還未過期,非末端消息處于阻塞狀态,是以非末端消息不會被檢測到已經過期。使業務産生與預期嚴重不一緻的結果。

  • 對上面的問題進行測試:(第一條消息的過期時間設定成10S,第二條消息設定成5S)
  • 從測試結果可以看出,id為1的消息存活時長為10S,id為2的消息存活時間為5S。但是隻有當第一條消息(id=1)過期之後,id=2的消息到達隊列末端,才會被檢測到已經過期。

3. 測試場景3

x-max-length

  • 修改隊列定義
/**
     * 使用者隊列
     *
     * @param userQueueName 使用者隊列名
     * @return
     */
    @Bean
    public Queue userQueue(@Value("${app.rabbitmq.queue.user}") String userQueueName,
                           @Value("${app.rabbitmq.exchange.common-dead-letter}") String commonDeadLetterExchange) {
        return QueueBuilder
                .durable(userQueueName)
                //聲明該隊列的死信消息發送到的 交換機 (隊列添加了這個參數之後會自動與該交換機綁定,并設定路由鍵,不需要開發者手動設定)
                .withArgument("x-dead-letter-exchange", commonDeadLetterExchange)
                //聲明該隊列死信消息在交換機的 路由鍵
                .withArgument("x-dead-letter-routing-key", "user-dead-letter-routing-key")
                //隊列最大消息數量
                .withArgument("x-max-length", 2)
                .build();
    }           
  • 向隊列中投遞消息
  • 從結果可以看出,當投遞第3條消息的時候,RabbitMQ會把在最靠經被消費那一端的消息移出隊列,并投遞到死信隊列。

隊列中将始終保持最多兩個消息。

其他:

  • Queue的可配置項可在RabbitMQ的管理背景檢視:

相關:

[SpringBoot RabbitMQ實作消息可靠投遞

](

https://www.jianshu.com/p/432167bbe95f)

TODO:

  • 消費端限流保護
  • 延遲隊列