【訂單失效】RabbitMQ死信隊列實作
之前做商城遇到一個關于訂單未支付逾時失效的問題,總結一下
1.訂單失效問題
訂單失效問題比較麻煩的地方就是如何能夠實時擷取失效的訂單。
對于這種問題一般有兩種解決方案: 定時任務處理,延時任務處理
2.定時任務處理
- 使用者下訂單後先生成訂單資訊,然後将該訂單加入到定時任務中(30分鐘後執行),當到達指定時間後檢查訂單狀态,如果未支付則辨別該訂單失效。
- 定時去輪詢資料庫/緩存,看訂單的狀态。這種方式的問題很明顯,當叢集部署伺服器的時候需要做分布式鎖進行協調,而且實時性不高,對資料庫會産生壓力
3.延時任務處理
當使用者下訂單後,将使用者的訂單的辨別全部發送到延時隊列中,30分鐘後進去消費隊列中被消費,消費時先檢查該訂單的狀态,如果未支付則辨別該訂單失效。
有以下幾種延時任務處理方式
- Java自帶的DelayedQuene隊列
這是java本身提供的一種延時隊列,如果項目業務複雜性不高可以考慮這種方式。它是使用jvm記憶體來實作的,停機會丢失資料,擴充性不強
- 使用redis監聽key的過期來實作
當使用者下訂單後把訂單資訊設定為redis的key,30分鐘失效,程式編寫監聽redis的key失效,然後處理訂單(我也嘗試過這種方式)。這種方式最大的弊端就是隻能監聽一台redis的key失效,叢集下将無法實作,也有人監聽叢集下的每個redis節點的key,但我認為這樣做很不合适。如果項目業務複雜性不高,redis單機部署,就可以考慮這種方式
- RabbitMQ死信隊列實作
重點介紹這種方式
4.RabbitMQ死信隊列實作監聽訂單失效
AMQP協定和RabbitMQ隊列本身沒有直接支援延遲隊列功能,但是可以通過以下特性模拟出延遲隊列的功能。

- Time To Live(TTL)
RabbitMQ可以針對Queue設定x-expires 或者 針對Message設定 x-message-ttl,來控制消息的生存時間,如果逾時(兩者同時設定以最先到期的時間為準),則消息變為dead letter(死信)
A: 通過隊列屬性設定,隊列中所有消息都有相同的過期時間。
B: 對消息進行單獨設定,每條消息TTL可以不同。
- Dead Letter Exchanges(DLX)
RabbitMQ的Queue可以配置x-dead-letter-exchange和x-dead-letter-routing-key(可選)兩個參數,如果隊列内出現了dead letter,則按照這兩個參數重新路由轉發到指定的隊列。
x-dead-letter-exchange:出現dead letter之後将dead letter重新發送到指定exchange
x-dead-letter-routing-key:出現dead letter之後将dead letter重新按照指定的routing-key發送
下面來做一個例子來實作訂單失效,為了效果明顯,我們把訂單的失效時間設定為10秒 (java實作)
5.具體實作
5.1.環境/版本一覽:
- 開發工具:Intellij IDEA 2020.2.3
- springboot:2.4.1
- jdk:1.8.0_211
- maven: 3.6.3
- RabbitMQ:3.8.9
docker安裝rabbitmq可以檢視我這一篇部落格 docker 安裝rabbitMQ :https://laoniu.blog.csdn.net/article/details/110090272
項目結構
5.2.項目搭建
最下方有完整代碼
- 建立項目,配置pom.xml加入依賴
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
- 配置RabbitMQ的隊列和路由資訊 建立類 RabbitMQConfiuration
package com.niu.springbootrabbitmqdelayqueue.config;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
/**
* @description: rabbitMQ配置資訊
* @author: nxq email: [email protected]
* @createDate: 2020/12/18 8:09 上午
* @updateUser: nxq email: [email protected]
* @updateDate: 2020/12/18 8:09 上午
* @updateRemark:
* @version: 1.0
**/
@Configuration
public class RabbitMQConfiguration {
//隊列名稱
public final static String orderQueue = "order_queue";
//交換機名稱
public final static String orderExchange = "order_exchange";
// routingKey
public final static String routingKeyOrder = "routing_key_order";
//死信消息隊列名稱
public final static String dealQueueOrder = "deal_queue_order";
//死信交換機名稱
public final static String dealExchangeOrder = "deal_exchange_order";
//死信 routingKey
public final static String deadRoutingKeyOrder = "dead_routing_key_order";
//死信隊列 交換機辨別符
public static final String DEAD_LETTER_QUEUE_KEY = "x-dead-letter-exchange";
//死信隊列交換機綁定鍵辨別符
public static final String DEAD_LETTER_ROUTING_KEY = "x-dead-letter-routing-key";
@Autowired
private CachingConnectionFactory connectionFactory;
@Bean
public Queue orderQueue() {
// 将普通隊列綁定到死信隊列交換機上
Map<String, Object> args = new HashMap<>(2);
//args.put("x-message-ttl", 5 * 1000);//直接設定 Queue 延遲時間 但如果直接給隊列設定過期時間,這種做法不是很靈活
//這裡采用發送消息動态設定延遲時間,這樣我們可以靈活控制
args.put(DEAD_LETTER_QUEUE_KEY, dealExchangeOrder);
args.put(DEAD_LETTER_ROUTING_KEY, deadRoutingKeyOrder);
return new Queue(RabbitMQConfiguration.orderQueue, true, false, false, args);
}
//聲明一個direct類型的交換機
@Bean
DirectExchange orderExchange() {
return new DirectExchange(RabbitMQConfiguration.orderExchange);
}
//綁定Queue隊列到交換機,并且指定routingKey
@Bean
Binding bindingDirectExchangeDemo5( ) {
return BindingBuilder.bind(orderQueue()).to(orderExchange()).with(routingKeyOrder);
}
//建立配置死信隊列
@Bean
public Queue deadQueueOrder() {
Queue queue = new Queue(dealQueueOrder, true);
return queue;
}
//建立死信交換機
@Bean
public DirectExchange deadExchangeOrder() {
return new DirectExchange(dealExchangeOrder);
}
//死信隊列與死信交換機綁定
@Bean
public Binding bindingDeadExchange() {
return BindingBuilder.bind(deadQueueOrder()).to(deadExchangeOrder()).with(deadRoutingKeyOrder);
}
}
- 配置消息生産者
package com.niu.springbootrabbitmqdelayqueue.controller;
import com.niu.springbootrabbitmqdelayqueue.config.RabbitMQConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.UUID;
/**
* @description: 訂單的控制器
* @author: nxq email: [email protected]
* @createDate: 2020/12/18 8:20 上午
* @updateUser: nxq email: [email protected]
* @updateDate: 2020/12/18 8:20 上午
* @updateRemark:
* @version: 1.0
**/
@RestController
@RequestMapping("/order")
public class OrderController {
private static final Logger logger = LoggerFactory.getLogger(OrderController.class);
@Autowired
private AmqpTemplate rabbitTemplate;
/**
* 模拟送出訂單
* @author nxq
* @return java.lang.Object
*/
@GetMapping("")
public Object submit(){
String orderId = UUID.randomUUID().toString();
logger.info("submit order {}", orderId);
this.rabbitTemplate.convertAndSend(
RabbitMQConfiguration.orderExchange, //發送至訂單交換機
RabbitMQConfiguration.routingKeyOrder, //訂單定routingKey
orderId //訂單号 這裡可以傳對象 比如直接傳訂單對象
, message -> {
// 如果配置了 params.put("x-message-ttl", 5 * 1000);
// 那麼這一句也可以省略,具體根據業務需要是聲明 Queue 的時候就指定好延遲時間還是在發送自己控制時間
message.getMessageProperties().setExpiration(1000 * 10 + "");
return message;
});
return "{'orderId':'"+orderId+"'}";
}
}
- 配置消費者
package com.niu.springbootrabbitmqdelayqueue.listener;
import com.niu.springbootrabbitmqdelayqueue.config.RabbitMQConfiguration;
import com.niu.springbootrabbitmqdelayqueue.controller.OrderController;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.boot.autoconfigure.amqp.RabbitProperties;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.Date;
import java.util.Map;
import com.rabbitmq.client.Channel;
/**
* @description: 訂單失效監聽器
* @author: nxq email: [email protected]
* @createDate: 2020/12/18 8:30 上午
* @updateUser: nxq email: [email protected]
* @updateDate: 2020/12/18 8:30 上午
* @updateRemark:
* @version: 1.0
**/
@Component
public class OrderFailureListener {
private static final Logger logger = LoggerFactory.getLogger(OrderFailureListener.class);
@RabbitListener(
queues = RabbitMQConfiguration.dealQueueOrder //設定訂單失效的隊列
)
public void process(String order, Message message, @Headers Map<String, Object> headers, Channel channel) throws IOException {
logger.info("【訂單号】 - [{}]", order);
// 判斷訂單是否已經支付,如果支付則;否則,取消訂單(邏輯代碼省略)
// 手動ack
// Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
// 手動簽收
// channel.basicAck(deliveryTag, false);
System.out.println("執行結束....");
}
}
- 配置檔案 application.yml 加入rabbitmq的相關配置
spring:
rabbitmq:
host: localhost
port: 5672
username: admin
password: admin
- 配置啟動類
package com.niu.springbootrabbitmqdelayqueue;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class SpringbootRabbitmqDelayQueueApplication {
public static void main(String[] args) {
SpringApplication.run(SpringbootRabbitmqDelayQueueApplication.class, args);
}
}
6.啟動測試
啟動成功後我們通路一下配置的接口模拟送出訂單 http://localhost:8080/order
檢視控制台,10秒後
蕪湖~起飛🛫️,大功告成,可以說實時性已經非常高了,可以試着多送出幾次訂單
打開rabbitMQ的控制台看一下,發現多出來兩個隊列
使用這種方式不止可以做訂單失效,比如說優惠券過期啊等等延時失效問題。可以叢集部署rabbitmq,開啟消息确認機制。
這種實作方法的基本使用到此就講完了,完整代碼已經推送至github :https://github.com/1603565290m/springboot-rabbitmq-delay-queue
轉載請注明出處:https://laoniu.blog.csdn.net/article/details/111352242