RabbitMQ進階-Queue隊列詳解-延遲隊列
文章目錄
-
-
- RabbitMQ進階-Queue隊列詳解-延遲隊列
-
- 1.延遲隊列場景
-
- 1.1 場景
- 2.延遲隊列實作方式
- 3.TTL+Exchange實作延遲隊列
-
- 3.1 初始化死信交換機
- 3.2 生産者
- 3.3 消費者
- 4.安裝插件實作延遲隊列
-
- 4.1 插件下載下傳
- 4.2 插件安裝
- 4.3 延遲交換機插件使用
- 4.4 檢視結果
-
1.延遲隊列場景
1.1 場景
一般延遲隊列用于特定事件發生後隔一段時間需要做特定處理的場景,下面舉幾個常見的栗子
1.電商系統中,若使用者下單後30min不支付,自動取消訂單
2.使用者登入APP浏覽特定商品20min後還沒下單,自動推送商品評測資訊的消息
3.調用第三方接口後,過30s去查詢接口調用狀态,比如簡單的掉第三方接口發短信,掉完營運商不會立馬告知你短信發送成功還是失敗、是以設定30s後去主動查詢下短信狀态,然後更新此條短信狀态(成功、失敗、待回執)等
我們本篇文章,模拟場景 設定30s的延遲隊列,消息在隊列中延遲30s後,再去處理消息,實作業務邏輯
2.延遲隊列實作方式
Rabbitmq本身是沒有延遲隊列的,要實作延遲消息,一般有兩種方式:
- 通過Rabbitmq本身隊列的特性TTL來實作,利用隊列消息的存活時間來實作,設定隊列的TTL消息存活周期 和死信交換機,延遲隊列需要消息的存活時間TTL(Time To Live) 來丢棄消息,并使用Rabbitmq的死信交換機(Exchange)來負責消息的轉發
- 在rabbitmq 3.5.7及以上的版本提供了一個插件(rabbitmq-delayed-message-exchange)來實作延遲隊列功能。同時插件依賴Erlang/OPT18.0及以上
3.TTL+Exchange實作延遲隊列
這種方式實作的原理就是利用隊列消息存活時間TTL(Time To Live) ,逾時就會丢棄本條消息,然後丢棄到死信交換機,死信交換機根據路由資訊RoutingKey去負責路由轉發,轉發到相應的消費者來實作 延遲後業務邏輯處理
!!!注意我們設定TTL的隊列是用做存儲消息30s的,他并沒有消費者
真正的消費者是綁定在死信交換機上面的,通過死信交換機設定的RoutingKey來路由到目标隊列
實作原理:
3.1 初始化死信交換機
構造一個DeadLetterExchange 名字exchange_dead 和一個 target隊列 名字 queue_delay_target,target隊列就是接收死信消息,由他的消費者來處理延遲30s後的相關業務
交換機枚舉類 ExchangeTypeEnum
package delay;
public enum ExchangeTypeEnum {
DIRECT("exchange-direct-name", "direct"),
FANOUT("exchange-fanout-name", "fanout"),
TOPIC("exchange-topic-name", "topic"),
HEADER("exchange-header-name", "headers"),
UNKNOWN("unknown-exchange-name", "direct");
/**
* 交換機名字
*/
private String name;
/**
* 交換機類型
*/
private String type;
ExchangeTypeEnum(String name, String type) {
this.name = name;
this.type = type;
}
public String getName() {
return name;
}
public String getType() {
return type;
}
public static ExchangeTypeEnum getEnum(String type) {
ExchangeTypeEnum[] exchangeArrays = ExchangeTypeEnum.values();
for (ExchangeTypeEnum exchange : exchangeArrays) {
if (exchange.getName().equals(type)) {
return exchange;
}
}
return ExchangeTypeEnum.UNKNOWN;
}
}
死信交換機、死信隊列定義
package delay;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import conn.MqConnectUtil;
public class DelayDeadExchange {
/**
* 死信交換機
*/
public static final String DEAD_EXCHANGE = "exchange_dead";
/**
* 目标隊列,就是真正有消費者監聽,要處理業務的隊列
*/
public static final String QUEUE_TARGET = "queue_delay_target";
/**
* 設定 rk=# 表示任意RK的消息過來,都可以路由到 dead_msg_quque這個隊列
*/
public static final String RK_QUEUE_TARGET = "rk.queue_delay_target";
/**
* 聲明死信隊列資訊
* <p>
* 死信隊列需要将 死亡的消息路由到 目标隊列,進而使目标隊列的消費者進行消費處理相關業務
*
* @throws Exception
*/
public static void deadInit() throws Exception {
// 擷取到連接配接以及mq通道
Connection connection = MqConnectUtil.getConnectionDefault();
// 從連接配接中建立通道
Channel channel = connection.createChannel();
/*聲明 直連交換機 交換機 String exchange,
* 參數明細
* 1、交換機名稱
* 2、交換機類型,topic
*/
channel.exchangeDeclare(DEAD_EXCHANGE, ExchangeTypeEnum.DIRECT.getType());
channel.queueDeclare(QUEUE_TARGET, true, false, false, null);
/*交換機和隊列綁定String queue, String exchange, String routingKey
* 參數明細
* 1、隊列名稱
* 2、交換機名稱
* 3、路由key rk.queue_delay_target
*/
channel.queueBind(QUEUE_TARGET, DEAD_EXCHANGE, RK_QUEUE_TARGET);
//關閉通道和連接配接
channel.close();
connection.close();
}
public static void main(String[] args) throws Exception {
deadInit();
}
}
運作 deadInit(),初始化死信交換機及死信隊列,檢視下界面
3.2 生産者
生産者定義一個TTL隊列 queue_delay_ttl 設定隊列消息生命周期TTL:30s,路由RoutingKey :rk.ttl_queue_test 就是為了存放消息,讓他逾時死亡後,經過死信交換機路由轉發
!!!注意該隊列沒有消費者,隻有ttl參數及綁定的死信隊列
package delay;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import conn.MqConnectUtil;
import subscrib3.ExchangeTypeEnum;
import java.time.LocalDate;
import java.time.LocalTime;
import java.util.HashMap;
import java.util.Map;
import static delay.DelayDeadExchange.*;
public class DelayQueueProducer {
/**
* 延時隊列名字
*/
public final static String QUEUE_TTL = "queue_delay_ttl";
/**
* 設定延時隊列的RK
*/
public final static String RK_QUEUE_TTL = "rk.ttl_queue_test";
/**
* 生産 Direct直連 交換機的MQ消息
*/
public static void produce() throws Exception {
// 擷取到連接配接以及mq通道
Connection connection = MqConnectUtil.getConnectionDefault();
// 從連接配接中建立通道
Channel channel = connection.createChannel();
/*聲明 直連交換機 交換機 String exchange,
* 參數明細
* 1、交換機名稱
* 2、交換機類型,direct
*/
channel.exchangeDeclare(ExchangeTypeEnum.DIRECT.getName(), ExchangeTypeEnum.DIRECT.getType());
/* 聲明(建立)隊列 queueDeclare( String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
* queue - 隊列名
* durable - 是否是持久化隊列, 隊列的聲明預設是存放到記憶體中的,如果rabbitmq重新開機會丢失
* exclusie - 是否排外的,僅限于目前隊列使用
* autoDelete - 是否自動删除隊列,當最後一個消費者斷開連接配接之後隊列是否自動被删除,可以通過界面 檢視某個隊列的消費者數量,當consumers = 0時隊列就會自動删除
* arguments - 隊列攜帶的參數 比如 ttl-生命周期,x-dead-letter 死信隊列等等
*/
Map<String, Object> arguments = new HashMap<>();
arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE);
// !!!!! 注意這裡綁定的 RoutingKey 是 死信隊列根據RK要路由到目标隊列的RoutingKey,是以要用目标隊列的RoutingKey
arguments.put("x-dead-letter-routing-key", RK_QUEUE_TARGET);
arguments.put("x-message-ttl", 30000);
channel.queueDeclare(QUEUE_TTL, true, false, false, arguments);
/*交換機和隊列綁定String queue, String exchange, String routingKey
* 參數明細
* 1、隊列名稱
* 2、交換機名稱
* 3、路由key rk.queue_delay_ttl
*/
channel.queueBind(QUEUE_TTL, ExchangeTypeEnum.DIRECT.getName(), RK_QUEUE_TTL);
/* 發送消息 String exchange, String routingKey, BasicProperties props, byte[] body
* exchange - 交換機 ,"" 空時候指定的是 擷取的virtualHost 虛拟伺服器的 預設的exchang,每個virtualHost都有一個AMQP default type:direct 直接轉發
* queuename - 隊列資訊
* props - 參數資訊
* message 消息體 byte[]類型
*/
// 消息内容
String message = "i=1" + " Hello World! Time:" + LocalDate.now() + " " + LocalTime.now();
channel.basicPublish(ExchangeTypeEnum.DIRECT.getName(), RK_QUEUE_TTL, null, message.getBytes());
System.out.println(" **** Producer Sent Message: [" + message + "]");
//關閉通道和連接配接
channel.close();
connection.close();
}
public static void main(String[] args) throws Exception {
// //生産消息
produce();
}
}
運作produce() 生産1條消息,檢視界面
TTL的隊列中消息由1條,target中沒有消息
3.3 消費者
30s後,生産者生産的消息,TTL生命周期到了,就死亡了,從DeadLetterExchange 路由到了Traget隊列
看下Target隊列,target隊列中從 0 變成 了1條消息
消費者我們循環10次,讓他等待消息,消費此消息
package delay;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import conn.MqConnectUtil;
import static delay.DelayDeadExchange.*;
public class DelayTargetConsumer {
public static void main(String[] argv) throws Exception {
Connection connection = null;
Channel channel = null;
try {
connection = MqConnectUtil.getConnectionDefault();
channel = connection.createChannel();
/*聲明交換機 String exchange
* 參數明細
* 1、交換機名稱
* 2、交換機類型,fanout、topic、direct、headers
*/
channel.exchangeDeclare(ExchangeTypeEnum.DIRECT.getName(), ExchangeTypeEnum.DIRECT.getType());
/*聲明隊列
* 參數明細:
* 1、隊列名稱
* 2、是否持久化
* 3、是否獨占此隊列
* 4、隊列不用是否自動删除
* 5、參數
*/
channel.queueDeclare(QUEUE_TARGET, true, false, false, null);
//交換機和隊列綁定String queue, String exchange, String routingKey
/**
* 參數明細
* 1、隊列名稱
* 2、交換機名稱
* 3、路由key
*/
channel.queueBind(QUEUE_TARGET, DEAD_EXCHANGE, RK_QUEUE_TARGET);
System.out.println(" **** Consumer->1 Waiting for messages. To exit press CTRL+C");
QueueingConsumer consumer = new QueueingConsumer(channel);
/* 消息确認機制
* autoAck true:表示自動确認,隻要消息從隊列中擷取,無論消費者擷取到消息後是否成功消費,都會認為消息已經成功消費
* autoAck false:表示手動确認,消費者擷取消息後,伺服器會将該消息标記為不可用狀态,等待消費者的回報,如果消費者一直沒有回報,那麼該消息将一直處于不可用狀态
* 并且伺服器會認為該消費者已經挂掉,不會再給其發送消息,直到該消費者回報
* !!!!!! 注意這裡是 false,手動确認
*/
channel.basicConsume(QUEUE_TARGET, false, consumer);
int count = 0;
while (count < 10) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" count:" + count + " **** Consumer->1 Received '" + message + "'");
doSomeThing(message);
//傳回确認狀态
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
count++;
}
} catch (Exception e) {
e.printStackTrace();
} finally {
channel.close();
connection.close();
}
}
/**
* 模拟處理複雜邏輯:休眠100ms
*
* @param message
* @throws Exception
*/
public static void doSomeThing(String message) throws Exception {
//周遊Count ,sleep , 接收一條消息後休眠 100 毫秒,模仿複雜邏輯
Thread.sleep(100);
}
}
運作consumer,接收到了此消息,隊列清空
消費完畢,消息隊列清零
4.安裝插件實作延遲隊列
RabbitMQ提供了插件,可以實作延遲隊列,我們現在采用插件的方式來實作一下延遲隊列
4.1 插件下載下傳
去RabbitMQ的官網下載下傳插件,插件位址:https://www.rabbitmq.com/community-plugins.html
找到 GitHub: rabbitmq/rabbitmq-delayed-message-exchange 的地方,下載下傳
4.2 插件安裝
把下載下傳的 rabbitmq_delayed_message_exchange-3.8.0.ez 檔案 複制到Rabbitmq安裝路徑 下面的 plugins/目錄下
切換到Rabbitmq安裝目錄 sbin下,執行指令
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
打開界面Rabbitmq,建立交換機 plugin_delay_exchange,可以看到多了一種交換機 x-delay-message類型
多了一種交換機類型x-delay-message類型
4.3 延遲交換機插件使用
生産消息
package delay;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import conn.MqConnectUtil;
import java.util.HashMap;
import java.util.Map;
public class PluginsDelayExchange {
/**
* 延遲隊列
*/
public static final String PLUGIN_DELAY_QUEUE = "plugin_delay_queue";
/**
* 延遲隊列RoutingKey
*/
public static final String RK_PLUGIN_DELAY_QUEUE = "rk.plugin_delay_queue";
/**
* 延遲隊列交換機
*/
public static final String PLUGIN_DELAY_EXCHANGE = "plugin_delay_exchange";
public static void main(String[] args) throws Exception {
// 擷取到連接配接以及mq通道
Connection connection = MqConnectUtil.getConnectionDefault();
// 從連接配接中建立通道
Channel channel = connection.createChannel();
Map<String, Object> arguments = new HashMap<>();
arguments.put("x-delayed-type", "direct");
channel.exchangeDeclare(PLUGIN_DELAY_EXCHANGE, "x-delayed-message", true, false, arguments);
channel.queueDeclare(PLUGIN_DELAY_QUEUE, true, false, false, null);
channel.queueBind(PLUGIN_DELAY_QUEUE, PLUGIN_DELAY_EXCHANGE, RK_PLUGIN_DELAY_QUEUE);
Map<String, Object> headers = new HashMap<>();
//延遲10s後發送
headers.put("x-delay", 10000);
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().headers(headers).build();
//延遲10s後 發送消息
channel.basicPublish(PLUGIN_DELAY_EXCHANGE, RK_PLUGIN_DELAY_QUEUE, props, "該消息将在10s後發送到隊列".getBytes());
channel.close();
connection.close();
}
}
執行 produce,生産1條消息
4.4 檢視結果
看下隊列中的消息
是以說,插件的實作方式和死信隊列的實作方式完全不同,一個是延遲發送、一個是延遲消費
- 插件看似立馬發送,Produce程序已經結束了,過了10s後,隊列才真正的有消息
- 死信隊列TTL方式是立馬發送,然後消息在隊列中存活10sTTL時間,後被路由轉發,由消費者消費
至此 死信隊列講完了
下一章 我們講一下 RabbitMQ系列(十二)RabbitMQ進階-消息确認機制之事務機制