文章目錄
- 死信隊列
- 延遲隊列
- 優先級隊列
死信隊列
- 死信隊列(
)用來儲存處理失敗或者過期的消息。當一個消息變為死信消息(DLQ(Dead-Letter-Queue)
)之後,消息會被投遞到死信交換器(Dead Message
)中,DLX(Dead-Letter-Exchange)
會将消息投遞到死信隊列(DLX
)中DLQ
- 消息變為死信消息的情況
- 消息沒有被正确消費————被取消确認(
或者Basic.Nack
),并且設定Basic.Reject
參數為false;requeue
- 消息TTL過期
- 隊列達到最大長度
- 消息沒有被正确消費————被取消确認(
- 死信交換器(
)就是普通的DLX(Dead-Letter-Exchange)
.RabbitMQ 不會去驗證死信交換器(Exchange
)設定是否有效,當死信消息找不到指定的交換器時,死信消息會被RabbitMQ安靜的丢棄,而不是抛出異常。DLX(Dead-Letter-Exchange)
- 當在某一個隊列中設定
屬性時,如果這個隊列中有消息變為死信消息,則x-dead-letter-exchange
會自動的将這個消息重新釋出到設定的Broker
上(Exchange
對應的那個交換器)。x-dead-letter-exchange
- 代碼示例
/** * 死信隊列 */ @Test public void testDLX() { ConnectionFactory factory = new ConnectionFactory(); String userName = "jannal"; String password = "jannal"; String virtualHost = "jannal-vhost"; String dlxQueueName = "jannal.direct.queue.dlx"; String queueNameNormal = "jannal.direct.queue.normal"; String dlxExchange = "jannal.direct.exchange.dlx"; String exchangeNormal = "jannal.direct.exchange.normal"; String routingKey = "DeadMessage"; String bindingKey = "DeadMessage"; String hostName = "jannal.mac.com"; int portNumber = 5672; factory.setUsername(userName); factory.setPassword(password); factory.setVirtualHost(virtualHost); factory.setHost(hostName); factory.setPort(portNumber); factory.setAutomaticRecoveryEnabled(false); Connection conn = null; try { conn = factory.newConnection(); Channel channel = conn.createChannel(); boolean durable = true; boolean exclusive = false; boolean autoDelete = false; //普通隊列添加DLX HashMap<String, Object> queueArgs = new HashMap<>(); queueArgs.put("x-dead-letter-exchange", dlxExchange); //如果不為DLX執行routingKey,則使用原來隊列的 //args.put("x-dead-letter-routing-key","dlx"); queueArgs.put("x-message-ttl", 10000); // 設定隊列中消息存活時間為10秒 queueArgs.put("x-max-length", 5); // 設定隊列最大消息數量為5 channel.queueDeclare(queueNameNormal, durable, exclusive, autoDelete, queueArgs); channel.exchangeDeclare(exchangeNormal, "direct", true); channel.queueBind(queueNameNormal, exchangeNormal, bindingKey); //死信 channel.queueDeclare(dlxQueueName, durable, exclusive, autoDelete,null); channel.exchangeDeclare(dlxExchange, "direct", true); channel.queueBind(dlxQueueName, dlxExchange, bindingKey); boolean mandatory = false; boolean immediate = false; String msg = "Hello, world "; for (int i = 0; i < 12; i++) { channel.basicPublish(exchangeNormal, routingKey, mandatory, immediate, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes("UTF-8")); } } catch (IOException e) { logger.error(e.getMessage(), e); } catch (TimeoutException e) { logger.error(e.getMessage(), e); } finally { if (conn != null) { try { conn.close(); } catch (IOException e) { e.printStackTrace(); } } } }
- 檢視web控制台,
表示Lim
,x-max-length=5
表示D
,durable=true
表示TTL
,x-message-ttl=10000
表示DLX
。 因為隊列最大長度為5,是以發送12條消息其中5條是進入x-dead-letter-exchange=jannal.direct.exchange.dlx
,其中7條被放入到jannal.direct.queue.normal
死信隊列中。當普通隊列中的5條消息10s後過期時,此時12條消息都被投遞到死信隊列中jannal.direct.queue.dlx
延遲隊列
- 延遲隊列存儲的是延遲的消息,即
隻有在等待特定的時間後才能去消費消息或者看到消息。consumer(消費者)
- 延遲消息應用場景
- 一個A任務執行之後,等待特定的時間T,B任務才能去執行。比如使用者買火車票,必須在30分鐘以内支付,否則席位會被取消。此時【取消席位】這個任務就可以使用延遲隊列來處理。再比如滴滴打車之後,使用者一直不評價,24小時之後自動評價
-
設定隊列中消息的存活時間,當消息過期變為死信隊列之後,此時x-message-ttl
就可以從死信隊列中消費消息,這樣就達到了延遲特定時間的目的了。Consumer(消息者)
- 為不同的延遲時間設定不同的隊列,代碼示例
/** * 延遲隊列 */ @Test public void testDelayQueue() { ConnectionFactory factory = new ConnectionFactory(); String userName = "jannal"; String password = "jannal"; String virtualHost = "jannal-vhost"; String dlxQueueName_5 = "jannal.direct.queue.dlx.5"; String dlxQueueName_10 = "jannal.direct.queue.dlx.10"; String queueNameNormal_5 = "jannal.direct.queue.normal_5"; String queueNameNormal_10 = "jannal.direct.queue.normal_10"; String dlxExchange_5 = "jannal.direct.exchange.dlx_5"; String dlxExchange_10 = "jannal.direct.exchange.dlx_10"; String exchangeNormal = "jannal.direct.exchange.normal"; String hostName = "jannal.mac.com"; int portNumber = 5672; factory.setUsername(userName); factory.setPassword(password); factory.setVirtualHost(virtualHost); factory.setHost(hostName); factory.setPort(portNumber); factory.setAutomaticRecoveryEnabled(false); Connection conn = null; try { conn = factory.newConnection(); Channel channel = conn.createChannel(); boolean durable = true; boolean exclusive = false; boolean autoDelete = false; HashMap<String, Object> queueArgs = new HashMap<>(); queueArgs.put("x-dead-letter-exchange", dlxExchange_5); //設定5s queueArgs.put("x-message-ttl", 5); channel.queueDeclare(queueNameNormal_5, durable, exclusive, autoDelete, queueArgs); channel.exchangeDeclare(exchangeNormal, "direct", true); channel.queueBind(queueNameNormal_5, exchangeNormal, "5s"); //設定10s queueArgs.put("x-dead-letter-exchange", dlxExchange_10); queueArgs.put("x-message-ttl", 10); channel.queueDeclare(queueNameNormal_10, durable, exclusive, autoDelete, queueArgs); channel.exchangeDeclare(exchangeNormal, "direct", true); channel.queueBind(queueNameNormal_10, exchangeNormal, "10s"); //死信 channel.queueDeclare(dlxQueueName_5, durable, exclusive, autoDelete,null); channel.exchangeDeclare(dlxExchange_5, "direct", true); channel.queueBind(dlxQueueName_5, dlxExchange_5, "5s"); channel.queueDeclare(dlxQueueName_10, durable, exclusive, autoDelete,null); channel.exchangeDeclare(dlxExchange_10, "direct", true); channel.queueBind(dlxQueueName_10, dlxExchange_10, "10s"); boolean mandatory = false; boolean immediate = false; String msg = "Hello, world "; //過期時間5s的12條 for (int i = 0; i < 12; i++) { channel.basicPublish(exchangeNormal, "5s", mandatory, immediate, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes("UTF-8")); } //過期時間10s的10條 for (int i = 0; i < 10; i++) { channel.basicPublish(exchangeNormal, "10s", mandatory, immediate, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes("UTF-8")); } } catch (IOException e) { logger.error(e.getMessage(), e); } catch (TimeoutException e) { logger.error(e.getMessage(), e); } finally { if (conn != null) { try { conn.close(); } catch (IOException e) { e.printStackTrace(); } } } }
- web控制台檢視
優先級隊列
- 對于消息在
中堆積的情況下,優先級高的消息具有優先被消費的特權。Broker
- 代碼示例
/** * 優先級隊列 */ @Test public void testPriority(){ ConnectionFactory factory = new ConnectionFactory(); String userName = "jannal"; String password = "jannal"; String virtualHost = "jannal-vhost"; String queueName = "jannal.direct.queue.priority"; String exchange = "jannal.direct.exchange.priority"; String routingKey = "priority"; String bindingKey = "priority"; String hostName = "jannal.mac.com"; int portNumber = 5672; factory.setUsername(userName); factory.setPassword(password); factory.setVirtualHost(virtualHost); factory.setHost(hostName); factory.setPort(portNumber); factory.setAutomaticRecoveryEnabled(false); Connection conn = null; try { conn = factory.newConnection(); Channel channel = conn.createChannel(); boolean durable = true; boolean exclusive = false; boolean autoDelete = false; HashMap<String, Object> args = new HashMap<>(); args.put("x-max-priority", 10);//設定一個隊列的最大優先級 channel.queueDeclare(queueName, durable, exclusive, autoDelete, args); channel.exchangeDeclare(exchange, "direct", true); channel.queueBind(queueName, exchange, bindingKey); boolean mandatory = false; boolean immediate = false; String msg = "Hello, world "; //消息的優先級預設是0,最高為隊列設定的優先級,優先級高的消息會被先消費 AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().priority(3).build(); channel.basicPublish(exchange, routingKey, mandatory, immediate, properties, msg.getBytes("UTF-8")); } catch (IOException e) { logger.error(e.getMessage(), e); } catch (TimeoutException e) { logger.error(e.getMessage(), e); } finally { if (conn != null) { try { conn.close(); } catch (IOException e) { e.printStackTrace(); } } } }
- web控制台