天天看點

基于SpringBoot與Redis實作分布式鎖

Docker運作Redis

拉取最近版本的Redis鏡像:

docker pull redis      

啟動容器:

docker run -d --name redis -p 6379:6379 redis:latest      

進入容器内部,測試存儲:

# docker exec -it redis redis-cli
127.0.0.1:6379> set name qcy
OK
127.0.0.1:6379> get name
"qcy"      

到這裡,說明我們啟動成功了。

實作要求

實作的分布式鎖必須具有以下特點:

  • 互斥:無論在什麼時刻,最多隻有一個用戶端擁有鎖
  • 鎖擁有逾時時間。否則加鎖的用戶端突然當機,來不及釋放鎖,會導緻所有用戶端擷取鎖失敗。
  • 在不逾時的情況下,用戶端隻能釋放掉自己申請的鎖。有這樣的一個例子:用戶端a申請了鎖,設定逾時為5秒,可是其運作同步代碼超過了5秒。此時用戶端b請求鎖時,由于該鎖逾時被redis自動删除,于是用戶端b申請鎖成功。接着用戶端a的同步代碼運作結束,這樣就會把用戶端b申請的鎖給釋放掉,這樣可能會引起資料錯亂。

實作原理

如何滿足互斥與具有逾時時間?

redis是單線程模型,指令是一條一條執行的,是以不存在并發問題。

我們使用以下的指令,來往reids中存一個過期時間為5秒的鍵值:

set name qcy nx ex 5      

其中的name為key,qcy為value,nx代表不存在此key則存儲,并且傳回OK,若存在則會傳回null,ex 5代表過期時間為5秒。

127.0.0.1:6379> set name qcy nx ex 5
OK
127.0.0.1:6379> set name qcy nx ex 5
(nil)      

當我們在過期時間内重複存儲時,redis會提示操作失敗。間隔5秒以後,redis會提示插入成功。

釋放鎖時,隻要删除對應的key即可:

127.0.0.1:6379> set name qcy nx ex 5
OK
127.0.0.1:6379> del name
(integer) 1      

那麼如何釋放掉自己申請的鎖?

key為鎖名稱,value可以為申請鎖的用戶端的唯一辨別。那麼在釋放鎖時,不僅僅比較key是否相同,還要比較value是否為申請鎖的用戶端的唯一辨別,且這兩次比較必須是原子操作。

為什麼必須是原子操作?

假設有這樣的一種場景:用戶端a申請了名稱為name,值為a的鎖,接着用戶端a想要釋放鎖,查詢出key=name,value=a的鍵值對,緊接着鎖過期,用戶端b申請到了鎖,将值變為b。可是用戶端a記憶體中的值仍然為a,于是釋放掉了用戶端b申請到的鎖。

利用以上的原理,來實作我們的分布式鎖。

代碼實作

代碼使用最簡單的配置

工程目錄結構:

基于SpringBoot與Redis實作分布式鎖

pom依賴:

<dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.6</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>      

其中,SpringBoot與Redis內建需要使用到spring-boot-starter-data-redis,輸出日志使用到lombok,測試時用到spring-boot-starter-test

配置如下:

spring:
  redis:
    host: localhost
    port: 6379

logging:
  level:
    root: info      

最核心的RedisLock類

package com.yang.redislock1;

import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.data.redis.core.script.RedisScript;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;

/**
 * @author qcy
 * @create 2020/08/28 14:21:35
 */
@Slf4j
@Component
public class RedisLock {

    @Resource
    StringRedisTemplate template;

    public boolean tryLock(String key, String value, int expireTime, TimeUnit timeUnit) {
        Boolean flag = template.opsForValue().setIfAbsent(key, value, expireTime, timeUnit);
        if (flag == null || !flag) {
            log.info("申請鎖(" + key + "," + value + ")失敗");
            return false;
        }
        log.error("申請鎖(" + key + "," + value + ")成功");
        return true;
    }

