天天看點

SpringBoot實作分布式鎖解決秒殺或者搶單問題一,分布式鎖誕生的原因二,常見的分布式鎖三,SpringBoot項目中的Redis分布式鎖

一,分布式鎖誕生的原因

      為什麼分布式鎖會誕生?類似于淘寶雙11的秒殺活動,同一件商品怎麼才能隻被一個使用者搶到,其他使用者搶不到?分布式鎖就能巧妙地解決類似秒殺和搶單的問題。技術源于生活,更高于生活。對于阿裡的那種的大型秒殺活動,分布式鎖隻是其中的一環,單單靠分布式鎖不足以支撐那種大并發的情景,後續解決方案會陸續更新。本期隻講解分布式鎖。

二,常見的分布式鎖

     1,基于資料庫實作的分布式鎖

          基于表實作的分布式鎖,如果有多個請求同時送出到資料庫的話,資料庫會保證隻有一個操作可以成功,那麼我們就可以認為操作成功的那個線程獲得了該方法的鎖

     2,Redis分布式鎖

     主要通過redis的存值取值進行判斷的,根據傳回的參數判斷是否能拿到鎖

     3,Zookeeper分布式鎖

     利用Zookeeper的順序臨時節點,來實作分布式鎖和等待隊列。Zookeeper設計的初衷,就是為了實作分布式鎖服務的。

三,SpringBoot項目中的Redis分布式鎖

      1,引入相關的依賴

              首先你的springboot項目是需要正常的能夠跑通的,然後在你的主pom檔案引入redis的相關依賴

<!--springboot 內建reids-->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-data-redis</artifactId>
                <version>2.1.3.RELEASE</version>
            </dependency>
           

      2,redis的分布鎖的原理

     一般情況像這種經常用到的代碼,單獨抽一個Redis工具類出來,友善自己檢視和調用。比如,我們現在進行一個活動,整點秒殺一台macPro電腦。想要将商品秒殺到,那隻需要改變資料庫的商品表的該商品的狀态置為已下架,同時建立訂單即可。在整點的時候,很多人同時秒殺更新資料庫,那我們如何保證隻被一個人拿到?道理很簡單,我們将下單(更新資料庫,建立訂單)打包成一個方法,在方法的外面加鎖,該鎖被第一個A線程拿到後,其他線程未拿到鎖則傳回【很遺憾~您手慢啦~】,A線程将該商品置為已下架并且建立訂單後,釋放該鎖,防止死鎖。哪怕後面的線程延遲,再A線程釋放鎖後又拿到下單方法,因為商品的狀态為已下架同樣沒有辦法進行建立訂單。至此,達到我們最初的目的,秒殺功能完成。

      3,加鎖操作

   redis有StringRedisTemplate 和RedisTemplate 。我這裡使用前者實作。

@Autowired
    private StringRedisTemplate redisTemplate;
           

    在高并發的情況下,確定某一個方法隻能被一個人調用,那麼我們隻要在該方法外調用工具類的加鎖方法,該加鎖方法傳回true,則代表該方法沒有被其他線程占用。若傳回false,則代表該方法已經被其他線程占用,同步傳回【很遺憾~您手慢啦~】。

/**
     * 對傳過來的redis的key進行加鎖
     *  TimeUnit.SECONDS  秒
     * @param key   需要加鎖的key
     * @param expire   過期時間
     * @return
     */
    public Boolean lockEnable(String key, long expire) {

        //這是将目前線程的名字置為key的value值,表明該鎖被誰拿到
        String keyValue = Thread.currentThread().getName();

        //1,這是StringRedisTemplate在set key的同時增加了過期時間,防止死鎖。保證了原子性。
        //2,setIfAbsent該方法如果該key不存在時候,設定值進去後,傳回true;若是已經存在,則傳回false;
        Boolean aBoolean = redisTemplate.opsForValue().setIfAbsent(key, keyValue, expire, TimeUnit.SECONDS);
        Long surplusTime = redisTemplate.getExpire(key, TimeUnit.SECONDS);
        if (!aBoolean) {
            log.info("該線程【{}】加鎖失敗,該key【{}】剩餘過期時間【{}】秒", keyValue, key, surplusTime);
            return false;
        }
        log.info("該線程【{}】加鎖成功,該key【{}】剩餘過期時間【{}】秒", keyValue, key, surplusTime);
        return true;
    }
           

      4,解鎖操作

  解鎖這邊不可以單純的删除redis的值,這裡需要對key和value兩個參數進行和redis裡面存儲的是否一緻,防止誤删别人的鎖。避免其他錯誤的産生,由于我們設定鎖的時候,鎖和失效時間有原子性,故不存在加完鎖後就當機,導緻死鎖。

