天天看点

Rabbitmq 高级特性 - 死信队列

Rabbitmq 高级特性 - 死信队列

  • 说明:
    • 死信队列,英文缩写:DLX 。Dead Letter Exchange(死信交换机),当消息成为Dead message后,可以被重新发送到另一个交换机,这个交换机就是DLX。
    • 为什么叫做死信队列,翻译过来又是死信交换机?

      主要是其他mq没有交换机这个概念,它们就一个死信队列,但是我们的rabbitmq有交换机的概念,所以我们一般在rabbitmq说的死信队列说的都是死信交换机.
      Rabbitmq 高级特性 - 死信队列
  • 消息成为死信的三种情况(

    面试可能会问

    ):
    1. 队列消息长度到达限制;
    2. 消费者拒接消费消息,basicNack/basicReject,并且不把消息重新放入原目标队列,requeue=false;
    3. 原队列存在消息过期设置,消息到达超时时间未被消费;
  • 队列绑定死信交换机:
    • 给队列设置参数: x-dead-letter-exchange 和 x-dead-letter-routing-key
      • x-dead-letter-exchange: 给队列设置一个死信交换机的名称
      • x-dead-letter-routing-key:给队列设置一个信息交换机的

        Routing key

        Rabbitmq 高级特性 - 死信队列
  • 死信队列步骤(基于RabbitMQ 高级特性 - 消息的可靠性投递):
    1. 声明正常的队列(

      test_queue_dlx

      )和交换机(

      test_exchange_dlx

      )
      <!--
              1. 声明正常的队列(`test_queue_dlx`)和交换机(`test_exchange_dlx`)
          -->
          <rabbit:queue id="test_queue_dlx" name="test_queue_dlx"/>
          <rabbit:topic-exchange name="test_exchange_dlx">
              <rabbit:bindings>
                  <rabbit:binding pattern="text.dlx.#" queue="test_queue_dlx"/>
              </rabbit:bindings>
          </rabbit:topic-exchange>
                 
    2. 声明死信队列(

      queue_dlx

      )和死信交换机(

      exchange_dlx

      )
      <!--
              2. 声明死信队列(`queue_dlx`)和死信交换机(`exchange_dlx`)
          -->
          <rabbit:queue id="queue_dlx" name="queue_dlx"/>
          <rabbit:topic-exchange name="exchange_dlx">
              <rabbit:bindings>
                  <rabbit:binding pattern="dlx.#" queue="queue_dlx"/>
              </rabbit:bindings>
          </rabbit:topic-exchange>
                 
    3. 正常队列绑定死信交换机(

      在1的queue里面绑定

      )
      • 设置两个参数:
        1. x-dead-letter-exchange 死信交换机名称
        2. x-dead-letter-routing-key 发给死信交换机的Routing key
      <rabbit:queue id="test_queue_dlx" name="test_queue_dlx">
      	        <!-- 3. 正常队列绑定死信交换机 -->
      	        <rabbit:queue-arguments>
      	            <!-- 3.1 x-dead-letter-exchange 死信交换机名称 -->
      	            <entry key="x-dead-letter-exchange" value="exchange_dlx"/>
      	            <!-- 3,2 x-dead-letter-routing-key 发给死信交换机的Routing key -->
      	            <entry key="x-dead-letter-routing-key" value="dlx.xixi"/>
      	        </rabbit:queue-arguments>
      	    </rabbit:queue>
      	```
                 
  1. 消息成为死信的情况(

    在1的queue里面绑定

    )
    1. 队列的过期时间
      <!-- 4.1 设置队列的过期时间 ttl-->
        <entry key="x-message-ttl" value="10000" value-type="java.lang.Integer"/>
                 
    2. 队列的长度限制
      <!-- 4.2 设置队列的长度限制 max-length -->
        <entry key="x-max-length" value="10" value-type="java.lang.Integer"/>
                 
    3. 消息拒收(和消费者有关)
  2. 测试
    1. 队列过期时间结果
      @Test
          public void testDlx() {
              //1. 测试过期时间,死信消息
              rabbitTemplate.convertAndSend("test_exchange_dlx","test.dlx.haha","测试消息 我会死吗? ");
          }
                 
      • 执行之后
        Rabbitmq 高级特性 - 死信队列
      • 10秒之后
        Rabbitmq 高级特性 - 死信队列
        Rabbitmq 高级特性 - 死信队列
        Rabbitmq 高级特性 - 死信队列
    2. 队列长度限制
      @Test
          public void testDlx() {
              //2. 测试长度限制后,消息死信
              for (int i = 0; i < 20; i++) {
                  rabbitTemplate.convertAndSend("test_exchange_dlx","test.dlx.haha","测试消息 我会死吗? ");
              }
          }
                 
      1. 推测由于长度的关系,

        test_queue_dlx

        会保留10条
        Rabbitmq 高级特性 - 死信队列
      2. 另外的10条会到queue_dlx里面(

        加上刚刚1条就变成11条

        )
        Rabbitmq 高级特性 - 死信队列
      3. 然后由于10秒的过期时间

        test_queue_dlx

        会变成0,

        queue_dlx

        会变成21
        Rabbitmq 高级特性 - 死信队列
    3. 消息过期(基于RabbitMQ 高级特性 Consumer Ack)
      1. 编写监听类
      @Component
      public class DlxListener implements ChannelAwareMessageListener {
      
          @Override
          public void onMessage(Message message, Channel channel) throws Exception {
              long deliveryTag = message.getMessageProperties().getDeliveryTag();
              try {
                  //1. 接受消息
                  System.out.println(new String(message.getBody()));
                  //2. 处理业务逻辑 打印一句话代表
                  System.out.println("处理业务逻辑");
                  int i = 3/0; //出现错误
                  //3. 手动签收
                  channel.basicAck(deliveryTag,true);
              } catch (Exception e) {
                  System.out.println("出现异常, 拒绝接收");
                  //4. 拒绝签收,不重回队列 requeue = false
                  channel.basicNack(deliveryTag,true,false);
              }
          }
      }
                 
      1. 配置监听(

        spring-rabbitmq-consumer.xml

        )
        <!-- 定义监听器-->
            <rabbitmq:listener-container connection-factory="connectionFactory" acknowledge="manual">
                <!-- 监听正常的队列-->
                <rabbit:listener ref="dlxListener" queue-names="test_queue_dlx"/>
            </rabbitmq:listener-container>
                   
      2. 运行
      @RunWith(SpringJUnit4ClassRunner.class)
      @ContextConfiguration(locations = "classpath:spring-rabbitmq-consumer.xml")
      public class ConsumerTest {
      
          @Test
          public void test() {
              while(true) {
      
              }
          }
      
      }
                 
    4. 发送消息
      @Test
          public void testDlx() {
              //3. 测试消息拒收
      		rabbitTemplate.convertAndSend("test_exchange_dlx","test.dlx.haha","测试消息 我会死吗? ");
          }
                 
    5. 监听结果(

      乱码需要监听的哪里修改编码格式

      )
      Rabbitmq 高级特性 - 死信队列
    6. 死信队列(

      21+1

      )
      Rabbitmq 高级特性 - 死信队列
  • 修复乱码
  • 死信队列小结
    1. 死信交换机和死信队列和普通的没有区别
    2. 当消息成为死信后,如果该队列绑定了死信交换机,则消息会被死信交换机重新路由到死信队列
    3. 消息成为死信的三种情况:
      1. 队列消息长度到达限制;
      2. 消费者拒接消费消息,并且不重回队列;
      3. 原队列存在消息过期设置,消息到达超时时间未被消费;