天天看點

微服務Spring Boot 整合Redis 基于Redis的Stream 消息隊列 實作異步秒殺下單

文章目錄

  • ​​一、什麼是 Redis 消息隊列?​​
  • ​​二、Redis 消息隊列 -- 基于 Redis List 實作消息隊列​​
  • ​​三、Redis 消息隊列 -- 基于 Pubsub 的消息隊列​​
  • ​​四、基于Redis 的Stream 的消費隊列​​
  • ​​⛅Stream 簡單文法​​
  • ​​⚡Stream 的消費者組​​
  • ​​五、基于Redis Stream消息隊列實作異步秒殺​​
  • ​​六、程式測試​​
  • ​​⛵小結​​

一、什麼是 Redis 消息隊列?

字面意思就是存放消息的隊列。最簡單的消息隊列模型包括3個角色:

  • 消息隊列:存儲和管理消息,也被稱為消息代理(Message Broker)
  • 生産者:發送消息到消息隊列
  • 消費者:從消息隊列擷取消息并處理消息
微服務Spring Boot 整合Redis 基于Redis的Stream 消息隊列 實作異步秒殺下單

使用隊列的好處在于 解耦 解除資料之間的耦合性

這裡最好的是使用MQ、RabbitMQ、RocketMQ、Kafka等消息隊列,我們本節主要介紹 Redis 的消息隊列。

二、Redis 消息隊列 – 基于 Redis List 實作消息隊列

基于List結構模拟消息隊列

消息隊列(Message Queue):字面意思就是存放消息的隊列。而Redis的list資料結構是一個雙向連結清單,很容易模拟出隊列效果。

隊列是入口和出口不在一邊,我們可以通過 LPush、RPOP、RPush、LPOP 這些來實作。

注意 : 如果擷取 LPOP、RPOP擷取消息如果沒有的話,會直接傳回null,是以我們使用阻塞:BLPOP、BRPOP來實作阻塞效果

微服務Spring Boot 整合Redis 基于Redis的Stream 消息隊列 實作異步秒殺下單

基于List 結構的消息隊列的優缺點?

優點:

  • 利用Redis存儲、不受限于JVM 記憶體上限
  • 基于Redis 的持久化機制、資料安全性有保障
  • 可以滿足消息有序性

缺點:

  • 無法避免消息丢失
  • 隻支援單消費者

三、Redis 消息隊列 – 基于 Pubsub 的消息隊列

PubSub(釋出訂閱)是Redis2.0版本引入的消息傳遞模型。

顧名思義,消費者可以訂閱一個或多個channel,生産者向對應channel發送消息後,所有訂閱者都能收到相關消息。

Pubsub 常用指令

SUBSCRIBE channel [channel] :訂閱一個或多個頻道
PUBLISH channel msg :向一個頻道發送消息
PSUBSCRIBE pattern[pattern] :訂閱與pattern格式比對的所有頻道      
微服務Spring Boot 整合Redis 基于Redis的Stream 消息隊列 實作異步秒殺下單

基于PubSub的消息隊列有哪些優缺點?

優點:

  • 采用釋出訂閱模型,支援多生産、多消費

缺點:

  • 不支援資料持久化
  • 無法避免消息丢失
  • 消息堆積有上限,超出時資料丢失

四、基于Redis 的Stream 的消費隊列

Stream 是 Redis 5.0 引入的一種新資料類型,可以實作一個功能非常完善的消息隊列。

⛅Stream 簡單文法

Stream 常用文法:

微服務Spring Boot 整合Redis 基于Redis的Stream 消息隊列 實作異步秒殺下單

例如:

建立為 users 的消息隊列,并向其中發送一條消息 使用Redis 自動生成id

微服務Spring Boot 整合Redis 基于Redis的Stream 消息隊列 實作異步秒殺下單

讀取消息的方式之一:XRead

微服務Spring Boot 整合Redis 基于Redis的Stream 消息隊列 實作異步秒殺下單

利用 XRead 讀取一個消息

微服務Spring Boot 整合Redis 基于Redis的Stream 消息隊列 實作異步秒殺下單

XRead 阻塞方式,讀取最新的消息

微服務Spring Boot 整合Redis 基于Redis的Stream 消息隊列 實作異步秒殺下單

在業務開發中,我們可以循環的調用XREAD阻塞方式來查詢最新消息,進而實作持續監聽隊列的效果

微服務Spring Boot 整合Redis 基于Redis的Stream 消息隊列 實作異步秒殺下單

