天天看點

RabbitMq死信隊列/延遲隊列什麼是死信隊列(DLX)優勢結構介紹死信結構的出現執行個體

什麼是死信隊列(DLX)

死信隊列又被稱為延遲隊列、延時隊列,也是RabbitMq隊列中的一種,利用DLX,當消息在一個隊列中變成死信(dead message)之後,它能被重新publish到另一個Exchange, 這個Exchange就是DLX。

死信隊列能在任何的隊列上被指定,實際上就是設定某個隊列的屬性,辨別這個隊列是否需要死信隊列的存在。當這個隊列中有死信時,RabbitMQ就會 自動的将這個消息重新釋出到設定的Exchange.上去,進而被路由到另一個隊列。

我們來模拟一個場景,比如12306搶票,當使用者搶到火車票時,12306官方會提示使用者“請在30分鐘内付款”,由于種種原因,使用者遲遲沒有付款,過了30分鐘後仍然沒有支付,系統自動取消該筆訂單。

對于這種業務,在我們的實際生活中是非常常見的,在以前傳統的處理方式可以是采用一個定時器,定時去擷取沒有付款的訂單,并判斷使用者的下單時間距離目前的時間是否已經超過30分鐘,如果是,表示使用者在30分鐘内内有付款,系統将自動失效該筆訂單并回收該車票。但是這種方式存在非常大的隐患,我們知道,春運搶票是一個大資料量、高并發請求的場景,再使用者搶到票到支付這段時間内,如果定時器頻繁的從資料庫中擷取“未付款”狀态的訂單,其資料量之大難以想象,并且大批使用者在30分鐘内遲遲不付款,從資料庫中擷取的資料量将一直在增長,當到達一定程度時,将給資料庫伺服器和應用伺服器帶來巨大的壓力。

而消息中間件rabbitmq的引入,将大大改善上面的情況,其流程如下圖:

RabbitMq死信隊列/延遲隊列什麼是死信隊列(DLX)優勢結構介紹死信結構的出現執行個體

死信隊列代替了原來定時器的邏輯,私信隊列/延遲隊列可以實作特定的消息或業務資料等待一定的時間TTL後,再被消費者監聽消費處理。

優勢

1、占用系統資源少

不需要在倫旭資料庫擷取資料,減少DB層面資源的消耗

2、人為幹預少

隻需要搭建好死信隊列的消息模型就可以不需要在去幹預了

3、自動消費處理

當制定的延遲時間一到,消息就自動被路由到實際的隊列進行處理

結構介紹

與普通的隊列相比,死信隊列同樣具有消息、交換機、路由和隊列等轉悠級名次,隻不過在死信隊列裡增加了另外三個成員:

DLX(Dead Letter Exchange):死信交換機,就是交換機的一種類型,知識屬于特殊的類型。

DLK(Dead Letter Routing-key):死信路由,同樣也是一種特殊的路由,主要是跟DLX組合在一起組成死信隊列。

TTL(Time To Live):指進入死信隊列中的消息可以存活的時間。

其中DLX跟DLK是必須的成員,而TTL則是可選、非必須的。

RabbitMq死信隊列/延遲隊列什麼是死信隊列(DLX)優勢結構介紹死信結構的出現執行個體
RabbitMq死信隊列/延遲隊列什麼是死信隊列(DLX)優勢結構介紹死信結構的出現執行個體

流程:

消息到達第一個中轉站,即死信隊列,由基本交換機和基本路由綁定,并對應到指定的死信隊列,因而消息将進入第一個暫存區,即死信隊列中,而死信隊列不同于一般的普通隊列,它由三大部分組成,當消息進入死信隊列時,TTL便開始進入倒計時,當存活時間一到,消息将進入第二個中轉站,即真正的消息模型中的死信交換機。由于死信交換機和死信路由綁定,并對應到指定的真正的 隊列,因而此時消息将不做停留,而是直接被路由到第二個暫存 區,即真正的隊列中,最終該消息被真正的隊列對應的消費者監聽。至此,消息才完成滿城的旅行。

死信結構的出現

