天天看點

Rabbitmq 進階特性 - 死信隊列

Rabbitmq 進階特性 - 死信隊列

  • 說明:
    • 死信隊列,英文縮寫:DLX 。Dead Letter Exchange(死信交換機),當消息成為Dead message後,可以被重新發送到另一個交換機,這個交換機就是DLX。
    • 為什麼叫做死信隊列,翻譯過來又是死信交換機?

      主要是其他mq沒有交換機這個概念,它們就一個死信隊列,但是我們的rabbitmq有交換機的概念,是以我們一般在rabbitmq說的死信隊列說的都是死信交換機.
      Rabbitmq 進階特性 - 死信隊列
  • 消息成為死信的三種情況(

    面試可能會問

    ):
    1. 隊列消息長度到達限制;
    2. 消費者拒接消費消息,basicNack/basicReject,并且不把消息重新放入原目标隊列,requeue=false;
    3. 原隊列存在消息過期設定,消息到達逾時時間未被消費;
  • 隊列綁定死信交換機:
    • 給隊列設定參數: x-dead-letter-exchange 和 x-dead-letter-routing-key
      • x-dead-letter-exchange: 給隊列設定一個死信交換機的名稱
      • x-dead-letter-routing-key:給隊列設定一個資訊交換機的

        Routing key

        Rabbitmq 進階特性 - 死信隊列
  • 死信隊列步驟(基于RabbitMQ 進階特性 - 消息的可靠性投遞):
    1. 聲明正常的隊列(

      test_queue_dlx

      )和交換機(

      test_exchange_dlx

      )
      <!--
              1. 聲明正常的隊列(`test_queue_dlx`)和交換機(`test_exchange_dlx`)
          -->
          <rabbit:queue id="test_queue_dlx" name="test_queue_dlx"/>
          <rabbit:topic-exchange name="test_exchange_dlx">
              <rabbit:bindings>
                  <rabbit:binding pattern="text.dlx.#" queue="test_queue_dlx"/>
              </rabbit:bindings>
          </rabbit:topic-exchange>
                 
    2. 聲明死信隊列(

      queue_dlx

      )和死信交換機(

      exchange_dlx

      )
      <!--
              2. 聲明死信隊列(`queue_dlx`)和死信交換機(`exchange_dlx`)
          -->
          <rabbit:queue id="queue_dlx" name="queue_dlx"/>
          <rabbit:topic-exchange name="exchange_dlx">
              <rabbit:bindings>
                  <rabbit:binding pattern="dlx.#" queue="queue_dlx"/>
              </rabbit:bindings>
          </rabbit:topic-exchange>
                 
    3. 正常隊列綁定死信交換機(

      在1的queue裡面綁定

      )
      • 設定兩個參數:
        1. x-dead-letter-exchange 死信交換機名稱
        2. x-dead-letter-routing-key 發給死信交換機的Routing key
      <rabbit:queue id="test_queue_dlx" name="test_queue_dlx">
      	        <!-- 3. 正常隊列綁定死信交換機 -->
      	        <rabbit:queue-arguments>
      	            <!-- 3.1 x-dead-letter-exchange 死信交換機名稱 -->
      	            <entry key="x-dead-letter-exchange" value="exchange_dlx"/>
      	            <!-- 3,2 x-dead-letter-routing-key 發給死信交換機的Routing key -->
      	            <entry key="x-dead-letter-routing-key" value="dlx.xixi"/>
      	        </rabbit:queue-arguments>
      	    </rabbit:queue>
      	```
                 
  1. 消息成為死信的情況(

    在1的queue裡面綁定

    )
    1. 隊列的過期時間
      <!-- 4.1 設定隊列的過期時間 ttl-->
        <entry key="x-message-ttl" value="10000" value-type="java.lang.Integer"/>
                 
    2. 隊列的長度限制
      <!-- 4.2 設定隊列的長度限制 max-length -->
        <entry key="x-max-length" value="10" value-type="java.lang.Integer"/>
                 
    3. 消息拒收(和消費者有關)
  2. 測試
    1. 隊列過期時間結果
      @Test
          public void testDlx() {
              //1. 測試過期時間,死信消息
              rabbitTemplate.convertAndSend("test_exchange_dlx","test.dlx.haha","測試消息 我會死嗎? ");
          }
                 
      • 執行之後
        Rabbitmq 進階特性 - 死信隊列
      • 10秒之後
        Rabbitmq 進階特性 - 死信隊列
        Rabbitmq 進階特性 - 死信隊列
        Rabbitmq 進階特性 - 死信隊列
    2. 隊列長度限制
      @Test
          public void testDlx() {
              //2. 測試長度限制後,消息死信
              for (int i = 0; i < 20; i++) {
                  rabbitTemplate.convertAndSend("test_exchange_dlx","test.dlx.haha","測試消息 我會死嗎? ");
              }
          }
                 
      1. 推測由于長度的關系,

        test_queue_dlx

        會保留10條
        Rabbitmq 進階特性 - 死信隊列
      2. 另外的10條會到queue_dlx裡面(

        加上剛剛1條就變成11條

        )
        Rabbitmq 進階特性 - 死信隊列
      3. 然後由于10秒的過期時間

        test_queue_dlx

        會變成0,

        queue_dlx

        會變成21
        Rabbitmq 進階特性 - 死信隊列
    3. 消息過期(基于RabbitMQ 進階特性 Consumer Ack)
      1. 編寫監聽類
      @Component
      public class DlxListener implements ChannelAwareMessageListener {
      
          @Override
          public void onMessage(Message message, Channel channel) throws Exception {
              long deliveryTag = message.getMessageProperties().getDeliveryTag();
              try {
                  //1. 接受消息
                  System.out.println(new String(message.getBody()));
                  //2. 處理業務邏輯 列印一句話代表
                  System.out.println("處理業務邏輯");
                  int i = 3/0; //出現錯誤
                  //3. 手動簽收
                  channel.basicAck(deliveryTag,true);
              } catch (Exception e) {
                  System.out.println("出現異常, 拒絕接收");
                  //4. 拒絕簽收,不重回隊列 requeue = false
                  channel.basicNack(deliveryTag,true,false);
              }
          }
      }
                 
      1. 配置監聽(

        spring-rabbitmq-consumer.xml

        )
        <!-- 定義監聽器-->
            <rabbitmq:listener-container connection-factory="connectionFactory" acknowledge="manual">
                <!-- 監聽正常的隊列-->
                <rabbit:listener ref="dlxListener" queue-names="test_queue_dlx"/>
            </rabbitmq:listener-container>
                   
      2. 運作
      @RunWith(SpringJUnit4ClassRunner.class)
      @ContextConfiguration(locations = "classpath:spring-rabbitmq-consumer.xml")
      public class ConsumerTest {
      
          @Test
          public void test() {
              while(true) {
      
              }
          }
      
      }
                 
    4. 發送消息
      @Test
          public void testDlx() {
              //3. 測試消息拒收
      		rabbitTemplate.convertAndSend("test_exchange_dlx","test.dlx.haha","測試消息 我會死嗎? ");
          }
                 
    5. 監聽結果(

      亂碼需要監聽的哪裡修改編碼格式

      )
      Rabbitmq 進階特性 - 死信隊列
    6. 死信隊列(

      21+1

      )
      Rabbitmq 進階特性 - 死信隊列
  • 修複亂碼
  • 死信隊列小結
    1. 死信交換機和死信隊列和普通的沒有差別
    2. 當消息成為死信後,如果該隊列綁定了死信交換機,則消息會被死信交換機重新路由到死信隊列
    3. 消息成為死信的三種情況:
      1. 隊列消息長度到達限制;
      2. 消費者拒接消費消息,并且不重回隊列;
      3. 原隊列存在消息過期設定,消息到達逾時時間未被消費;