天天看點

2-RabbitMQ的死信隊列詳解

作者:前行者lw

參考:

https://www.rabbitmq.com/dlx.html

https://blog.csdn.net/weixin_44688301/article/details/116237294

https://www.yii666.com/blog/67356.html

一、死信隊列是什麼

“死信”是RabbitMQ中的一種消息機制,當你在消費消息時,如果隊列裡的消息出現以下情況:

> 消息被否定确認(被拒絕),使用 channel.basicNack 或 channel.basicReject ,并且此時requeue 屬性被設定為false。

> 消息在隊列的存活時間超過設定的生存時間(TTL)時間。

> 消息隊列的消息數量已經超過最大隊列長度。

那麼該消息将成為“死信”。

“死信”消息會被RabbitMQ進行特殊處理,如果配置了死信隊列資訊,那麼該消息将會被丢進死信隊列中,如果沒有配置,則該消息将會被丢棄。

2-RabbitMQ的死信隊列詳解

死信交換機(dlx)是正常的交換機,能夠在任何隊列上被指定。其實死信交換機和一般的交換機沒啥差別,隻是添加了死信交換機的屬性。

如果隊列上存在死信, RabbitMq 會将死信消息投遞到設定的 死信交換機DLX 上去 ,然後被路由到一個隊列上,這個隊列,就是死信隊列。

注意,并不是直接聲明一個公共的死信隊列,然後死信消息就自己跑到死信隊列裡去了。

而是為每個需要使用死信的業務隊列配置一個死信交換機,這裡同一個項目的死信交換機可以共用一個,然後為每個業務隊列配置設定一個單獨的路由key。

有了死信交換機和路由key後,接下來,就像配置業務隊列一樣,配置死信隊列,然後綁定在死信交換機上。

也就是說,死信隊列并不是什麼特殊的隊列,隻不過是綁定在死信交換機上的隊列。死信交換機也不是什麼特殊的交換機,隻不過是用來接受死信的交換機,

是以可以為任何類型【Direct、Fanout、Topic】。

一般來說,會為每個業務隊列配置設定一個獨有的路由key,并對應的配置一個死信隊列進行監聽,也就是說,一般會為每個重要的業務隊列配置一個死信隊列。

2-RabbitMQ的死信隊列詳解

死信隊列的應用

> 保證消息不會丢失,保證資料的完整性;

> 可以借助延時消費的特性完成特定的功能(比如訂單生成但是未支付,超過30分鐘自動取消的業務場景)

1. 當消息在一個隊列中變成死信之後,它能重新被發送到另一個交換機中,這個交換機就是 死信交換機,綁定 死信交換機(DLX Exchange) 的隊列就稱之為死信隊列

2. 死信隊列同其他的隊列一樣都是普通的隊列。

3. 在RabbitMQ中并沒有特定的"死信隊列"類型,而是通過配置,将其實作。

設定死信隊列需要設定以下2個屬性

交換機 x-dead-letter-exchange

路由鍵 x-dead-letter-routing-key

下面開始示範死信隊列的案例

(PHP)

消費生産者

// 死信隊列
public function deadMq()
{
   $data = '死信隊列測試-消息被拒絕死信隊列';

   /**
    * 正常消息隊列
    */
   $this->channel->exchange_declare('my-logs', 'direct',false, false, false);
   $args = new AMQPTable([
       // 資訊過期時間
       'x-message-ttl'             => 20000,
       'x-dead-letter-exchange'    => 'dead-exc',
       'x-dead-letter-routing-key' => 'dead-key'
   ]);
   // 通過隊列額外參數設定過期時期等配置
   $this->channel->queue_declare('user-log-1', false, true,false,false,false,$args);
   $this->channel->queue_bind('user-log-1', 'my-logs', 'user');


   /**
    * 死信隊列的配置
    */
   // 1、聲明死信交換機
   $this->channel->exchange_declare('dead-exc','direct', false,false,false);
   // 2、聲明死信隊列
   $this->channel->queue_declare('dead-log-queue',false,true,false,false);
   // 3、死信隊列與死信交換機綁定
   $this->channel->queue_bind('dead-log-queue', 'dead-exc','dead-key');

   // 正常隊列發送消息
   $msg = new AMQPMessage($data, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]);
   $this->channel->basic_publish($msg,'my-logs', 'user');

   $this->channel->close();
   $this->mqConnection->close();
}           

上述代碼聲明兩個與之相關的交換機與隊列。一個是正常業務的消息隊列,一個是死信隊列。

1、正常消息隊列業務邏輯中,通過額外參數配置過期時間等,讓其成為死信隊列;

2、通過 x-dead-letter-exchange 和 x-dead-letter-routing-key 配置資訊交換機名稱與路由鍵;

