天天看點

RabbitMQ實作延時消息的兩種方法

76套java從入門到精通實戰課程分享

消息被否定接收,消費者使用basic.reject 或者 basic.nack并且requeue 重回隊列屬性設為false。

消息在隊列裡得時間超過了該消息設定的過期時間(ttl)。

消息隊列到達了它的最大長度,之後再收到的消息。

當一個消息再隊列裡變為死信時,它會被重新publish到另一個exchange交換機上,這個exchange就為dlx。是以我們隻需要在聲明正常的業務隊列時添加一個可選的"x-dead-letter-exchange"參數,值為死信交換機,死信就會被rabbitmq重新publish到配置的這個交換機上,我們接着監聽這個交換機就可以了。

引入amqp依賴

聲明交換機,隊列

聲明一個類用于發送帶過期時間的消息

編寫一個類用于消費消息

編寫controller調用發送消息方法測試結果

配置檔案application.properties

啟動項目,打開rabbitmq控制台,可以看到交換機和隊列已經建立好。

RabbitMQ實作延時消息的兩種方法
RabbitMQ實作延時消息的兩種方法

在浏覽器中請求http://localhost:4399/send?msg=hello&time=5,從控制台的輸出來看,剛好5s後接收到消息。

當我往死信隊列中發送兩條不同過期時間的消息時,如果先發送的消息a的過期時間大于後發送的消息b的過期時間時,由于消息的順序消費,消息b過期後并不會立即重新publish到死信交換機,而是會等到消息a過期後一起被消費。

依次發送兩個請求http://localhost:4399/send?msg=消息a&time=30和http://localhost:4399/send?msg=消息b&time=10,消息a先發送,過期時間30s,消息b後發送,過期時間10s,我們想要的結果應該是10s收到消息b,30s後收到消息a,但結果并不是,控制台輸出如下:

消息a30s後被成功消費,緊接着消息b被消費。是以當我們使用死信隊列時應該注意是否消息的過期時間都是一樣的,比如訂單超過10分鐘未支付修改其狀态。如果當一個隊列各個消息的過期時間不一緻時,使用死信隊列就可能達不到延時的作用。這時候我們可以使用延時插件來實作這需求。

rabbitmq delayed message plugin是一個rabbitmq的插件,是以使用前需要安裝它,可以參考的github位址:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange

安裝好插件後隻需要聲明一個類型type為"x-delayed-message"的exchange,并且在其可選參數下配置一個key為"x-delayed-typ",值為交換機類型(topic/direct/fanout)的屬性。

聲明一個隊列綁定到該交換機

在發送消息的時候消息的header裡添加一個key為"x-delay",值為過期時間的屬性,機關毫秒。

代碼就在上面,配置類為dmp開頭的,發送消息的方法為send2()。

啟動後在rabbitmq控制台可以看到一個類型為x-delayed-message的交換機。

RabbitMQ實作延時消息的兩種方法
RabbitMQ實作延時消息的兩種方法

繼續在浏覽器中發送兩個請求http://localhost:4399/send2?msg=消息a&time=30和http://localhost:4399/send2?msg=消息b&time=10,控制台輸出如下,不會出現死信隊列出現的問題:

死信交換機官網介紹:https://www.rabbitmq.com/dlx.html 延時插件github:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange