Docker运行Redis
拉取最近版本的Redis镜像:
docker pull redis
启动容器:
docker run -d --name redis -p 6379:6379 redis:latest
进入容器内部,测试存储:
# docker exec -it redis redis-cli
127.0.0.1:6379> set name qcy
OK
127.0.0.1:6379> get name
"qcy"
到这里,说明我们启动成功了。
实现要求
实现的分布式锁必须具有以下特点:
- 互斥:无论在什么时刻,最多只有一个客户端拥有锁
- 锁拥有超时时间。否则加锁的客户端突然宕机,来不及释放锁,会导致所有客户端获取锁失败。
- 在不超时的情况下,客户端只能释放掉自己申请的锁。有这样的一个例子:客户端a申请了锁,设置超时为5秒,可是其运行同步代码超过了5秒。此时客户端b请求锁时,由于该锁超时被redis自动删除,于是客户端b申请锁成功。接着客户端a的同步代码运行结束,这样就会把客户端b申请的锁给释放掉,这样可能会引起数据错乱。
实现原理
如何满足互斥与具有超时时间?
redis是单线程模型,指令是一条一条执行的,因此不存在并发问题。
我们使用以下的指令,来往reids中存一个过期时间为5秒的键值:
set name qcy nx ex 5
其中的name为key,qcy为value,nx代表不存在此key则存储,并且返回OK,若存在则会返回null,ex 5代表过期时间为5秒。
127.0.0.1:6379> set name qcy nx ex 5
OK
127.0.0.1:6379> set name qcy nx ex 5
(nil)
当我们在过期时间内重复存储时,redis会提示操作失败。间隔5秒以后,redis会提示插入成功。
释放锁时,只要删除对应的key即可:
127.0.0.1:6379> set name qcy nx ex 5
OK
127.0.0.1:6379> del name
(integer) 1
那么如何释放掉自己申请的锁?
key为锁名称,value可以为申请锁的客户端的唯一标识。那么在释放锁时,不仅仅比较key是否相同,还要比较value是否为申请锁的客户端的唯一标识,且这两次比较必须是原子操作。
为什么必须是原子操作?
假设有这样的一种场景:客户端a申请了名称为name,值为a的锁,接着客户端a想要释放锁,查询出key=name,value=a的键值对,紧接着锁过期,客户端b申请到了锁,将值变为b。可是客户端a内存中的值仍然为a,于是释放掉了客户端b申请到的锁。
利用以上的原理,来实现我们的分布式锁。
代码实现
代码使用最简单的配置
工程目录结构:

pom依赖:
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.6</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
其中,SpringBoot与Redis集成需要使用到spring-boot-starter-data-redis,输出日志使用到lombok,测试时用到spring-boot-starter-test
配置如下:
spring:
redis:
host: localhost
port: 6379
logging:
level:
root: info
最核心的RedisLock类
package com.yang.redislock1;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.data.redis.core.script.RedisScript;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
/**
* @author qcy
* @create 2020/08/28 14:21:35
*/
@Slf4j
@Component
public class RedisLock {
@Resource
StringRedisTemplate template;
public boolean tryLock(String key, String value, int expireTime, TimeUnit timeUnit) {
Boolean flag = template.opsForValue().setIfAbsent(key, value, expireTime, timeUnit);
if (flag == null || !flag) {
log.info("申请锁(" + key + "," + value + ")失败");
return false;
}
log.error("申请锁(" + key + "," + value + ")成功");
return true;
}
public void unLock(String key, String value) {
String script = "if redis.call('get', KEYS[1]) == KEYS[2] then return redis.call('del', KEYS[1]) else return 0 end";
RedisScript<Long> redisScript = new DefaultRedisScript<>(script, Long.class);
Long result = template.execute(redisScript, Arrays.asList(key, value));
if (result == null || result == 0) {
log.info("释放锁(" + key + "," + value + ")失败,该锁不存在或锁已经过期");
} else {
log.info("释放锁(" + key + "," + value + ")成功");
}
}
}
其中unLock方法中的lua脚本是一个原子操作,此时这里的KEYS[1]是key,KEYS[2]是value。
if redis.call('get', KEYS[1]) == KEYS[2] then return redis.call('del', KEYS[1]) else return 0 end
该段脚本的意思是,首先获取锁的value,判断是否等于期待的value,满足的话,则删除该锁,并返回1;否则直接返回0。
为什么不直接删除该key?
因为需要满足实现要求中的第三点:在锁不过期的情况,只能释放掉自己申请的锁。
锁的名称都一样,因此将客户端的唯一标识存进了其value中,那么删除前,需要判断是否是自己创建的锁。
测试类:
第一种情况,两个客户端都正常的创建、释放锁。
package com.yang.redislock1;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import javax.annotation.Resource;
import java.util.concurrent.TimeUnit;
@Slf4j
@SpringBootTest
class Redislock1ApplicationTests {
@Resource
RedisLock redisLock;
public static final String LOCK_NAME = "name";
public static final String CLIENT_A = "a";
public static final String CLIENT_B = "b";
public static final int EXPIRE_TIME = 5;
@Test
public void test1() {
//客户端a
boolean lockResultA = redisLock.tryLock(LOCK_NAME, CLIENT_A, EXPIRE_TIME, TimeUnit.SECONDS);
if (lockResultA) {
try {
//模拟客户端a的操作耗时
Thread.sleep(2 * 1000);
} catch (Exception e) {
e.printStackTrace();
} finally {
redisLock.unLock(LOCK_NAME, CLIENT_A);
}
}
//客户端b
boolean lockResultB = redisLock.tryLock(LOCK_NAME, CLIENT_B, EXPIRE_TIME, TimeUnit.SECONDS);
if (lockResultB) {
try {
//模拟客户端b的操作耗时
Thread.sleep(2 * 1000);
} catch (Exception e) {
e.printStackTrace();
} finally {
redisLock.unLock(LOCK_NAME, CLIENT_B);
}
}
}
}
运行结果如下:
第二种情况,客户端a申请的锁超时
只需要把上述代码中,客户端a的操作耗时改为6秒即可。
输出如下:
第三种情况,假设客户端a不释放锁
修改测试类代码:
//客户端a
boolean lockResultA = redisLock.tryLock(LOCK_NAME, CLIENT_A, EXPIRE_TIME, TimeUnit.SECONDS);
if (lockResultA) {
try {
//模拟客户端a的操作耗时
Thread.sleep(2 * 1000);
} catch (Exception e) {
e.printStackTrace();
} finally {
//redisLock.unLock(LOCK_NAME, CLIENT_A);
log.info("客户端a不释放锁");
}
}
//客户端b
boolean lockResultB = redisLock.tryLock(LOCK_NAME, CLIENT_B, EXPIRE_TIME, TimeUnit.SECONDS);
if (lockResultB) {
try {
//模拟客户端b的操作耗时
Thread.sleep(2 * 1000);
} catch (Exception e) {
e.printStackTrace();
} finally {
redisLock.unLock(LOCK_NAME, CLIENT_B);
}
}
}
输出如下:
在集群模式下的问题
以上的场景,在单机版的redis中问题不大,可是在redis集群中,可能存在问题。
客户端a在redis集群中节点1上申请到了锁,然后执行业务逻辑,可是这个时候节点1的锁还没同步到节点2上,节点1突然挂了。这个时候客户端b在节点2上申请锁,立即就成功了。这个时候就出现了一把锁同时被多个客户端持有的情况。