3、配置死信隊列,交換機是 x-dead-letter-exchange 設定的值;

4、死信隊列綁定死信交換機并設定路由鍵。

消息者(正常業務)

<?php

namespace App\Console\Commands;

use Illuminate\Console\Command;
use PhpAmqpLib\Connection\AMQPStreamConnection;

//正常消息者
class LogMq extends Command
{
   protected $signature = 'mq:log';

   protected $description = 'Command description';

   public function handle(): void
   {
       $connection = new AMQPStreamConnection('localhost', 5672, 'test', '123456', 'sms');
       $channel = $connection->channel();
       $channel->exchange_declare('my-logs','direct', false,false,false);

       list($queueName, ,) = $channel->queue_declare("", false, false, true, false);

       $channel->queue_bind($queueName, 'my-logs', 'user');

       $callback = function ($msg) {
           echo '輸出: ' . $msg->body . PHP_EOL;
           $msg->ack();
       };

       $channel->basic_qos(null, 1, null);
       $channel->basic_consume($queueName,'', false,false,false,false,$callback);

       while ($channel->is_open())
       {
           $channel->wait();
       }

       $channel->close();
       $connection->close();
   }
}           

這是一個正常業務的消息消費者。後續我們改動這個消費者來實作死信隊列。

死信隊列消費者

<?php

namespace App\Console\Commands;

use Illuminate\Console\Command;
use PhpAmqpLib\Connection\AMQPStreamConnection;

class DeadMqLog extends Command
{
   protected $signature = 'mq:dead';

   protected $description = 'Command description';

   public function handle(): void
   {
       $connection =  new AMQPStreamConnection('localhost', 5672, 'test', '123456', 'ziruchu');
       $channel = $connection->channel();
       $channel->exchange_declare('dead-exc','direct', false,false,false);


       $channel->queue_bind('dead-log-queue', 'dead-exc','dead-key');

       $callback = function ($msg) {
           echo '從死信隊列中輸出的消息: ' . $msg->body . PHP_EOL;
           $msg->ack();
       };

       $channel->basic_qos(null, 1, null);
       $channel->basic_consume('dead-log-queue', '', false, false, false, false, $callback);
       while ($channel->is_open())
       {
           $channel->wait();
       }

       $channel->close();
       $connection->close();
   }
}           

這是一個死信隊列消費者,當有死信消費時就會走到這裡

死信隊列示範

A. 拒絕接收消息

1、修改 LogMq.php 正常消費者代碼

// 未變化代碼

$callback = function ($msg) {
   // 拒絕接收消息
    $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
};

// 未變化代碼           

2、效果

# 生産者釋出消息
nbsp;curl http://la10test.test/mq/dead
# 正常消費者
nbsp;php artisan mq:log

# 死信隊列
nbsp;php artisan mq:dead
從死信隊列中輸出的消息: 死信隊列測試-消息被拒絕死信隊列
從死信隊列中輸出的消息: 死信隊列測試-消息被拒絕死信隊列           

B.消息過期

利用消息過期可以實作延遲隊列的效果

1、修改 LogMq.php 正常消費者代碼

$callback = function ($msg) {
   // 睡眠 23 秒,
   sleep(23);
   $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
};           

2、效果

# 生産者釋出消息
nbsp;curl http://la10test.test/mq/dead
# 正常消費者
nbsp;php artisan mq:log

# 死信隊列
nbsp;php artisan mq:dead
從死信隊列中輸出的消息: 死信隊列測試-消息被拒絕死信隊列
從死信隊列中輸出的消息: 死信隊列測試-消息被拒絕死信隊列
從死信隊列中輸出的消息: 死信隊列測試-消息被拒絕死信隊列
從死信隊列中輸出的消息: 死信隊列測試-消息被拒絕死信隊列           

C. 隊列長度達到最大長度

1、修改生産者代碼配置(deadMq 方法)

$args = new AMQPTable([
   'x-max-length'              => 5,
   'x-overflow'                => 'reject-publish-dlx',
   'x-dead-letter-exchange'    => 'dead-exc',
   'x-dead-letter-routing-key' => 'dead-key'
]);           

------------------------------------------------------------------

(Go版本)

https://www.jianshu.com/p/6255b17fed68

Golang處理死信隊列

1.定義死信交換機

當成普通交換機定義就行。

// 聲明交換機
    err = ch.ExchangeDeclare(
        "tizi365.dead",   // 交換機名字
        "topic", // 交換機類型
        true,     // 是否持久化
        false,
        false,
        false,
        nil,
    )           

2.定義死信隊列

當成普通隊列定義就行

