Rabbitmq 進階特性 - 死信隊列
- 說明:
- 死信隊列,英文縮寫:DLX 。Dead Letter Exchange(死信交換機),當消息成為Dead message後,可以被重新發送到另一個交換機,這個交換機就是DLX。
-
主要是其他mq沒有交換機這個概念,它們就一個死信隊列,但是我們的rabbitmq有交換機的概念,是以我們一般在rabbitmq說的死信隊列說的都是死信交換機.為什麼叫做死信隊列,翻譯過來又是死信交換機?
- 消息成為死信的三種情況(
):面試可能會問
- 隊列消息長度到達限制;
- 消費者拒接消費消息,basicNack/basicReject,并且不把消息重新放入原目标隊列,requeue=false;
- 原隊列存在消息過期設定,消息到達逾時時間未被消費;
- 隊列綁定死信交換機:
- 給隊列設定參數: x-dead-letter-exchange 和 x-dead-letter-routing-key
- x-dead-letter-exchange: 給隊列設定一個死信交換機的名稱
- x-dead-letter-routing-key:給隊列設定一個資訊交換機的
Routing key
- 給隊列設定參數: x-dead-letter-exchange 和 x-dead-letter-routing-key
- 死信隊列步驟(基于RabbitMQ 進階特性 - 消息的可靠性投遞):
- 聲明正常的隊列(
)和交換機(test_queue_dlx
)test_exchange_dlx
<!-- 1. 聲明正常的隊列(`test_queue_dlx`)和交換機(`test_exchange_dlx`) --> <rabbit:queue id="test_queue_dlx" name="test_queue_dlx"/> <rabbit:topic-exchange name="test_exchange_dlx"> <rabbit:bindings> <rabbit:binding pattern="text.dlx.#" queue="test_queue_dlx"/> </rabbit:bindings> </rabbit:topic-exchange>
- 聲明死信隊列(
)和死信交換機(queue_dlx
)exchange_dlx
<!-- 2. 聲明死信隊列(`queue_dlx`)和死信交換機(`exchange_dlx`) --> <rabbit:queue id="queue_dlx" name="queue_dlx"/> <rabbit:topic-exchange name="exchange_dlx"> <rabbit:bindings> <rabbit:binding pattern="dlx.#" queue="queue_dlx"/> </rabbit:bindings> </rabbit:topic-exchange>
- 正常隊列綁定死信交換機(
)在1的queue裡面綁定
- 設定兩個參數:
- x-dead-letter-exchange 死信交換機名稱
- x-dead-letter-routing-key 發給死信交換機的Routing key
<rabbit:queue id="test_queue_dlx" name="test_queue_dlx"> <!-- 3. 正常隊列綁定死信交換機 --> <rabbit:queue-arguments> <!-- 3.1 x-dead-letter-exchange 死信交換機名稱 --> <entry key="x-dead-letter-exchange" value="exchange_dlx"/> <!-- 3,2 x-dead-letter-routing-key 發給死信交換機的Routing key --> <entry key="x-dead-letter-routing-key" value="dlx.xixi"/> </rabbit:queue-arguments> </rabbit:queue> ```
- 設定兩個參數:
- 聲明正常的隊列(
- 消息成為死信的情況(
)在1的queue裡面綁定
- 隊列的過期時間
<!-- 4.1 設定隊列的過期時間 ttl--> <entry key="x-message-ttl" value="10000" value-type="java.lang.Integer"/>
- 隊列的長度限制
<!-- 4.2 設定隊列的長度限制 max-length --> <entry key="x-max-length" value="10" value-type="java.lang.Integer"/>
- 消息拒收(和消費者有關)
- 隊列的過期時間
- 測試
- 隊列過期時間結果
@Test public void testDlx() { //1. 測試過期時間,死信消息 rabbitTemplate.convertAndSend("test_exchange_dlx","test.dlx.haha","測試消息 我會死嗎? "); }
- 執行之後
- 10秒之後
- 隊列長度限制
@Test public void testDlx() { //2. 測試長度限制後,消息死信 for (int i = 0; i < 20; i++) { rabbitTemplate.convertAndSend("test_exchange_dlx","test.dlx.haha","測試消息 我會死嗎? "); } }
- 推測由于長度的關系,
會保留10條test_queue_dlx
- 另外的10條會到queue_dlx裡面(
)加上剛剛1條就變成11條
- 然後由于10秒的過期時間
會變成0,test_queue_dlx
會變成21queue_dlx
- 推測由于長度的關系,
- 消息過期(基于RabbitMQ 進階特性 Consumer Ack)
- 編寫監聽類
@Component public class DlxListener implements ChannelAwareMessageListener { @Override public void onMessage(Message message, Channel channel) throws Exception { long deliveryTag = message.getMessageProperties().getDeliveryTag(); try { //1. 接受消息 System.out.println(new String(message.getBody())); //2. 處理業務邏輯 列印一句話代表 System.out.println("處理業務邏輯"); int i = 3/0; //出現錯誤 //3. 手動簽收 channel.basicAck(deliveryTag,true); } catch (Exception e) { System.out.println("出現異常, 拒絕接收"); //4. 拒絕簽收,不重回隊列 requeue = false channel.basicNack(deliveryTag,true,false); } } }
- 配置監聽(
)spring-rabbitmq-consumer.xml
<!-- 定義監聽器--> <rabbitmq:listener-container connection-factory="connectionFactory" acknowledge="manual"> <!-- 監聽正常的隊列--> <rabbit:listener ref="dlxListener" queue-names="test_queue_dlx"/> </rabbitmq:listener-container>
- 運作
@RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(locations = "classpath:spring-rabbitmq-consumer.xml") public class ConsumerTest { @Test public void test() { while(true) { } } }
- 發送消息
@Test public void testDlx() { //3. 測試消息拒收 rabbitTemplate.convertAndSend("test_exchange_dlx","test.dlx.haha","測試消息 我會死嗎? "); }
- 監聽結果(
)亂碼需要監聽的哪裡修改編碼格式
- 死信隊列(
)21+1
- 隊列過期時間結果
- 修複亂碼
- 死信隊列小結
- 死信交換機和死信隊列和普通的沒有差別
- 當消息成為死信後,如果該隊列綁定了死信交換機,則消息會被死信交換機重新路由到死信隊列
- 消息成為死信的三種情況:
- 隊列消息長度到達限制;
- 消費者拒接消費消息,并且不重回隊列;
- 原隊列存在消息過期設定,消息到達逾時時間未被消費;