天天看點

【延時任務處理、訂單失效】RabbitMQ死信隊列實作【訂單失效】RabbitMQ死信隊列實作

【訂單失效】RabbitMQ死信隊列實作

之前做商城遇到一個關于訂單未支付逾時失效的問題,總結一下

1.訂單失效問題

訂單失效問題比較麻煩的地方就是如何能夠實時擷取失效的訂單。

對于這種問題一般有兩種解決方案: 定時任務處理,延時任務處理

2.定時任務處理

  1. 使用者下訂單後先生成訂單資訊,然後将該訂單加入到定時任務中(30分鐘後執行),當到達指定時間後檢查訂單狀态,如果未支付則辨別該訂單失效。
  2. 定時去輪詢資料庫/緩存,看訂單的狀态。這種方式的問題很明顯,當叢集部署伺服器的時候需要做分布式鎖進行協調,而且實時性不高,對資料庫會産生壓力

3.延時任務處理

當使用者下訂單後,将使用者的訂單的辨別全部發送到延時隊列中,30分鐘後進去消費隊列中被消費,消費時先檢查該訂單的狀态,如果未支付則辨別該訂單失效。

有以下幾種延時任務處理方式

  • Java自帶的DelayedQuene隊列

這是java本身提供的一種延時隊列,如果項目業務複雜性不高可以考慮這種方式。它是使用jvm記憶體來實作的,停機會丢失資料,擴充性不強

  • 使用redis監聽key的過期來實作

當使用者下訂單後把訂單資訊設定為redis的key,30分鐘失效,程式編寫監聽redis的key失效,然後處理訂單(我也嘗試過這種方式)。這種方式最大的弊端就是隻能監聽一台redis的key失效,叢集下将無法實作,也有人監聽叢集下的每個redis節點的key,但我認為這樣做很不合适。如果項目業務複雜性不高,redis單機部署,就可以考慮這種方式

  • RabbitMQ死信隊列實作

重點介紹這種方式

4.RabbitMQ死信隊列實作監聽訂單失效

AMQP協定和RabbitMQ隊列本身沒有直接支援延遲隊列功能,但是可以通過以下特性模拟出延遲隊列的功能。

【延時任務處理、訂單失效】RabbitMQ死信隊列實作【訂單失效】RabbitMQ死信隊列實作
  • Time To Live(TTL)

RabbitMQ可以針對Queue設定x-expires 或者 針對Message設定 x-message-ttl,來控制消息的生存時間,如果逾時(兩者同時設定以最先到期的時間為準),則消息變為dead letter(死信)

A: 通過隊列屬性設定,隊列中所有消息都有相同的過期時間。 

B: 對消息進行單獨設定,每條消息TTL可以不同。

  • Dead Letter Exchanges(DLX)

RabbitMQ的Queue可以配置x-dead-letter-exchange和x-dead-letter-routing-key(可選)兩個參數,如果隊列内出現了dead letter,則按照這兩個參數重新路由轉發到指定的隊列。

x-dead-letter-exchange:出現dead letter之後将dead letter重新發送到指定exchange

x-dead-letter-routing-key:出現dead letter之後将dead letter重新按照指定的routing-key發送

下面來做一個例子來實作訂單失效,為了效果明顯,我們把訂單的失效時間設定為10秒 (java實作)

5.具體實作

5.1.環境/版本一覽:

  • 開發工具:Intellij IDEA 2020.2.3
  • springboot:2.4.1
  • jdk:1.8.0_211
  • maven: 3.6.3
  • RabbitMQ:3.8.9

docker安裝rabbitmq可以檢視我這一篇部落格  docker 安裝rabbitMQ :https://laoniu.blog.csdn.net/article/details/110090272

項目結構

【延時任務處理、訂單失效】RabbitMQ死信隊列實作【訂單失效】RabbitMQ死信隊列實作

5.2.項目搭建