    public void unLock(String key, String value) {
        String script = "if redis.call('get', KEYS[1]) == KEYS[2] then return redis.call('del', KEYS[1]) else return 0 end";
        RedisScript<Long> redisScript = new DefaultRedisScript<>(script, Long.class);
        Long result = template.execute(redisScript, Arrays.asList(key, value));
        if (result == null || result == 0) {
            log.info("釋放鎖(" + key + "," + value + ")失敗,該鎖不存在或鎖已經過期");
        } else {
            log.info("釋放鎖(" + key + "," + value + ")成功");
        }
    }

}      

其中unLock方法中的lua腳本是一個原子操作,此時這裡的KEYS[1]是key,KEYS[2]是value。

if redis.call('get', KEYS[1]) == KEYS[2] then return redis.call('del', KEYS[1]) else return 0 end      

該段腳本的意思是,首先擷取鎖的value,判斷是否等于期待的value,滿足的話,則删除該鎖,并傳回1;否則直接傳回0。

為什麼不直接删除該key?

因為需要滿足實作要求中的第三點:在鎖不過期的情況,隻能釋放掉自己申請的鎖。

鎖的名稱都一樣,是以将用戶端的唯一辨別存進了其value中,那麼删除前,需要判斷是否是自己建立的鎖。

測試類:

第一種情況,兩個用戶端都正常的建立、釋放鎖。

package com.yang.redislock1;

import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;

import javax.annotation.Resource;
import java.util.concurrent.TimeUnit;

@Slf4j
@SpringBootTest
class Redislock1ApplicationTests {

    @Resource
    RedisLock redisLock;
    public static final String LOCK_NAME = "name";
    public static final String CLIENT_A = "a";
    public static final String CLIENT_B = "b";
    public static final int EXPIRE_TIME = 5;


    @Test
    public void test1() {

        //用戶端a
        boolean lockResultA = redisLock.tryLock(LOCK_NAME, CLIENT_A, EXPIRE_TIME, TimeUnit.SECONDS);
        if (lockResultA) {
            try {
                //模拟用戶端a的操作耗時
                Thread.sleep(2 * 1000);
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                redisLock.unLock(LOCK_NAME, CLIENT_A);
            }
        }

        //用戶端b
        boolean lockResultB = redisLock.tryLock(LOCK_NAME, CLIENT_B, EXPIRE_TIME, TimeUnit.SECONDS);
        if (lockResultB) {
            try {
                //模拟用戶端b的操作耗時
                Thread.sleep(2 * 1000);
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                redisLock.unLock(LOCK_NAME, CLIENT_B);
            }
        }
    }

}      

運作結果如下:

基于SpringBoot與Redis實作分布式鎖

第二種情況,用戶端a申請的鎖逾時

隻需要把上述代碼中,用戶端a的操作耗時改為6秒即可。

輸出如下:

基于SpringBoot與Redis實作分布式鎖

第三種情況,假設用戶端a不釋放鎖

修改測試類代碼:

//用戶端a
        boolean lockResultA = redisLock.tryLock(LOCK_NAME, CLIENT_A, EXPIRE_TIME, TimeUnit.SECONDS);
        if (lockResultA) {
            try {
                //模拟用戶端a的操作耗時
                Thread.sleep(2 * 1000);
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                //redisLock.unLock(LOCK_NAME, CLIENT_A);
                log.info("用戶端a不釋放鎖");
            }
        }

        //用戶端b
        boolean lockResultB = redisLock.tryLock(LOCK_NAME, CLIENT_B, EXPIRE_TIME, TimeUnit.SECONDS);
        if (lockResultB) {
            try {
                //模拟用戶端b的操作耗時
                Thread.sleep(2 * 1000);
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                redisLock.unLock(LOCK_NAME, CLIENT_B);
            }
        }
    }      

輸出如下:

基于SpringBoot與Redis實作分布式鎖

在叢集模式下的問題

以上的場景,在單機版的redis中問題不大,可是在redis叢集中,可能存在問題。

用戶端a在redis叢集中節點1上申請到了鎖,然後執行業務邏輯,可是這個時候節點1的鎖還沒同步到節點2上,節點1突然挂了。這個時候用戶端b在節點2上申請鎖,立即就成功了。這個時候就出現了一把鎖同時被多個用戶端持有的情況。

繼續閱讀