@Autowired
    private DefaultRedisScript<Long> redisScript;
           
/**
     * lua腳本
     *
     * @return
     */
    @Bean
    public DefaultRedisScript<Long> defaultRedisScript() {
        DefaultRedisScript<Long> defaultRedisScript = new DefaultRedisScript<>();
        defaultRedisScript.setResultType(Long.class);
        defaultRedisScript.setScriptText("if redis.call('get', KEYS[1]) == KEYS[2] then return redis.call('del', KEYS[1]) else return 0 end");
        return defaultRedisScript;
    }
           

同樣的在完成該方法後一定要記得解鎖,不要因為有過期時間就不釋放鎖,不要給自己埋坑,自己偷的懶早晚要還回來的。

/**
     * 對傳過來的redis的key進行解鎖
     * key和value不一緻時,傳回:【0】
     * key和value不一緻時,傳回:【1】
     * @param key
     * @return
     */
    public Boolean lockUnable(String key) {
        String keyValue = Thread.currentThread().getName();
        //key和value不一緻時,傳回:【0】
        //key和value不一緻時,傳回:【1】
        Long execute = redisTemplate.execute(redisScript, Arrays.asList(key, keyValue));
        if(execute != 1 ){
            Boolean aBoolean = redisTemplate.hasKey(key);
            Long surplusTime = redisTemplate.getExpire(key, TimeUnit.SECONDS);
            log.info("該key【{}】解鎖失敗,是否存在【{}】,剩餘過期時間【{}】秒", key, aBoolean, surplusTime);
            return false;
        }
        log.info("該key【{}】解鎖成功", key);
        Boolean aBoolean = redisTemplate.hasKey(key);
        log.info("該key是否存在【{}】",aBoolean);
        return true;
    }
           

     5,分布式鎖的測試類

    由于資料敏感問題,我這邊自己使用了自己建立的員工表進行模拟秒殺,效果是一緻的,我們鎖的多台伺服器上面的同一個方法。

/**
     * 1000個線程搶一條資料
     */
    @Test
    public void catchData() {
        for (int i = 0; i <1000; i++) {
            Thread thread = new Thread(() -> {
                threadTest();
            });
            thread.start();
            thread.setName("thread" + i);
        }
        while (true){

        }
    }
           
/**
     * 線程調用的測試方法
     */
    public void threadTest() {

        /**
         * 對某一條資料的id進行加鎖
         */
        Boolean aBoolean = redisUtils.lockEnable("狄仁傑", 600);


        if (!aBoolean) {
            log.info("線程【{}】沒有拿到鎖,結束流程",Thread.currentThread().getName());
            return;
        }
        UpdateUserDTO updateUserDTO = new UpdateUserDTO();
        updateUserDTO.setUserName("狄仁傑");
        updateUserDTO.setUserStatus("UNABLE");
        updateUserDTO.setMobilePhone("15555406855");
        updateUserDTO.setUpdatedBy(Thread.currentThread().getName());
        Result<Boolean> result = userBaseInfo.updateUser(updateUserDTO);
        if (!result.getResult()) {
            log.info("線程【{}】更新資料失敗",Thread.currentThread().getName());
            return;
        }
        log.info("線程【{}】更新資料成功",Thread.currentThread().getName());


        /**
         * 釋放該條資料的鎖
         */
        Boolean aBoolean1 = redisUtils.lockUnable("狄仁傑");
        log.info("線程【{}】是否成功釋放鎖:【{}】",Thread.currentThread().getName(),aBoolean1);
    }
           

 5,分布式鎖的測試結果展示

資料庫之前的資料

SpringBoot實作分布式鎖解決秒殺或者搶單問題一,分布式鎖誕生的原因二,常見的分布式鎖三,SpringBoot項目中的Redis分布式鎖

跑完測試之後的資料

SpringBoot實作分布式鎖解決秒殺或者搶單問題一,分布式鎖誕生的原因二,常見的分布式鎖三,SpringBoot項目中的Redis分布式鎖
SpringBoot實作分布式鎖解決秒殺或者搶單問題一,分布式鎖誕生的原因二,常見的分布式鎖三,SpringBoot項目中的Redis分布式鎖

在列印的全部日志中,1000個線程隻有一個線程拿到鎖,其他線程全部失敗。測試成功。

覺得寫得你還滿意,點下關注哈~如果有問題可以下面評論一起探讨下^~^