最下方有完整代碼

  • 建立項目,配置pom.xml加入依賴
<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
           
  • 配置RabbitMQ的隊列和路由資訊  建立類 RabbitMQConfiuration
package com.niu.springbootrabbitmqdelayqueue.config;

import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


import java.util.HashMap;
import java.util.Map;

/**
 * @description: rabbitMQ配置資訊
 * @author: nxq email: [email protected]
 * @createDate: 2020/12/18 8:09 上午
 * @updateUser: nxq email: [email protected]
 * @updateDate: 2020/12/18 8:09 上午
 * @updateRemark:
 * @version: 1.0
 **/
@Configuration
public class RabbitMQConfiguration {
    //隊列名稱
   public   final static String orderQueue = "order_queue";

    //交換機名稱
    public  final static String orderExchange = "order_exchange";

    // routingKey
    public  final static String routingKeyOrder = "routing_key_order";

    //死信消息隊列名稱
    public  final static String dealQueueOrder = "deal_queue_order";

    //死信交換機名稱
    public  final static String dealExchangeOrder = "deal_exchange_order";

    //死信 routingKey
    public final static String deadRoutingKeyOrder = "dead_routing_key_order";

    //死信隊列 交換機辨別符
    public static final String DEAD_LETTER_QUEUE_KEY = "x-dead-letter-exchange";

    //死信隊列交換機綁定鍵辨別符
    public static final String DEAD_LETTER_ROUTING_KEY = "x-dead-letter-routing-key";

    @Autowired
    private CachingConnectionFactory connectionFactory;

    @Bean
    public Queue orderQueue() {
        // 将普通隊列綁定到死信隊列交換機上
        Map<String, Object> args = new HashMap<>(2);
        //args.put("x-message-ttl", 5 * 1000);//直接設定 Queue 延遲時間 但如果直接給隊列設定過期時間,這種做法不是很靈活
        //這裡采用發送消息動态設定延遲時間,這樣我們可以靈活控制
        args.put(DEAD_LETTER_QUEUE_KEY, dealExchangeOrder);
        args.put(DEAD_LETTER_ROUTING_KEY, deadRoutingKeyOrder);
        return new Queue(RabbitMQConfiguration.orderQueue, true, false, false, args);
    }

    //聲明一個direct類型的交換機
    @Bean
    DirectExchange orderExchange() {
        return new DirectExchange(RabbitMQConfiguration.orderExchange);
    }

    //綁定Queue隊列到交換機,并且指定routingKey
    @Bean
    Binding bindingDirectExchangeDemo5(   ) {
        return BindingBuilder.bind(orderQueue()).to(orderExchange()).with(routingKeyOrder);
    }

    //建立配置死信隊列
    @Bean
    public Queue deadQueueOrder() {
        Queue queue = new Queue(dealQueueOrder, true);
        return queue;
    }

    //建立死信交換機
    @Bean
    public DirectExchange deadExchangeOrder() {
        return new DirectExchange(dealExchangeOrder);
    }

    //死信隊列與死信交換機綁定
    @Bean
    public Binding bindingDeadExchange() {
        return BindingBuilder.bind(deadQueueOrder()).to(deadExchangeOrder()).with(deadRoutingKeyOrder);
    }



}
           
  • 配置消息生産者
package com.niu.springbootrabbitmqdelayqueue.controller;

import com.niu.springbootrabbitmqdelayqueue.config.RabbitMQConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.UUID;

/**
 * @description: 訂單的控制器
 * @author: nxq email: [email protected]
 * @createDate: 2020/12/18 8:20 上午
 * @updateUser: nxq email: [email protected]
 * @updateDate: 2020/12/18 8:20 上午
 * @updateRemark:
 * @version: 1.0
 **/
@RestController
@RequestMapping("/order")
public class OrderController {
    private static final Logger logger =  LoggerFactory.getLogger(OrderController.class);
    @Autowired
    private AmqpTemplate rabbitTemplate;