注意: 當我們指定起始ID 為 $ 時,代表讀取最新的消息,如果我們處理一條消息的過程中,又有超過1條以上的消息到達隊列,則下次擷取的也是隻有最新的一條,會出現消息漏讀的問題!

STREAM類型消息隊列的XREAD指令特點:

  • 消息可回溯
  • 一個消息可以被多個消費者讀取
  • 可以阻塞讀取
  • 有消息漏讀的風險

⚡Stream 的消費者組

消費者組(Consumer Group):将多個消費者劃分到一個組中,監聽同一個隊列。具備下列特點:

微服務Spring Boot 整合Redis 基于Redis的Stream 消息隊列 實作異步秒殺下單

建立消費者組:

XGROUP CREATE key groupName ID [MKSTREAM]      
  • key:隊列名稱
  • groupName:消費者組名稱
  • ID:起始ID标示,$代表隊列中最後一個消息,0則代表隊列中第一個消息
  • MKSTREAM:隊列不存在時自動建立隊列
其它常用指令

删除指定的消費者組

XGROUP DESTORY key groupName      

給指定的消費者組添加消費者

XGROUP CREATECONSUMER key groupname consumername      

删除消費者組中的指定消費者

XGROUP DELCONSUMER key groupname consumername      

從消費者組讀取消息:

XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]      
  • group:消費組名稱
  • consumer:消費者名稱,如果消費者不存在,會自動建立一個消費者
  • count:本次查詢的最大數量
  • BLOCK milliseconds:當沒有消息時最長等待時間
  • NOACK:無需手動ACK,擷取到消息後自動确認
  • STREAMS key:指定隊列名稱
  • ID:擷取消息的起始ID:

“>”:從下一個未消費的消息開始

其它:根據指定id從pending-list中擷取已消費但未确認的消息,例如0,是從pending-list中的第一個消息開始

消費者監聽消息的基本思路:

微服務Spring Boot 整合Redis 基于Redis的Stream 消息隊列 實作異步秒殺下單

STREAM類型消息隊列的XREADGROUP指令特點:

  • 消息可回溯
  • 可以多消費者争搶消息,加快消費速度
  • 可以阻塞讀取
  • 沒有消息漏讀的風險
  • 有消息确認機制,保證消息至少被消費一次

三種消息隊列對比

微服務Spring Boot 整合Redis 基于Redis的Stream 消息隊列 實作異步秒殺下單

五、基于Redis Stream消息隊列實作異步秒殺

需求:

  • 建立一個Stream類型的消息隊列,名為stream.orders
  • 修改之前的秒殺下單Lua腳本,在認定有搶購資格後,直接向stream.orders中添加消息,内容包含voucherId、userId、orderId
  • 項目啟動時,開啟一個線程任務,嘗試擷取stream.orders中的消息,完成下單

修改 seckill.lua 腳本

-- 1.3.訂單id
local orderId = ARGV[3]

-- 3.6.發送消息到隊列中, XADD stream.orders * k1 v1 k2 v2 ...
redis.call('xadd', 'stream.orders', '*', 'userId', userId, 'voucherId', voucherId, 'id', orderId)      

修改VoucherOrderService

private static final DefaultRedisScript<Long> SECKILL_SCRIPT;

static {
    SECKILL_SCRIPT = new DefaultRedisScript<>();
    SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));
    SECKILL_SCRIPT.setResultType(Long.class);
}


private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();

//在類初始化之後執行,因為當這個類初始化好了之後,随時都是有可能要執行的
@PostConstruct
private void init() {
    SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
}

/**
     * 使用 Redis消息隊列建立 讀隊列、編寫下訂單任務
     */
private class VoucherOrderHandler implements Runnable {

    @Override
    public void run() {
        while (true) {
            try {
                // 1.擷取消息隊列中的訂單資訊 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS s1 >
                List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
                    Consumer.from("g1", "c1"),
                    StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),
                    StreamOffset.create("stream.orders", ReadOffset.lastConsumed())
                );

                // 2.判斷訂單資訊是否為空
                if (list == null || list.isEmpty()) {
                    // 如果為null,說明沒有消息,繼續下一次循環
                    continue;
                }

                // 解析資料
                MapRecord<String, Object, Object> record = list.get(0);
                Map<Object, Object> value = record.getValue();
                VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);

                // 3.建立訂單
                createVoucherOrder(voucherOrder);