1、消息被拒絕并且不再重新投遞,即requeue參數的取值為false

即雖然消息消費失敗,但是又不想讓消息重新回到隊列中,利用死信隊列可以對消息進行一個處理。

2、消息超過了指定的存活時間TTL

3、隊列達到最大長度了

執行個體

死信隊列的應用,首先我們需要設定死信隊列的exchange和queue,然後進行綁定。然後在我們正常聲明普通的交換機、隊列、綁定後,在隊列上加上一個參數即可:

這樣,當消息過期、requeue、隊列在達到最大長度是,消息就可以直接路由到死信隊列。

生産者

public class Producer {
	public static void main(String[] args) throws Exception {
		
		ConnectionFactory connectionFactory = new ConnectionFactory();
		connectionFactory.setHost("192.168.237.139");
		connectionFactory.setPort(5672);
		connectionFactory.setVirtualHost("/");
		
		Connection connection = connectionFactory.newConnection();
		Channel channel = connection.createChannel();
		
		String exchange = "test_dlx_exchange";
		String routingKey = "dlx.save";
		
		String msg = "Hello RabbitMQ DLX Message";
		
		for(int i =0; i<1; i ++){
			
			AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
					.deliveryMode(2)
					.contentEncoding("UTF-8")
					.expiration("10000")  //設定消息過期時間
					.build();
			channel.basicPublish(exchange, routingKey, true, properties, msg.getBytes());
		}
	}
}
           

消費者

public class Consumer {
	public static void main(String[] args) throws Exception {

		ConnectionFactory connectionFactory = new ConnectionFactory();
		connectionFactory.setHost("192.168.237.139");
		connectionFactory.setPort(5672);
		connectionFactory.setVirtualHost("/");
		
		Connection connection = connectionFactory.newConnection();
		Channel channel = connection.createChannel();

		//要進行死信隊列中交換機、隊列的聲明和綁定:
		channel.exchangeDeclare("dlx.exchange", "topic", true, false, null);
		channel.queueDeclare("dlx.queue", true, false, false, null);
		channel.queueBind("dlx.queue", "dlx.exchange", "#");

		// 這就是一個普通的交換機 和 隊列 以及路由名
		String exchangeName = "test_dlx_exchange";
		String routingKey = "dlx.#";
		String queueName = "test_dlx_queue";

//		普通交換機聲明
		channel.exchangeDeclare(exchangeName, "topic", true, false, null);

		Map<String, Object> agruments = new HashMap<String, Object>();
		agruments.put("x-dead-letter-exchange", "dlx.exchange");

		//普通隊列聲明,但是需要綁定agruments屬性
		channel.queueDeclare(queueName, true, false, false, agruments);
//		普通交換機與隊列的綁定
		channel.queueBind(queueName, exchangeName, routingKey);

		
		channel.basicConsume(queueName, true, new MyConsumer(channel));
	}
}
           

自定義消費者

public class MyConsumer extends DefaultConsumer {
	public MyConsumer(Channel channel) {
		super(channel);
	}

	@Override
	public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
		System.err.println("-----------consume message----------");
		System.err.println("consumerTag: " + consumerTag);
		System.err.println("envelope: " + envelope);
		System.err.println("properties: " + properties);
		System.err.println("body: " + new String(body));
	}
}
           

我們先啟動消費者程序,保證rabbitmq伺服器中存在普通隊列和死信隊列:

RabbitMq死信隊列/延遲隊列什麼是死信隊列(DLX)優勢結構介紹死信結構的出現執行個體

關閉消費者隊列,啟動生産者隊列,我們将看到伺服器中的普通隊列上存在一條資訊

RabbitMq死信隊列/延遲隊列什麼是死信隊列(DLX)優勢結構介紹死信結構的出現執行個體

由于我們将普通隊列中的消息設定為了10秒過期,是以10秒之後,我們發現普通隊列上已經沒有了消息,而死信隊列上多出了一條消息。證明死信隊列中消息擷取成功。

RabbitMq死信隊列/延遲隊列什麼是死信隊列(DLX)優勢結構介紹死信結構的出現執行個體