    /**
     * 模拟送出訂單
     * @author nxq
     * @return java.lang.Object
     */
    @GetMapping("")
    public Object submit(){
        String orderId = UUID.randomUUID().toString();
        logger.info("submit order {}", orderId);
        this.rabbitTemplate.convertAndSend(
                RabbitMQConfiguration.orderExchange, //發送至訂單交換機
                RabbitMQConfiguration.routingKeyOrder, //訂單定routingKey
                orderId //訂單号   這裡可以傳對象 比如直接傳訂單對象
                , message -> {
            // 如果配置了 params.put("x-message-ttl", 5 * 1000);
            // 那麼這一句也可以省略,具體根據業務需要是聲明 Queue 的時候就指定好延遲時間還是在發送自己控制時間
            message.getMessageProperties().setExpiration(1000 * 10 + "");
            return message;
        });
        
        return "{'orderId':'"+orderId+"'}";
    }
}
           
  • 配置消費者 
package com.niu.springbootrabbitmqdelayqueue.listener;

import com.niu.springbootrabbitmqdelayqueue.config.RabbitMQConfiguration;
import com.niu.springbootrabbitmqdelayqueue.controller.OrderController;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.boot.autoconfigure.amqp.RabbitProperties;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.util.Date;
import java.util.Map;
import com.rabbitmq.client.Channel;

/**
 * @description: 訂單失效監聽器
 * @author: nxq email: [email protected]
 * @createDate: 2020/12/18 8:30 上午
 * @updateUser: nxq email: [email protected]
 * @updateDate: 2020/12/18 8:30 上午
 * @updateRemark:
 * @version: 1.0
 **/
@Component
public class OrderFailureListener {
    private static final Logger logger =  LoggerFactory.getLogger(OrderFailureListener.class);
    @RabbitListener(
            queues = RabbitMQConfiguration.dealQueueOrder //設定訂單失效的隊列
    )
    public void process(String order, Message message, @Headers Map<String, Object> headers, Channel channel) throws IOException {

        logger.info("【訂單号】 - [{}]",  order);
        // 判斷訂單是否已經支付,如果支付則;否則,取消訂單(邏輯代碼省略)

        // 手動ack
//        Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
        // 手動簽收
//        channel.basicAck(deliveryTag, false);
        System.out.println("執行結束....");

    }
}
           
  • 配置檔案 application.yml 加入rabbitmq的相關配置
spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: admin
    password: admin
           
  • 配置啟動類
package com.niu.springbootrabbitmqdelayqueue;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class SpringbootRabbitmqDelayQueueApplication {

    public static void main(String[] args) {
        SpringApplication.run(SpringbootRabbitmqDelayQueueApplication.class, args);
    }

}
           

6.啟動測試

啟動成功後我們通路一下配置的接口模拟送出訂單 http://localhost:8080/order

【延時任務處理、訂單失效】RabbitMQ死信隊列實作【訂單失效】RabbitMQ死信隊列實作

檢視控制台,10秒後

【延時任務處理、訂單失效】RabbitMQ死信隊列實作【訂單失效】RabbitMQ死信隊列實作

蕪湖~起飛🛫️,大功告成,可以說實時性已經非常高了,可以試着多送出幾次訂單

打開rabbitMQ的控制台看一下,發現多出來兩個隊列

【延時任務處理、訂單失效】RabbitMQ死信隊列實作【訂單失效】RabbitMQ死信隊列實作

使用這種方式不止可以做訂單失效,比如說優惠券過期啊等等延時失效問題。可以叢集部署rabbitmq,開啟消息确認機制。

這種實作方法的基本使用到此就講完了,完整代碼已經推送至github :https://github.com/1603565290m/springboot-rabbitmq-delay-queue

轉載請注明出處:https://laoniu.blog.csdn.net/article/details/111352242