天天看點

RabbitMQ--過期時間TTL、死信隊列、延遲隊列和優先隊列

過期時間TTL

TTL,Time to Live的簡稱,即過期時間。RabbitMQ 可以對消息和隊列設定TTL。

設定消息的TTL:

目前有兩種方法可以設定消息的TTL。

  1. 第一種方法是通過隊列屬性設定,隊列中所有消息都有相同的過期時間。
  2. 第二種方法是對消息本身進行單獨設定,每條消息的TTL可以不同。

如果兩種方法一起使用,則消息的TTL以兩者之間較小的那個數值為準。

消息在隊列中的生存時間一旦超過設定的TTL值時,就會變成“死信”(DeadMessage),消費者将無法再收到該消息(這點不是絕對的)。

通過隊列屬性設定消息TTL的方法是在channel.queueDeclare 方法中加入x-message-ttl參數實作的,這個參數的機關是毫秒。

//設定TTL
Map<String, Object> argss = new HashMap<>();
argss.put("x-message-ttl", 6000);
channel.queueDeclare(QUEUE_NAME, true, false, false, argss);
           

同時也可以通過Pllicy的方式來設定TTL:

rabbitmqctl set_policy TTL ".*" '{"message-ttl"":60000}' --apply-to queues
           

如果不設定TTL,則表示此消息不會過期;

如果将TTL設定為0,則表示除非此時可以直接将消息投遞到消費者,否則該消息會被立即丢棄,這個特性可以部分替代RabbitMQ 3.0版本之前的immediate 參數,之是以部分代替,是因為immediate參數在投遞失敗時會用Basic. Return将消息傳回(這個功能可以用死信隊列來實作)

針對每條消息設定TTL的方法是在channel.basicPublish方法中加入expiration的屬性參數,機關為毫秒:

AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
builder.deliveryMode(2);//持久化消息
builder.expiration("60000");//設定TTL=60000ms
AMQP.BasicProperties properties = builder.build();
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, false, properties, "ttlTest".getBytes());
           

對于第一種設定隊列TTL屬性的方法,一旦消息過期,就會從隊列中抹去,而在第二種方法中,即使消息過期,也不會馬上從隊列中抹去,因為每條消息是否過期是在即将投遞到消費者之前判定的。

為什麼這兩種方法處理的方式不一樣?因為第一種方法裡,隊列中已過期的消息肯定在隊列頭部,RabbitMQ隻要定期從隊頭開始掃描是否有過期的消息即可。而第二種方法裡,每條消息的過期時間不同,如果要删除所有過期消息勢必要掃描整個隊列,是以不如等到此消息即将被消費時再判定是否過期,如果過期再進行删除即可。

設定隊列的TTL:

通過channel. queueDeclare方法中的x-expires參數可以控制隊列被自動删除前處于未使用狀态的時間

。未使用的意思是隊列上沒有任何的消費者,隊列也沒有被重新聲明,并且在過期時間段内也未調用過Basic.Get指令。

設定隊列裡的TTL可以應用于類似RPC方式的回複隊列,在RPC中,許多隊列會被建立出來,但是卻是未被使用的。

RabbitMQ會確定在過期時間到達後将隊列删除,但是不保障删除的動作有多及時。在RabbitMQ重新開機後,持久化的隊列的過期時間會被重新計算。

用于表示過期時間的x-expires參數以毫秒為機關,并且服從和x-message-ttl一樣的限制條件,不過

不能設定為0

。比如該參數設定為1000,則表示該隊列如果在1秒鐘之内未使用則會被删除。

//設定隊列TTL
Map<String, Object> arg = new HashMap<>();
arg.put("x-expires", 1800000);
channel.queueDeclare("myqueue", false, false, false, arg);
           

死信隊列

DLX,全稱為Dead-Letter-Exchange,可以稱之為死信交換器,也有人稱之為死信郵箱。當消息在一個隊列中變成死信( dead message)之後,它能被重新被發送到另一個交換器中,這個交換器就是DLX,綁定DLX的隊列就稱之為死信隊列。

消息變成死信一般是由于以下幾種情況:

  • 消息被拒絕(Basic. Reject/Basic.Nack),并且設定requeue參數為false;
  • 消息過期;
  • 隊列達到最大長度。

DLX也是一個正常的交換器,和一般的交換器沒有差別,它能在任何的隊列上被指定,實際上就是設定某個隊列的屬性。當這個隊列中存在死信時,RabbitMQ就會自動地将這個消息重新釋出到設定的DLX上去,進而被路由到另一個隊列,即死信隊列。可以監聽這個隊列中的消息以進行相應的處理,這個特性與将消息的TTL設定為0配合使用可以彌補immediate參數的功能。

通過在channel. queueDeclare方法中設定x-dead- letter-exchange參數來為這個隊列添加DLX

//設定死信隊列
channel.exchangeDeclare("dlx_exchange", "direct");//建立DLX
Map<String, Object> arg = new HashMap<>();
arg.put("x-dead-letter-exchange", "dlx_exchange");
//為隊列添加myqueue添加DLX
channel.queueDeclare("myqueue", false, false, false, arg);
           

也可以為這個DLX指定路郵鍵,如果沒有特殊指定,則使用原隊列的路由鍵:

也可以通過Policy的方式設定:

rabbitmqctl set_policy DLX ".*" '{"dead-letter-exchange":"dlx_exchange"}' --apply-to queues
           

示例:建立一個隊列,為其設定TTL和DLX:

channel.exchangeDeclare("exchange.dlx", "direct", true);
channel.exchangeDeclare("exchange.normal", "fanout", true);
Map<String, Object> arg = new HashMap<>();
arg.put("x-message-ttl", 10000);
arg.put("x-dead-letter-exchange", "exchange.dlx");
arg.put("x-dead-letter-routing-key", "routingkey");
channel.queueDeclare("queue.normal", true, false, false, arg);
channel.queueBind("queue.normal", "exchange.normal", "");
channel.queueDeclare("queue.dlx", true, false, false, null);
channel.queueBind("queue.dlx", "exchange.dlx", "routingkey");
channel.basicPublish("exchange.normal", "rk", 
        MessageProperties.PERSISTENT_TEXT_PLAIN, "dlx".getBytes());
           

這裡建立了兩個交換器exchange.normal 和exchange.dlx, 分别綁定兩個隊列queue.normal和queue.dlx.

由Web管理頁面可以看出,兩個隊列都被标記了“D”,這個是durable的縮寫,即設定了隊列持久化。queue.normal 這個隊列還配置了TTL、DLX和DLK,其中DLX指的是x-dead- letter-routing-key這個屬性。

RabbitMQ--過期時間TTL、死信隊列、延遲隊列和優先隊列

生産者首先發送一條攜帶路由鍵為“rk”的消息,然後經過交換器exchange.normal順利地存儲到隊列queue.normal中。由于隊列queue.normal設定了過期時間為10s,在這10s 内沒有消費者消費這條消息,那麼判定這條消息為過期。由于設定了DLX,過期之時,消息被丢給交換器exchange.dlx中,這時找到與exchange.dlx 比對的隊列queue.dlx, 最後消息被存儲在queue.dlx這個死信隊列中。

RabbitMQ--過期時間TTL、死信隊列、延遲隊列和優先隊列

對于RabbitMQ來說,DLX是一個非常有用的特性。它可以處理異常情況下,消息不能夠被消費者正确消費(消費者調用了Basic.Nack或者Basic. Reject)而被置入死信隊列中的情況,後續分析程式可以通過消費這個死信隊列中的内容來分析當時所遇到的異常情況,進而可以改善和優化系統。DLX配合TTL使用還可以實作延遲隊列的功能。

延遲隊列

延遲隊列存儲的對象是對應的延遲消息,所謂“延遲消息”是指當消息被發送以後,并不想讓消費者立刻拿到消息,而是等待特定時間後,消費者才能拿到這個消息進行消費。

延遲隊列的使用場景有很多,比如:

  • 在訂單系統中, 一個使用者下單之後通常有30分鐘的時間進行支付,如果30分鐘之内沒有支付成功,那麼這個訂單将進行異常處理,這時就可以使用延遲隊列來處理這些訂單了。
  • 使用者希望通過手機遠端遙控家裡的智能裝置在指定的時間進行工作。這時候就可以将使用者指令發送到延遲隊列,當指令設定的時間到了再将指令推送到智能裝置。

在AMQP協定中,或者RabbitMQ本身沒有直接支援延遲隊列的功能,但是可以通過前面所介紹的DLX和TTL模拟出延遲隊列的功能。

在上節的圖示中,不僅展示的是死信隊列的用法,也是延遲隊列的用法,對于queue.dlx這個死信隊列來說,同樣可以看作延遲隊列。假設一個應用中需要将每條消息都設定為10秒的延遲,生産者通過exchange.normal這個交換器将發送的消息存儲在queue.normal這個隊列中。消費者訂閱的并非是queue.normal這個隊列,而是queue.dlx這個隊列。當消息從queue.normal這個隊列中過期之後被存入queue.dlx這個隊列中,消費者就恰巧消費到了延遲10秒的這條消息。

在真實應用中,對于延遲隊列可以根據延遲時間的長短分為多個等級,-般分為5秒、10秒、30秒、1分鐘、5分鐘、10分鐘、30分鐘、1小時這幾個次元,當然也可以再細化一下。

為了簡化說明,這裡隻設定了5秒、10秒、30秒、1分鐘這四個等級。根據應用需求的不同,生産者在發送消息的時候通過設定不同的路由鍵,以此将消息發送到與交換器綁定的不同的隊列中。這裡隊列分别設定了過期時間為5秒、10秒、30秒、1分鐘,同時也分别配置了DLX和相應的死信隊列。當相應的消息過期時,就會轉存到相應的死信隊列(即延遲隊列)中,這樣消費者根據業務自身的情況,分别選擇不同延遲等級的延遲隊列進行消費。

RabbitMQ--過期時間TTL、死信隊列、延遲隊列和優先隊列

優先隊列

優先級隊列,顧名思義,具有高優先級的隊列具有高的優先權,優先級高的消息具備優先被消費的特權。

可以通過設定隊列的x-max-priority參數來實作:

//設定一個隊列的最大優先級
Map<String, Object> arg = new HashMap<>();
arg.put("x-max-priority", 10);
channel.queueDeclare("queue.priority", true, false, false, arg);
           

通過Web管理頁面可以看到

Pri

标志:

RabbitMQ--過期時間TTL、死信隊列、延遲隊列和優先隊列

上面的代碼示範的是如何配置一個隊列的最大優先級。在此之後,需要在發送時在消息中設定消息目前的優先級。

//設定消息優先級
//設定消息優先級
AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
builder.priority(5);
AMQP.BasicProperties properties = builder.build();
channel.basicPublish("exchange_priority", "rk_priority", properties, "messages".getBytes());
           

上面的代碼中設定消息的優先級為5。預設最低為0,最高為隊列設定的最大優先級。優先級高的消息可以被優先消費,這個也是有前提的:如果在消費者的消費速度大于生産者的速度且Broker中沒有消息堆積的情況下,對發送的消息設定優先級也就沒有什麼實際意義。因為生産者剛發送完一條消 息就被消費者消費了,那麼就相當于Broker中至多隻有一條消息,對于單條消息來說優先級是沒有什麼意義的。