                // 4.确認消息 XACK
                stringRedisTemplate.opsForStream().acknowledge("s1", "g1", record.getId());

            } catch (Exception e) {
                log.error("處理訂單異常", e);
                //處理異常消息 去 Pading-List讀取消息
                handlePendingList();
            }
        }
    }
}

/**
     *  Redis消息隊列出現異常,調用此方法去 Pading—List中重新讀取
     */
private void handlePendingList() {
    while (true) {
        try {
            // 1.擷取pending-list中的訂單資訊 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS s1 0
            List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
                Consumer.from("g1", "c1"),
                StreamReadOptions.empty().count(1),
                StreamOffset.create("stream.orders", ReadOffset.from("0"))
            );

            // 2.判斷訂單資訊是否為空
            if (list == null || list.isEmpty()) {
                // 如果為null,說明沒有異常消息,結束循環
                break;
            }

            // 解析資料
            MapRecord<String, Object, Object> record = list.get(0);
            Map<Object, Object> value = record.getValue();
            VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);

            // 3.建立訂單
            createVoucherOrder(voucherOrder);

            // 4.确認消息 XACK
            stringRedisTemplate.opsForStream().acknowledge("s1", "g1", record.getId());
        } catch (Exception e) {
            log.error("處理pendding訂單異常", e);
            try{
                Thread.sleep(20);
            }catch(Exception ee){
                ee.printStackTrace();
            }
        }
    }
}


private void handleVoucherOrder(VoucherOrder voucherOrder) {
    //1.擷取使用者
    Long userId = voucherOrder.getUserId();
    // 2.建立鎖對象
    RLock lock = redissonClient.getLock("lock:order:" + userId);
    // 3.嘗試擷取鎖
    boolean isLock = lock.tryLock();
    // 4.判斷是否獲得鎖成功
    if (!isLock) {
        // 擷取鎖失敗,直接傳回失敗或者重試
        log.error("不允許重複下單!");
        return;
    }
    try {
        //注意:由于是spring的事務是放在threadLocal中,此時的是多線程,事務會失效
        proxy.createVoucherOrder(voucherOrder);
    } finally {
        // 釋放鎖
        lock.unlock();
    }
}


// 代理對象
private IVoucherOrderService proxy;

@Override
public Result seckillVoucher(Long voucherId) {

    //擷取使用者
    Long userId = UserHolder.getUser().getId();
    //生成訂單ID
    long orderId = redisIdWorker.nextId("order");

    // 1.執行lua腳本
    Long result = stringRedisTemplate.execute(
        SECKILL_SCRIPT,
        Collections.emptyList(),
        voucherId.toString(), userId.toString(), String.valueOf(orderId)
    );
    int r = result.intValue(); // 轉成int

    // 2.判斷結果是否為0
    if (r != 0) {
        // 2.1.不為0 ,代表沒有購買資格
        return Result.fail(r == 1 ? "庫存不足" : "不能重複下單");
    }

    //3.擷取代理對象
    proxy = (IVoucherOrderService) AopContext.currentProxy();

    //4.傳回訂單id
    return Result.ok(orderId);
}


@Transactional
public void createVoucherOrder (VoucherOrder voucherOrder){
    // 5.一人一單邏輯
    // 5.1.使用者id
    Long userId = voucherOrder.getUserId();

    // 判斷是否存在
    int count = query().eq("user_id", userId)
        .eq("voucher_id", voucherOrder.getId()).count();

    // 5.2.判斷是否存在
    if (count > 0) {
        // 使用者已經購買過了
        log.error("使用者已經購買過了");
    }

    //6,扣減庫存
    boolean success = seckillVoucherService.update()
        .setSql("stock= stock -1") //set stock = stock -1
        .eq("voucher_id", voucherOrder.getVoucherId()).gt("stock",0).update(); //where id = ? and stock > 0
    // .eq("voucher_id", voucherId).eq("stock",voucher.getStock()).update(); //where id = ? and stock = ?

    if (!success) {
        //扣減庫存
        log.error("庫存不足!");
    }

    save(voucherOrder);
}      

六、程式測試

ApiFox 簡單測試
微服務Spring Boot 整合Redis 基于Redis的Stream 消息隊列 實作異步秒殺下單

請求成功,完成基本測試,下面恢複資料庫,進行壓力測試

Jmeter 壓力測試

Jmeter測試

微服務Spring Boot 整合Redis 基于Redis的Stream 消息隊列 實作異步秒殺下單

檢視Redis

微服務Spring Boot 整合Redis 基于Redis的Stream 消息隊列 實作異步秒殺下單

檢視MySQL

⛵小結

繼續閱讀