參考:
https://www.rabbitmq.com/dlx.html
https://blog.csdn.net/weixin_44688301/article/details/116237294
https://www.yii666.com/blog/67356.html
一、死信隊列是什麼
“死信”是RabbitMQ中的一種消息機制,當你在消費消息時,如果隊列裡的消息出現以下情況:
> 消息被否定确認(被拒絕),使用 channel.basicNack 或 channel.basicReject ,并且此時requeue 屬性被設定為false。
> 消息在隊列的存活時間超過設定的生存時間(TTL)時間。
> 消息隊列的消息數量已經超過最大隊列長度。
那麼該消息将成為“死信”。
“死信”消息會被RabbitMQ進行特殊處理,如果配置了死信隊列資訊,那麼該消息将會被丢進死信隊列中,如果沒有配置,則該消息将會被丢棄。
死信交換機(dlx)是正常的交換機,能夠在任何隊列上被指定。其實死信交換機和一般的交換機沒啥差別,隻是添加了死信交換機的屬性。
如果隊列上存在死信, RabbitMq 會将死信消息投遞到設定的 死信交換機DLX 上去 ,然後被路由到一個隊列上,這個隊列,就是死信隊列。
注意,并不是直接聲明一個公共的死信隊列,然後死信消息就自己跑到死信隊列裡去了。
而是為每個需要使用死信的業務隊列配置一個死信交換機,這裡同一個項目的死信交換機可以共用一個,然後為每個業務隊列配置設定一個單獨的路由key。
有了死信交換機和路由key後,接下來,就像配置業務隊列一樣,配置死信隊列,然後綁定在死信交換機上。
也就是說,死信隊列并不是什麼特殊的隊列,隻不過是綁定在死信交換機上的隊列。死信交換機也不是什麼特殊的交換機,隻不過是用來接受死信的交換機,
是以可以為任何類型【Direct、Fanout、Topic】。
一般來說,會為每個業務隊列配置設定一個獨有的路由key,并對應的配置一個死信隊列進行監聽,也就是說,一般會為每個重要的業務隊列配置一個死信隊列。
死信隊列的應用
> 保證消息不會丢失,保證資料的完整性;
> 可以借助延時消費的特性完成特定的功能(比如訂單生成但是未支付,超過30分鐘自動取消的業務場景)
1. 當消息在一個隊列中變成死信之後,它能重新被發送到另一個交換機中,這個交換機就是 死信交換機,綁定 死信交換機(DLX Exchange) 的隊列就稱之為死信隊列
2. 死信隊列同其他的隊列一樣都是普通的隊列。
3. 在RabbitMQ中并沒有特定的"死信隊列"類型,而是通過配置,将其實作。
設定死信隊列需要設定以下2個屬性
交換機 x-dead-letter-exchange
路由鍵 x-dead-letter-routing-key
下面開始示範死信隊列的案例
(PHP)
消費生産者
// 死信隊列
public function deadMq()
{
$data = '死信隊列測試-消息被拒絕死信隊列';
/**
* 正常消息隊列
*/
$this->channel->exchange_declare('my-logs', 'direct',false, false, false);
$args = new AMQPTable([
// 資訊過期時間
'x-message-ttl' => 20000,
'x-dead-letter-exchange' => 'dead-exc',
'x-dead-letter-routing-key' => 'dead-key'
]);
// 通過隊列額外參數設定過期時期等配置
$this->channel->queue_declare('user-log-1', false, true,false,false,false,$args);
$this->channel->queue_bind('user-log-1', 'my-logs', 'user');
/**
* 死信隊列的配置
*/
// 1、聲明死信交換機
$this->channel->exchange_declare('dead-exc','direct', false,false,false);
// 2、聲明死信隊列
$this->channel->queue_declare('dead-log-queue',false,true,false,false);
// 3、死信隊列與死信交換機綁定
$this->channel->queue_bind('dead-log-queue', 'dead-exc','dead-key');
// 正常隊列發送消息
$msg = new AMQPMessage($data, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]);
$this->channel->basic_publish($msg,'my-logs', 'user');
$this->channel->close();
$this->mqConnection->close();
}
上述代碼聲明兩個與之相關的交換機與隊列。一個是正常業務的消息隊列,一個是死信隊列。
1、正常消息隊列業務邏輯中,通過額外參數配置過期時間等,讓其成為死信隊列;
2、通過 x-dead-letter-exchange 和 x-dead-letter-routing-key 配置資訊交換機名稱與路由鍵;
3、配置死信隊列,交換機是 x-dead-letter-exchange 設定的值;
4、死信隊列綁定死信交換機并設定路由鍵。
消息者(正常業務)
<?php
namespace App\Console\Commands;
use Illuminate\Console\Command;
use PhpAmqpLib\Connection\AMQPStreamConnection;
//正常消息者
class LogMq extends Command
{
protected $signature = 'mq:log';
protected $description = 'Command description';
public function handle(): void
{
$connection = new AMQPStreamConnection('localhost', 5672, 'test', '123456', 'sms');
$channel = $connection->channel();
$channel->exchange_declare('my-logs','direct', false,false,false);
list($queueName, ,) = $channel->queue_declare("", false, false, true, false);
$channel->queue_bind($queueName, 'my-logs', 'user');
$callback = function ($msg) {
echo '輸出: ' . $msg->body . PHP_EOL;
$msg->ack();
};
$channel->basic_qos(null, 1, null);
$channel->basic_consume($queueName,'', false,false,false,false,$callback);
while ($channel->is_open())
{
$channel->wait();
}
$channel->close();
$connection->close();
}
}
這是一個正常業務的消息消費者。後續我們改動這個消費者來實作死信隊列。
死信隊列消費者
<?php
namespace App\Console\Commands;
use Illuminate\Console\Command;
use PhpAmqpLib\Connection\AMQPStreamConnection;
class DeadMqLog extends Command
{
protected $signature = 'mq:dead';
protected $description = 'Command description';
public function handle(): void
{
$connection = new AMQPStreamConnection('localhost', 5672, 'test', '123456', 'ziruchu');
$channel = $connection->channel();
$channel->exchange_declare('dead-exc','direct', false,false,false);
$channel->queue_bind('dead-log-queue', 'dead-exc','dead-key');
$callback = function ($msg) {
echo '從死信隊列中輸出的消息: ' . $msg->body . PHP_EOL;
$msg->ack();
};
$channel->basic_qos(null, 1, null);
$channel->basic_consume('dead-log-queue', '', false, false, false, false, $callback);
while ($channel->is_open())
{
$channel->wait();
}
$channel->close();
$connection->close();
}
}
這是一個死信隊列消費者,當有死信消費時就會走到這裡
死信隊列示範
A. 拒絕接收消息
1、修改 LogMq.php 正常消費者代碼
// 未變化代碼
$callback = function ($msg) {
// 拒絕接收消息
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
};
// 未變化代碼
2、效果
# 生産者釋出消息
nbsp;curl http://la10test.test/mq/dead
# 正常消費者
nbsp;php artisan mq:log
# 死信隊列
nbsp;php artisan mq:dead
從死信隊列中輸出的消息: 死信隊列測試-消息被拒絕死信隊列
從死信隊列中輸出的消息: 死信隊列測試-消息被拒絕死信隊列
B.消息過期
利用消息過期可以實作延遲隊列的效果
1、修改 LogMq.php 正常消費者代碼
$callback = function ($msg) {
// 睡眠 23 秒,
sleep(23);
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
};
2、效果
# 生産者釋出消息
nbsp;curl http://la10test.test/mq/dead
# 正常消費者
nbsp;php artisan mq:log
# 死信隊列
nbsp;php artisan mq:dead
從死信隊列中輸出的消息: 死信隊列測試-消息被拒絕死信隊列
從死信隊列中輸出的消息: 死信隊列測試-消息被拒絕死信隊列
從死信隊列中輸出的消息: 死信隊列測試-消息被拒絕死信隊列
從死信隊列中輸出的消息: 死信隊列測試-消息被拒絕死信隊列
C. 隊列長度達到最大長度
1、修改生産者代碼配置(deadMq 方法)
$args = new AMQPTable([
'x-max-length' => 5,
'x-overflow' => 'reject-publish-dlx',
'x-dead-letter-exchange' => 'dead-exc',
'x-dead-letter-routing-key' => 'dead-key'
]);
------------------------------------------------------------------
(Go版本)
https://www.jianshu.com/p/6255b17fed68
Golang處理死信隊列
1.定義死信交換機
當成普通交換機定義就行。
// 聲明交換機
err = ch.ExchangeDeclare(
"tizi365.dead", // 交換機名字
"topic", // 交換機類型
true, // 是否持久化
false,
false,
false,
nil,
)
2.定義死信隊列
當成普通隊列定義就行
// 聲明隊列
q, err := ch.QueueDeclare(
"", // 隊列名字,不填則随機生成一個
false, // 是否持久化隊列
false,
true,
false,
nil,
)
// 隊列綁定死信交換機
err = ch.QueueBind(
q.Name, // 隊列名
"#", // 路由參數,# 井号的意思就比對所有路由參數,意思就是接收所有死信消息
"tizi365.dead", // 死信交換機名字
false,
nil)
提示:死信隊列就當成普通隊列用就是了
3.定義死信消費者
// 建立消費者
msgs, err := ch.Consume(
q.Name, // 引用前面的死信隊列名
"", // 消費者名字,不填自動生成一個
true, // 自動向隊列确認消息已經處理
false,
false,
false,
nil,
)
// 循環消費死信隊列中的消息
for d := range msgs {
log.Printf("接收死信消息=%s", d.Body)
}
4.将死信交換機綁定到指定正常隊列
// 隊列屬性
props := make(map[string]interface{})
// 設定正常隊列的死信交換機
props["x-dead-letter-exchange"] = "tizi365.dead"
// 可選: 設定死信投遞到死信交換機的時候的路由參數,如果不設定,則使用消息原先自帶的路由參數
// props["x-dead-letter-routing-key"] = "www.tizi365.com"
q, err := ch.QueueDeclare(
"normo.hello", // 隊列名
true, // 是否持久化
false,
false,
false,
props, // 設定隊列屬性
)
這樣,隻要tizi365.demo.hello隊列的消息變成死信的話,消息會被轉發到tizi365.dead死信交換機。作者:一位先生_連結:https://www.jianshu.com/p/6255b17fed68來源:簡書著作權歸作者所有。商業轉載請聯系作者獲得授權,非商業轉載請注明出處。
https://blog.csdn.net/weixin_43495948/article/details/129249740
其實整體的思路就是分别建立一個normal_exchange、dead_exchange、normal_queue、dead_queue,
然後将normal_exchange與normal_queue進行綁定,将dead_exchange與dead_queue進行綁定,
這裡比較關鍵的一個點在于說如何将normal_queue與dead_exchange進行綁定,這樣才能将錯誤的消息傳遞過來。
--------------------------------------------------------------------------------
https://blog.csdn.net/weixin_43495948/article/details/129249740
其實整體的思路就是分别建立一個normal_exchange、dead_exchange、normal_queue、dead_queue,
然後将normal_exchange與normal_queue進行綁定,将dead_exchange與dead_queue進行綁定,
這裡比較關鍵的一個點在于說如何将normal_queue與dead_exchange進行綁定,這樣才能将錯誤的消息傳遞過來。
// 聲明一個normal隊列
_, err = ch.QueueDeclare(
constant.NormalQueue,
true,
false,
false,
false,
amqp.Table{
//"x-message-ttl": 5000, // 指定過期時間
//"x-max-length": 6, // 指定長度。超過這個長度的消息會發送到dead_exchange中
"x-dead-letter-exchange": constant.DeadExchange, // 指定死信交換機
"x-dead-letter-routing-key": constant.DeadRoutingKey, // 指定死信routing-key
})
消費者超出時間未應答
produce.go
package day07
import (
"context"
amqp "github.com/rabbitmq/amqp091-go"
"strconv"
"time"
"v1/utils"
)
func Produce() {
// 擷取信道
ch := utils.GetChannel()
// 聲明一個交換機
err := ch.ExchangeDeclare(
"normal_exchange",
amqp.ExchangeDirect,
true,
false,
false,
false,
nil)
utils.FailOnError(err, "Failed to declare a exchange")
ctx, cancer := context.WithTimeout(context.Background(), 5*time.Second)
defer cancer()
// 發送了10條消息
for i := 0; i < 10; i++ {
msg := "Info:" + strconv.Itoa(i)
ch.PublishWithContext(ctx,
"normal_exchange",
"normal_key",
false,
false,
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(msg),
})
}
}
consumer1.go
package day07
import (
amqp "github.com/rabbitmq/amqp091-go"
"log"
"v1/utils"
)
type Constant struct {
NormalExchange string
DeadExchange string
NormalQueue string
DeadQueue string
NormalRoutingKey string
DeadRoutingKey string
}
func Consumer1() {
// 擷取連接配接
ch := utils.GetChannel()
// 建立一個變量常量
constant := Constant{
NormalExchange: "normal_exchange",
DeadExchange: "dead_exchange",
NormalQueue: "normal_queue",
DeadQueue: "dead_queue",
NormalRoutingKey: "normal_key",
DeadRoutingKey: "dead_key",
}
// 聲明normal交換機
err := ch.ExchangeDeclare(
constant.NormalExchange,
amqp.ExchangeDirect,
true,
false,
false,
false,
nil,
)
utils.FailOnError(err, "Failed to declare a normal exchange")
// 聲明一個dead交換機
err = ch.ExchangeDeclare(
constant.DeadExchange,
amqp.ExchangeDirect,
true,
false,
false,
false,
nil,
)
utils.FailOnError(err, "Failed to declare a dead exchange")
// 聲明一個normal隊列
_, err = ch.QueueDeclare(
constant.NormalQueue,
true,
false,
false,
false,
amqp.Table{
"x-message-ttl": 5000, // 指定過期時間
//"x-max-length": 6,
"x-dead-letter-exchange": constant.DeadExchange, // 指定死信交換機
"x-dead-letter-routing-key": constant.DeadRoutingKey, // 指定死信routing-key
})
utils.FailOnError(err, "Failed to declare a normal queue")
// 聲明一個dead隊列:注意不要給死信隊列設定消息時間,否者死信隊列裡面的資訊會再次過期
_, err = ch.QueueDeclare(
constant.DeadQueue,
true,
false,
false,
false,
nil)
utils.FailOnError(err, "Failed to declare a dead queue")
// 将normal_exchange與normal_queue進行綁定
err = ch.QueueBind(constant.NormalQueue, constant.NormalRoutingKey, constant.NormalExchange, false, nil)
utils.FailOnError(err, "Failed to binding normal_exchange with normal_queue")
// 将dead_exchange與dead_queue進行綁定
err = ch.QueueBind(constant.DeadQueue, constant.DeadRoutingKey, constant.DeadExchange, false, nil)
utils.FailOnError(err, "Failed to binding dead_exchange with dead_queue")
// 消費消息
msgs, err := ch.Consume(constant.NormalQueue,
"",
false, // 這個地方一定要關閉自動應答
false,
false,
false,
nil)
utils.FailOnError(err, "Failed to consume in Consumer1")
var forever chan struct{}
go func() {
for d := range msgs {
if err := d.Reject(false); err != nil {
utils.FailOnError(err, "Failed to Reject a message")
}
}
}()
log.Printf(" [*] Waiting for logs. To exit press CTRL+C")
<-forever
}
consumer2.go
package day07
import (
amqp "github.com/rabbitmq/amqp091-go"
"log"
"v1/utils"
)
func Consumer2() {
// 拿取信道
ch := utils.GetChannel()
// 聲明一個交換機
err := ch.ExchangeDeclare(
"dead_exchange",
amqp.ExchangeDirect,
true,
false,
false,
false,
nil)
utils.FailOnError(err, "Failed to Declare a exchange")
// 接收消息的應答
msgs, err := ch.Consume("dead_queue",
"",
false,
false,
false,
false,
nil,
)
var forever chan struct{}
go func() {
for d := range msgs {
log.Printf("[x] %s", d.Body)
// 開啟手動應答ß
d.Ack(false)
}
}()
log.Printf(" [*] Waiting for logs. To exit press CTRL+C")
<-forever
}
限制一定的長度
隻需要改變consumer1.go中的對normal_queue的聲明
// 聲明一個normal隊列
_, err = ch.QueueDeclare(
constant.NormalQueue,
true,
false,
false,
false,
amqp.Table{
//"x-message-ttl": 5000, // 指定過期時間
"x-max-length": 6,
"x-dead-letter-exchange": constant.DeadExchange, // 指定死信交換機
"x-dead-letter-routing-key": constant.DeadRoutingKey, // 指定死信routing-key
})
消費者拒絕的消息回到死信隊列中
這裡需要完成兩點工作
工作1:需要在consumer1中作出拒絕的操作
go func() {
for d := range msgs {
if err := d.Reject(false); err != nil {
utils.FailOnError(err, "Failed to Reject a message")
}
}
}()
工作2:如果你consume的時候開啟了自動應答一定要關閉
// 消費消息
msgs, err := ch.Consume(constant.NormalQueue,
"",
false, // 這個地方一定要關閉自動應答
false,
false,
false,
nil)