消息如何保障100%的投遞成功?
什麼是生産端的可靠性投遞?
- 1、保障消息的成功發出
- 2、保障MQ節點的成功接收
- 3、發送端收到MQ節點(Broker)确認應答
-
4、完善的消息進行補償機制
前三步不一定能保障消息能夠100%投遞成功,是以要加上第四步。
一線網際網路大廠的解決方案:
1、消息落庫,對消息狀态進行打标

圖解:
- 藍色部分表示:生産者負責發送消息發送至Broker端
- Biz DB:訂單資料庫
- MSG DB: 消息資料
-
面對小規模的應用可以采用加事務的方式,保證事務的一緻性。但在大廠中面對高并發,并沒有加事務,事務的性能拼接非常嚴重,而是做補償。
比如:如下發一條訂單消息。
- step1:存儲訂單消息(建立訂單),業務資料入庫,消息也入庫。缺點:需要持久化兩次。(status:0)
- step2:在step1成功的前提下,發送消息
- step3:Broker收到消息後,confirm給我們的生産端。Confirm Listener異步監聽Broker回送的消息。
- step4:抓取出指定的消息,更新(status=1),表示消息已經投遞成功。
- step5:分布式定時任務擷取消息狀态,如果等于0則抓取資料出來。
- step6:重新發送消息
- step7:重試限制設定3次。如果消息重試了3次還是失敗,那麼(status=2),認為這個消息就是失敗的。
- 查詢這些消息為什麼失敗,可能需要人工去查詢。
- 假設step2執行成功,step3由于網絡閃斷。那麼confirm将永遠收不到消息,那麼我們需要設定一個規則:
- 例如:在消息入庫的時候,設定一個臨界值 timeout=5min,當超過5min之後,就将這條資料抓取出來。
- 或者寫一個定時任務每隔5分鐘就将status=0的消息抓取出來。可能存在小問題:消息發送出去,定時任務又正好剛執行,Confirm還未收到,定時任務就會執行,會導緻消息執行兩次。
- 更精細化操作:消息逾時容忍限制。confirm在2-3分鐘内未收到消息,則重新發送。
- 總體概括上圖:首先業務持久化入庫之後,在發送消息之前,需要将消息持久化到資料庫中,并給這個消息設定一個狀态(未發送、發送中、到達)。發送之後Producer端需要有一個ConfirmListener監聽Broker的傳回confirm,當消息狀态發生了變化,需要對消息做一個變更。針對沒有到達的消息做一個輪訓操作,重新發送。對輪訓次數也需要做一個限制3-5次。確定消息能夠成功的發送。
- 第一種方案對資料有兩次入庫,一次業務資料入庫,一次消息入庫。這樣對資料的入庫是一個瓶頸。
2、消息的延遲投遞,做二次确認,回調檢查
這種方式并不一定能保證100%成功,但是也能保證99.99%的消息成功。如果遇到特别極端的情況,那麼就隻能需要人工去補償,或者定時任務去做。
第二種方式主要是為了減少對資料庫的操作。
圖解:
- Upstream service:生産端
- DownStream service:消費端
- Callback service:回調服務
- step1:業務消息入庫成功後,第一次消息發送。
- step2:同樣在消息入庫成功後,發送第二次消息,這兩條消息是同時發送的。第二條消息是延遲檢查,可以設定2min、5min 延遲發送。
- step3:消費端監聽指定隊列。
- step4:消費端處理完消息後,内部生成新的消息send confirm。投遞到MQ Broker。
- step5: Callback Service 回調服務監聽MQ Broker,如果收到Downstream service發送的消息,則可以确定消息發送成功,執行消息存儲到MSG DB。
- step6:Check Detail檢查監聽step2延遲投遞的消息。此時兩個監聽的隊列不是同一個,5分鐘後,Callback service收到消息,檢查MSG DB。如果發現之前的消息已經投遞成功,則不需要做其他事情。如果檢查發現失敗,則Callback 進行補償,主動發送RPC 通信。通知上遊生産端重新發送消息。
- 首先正常邏輯:業務持久化入庫之後,一次性生成2條消息,第一條消息為Consumer監聽消費,第2條消息為延遲消息,由Callback Service監聽并消息;Consumer消費消息成功後給Callback Service發送确認消費消息,Callback Service做消息消費成功的入庫持久化,Callback Service收到延遲消息不做任何處理。
- 異常邏輯:Consumer消費消息失敗,或者沒有收到消息,那麼Callback Service收到延遲消息之後,通過RPC調用Producer查詢Producer的業務表進行消息的重新發送。極端情況下需要人工補償處理。
- 這樣做的目的就是為了在Producer端少做一次消息的持久化入庫處理,異步的進行補償操作,提升性能。
具體采用哪種方案,還需要根據業務與消息的并發量而定。
幂等性概念詳解
幂等性是什麼?
幂等(idempotent、idempotence)是一個數學與計算機學概念,常見于抽象代數中,即f(f(x)) = f(x)。簡單的來說就是一個操作多次執行産生的結果與一次執行産生的結果一緻。
- 我們可以借鑒資料庫的樂觀鎖機制:
- 比如我們執行一條更新庫存的SQL語句:
- UPDATE T_REPS SET COUNT = COUNT - 1,VERSION = VERSION + 1 WHERE VERSION = 1
- 利用加版本号Version的方式來保證幂等性。
消費端-幂等性保障
在海量訂單産生的業務高峰期,如何避免消息的重複消費問題?
在高并發的情況下,會有大量的消息到達MQ,消費端需要監聽大量的消息。這樣的情況下,難免會出現消息的重複投遞,網絡閃斷等等。如果不去做幂等,則會出現消息的重複消費。
消費端實作幂等性,就意味着,我們的消息永遠不會被消費多次,即使我們收到了多條一樣的消息,也隻會執行一次。
看下網際網路主流的幂等性操作:
- 唯一ID+指紋碼機制,利用資料庫主鍵去重。
- 利用Redis的原子性實作
- 其他的技術實作幂等性:https://www.jianshu.com/p/56e0746ab3cc
唯一ID+指紋碼機制
- 唯一ID + 指紋碼機制,利用資料庫主鍵去重。保證唯一性
-
SELECT COUNT(1) FROM T_ORDER WHERE ID = 唯一ID + 指紋碼
如果查詢沒有,則添加。有則不需要做任何操作,消費端不需要消費消息。
- 好處:實作簡單
- 壞處:高并發下有資料庫寫入的性能瓶頸
- 解決方案:跟進ID進行分庫分表進行算法路由分攤流量壓力。
Redis 原子特性實作
最簡單使用Redis的自增。
使用Redis進行幂等,需要考慮的問題。
- 第一:我們是否需要資料落庫,如果落庫的話,關鍵解決的問題是資料庫和緩存如何做到原子性?加事務不行,Redis和資料庫的事務不是同一個,無法保證同時成功同時失敗。
- 第二:如果不進行落庫,那麼都存儲到緩存中,如何設定定時同步的政策?
Confirm确認消息、Return傳回消息
了解Confirm 消息确認機制:
- 消息的确認,是指生産者投遞消息後,如果Broker收到消息,則會給我們生産者一個應答。
-
生産者進行接收應答,用來确定這條消息是否正常的發送到Broker,這種方式也是消息的可靠性投遞的核心保障!
Confirm确認消息流程:
RabbitMQ進階特性 藍色:producer 生産者 紅色:MQ Broker 伺服器
生産者把消息發送到Broker端,Broker收到消息之後回送給producer。Confirm Listener 監聽應答。
操作是異步操作,當生産者發送完消息之後,就不需要管了。Confirm Listener 監聽MQ Broker的應答。
如何實作Confirm确認消息?
- 第一步:在channel上開啟确認模式:channel.confirmSelect()
- 第二步;在chanel上 添加監聽:addConfirmListener,監聽成功和失敗的傳回結果,根據具體的結果對消息進行重新發送、或記錄日志等後續處理!
Confirm确認模式的生産端:
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
//1、建立一個ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
//2、通過連接配接工廠建立連接配接
Connection connection = connectionFactory.newConnection();
//3、通過Connection建立一個Channel
Channel channel = connection.createChannel();
//4、指定消息的投遞模式:消息的确認模式
channel.confirmSelect();
String exchangeName = "test_confirm_exchange";
String routingKey = "confirm.save";
//5、發送消息
String msg = "hello rabbitmq send confirm message!";
channel.basicPublish(exchangeName,routingKey,null,msg.getBytes("UTF-8"));
//6、添加一個确認監聽
channel.addConfirmListener(new ConfirmListener() {
/**
* 傳回成功
* @param deliveryTag
* @param multiple
* @throws IOException
*/
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
System.out.println("----------ack----------");
}
/**
* 傳回失敗
* @param deliveryTag
* @param multiple
* @throws IOException
*/
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.out.println("----------no ack----------");
}
});
}
}
Confirm确認模式的消費端:
public class Consumer {
public static void main(String[] args) throws IOException, TimeoutException {
//1、建立一個ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
//2、通過連接配接工廠建立連接配接
Connection connection = connectionFactory.newConnection();
//3、通過Connection建立一個Channel
Channel channel = connection.createChannel();
String exchangeName = "test_confirm_exchange";
String queueName = "test_confirm_queue";
String routingKey = "confirm.#";
//4、聲明交換機和隊列,然後進行綁定設定,指定路由key
channel.exchangeDeclare(exchangeName,"topic",true);
channel.queueDeclare(queueName,true,false,false,null);
channel.queueBind(queueName,exchangeName,routingKey);
//5、建立消費者
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(envelope.getRoutingKey() + ":" + message);
}
};
//6、設定channel
channel.basicConsume(queueName,true,consumer);
}
}
Return消息機制
- Return Listener用于處理一些不可路由的消息!
- 我們的消息生産者,通過指定一個Exchange和Routingkey,把消息送達到某一個隊列中去,然後我們的消費者監聽隊列,進行消費處理操作!
- 但是在某些情況下,如果我們在發送消息的時候,目前的exchange不存在或者指定的路由key路由不到,這個時候如果我們需要監聽這種不可達的消息,就要使用Return Listener!
Return消息機制基礎API中有一個關鍵的配置項:
- Mandatory:如果為true,則監聽器會接收到路由不可達的消息,然後進行後續處理,如果為false,那麼broker端自動删除該消息!
Return消息機制流程
Return消息機制生産端:
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
//1、建立一個ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
//2、通過連接配接工廠建立連接配接
Connection connection = connectionFactory.newConnection();
//3、通過Connection建立一個Channel
Channel channel = connection.createChannel();
//4、指定消息的投遞模式:消息的确認模式
channel.confirmSelect();
String exchangeName = "test_return_exchange";
String routingKey = "return.save";
String routingKeyError = "error.save";
//5、發送消息
String msg = "hello rabbitmq send confirm message!";
channel.basicPublish(exchangeName,routingKey,true,null,msg.getBytes("UTF-8"));
//第三個參數mandatory=true,意味着路由不到的話mq也不會删除消息,false則會自動删除
channel.basicPublish(exchangeName,routingKeyError,true,null,msg.getBytes("UTF-8"));
channel.basicPublish(exchangeName,routingKeyError,false,null,msg.getBytes("UTF-8"));
//6、添加一個return監聽
channel.addReturnListener(new ReturnListener() {
@Override
public void handleReturn(int replyCode, String replyText, String exchange,
String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("----------handle return----------");
System.out.println("replyCode:" + replyCode);
System.out.println("replyText:" + replyText);
System.out.println("exchange:" + exchange);
System.out.println("routingKey:" + routingKey);
System.out.println("properties:" + properties);
System.out.println("body:" + new String(body,"UTF-8"));
}
});
}
}
Return消息機制消費端:
public class Consumer {
public static void main(String[] args) throws IOException, TimeoutException {
//1、建立一個ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
//2、通過連接配接工廠建立連接配接
Connection connection = connectionFactory.newConnection();
//3、通過Connection建立一個Channel
Channel channel = connection.createChannel();
String exchangeName = "test_return_exchange";
String queueName = "test_return_queue";
String routingKey = "return.#";
//4、聲明交換機和隊列,然後進行綁定設定,指定路由key
channel.exchangeDeclare(exchangeName,"topic",true);
channel.queueDeclare(queueName,true,false,false,null);
channel.queueBind(queueName,exchangeName,routingKey);
//5、建立消費者
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(envelope.getRoutingKey() + ":" + message);
}
};
//6、設定channel
channel.basicConsume(queueName,true,consumer);
}
}
關于Confirm确認消息、Return傳回消息:rabbitmq生産者的消息确認
消息的限流(防止占用記憶體過多,節點當機)
什麼是消費端的限流?
- 假設一個場景,首先,我們Rabbitmq伺服器有上萬條未處理的消息,我們随便打開一個消費者用戶端,會出現下面情況:
- 巨量的消息瞬間全部推送過來,但是我們單個用戶端無法同時處理這麼多資料!這個時候很容易導緻伺服器崩潰,出現故障。
為什麼不在生産端進行限流呢?
因為在高并發的情況下,流量就是非常大,是以很難在生産端做限制。是以我們可以用MQ在消費端做限流。
-
RabbitMQ提供了一種qos(服務品質保證)功能,即在非自動确認消息的前提下,如果一定數目的消息(通過基于consume或者channel設定Qos的值)未被确認前,不進行消費新的消息。
在限流的情況下,千萬不要設定自動簽收,要設定為手動簽收。
-
void BasicQos(uint prfetchSize,ushort prefetchCount,bool global);
參數解釋:
- prefetchSize:0
- prefetchCount:會告訴RabbitMQ不要同時給一個消費者推送多于N個消息,即一旦有N個消息還沒有ack,則該consumer将block掉,直到有消息ack。
- global: true\false 是否将上面設定應用于channel,簡單點說,就是上面限制是channel級别還是consumer級别。
- prefetchSize和global這兩項,rabbitmq沒有實作,暫且不研究prefetch_count在no_ask = false的情況下生效,即在自動應答的情況下這兩個值是不生效的。
- 第一個參數:消息的限制大小,消息多少兆。一般不做限制,設定為0
- 第二個參數:一次最多處理多少條,實際工作中設定為1就好
- 第三個參數:限流政策在什麼上應用。在RabbitMQ一般有兩個應用級别:1.通道 2.Consumer級别。一般設定為false,true 表示channel級别,false表示在consumer級别
限流機制生産者端:
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
//1、建立一個ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
//2、通過連接配接工廠建立連接配接
Connection connection = connectionFactory.newConnection();
//3、通過Connection建立一個Channel
Channel channel = connection.createChannel();
//4、指定消息的投遞模式:消息的确認模式
channel.confirmSelect();
String exchangeName = "test_qos_exchange";
String routingKey = "qos.save";
//5、發送消息
String msg = "hello rabbitmq send confirm message!";
channel.basicPublish(exchangeName,routingKey,true,null,msg.getBytes("UTF-8"));
}
}
限流機制消費者端:
public class MyConsumer extends DefaultConsumer {
private Channel channel;
public MyConsumer(Channel channel) {
super(channel);
this.channel = channel;
}
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(envelope.getRoutingKey() + ":" + message);
//手動簽收
channel.basicAck(envelope.getDeliveryTag(),false);
}
}
public class Consumer {
public static void main(String[] args) throws IOException, TimeoutException {
//1、建立一個ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
//2、通過連接配接工廠建立連接配接
Connection connection = connectionFactory.newConnection();
//3、通過Connection建立一個Channel
Channel channel = connection.createChannel();
String exchangeName = "test_qos_exchange";
String queueName = "test_qos_queue";
String routingKey = "qos.#";
//4、聲明交換機和隊列,然後進行綁定設定,指定路由key
channel.exchangeDeclare(exchangeName,"topic",true);
channel.queueDeclare(queueName,true,false,false,null);
channel.queueBind(queueName,exchangeName,routingKey);
//5、建立消費者
DefaultConsumer consumer = new MyConsumer(channel);
//6、限流方式 AutoAck設定為false
channel.basicConsume(queueName,false,consumer);
//
channel.basicQos(0,1,false);
}
}
消息的ACK與重回隊列
消費端的手工ACK和NACK
- 消費端進行消費的時候,如果由于業務異常我們可以進行日志的記錄,然後進行補償!
- 如果由于伺服器當機等嚴重問題,那我們就需要手工進行ACK保障消費端消費成功!
消費端的重回隊列
- 消費端重回隊列是為了對沒有處理成功的消息,把消息重新傳遞給Broker!
-
一般我們在實際應用中,都會關閉重回隊列,也就是設定為False.
生産者端代碼:
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
//1、建立一個ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
//2、通過連接配接工廠建立連接配接
Connection connection = connectionFactory.newConnection();
//3、通過Connection建立一個Channel
Channel channel = connection.createChannel();
//4、指定消息的投遞模式:消息的确認模式
channel.confirmSelect();
String exchangeName = "test_ack_exchange";
String routingKey = "ack.save";
Map<String,Object> headers = new HashMap<>();
headers.put("num",0);
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()
.deliveryMode(2)
.contentEncoding("UTF-8")
.headers(headers)
.build();
for (int i = 0; i < 5; i++) {
//5、發送消息
String msg = "hello rabbitmq send ack message!"+i;
channel.basicPublish(exchangeName,routingKey,true,properties,msg.getBytes("UTF-8"));
}
}
}
消費者端:
public class MyConsumer extends DefaultConsumer {
private Channel channel;
public MyConsumer(Channel channel) {
super(channel);
this.channel = channel;
}
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(envelope.getRoutingKey() + ":" + message);
Map<String, Object> headers = properties.getHeaders();
Integer num = (Integer) headers.get("num");
if(num == 0){
//第三個參數,true:重回隊列,false:不重回隊列
//重回隊列,會将消息重新添加到消息的尾部
channel.basicNack(envelope.getDeliveryTag(),false,true);
return;
}
//手動簽收
channel.basicAck(envelope.getDeliveryTag(),false);
}
}
public class Consumer {
public static void main(String[] args) throws IOException, TimeoutException {
//1、建立一個ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
//2、通過連接配接工廠建立連接配接
Connection connection = connectionFactory.newConnection();
//3、通過Connection建立一個Channel
Channel channel = connection.createChannel();
String exchangeName = "test_ack_exchange";
String queueName = "test_ack_queue";
String routingKey = "ack.save";
//4、聲明交換機和隊列,然後進行綁定設定,指定路由key
channel.exchangeDeclare(exchangeName,"topic",true);
channel.queueDeclare(queueName,true,false,false,null);
channel.queueBind(queueName,exchangeName,routingKey);
//5、建立消費者
DefaultConsumer consumer = new MyConsumer(channel);
//6、手動簽收 AutoAck設定為false
channel.basicConsume(queueName,false,consumer);
}
}
TTL消息
TTL
- TTL是Time To Live的縮寫,也就是生存時間
- RabbitMQ支援消息的過期時間,在消息發送時可以進行指定
- RabbitMQ支援隊列的過期時間,從消息入隊列開始計算,隻要超過了隊列的逾時時間配置,那麼消息會自動的清除
死信隊列
死信隊列:DLX,Dead-Letter-Exchange
RabbitMQ的死信隊裡與Exchange息息相關
- 利用DLX,當消息在一個隊列中變成死信(dead message)之後,它能被重新publish到另一個Exchange,這個Exchange就是DLX
消息變成死信有以下幾種情況:
- 消息被拒絕(basic.reject/basic.nack)并且requeue=false
- 消息TTL過期
- 隊列達到最大長度
DLX也是一個正常的Exchange,和一般的Exchange沒有差別,它能在任何的隊列上被指定,實際上就是設定某個隊列的屬性。
當這個隊列中有死信時,RabbitMQ就會自動的将這個消息重新釋出到設定的Exchange上去,進而被路由到另一個隊列。
可以監聽這個隊列中消息做相應的處理,這個特征可以彌補RabbitMQ3.0以前支援的immediate參數的功能。
死信隊列設定:
- 首先需要設定死信隊列的exchange和queue,然後進行綁定:
- Exchange:dlx.exchange
- Queue:dlx.queue
- RoutingKey:#
- 然後我們進行正常聲明交換機、隊列、綁定,隻不過我們需要在隊列加上一個參數即可:arguments.put("x-dead-letter-exchange","dlx.exchange");
- 這樣消息在過期、requeue、隊列在達到最大長度時,消息就可以直接路由到死信隊列!
Producer端:
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
//1、建立一個ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
//2、通過連接配接工廠建立連接配接
Connection connection = connectionFactory.newConnection();
//3、通過Connection建立一個Channel
Channel channel = connection.createChannel();
String exchangeName = "test_dlx_exchange";
String routingKey = "dlx.save";
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()
.deliveryMode(2)
.contentEncoding("UTF-8")
.expiration("20000")
.build();
//5、發送消息
String msg = "hello rabbitmq send dlx message!";
channel.basicPublish(exchangeName,routingKey,true,properties,msg.getBytes("UTF-8"));
}
}
消費者端:
public class MyConsumer extends DefaultConsumer {
private Channel channel;
public MyConsumer(Channel channel) {
super(channel);
this.channel = channel;
}
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(envelope.getRoutingKey() + ":" + message);
//手動簽收
channel.basicAck(envelope.getDeliveryTag(),false);
}
}
public class Consumer {
public static void main(String[] args) throws IOException, TimeoutException {
//1、建立一個ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
//2、通過連接配接工廠建立連接配接
Connection connection = connectionFactory.newConnection();
//3、通過Connection建立一個Channel
Channel channel = connection.createChannel();
//這是一個普通的交換機、隊列、路由
String exchangeName = "test_dlx_exchange";
String queueName = "test_dlx_queue";
String routingKey = "dlx.#";
//4、聲明交換機和隊列,然後進行綁定設定,指定路由key
channel.exchangeDeclare(exchangeName,"topic",true);
Map<String,Object> agruments = new HashMap<>();
agruments.put("x-dead-letter-exchange", "dlx.exchange");
//這個agruments屬性要聲明到隊列上
channel.queueDeclare(queueName,true,false,false,agruments);
channel.queueBind(queueName,exchangeName,routingKey);
//死信隊列的聲明
channel.exchangeDeclare("dlx.exchange","topic",true,false,null);
channel.queueDeclare("dlx.queue",true,false,false,null);
channel.queueBind("dlx.queue","dlx.exchange","#");
//5、建立消費者
DefaultConsumer consumer = new MyConsumer(channel);
//6、手動簽收 AutoAck設定為false
channel.basicConsume("dlx.queue",false,consumer);
}
}
在實際工作中,死信隊列非常重要,用于消息沒有消費者,處于死信狀态。我們可以才用補償機制。