什麼是死信隊列(DLX)
死信隊列又被稱為延遲隊列、延時隊列,也是RabbitMq隊列中的一種,利用DLX,當消息在一個隊列中變成死信(dead message)之後,它能被重新publish到另一個Exchange, 這個Exchange就是DLX。
死信隊列能在任何的隊列上被指定,實際上就是設定某個隊列的屬性,辨別這個隊列是否需要死信隊列的存在。當這個隊列中有死信時,RabbitMQ就會 自動的将這個消息重新釋出到設定的Exchange.上去,進而被路由到另一個隊列。
我們來模拟一個場景,比如12306搶票,當使用者搶到火車票時,12306官方會提示使用者“請在30分鐘内付款”,由于種種原因,使用者遲遲沒有付款,過了30分鐘後仍然沒有支付,系統自動取消該筆訂單。
對于這種業務,在我們的實際生活中是非常常見的,在以前傳統的處理方式可以是采用一個定時器,定時去擷取沒有付款的訂單,并判斷使用者的下單時間距離目前的時間是否已經超過30分鐘,如果是,表示使用者在30分鐘内内有付款,系統将自動失效該筆訂單并回收該車票。但是這種方式存在非常大的隐患,我們知道,春運搶票是一個大資料量、高并發請求的場景,再使用者搶到票到支付這段時間内,如果定時器頻繁的從資料庫中擷取“未付款”狀态的訂單,其資料量之大難以想象,并且大批使用者在30分鐘内遲遲不付款,從資料庫中擷取的資料量将一直在增長,當到達一定程度時,将給資料庫伺服器和應用伺服器帶來巨大的壓力。
而消息中間件rabbitmq的引入,将大大改善上面的情況,其流程如下圖:
死信隊列代替了原來定時器的邏輯,私信隊列/延遲隊列可以實作特定的消息或業務資料等待一定的時間TTL後,再被消費者監聽消費處理。
優勢
1、占用系統資源少
不需要在倫旭資料庫擷取資料,減少DB層面資源的消耗
2、人為幹預少
隻需要搭建好死信隊列的消息模型就可以不需要在去幹預了
3、自動消費處理
當制定的延遲時間一到,消息就自動被路由到實際的隊列進行處理
結構介紹
與普通的隊列相比,死信隊列同樣具有消息、交換機、路由和隊列等轉悠級名次,隻不過在死信隊列裡增加了另外三個成員:
DLX(Dead Letter Exchange):死信交換機,就是交換機的一種類型,知識屬于特殊的類型。
DLK(Dead Letter Routing-key):死信路由,同樣也是一種特殊的路由,主要是跟DLX組合在一起組成死信隊列。
TTL(Time To Live):指進入死信隊列中的消息可以存活的時間。
其中DLX跟DLK是必須的成員,而TTL則是可選、非必須的。
流程:
消息到達第一個中轉站,即死信隊列,由基本交換機和基本路由綁定,并對應到指定的死信隊列,因而消息将進入第一個暫存區,即死信隊列中,而死信隊列不同于一般的普通隊列,它由三大部分組成,當消息進入死信隊列時,TTL便開始進入倒計時,當存活時間一到,消息将進入第二個中轉站,即真正的消息模型中的死信交換機。由于死信交換機和死信路由綁定,并對應到指定的真正的 隊列,因而此時消息将不做停留,而是直接被路由到第二個暫存 區,即真正的隊列中,最終該消息被真正的隊列對應的消費者監聽。至此,消息才完成滿城的旅行。
死信結構的出現
1、消息被拒絕并且不再重新投遞,即requeue參數的取值為false
即雖然消息消費失敗,但是又不想讓消息重新回到隊列中,利用死信隊列可以對消息進行一個處理。
2、消息超過了指定的存活時間TTL
3、隊列達到最大長度了
執行個體
死信隊列的應用,首先我們需要設定死信隊列的exchange和queue,然後進行綁定。然後在我們正常聲明普通的交換機、隊列、綁定後,在隊列上加上一個參數即可:
這樣,當消息過期、requeue、隊列在達到最大長度是,消息就可以直接路由到死信隊列。
生産者
public class Producer {
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.237.139");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
String exchange = "test_dlx_exchange";
String routingKey = "dlx.save";
String msg = "Hello RabbitMQ DLX Message";
for(int i =0; i<1; i ++){
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.deliveryMode(2)
.contentEncoding("UTF-8")
.expiration("10000") //設定消息過期時間
.build();
channel.basicPublish(exchange, routingKey, true, properties, msg.getBytes());
}
}
}
消費者
public class Consumer {
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.237.139");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
//要進行死信隊列中交換機、隊列的聲明和綁定:
channel.exchangeDeclare("dlx.exchange", "topic", true, false, null);
channel.queueDeclare("dlx.queue", true, false, false, null);
channel.queueBind("dlx.queue", "dlx.exchange", "#");
// 這就是一個普通的交換機 和 隊列 以及路由名
String exchangeName = "test_dlx_exchange";
String routingKey = "dlx.#";
String queueName = "test_dlx_queue";
// 普通交換機聲明
channel.exchangeDeclare(exchangeName, "topic", true, false, null);
Map<String, Object> agruments = new HashMap<String, Object>();
agruments.put("x-dead-letter-exchange", "dlx.exchange");
//普通隊列聲明,但是需要綁定agruments屬性
channel.queueDeclare(queueName, true, false, false, agruments);
// 普通交換機與隊列的綁定
channel.queueBind(queueName, exchangeName, routingKey);
channel.basicConsume(queueName, true, new MyConsumer(channel));
}
}
自定義消費者
public class MyConsumer extends DefaultConsumer {
public MyConsumer(Channel channel) {
super(channel);
}
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.err.println("-----------consume message----------");
System.err.println("consumerTag: " + consumerTag);
System.err.println("envelope: " + envelope);
System.err.println("properties: " + properties);
System.err.println("body: " + new String(body));
}
}
我們先啟動消費者程序,保證rabbitmq伺服器中存在普通隊列和死信隊列:
關閉消費者隊列,啟動生産者隊列,我們将看到伺服器中的普通隊列上存在一條資訊
由于我們将普通隊列中的消息設定為了10秒過期,是以10秒之後,我們發現普通隊列上已經沒有了消息,而死信隊列上多出了一條消息。證明死信隊列中消息擷取成功。