// 聲明隊列
    q, err := ch.QueueDeclare(
        "",    // 隊列名字,不填則随機生成一個
        false, // 是否持久化隊列
        false,
        true,
        false,
        nil,
    )

    // 隊列綁定死信交換機
    err = ch.QueueBind(
        q.Name, // 隊列名
        "#",     // 路由參數,# 井号的意思就比對所有路由參數,意思就是接收所有死信消息
        "tizi365.dead", // 死信交換機名字
        false,
        nil)
           

提示:死信隊列就當成普通隊列用就是了

3.定義死信消費者

// 建立消費者
    msgs, err := ch.Consume(
        q.Name, // 引用前面的死信隊列名
        "",     // 消費者名字,不填自動生成一個
        true,   // 自動向隊列确認消息已經處理
        false, 
        false, 
        false, 
        nil,
    )

    // 循環消費死信隊列中的消息
    for d := range msgs {
        log.Printf("接收死信消息=%s", d.Body)
    }
           

4.将死信交換機綁定到指定正常隊列

// 隊列屬性
    props := make(map[string]interface{})
    // 設定正常隊列的死信交換機
    props["x-dead-letter-exchange"] = "tizi365.dead"
    // 可選: 設定死信投遞到死信交換機的時候的路由參數,如果不設定,則使用消息原先自帶的路由參數
    // props["x-dead-letter-routing-key"] = "www.tizi365.com"

    q, err := ch.QueueDeclare(
        "normo.hello", // 隊列名
        true,   // 是否持久化
        false, 
        false, 
        false,   
        props,     // 設定隊列屬性
    )
           

這樣,隻要tizi365.demo.hello隊列的消息變成死信的話,消息會被轉發到tizi365.dead死信交換機。作者:一位先生_連結:https://www.jianshu.com/p/6255b17fed68來源:簡書著作權歸作者所有。商業轉載請聯系作者獲得授權,非商業轉載請注明出處。

https://blog.csdn.net/weixin_43495948/article/details/129249740

其實整體的思路就是分别建立一個normal_exchange、dead_exchange、normal_queue、dead_queue,

然後将normal_exchange與normal_queue進行綁定,将dead_exchange與dead_queue進行綁定,

這裡比較關鍵的一個點在于說如何将normal_queue與dead_exchange進行綁定,這樣才能将錯誤的消息傳遞過來。

--------------------------------------------------------------------------------

https://blog.csdn.net/weixin_43495948/article/details/129249740

其實整體的思路就是分别建立一個normal_exchange、dead_exchange、normal_queue、dead_queue,

然後将normal_exchange與normal_queue進行綁定,将dead_exchange與dead_queue進行綁定,

這裡比較關鍵的一個點在于說如何将normal_queue與dead_exchange進行綁定,這樣才能将錯誤的消息傳遞過來。

// 聲明一個normal隊列
	_, err = ch.QueueDeclare(
		constant.NormalQueue,
		true,
		false,
		false,
		false,
		amqp.Table{
			//"x-message-ttl":             5000,                    // 指定過期時間
			//"x-max-length":              6,						// 指定長度。超過這個長度的消息會發送到dead_exchange中
			"x-dead-letter-exchange":    constant.DeadExchange,    // 指定死信交換機
			"x-dead-letter-routing-key": constant.DeadRoutingKey,  // 指定死信routing-key
		})
           

消費者超出時間未應答

produce.go

package day07

import (
	"context"
	amqp "github.com/rabbitmq/amqp091-go"
	"strconv"
	"time"
	"v1/utils"
)

func Produce() {
	// 擷取信道
	ch := utils.GetChannel()
	// 聲明一個交換機
	err := ch.ExchangeDeclare(
		"normal_exchange",
		amqp.ExchangeDirect,
		true,
		false,
		false,
		false,
		nil)
	utils.FailOnError(err, "Failed to declare a exchange")
	ctx, cancer := context.WithTimeout(context.Background(), 5*time.Second)
	defer cancer()

	// 發送了10條消息
	for i := 0; i < 10; i++ {
		msg := "Info:" + strconv.Itoa(i)
		ch.PublishWithContext(ctx,
			"normal_exchange",
			"normal_key",
			false,
			false,
			amqp.Publishing{
				ContentType: "text/plain",
				Body:        []byte(msg),
			})
	}
}
           

consumer1.go

package day07

import (
	amqp "github.com/rabbitmq/amqp091-go"
	"log"
	"v1/utils"
)

type Constant struct {
	NormalExchange   string
	DeadExchange     string
	NormalQueue      string
	DeadQueue        string
	NormalRoutingKey string
	DeadRoutingKey   string
}

