天天看點

Rabbitmq的死信

一、概述

死信有死信隊列、死信交換器和死信消息組成。死信消息則有如下三種情況生成:

1.消費者使用basic.reject或 basic.nack并将requeue參數設定為false來拒絕該消息

2.消息設定了TTL過期時間,過期時間内沒有消費導緻過期

3.消息因超過隊列長度導緻被丢棄

如果隊列删除或者隊列的TTL過期時間到了被删除其中的消息是不會成為死信消息的。

每個隊列可通過隊列參數arg或者政策policy設定死信交換器和死信路由鍵來處理死信消息,當隊列參數與政策都有設定時,以隊列參數設定内容為準,建議使用政策。沒有設定死信,消息滿足死信條件将會被丢棄。

簡而言之,一個隊列A設定了死信交換器和路由鍵,當一個消息滿足死信要求時會通過設定的死信交換器和路由鍵找到對應的隊列将消息傳遞到對應隊列B中,B也就是對應的死信隊列。是以,死信交換機和死信隊列都是正常的交換器和隊列,和其他的交換器隊列聲明沒差別。通過死信隊列可以防止消息的意外丢失,保證重要的消息被執行記錄。

二、在政策中設定死信

我們通過rabbitmqctl指令建立一個名為DLX的設定了死信交換器為my-dlx路由鍵為my-dlx-routekey的政策,對應用于所有的隊列。

rabbitmqctl set_policy --vhost my_vhost1 DLX ".*" '{"dead-letter-exchange":"my-dlx","dead-letter-routing-key":"my-dlx-routekey"}' --apply-to queues      

或者在web管理頁中添加policy如下:

Rabbitmq的死信

 有多個政策的話使用優先級最高的政策,設定後隊列會有個DLX的特性。

Rabbitmq的死信

三、在隊列參數中設定死信

我們通過隊列參數設定x-dead-letter-exchange和x-dead-letter-routing-key,死信交換器必須和隊列處于同一個vhost下,示例代碼如下:

Dictionary<string, object> arg = new Dictionary<string, object>();
arg.Add("x-dead-letter-exchange","hello-dlx");
arg.Add("x-dead-letter-routing-key", "hello-dlx-routekey");
channel.QueueDeclare(
  "HelloQueue",//隊列名稱
  false,       //是否持久化
  false,       //是否隻對首次聲明的隊列可見
  false,       //是否自動删除
  arg         ////關于隊列和隊列内消息的詳細設定,鍵值對形式
);      

四、死信路由

我們在上述代碼中都設定了死信路由鍵,但是其實是可以不設定的,如果設定了就遵循設定的路由鍵,如果沒有設定則遵循原交換器的路由鍵。比如:我們将消息發送到路由鍵為A的交換器進入隊列,則當消息死信時也發送到路由鍵為A的死信交換器,如果設定了x-dead-letter-routing-key為B則發送到路由鍵為B的死信交換器中。如果不設定死信路由鍵,那麼由CC和BCC設定的路由鍵也會觸發,CC和BCC相當于抄送和密送到别的交換器。

如果産生死信循環,既到達同一個隊列兩次,消息将會被抛棄。當死信消息發送到死信隊列後将會從原隊列删除,避免過多消息的積壓,但是如果目标的死信隊列不能接受消息,則該死信消息将可能丢失。

五、死信消息

當死信消息被發送到死信隊列後會有以下變化:

1.消息的交換器名稱會改為死信交換器名稱;

2.設定了死信路由鍵的話,路由鍵會相應更改;

3.CC和BCC标頭将會删除;

死信消息将會添加一些标頭,我們先看下列印的标頭:

{"Headers":{
        "x-death":[
            {
                "count":1,
                "reason":"ZXhwaXJlZA==",
                "queue":"SGVsbG9RdWV1ZQ==",
                "time":{
                    "UnixTime":1637409202
                },
                "exchange":"SGVsbG9FeGNoYW5nZQ==",
                "routing-keys":[
                    "aG9sYQ=="
                ]
            }
        ],
        "x-first-death-exchange":"SGVsbG9FeGNoYW5nZQ==",
        "x-first-death-queue":"SGVsbG9RdWV1ZQ==",
        "x-first-death-reason":"ZXhwaXJlZA=="
    }
}      

可以看到值是Base64格式的,新添加了x-death、x-first-death-exchange、x-first-death-queue、x-first-death-reason四個值。

x-first-death-exchange:第一次的成為死信時的交換器名稱;

x-first-death-queue:第一次的成為死信時的隊列名稱;

x-first-death-reason:第一次的成為死信時的原因,死亡原因分為四種下同分别是:rejected(消息處理被拒絕)、expired (ttl時間到期)、maxlen(超過隊列允許最大長度)和Delivery-limit(消息傳回的次數超限額)

這三個是第一次死信的時候添加的,後面就不會變了。

x-death裡面是數組形式的,因為消息可能多次死信。新條目被添加到x-death 數組的開頭。如果x-death已經包含一個具有相同隊列和死字原因的條目,它的計數字段count将增加,并将移到數組的開頭。

queue:消息在死信之前所在隊列的名稱;

reason:死信的原因,同上;

time:消息被死信的日期和時間,為 64 位 AMQP 0-9-1 時間戳;

exchange :消息釋出到的交換器名(請注意,如果消息多次死信,這将是死信交換);

routing-keys :消息釋出時使用的路由鍵(包括CC密鑰,但不包括BCC密鑰 );

count : 由于這個原因,這條消息在這個隊列中被死信多少次;

original-expiration(如果消息由于消息的TTL死信的):消息的原始到期屬性。該到期屬性從死刻字的消息,以防止它被路由到任何隊列再次到期删除。

學習連結:https://www.rabbitmq.com/dlx.html