Docker運作Redis
拉取最近版本的Redis鏡像:
docker pull redis
啟動容器:
docker run -d --name redis -p 6379:6379 redis:latest
進入容器内部,測試存儲:
# docker exec -it redis redis-cli
127.0.0.1:6379> set name qcy
OK
127.0.0.1:6379> get name
"qcy"
到這裡,說明我們啟動成功了。
實作要求
實作的分布式鎖必須具有以下特點:
- 互斥:無論在什麼時刻,最多隻有一個用戶端擁有鎖
- 鎖擁有逾時時間。否則加鎖的用戶端突然當機,來不及釋放鎖,會導緻所有用戶端擷取鎖失敗。
- 在不逾時的情況下,用戶端隻能釋放掉自己申請的鎖。有這樣的一個例子:用戶端a申請了鎖,設定逾時為5秒,可是其運作同步代碼超過了5秒。此時用戶端b請求鎖時,由于該鎖逾時被redis自動删除,于是用戶端b申請鎖成功。接着用戶端a的同步代碼運作結束,這樣就會把用戶端b申請的鎖給釋放掉,這樣可能會引起資料錯亂。
實作原理
如何滿足互斥與具有逾時時間?
redis是單線程模型,指令是一條一條執行的,是以不存在并發問題。
我們使用以下的指令,來往reids中存一個過期時間為5秒的鍵值:
set name qcy nx ex 5
其中的name為key,qcy為value,nx代表不存在此key則存儲,并且傳回OK,若存在則會傳回null,ex 5代表過期時間為5秒。
127.0.0.1:6379> set name qcy nx ex 5
OK
127.0.0.1:6379> set name qcy nx ex 5
(nil)
當我們在過期時間内重複存儲時,redis會提示操作失敗。間隔5秒以後,redis會提示插入成功。
釋放鎖時,隻要删除對應的key即可:
127.0.0.1:6379> set name qcy nx ex 5
OK
127.0.0.1:6379> del name
(integer) 1
那麼如何釋放掉自己申請的鎖?
key為鎖名稱,value可以為申請鎖的用戶端的唯一辨別。那麼在釋放鎖時,不僅僅比較key是否相同,還要比較value是否為申請鎖的用戶端的唯一辨別,且這兩次比較必須是原子操作。
為什麼必須是原子操作?
假設有這樣的一種場景:用戶端a申請了名稱為name,值為a的鎖,接着用戶端a想要釋放鎖,查詢出key=name,value=a的鍵值對,緊接着鎖過期,用戶端b申請到了鎖,将值變為b。可是用戶端a記憶體中的值仍然為a,于是釋放掉了用戶端b申請到的鎖。
利用以上的原理,來實作我們的分布式鎖。
代碼實作
代碼使用最簡單的配置
工程目錄結構:

pom依賴:
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.6</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
其中,SpringBoot與Redis內建需要使用到spring-boot-starter-data-redis,輸出日志使用到lombok,測試時用到spring-boot-starter-test
配置如下:
spring:
redis:
host: localhost
port: 6379
logging:
level:
root: info
最核心的RedisLock類
package com.yang.redislock1;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.data.redis.core.script.RedisScript;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
/**
* @author qcy
* @create 2020/08/28 14:21:35
*/
@Slf4j
@Component
public class RedisLock {
@Resource
StringRedisTemplate template;
public boolean tryLock(String key, String value, int expireTime, TimeUnit timeUnit) {
Boolean flag = template.opsForValue().setIfAbsent(key, value, expireTime, timeUnit);
if (flag == null || !flag) {
log.info("申請鎖(" + key + "," + value + ")失敗");
return false;
}
log.error("申請鎖(" + key + "," + value + ")成功");
return true;
}
public void unLock(String key, String value) {
String script = "if redis.call('get', KEYS[1]) == KEYS[2] then return redis.call('del', KEYS[1]) else return 0 end";
RedisScript<Long> redisScript = new DefaultRedisScript<>(script, Long.class);
Long result = template.execute(redisScript, Arrays.asList(key, value));
if (result == null || result == 0) {
log.info("釋放鎖(" + key + "," + value + ")失敗,該鎖不存在或鎖已經過期");
} else {
log.info("釋放鎖(" + key + "," + value + ")成功");
}
}
}
其中unLock方法中的lua腳本是一個原子操作,此時這裡的KEYS[1]是key,KEYS[2]是value。
if redis.call('get', KEYS[1]) == KEYS[2] then return redis.call('del', KEYS[1]) else return 0 end
該段腳本的意思是,首先擷取鎖的value,判斷是否等于期待的value,滿足的話,則删除該鎖,并傳回1;否則直接傳回0。
為什麼不直接删除該key?
因為需要滿足實作要求中的第三點:在鎖不過期的情況,隻能釋放掉自己申請的鎖。
鎖的名稱都一樣,是以将用戶端的唯一辨別存進了其value中,那麼删除前,需要判斷是否是自己建立的鎖。
測試類:
第一種情況,兩個用戶端都正常的建立、釋放鎖。
package com.yang.redislock1;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import javax.annotation.Resource;
import java.util.concurrent.TimeUnit;
@Slf4j
@SpringBootTest
class Redislock1ApplicationTests {
@Resource
RedisLock redisLock;
public static final String LOCK_NAME = "name";
public static final String CLIENT_A = "a";
public static final String CLIENT_B = "b";
public static final int EXPIRE_TIME = 5;
@Test
public void test1() {
//用戶端a
boolean lockResultA = redisLock.tryLock(LOCK_NAME, CLIENT_A, EXPIRE_TIME, TimeUnit.SECONDS);
if (lockResultA) {
try {
//模拟用戶端a的操作耗時
Thread.sleep(2 * 1000);
} catch (Exception e) {
e.printStackTrace();
} finally {
redisLock.unLock(LOCK_NAME, CLIENT_A);
}
}
//用戶端b
boolean lockResultB = redisLock.tryLock(LOCK_NAME, CLIENT_B, EXPIRE_TIME, TimeUnit.SECONDS);
if (lockResultB) {
try {
//模拟用戶端b的操作耗時
Thread.sleep(2 * 1000);
} catch (Exception e) {
e.printStackTrace();
} finally {
redisLock.unLock(LOCK_NAME, CLIENT_B);
}
}
}
}
運作結果如下:
第二種情況,用戶端a申請的鎖逾時
隻需要把上述代碼中,用戶端a的操作耗時改為6秒即可。
輸出如下:
第三種情況,假設用戶端a不釋放鎖
修改測試類代碼:
//用戶端a
boolean lockResultA = redisLock.tryLock(LOCK_NAME, CLIENT_A, EXPIRE_TIME, TimeUnit.SECONDS);
if (lockResultA) {
try {
//模拟用戶端a的操作耗時
Thread.sleep(2 * 1000);
} catch (Exception e) {
e.printStackTrace();
} finally {
//redisLock.unLock(LOCK_NAME, CLIENT_A);
log.info("用戶端a不釋放鎖");
}
}
//用戶端b
boolean lockResultB = redisLock.tryLock(LOCK_NAME, CLIENT_B, EXPIRE_TIME, TimeUnit.SECONDS);
if (lockResultB) {
try {
//模拟用戶端b的操作耗時
Thread.sleep(2 * 1000);
} catch (Exception e) {
e.printStackTrace();
} finally {
redisLock.unLock(LOCK_NAME, CLIENT_B);
}
}
}
輸出如下:
在叢集模式下的問題
以上的場景,在單機版的redis中問題不大,可是在redis叢集中,可能存在問題。
用戶端a在redis叢集中節點1上申請到了鎖,然後執行業務邏輯,可是這個時候節點1的鎖還沒同步到節點2上,節點1突然挂了。這個時候用戶端b在節點2上申請鎖,立即就成功了。這個時候就出現了一把鎖同時被多個用戶端持有的情況。