func Consumer1() {
	// 擷取連接配接
	ch := utils.GetChannel()
	// 建立一個變量常量
	constant := Constant{
		NormalExchange:   "normal_exchange",
		DeadExchange:     "dead_exchange",
		NormalQueue:      "normal_queue",
		DeadQueue:        "dead_queue",
		NormalRoutingKey: "normal_key",
		DeadRoutingKey:   "dead_key",
	}
	// 聲明normal交換機
	err := ch.ExchangeDeclare(
		constant.NormalExchange,
		amqp.ExchangeDirect,
		true,
		false,
		false,
		false,
		nil,
	)
	utils.FailOnError(err, "Failed to declare a normal exchange")
	// 聲明一個dead交換機
	err = ch.ExchangeDeclare(
		constant.DeadExchange,
		amqp.ExchangeDirect,
		true,
		false,
		false,
		false,
		nil,
	)
	utils.FailOnError(err, "Failed to declare a dead exchange")

	// 聲明一個normal隊列
	_, err = ch.QueueDeclare(
		constant.NormalQueue,
		true,
		false,
		false,
		false,
		amqp.Table{
			"x-message-ttl": 5000, // 指定過期時間
			//"x-max-length":              6,
			"x-dead-letter-exchange":    constant.DeadExchange,   // 指定死信交換機
			"x-dead-letter-routing-key": constant.DeadRoutingKey, // 指定死信routing-key
		})
	utils.FailOnError(err, "Failed to declare a normal queue")
	
 
   // 聲明一個dead隊列:注意不要給死信隊列設定消息時間,否者死信隊列裡面的資訊會再次過期
	_, err = ch.QueueDeclare(
		constant.DeadQueue,
		true,
		false,
		false,
		false,
		nil)
	utils.FailOnError(err, "Failed to declare a dead queue")

	// 将normal_exchange與normal_queue進行綁定
	err = ch.QueueBind(constant.NormalQueue, constant.NormalRoutingKey, constant.NormalExchange, false, nil)
	utils.FailOnError(err, "Failed to binding normal_exchange with normal_queue")
	
 	// 将dead_exchange與dead_queue進行綁定
	err = ch.QueueBind(constant.DeadQueue, constant.DeadRoutingKey, constant.DeadExchange, false, nil)
	utils.FailOnError(err, "Failed to binding dead_exchange with dead_queue")

	// 消費消息
	msgs, err := ch.Consume(constant.NormalQueue,
		"",
		false, // 這個地方一定要關閉自動應答
		false,
		false,
		false,
		nil)
	utils.FailOnError(err, "Failed to consume in Consumer1")

	var forever chan struct{}

	go func() {
		for d := range msgs {
			if err := d.Reject(false); err != nil {
				utils.FailOnError(err, "Failed to Reject a message")
			}
		}
	}()
	log.Printf(" [*] Waiting for logs. To exit press CTRL+C")

	<-forever
}
           

consumer2.go

package day07

import (
	amqp "github.com/rabbitmq/amqp091-go"
	"log"
	"v1/utils"
)

func Consumer2() {
	// 拿取信道
	ch := utils.GetChannel()

	// 聲明一個交換機
	err := ch.ExchangeDeclare(
		"dead_exchange",
		amqp.ExchangeDirect,
		true,
		false,
		false,
		false,
		nil)
	utils.FailOnError(err, "Failed to Declare a exchange")

	// 接收消息的應答
	msgs, err := ch.Consume("dead_queue",
		"",
		false,
		false,
		false,
		false,
		nil,
	)

	var forever chan struct{}
	go func() {
		for d := range msgs {
			log.Printf("[x] %s", d.Body)
			// 開啟手動應答ß
			d.Ack(false)
		}
	}()
	log.Printf(" [*] Waiting for logs. To exit press CTRL+C")
	<-forever

}

           

限制一定的長度

隻需要改變consumer1.go中的對normal_queue的聲明

// 聲明一個normal隊列
	_, err = ch.QueueDeclare(
		constant.NormalQueue,
		true,
		false,
		false,
		false,
		amqp.Table{
			//"x-message-ttl": 5000, // 指定過期時間
			"x-max-length":              6,
			"x-dead-letter-exchange":    constant.DeadExchange,   // 指定死信交換機
			"x-dead-letter-routing-key": constant.DeadRoutingKey, // 指定死信routing-key
		})
           

消費者拒絕的消息回到死信隊列中

這裡需要完成兩點工作

工作1:需要在consumer1中作出拒絕的操作

go func() {
		for d := range msgs {
			if err := d.Reject(false); err != nil {
				utils.FailOnError(err, "Failed to Reject a message")
			}
		}
	}()
           

工作2:如果你consume的時候開啟了自動應答一定要關閉

// 消費消息
	msgs, err := ch.Consume(constant.NormalQueue,
		"",
		false, // 這個地方一定要關閉自動應答
		false,
		false,
		false,
		nil)