1 分布式锁的概述
1.1 锁的种类
- 单机版同一个JVM虚拟机内,synchronized或者Lock接口
- 分布式不同个JVM虚拟机内,单机的线程锁机制不再起作用,资源类在不同的服务器之间共享了
1.2 一个靠谱分布式锁需要具备的条件
- 独占性: 任何时刻只能有且仅有一个线程持有
- 高可用: 若redis集群环境下, 不能因为某一个节点挂了而出现获取锁和释放锁失败的情况
- 防死锁: 杜绝死锁,必须有超时控制机制或者撤销操作,有个兜底终止跳出方案
- 不乱抢: 防止张冠李戴,不能私下unlock别人的锁,只能自己加锁自己释放。
- 重入性: 同一个节点的同一个线程如果获得锁之后,它也可以再次获取这个锁。
2 实现之一 Setnx
其实目前通常所说的Setnx命令,并非单指Redis的setnx key value这条命令。
一般代指Redis中对set命令加上nx参数进行使用,set这个命令,目前已经支持这么多参数可选:
SET key value [EX seconds|PX milliseconds] [NX|XX] [KEEPTTL]
参数说明
- EX: key在多少秒之后过期。
- PX: key在多少毫秒之后过期。
- NX: 当key不存在的时候,才创建key,效果等同于setnx。
- XX: 当key存在的时候,覆盖key。
2.1 示例代码
@GetMapping("/buy_goods")
public String buy_Goods() {
String key = "RedisLock";
String value = UUID.randomUUID().toString()+Thread.currentThread().getName();
// setIfAbsent是java中的方法
// setnx是redis命令中的方法
Boolean flagLock = stringRedisTemplate.opsForValue().setIfAbsent(key, value);
if(!flagLock) {
return "抢夺锁失败";
}
String result = stringRedisTemplate.opsForValue().get("goods:001");
int goodsNumber = result == null ? 0 : Integer.parseInt(result);
if(goodsNumber > 0){
int realNumber = goodsNumber - 1;
stringRedisTemplate.opsForValue().set("goods:001",realNumber + "");
stringRedisTemplate.delete(key);
return "你已经成功秒杀商品,此时还剩余:" + realNumber + "件"
}
return "商品已经售罄/活动结束/调用超时,欢迎下次光临" ;
}
如果在上述的代码执行过程中,出异常的话,可能无法释放锁,必须要在代码层面finally释放锁
}finally {
stringRedisTemplate.delete(key)
}
再如果部署了微服务jar包的机器挂了,代码层面根本没有走到finally这块,没办法保证解锁,这个key没有被删除,需要加入一个过期时间限定key
Boolean flagLock = stringRedisTemplate.opsForValue().setIfAbsent(key, value);//1
stringRedisTemplate.expire(key,10L,TimeUnit.SECONDS);//2
//设置key+过期时间必须要合并成一行具备原子性(加锁必须确保原子性)
//如果某一个时刻,刚刚执行完//1,这个时候发生宕机,那么就造成了死锁
stringRedisTemplate.opsForValue().setIfAbsent(key,value,10L,TimeUnit.SECONDS);
这样就已经没问题了吗?不!还是存在问题:
张冠李戴,删除了别人的锁:如果线程A拿到分布式锁,设置的过期时间小于业务代码执行的时间,当A线程分布式锁刚刚过期,这个时候B线程获取到了分布式锁,A线程执行完业务逻辑,进行删除,就可能删除的是B的分布式锁。
所以我们要先获取value判断是否是当前进程加的锁,value可以设置一个唯一的客户端ID,或者用UUID这种随机数。
finally {
if (stringRedisTemplate.opsForValue().get(key).equals(value)) {
stringRedisTemplate.delete(key);
}
}
这下总没问题了吧, 。额还是存在一个小问题。
finally块的判断+del删除操作不是原子性的。假如说我们通过stringRedisTemplate.opsForValue().get(key)拿到了value值,此时正好key过期了,另外一个线程获取到了锁,那么就会进行误删除了。
为什么有问题还说这么多呢?
第一,搞清劣势所在,才能更好的完善。
第二点,其实上文中最后这段代码,还是有很多公司在用的。
2.2 删除锁的正确姿势-Lua脚本
//1.占分布式锁,去redis占坑 setIfAbsent==sexnx
String uuid = UUID.randomUUID().toString();
Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", uuid,300,TimeUnit.SECONDS);
if(lock){
try{
//加锁成功,执行业务
getDataFromDb();
}finally {
//原子删锁
String script = "if redis.call('get', KEYS[1]) == ARGV[1] " +
"then " +
"return redis.call('del', KEYS[1]) " +
"else " +
" return 0 " +
"end";
//删除成功返回1,删除不成功返回0
redisTemplate.execute(
new DefaultRedisScript<Long>(script, Long.class),
Arrays.asList("lock"),uuid);
}
return res;
}else{
//加锁失败,重试
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
return null;
}
通过Lua脚本能保证原子性的原因说的简单一点:市因为所有的lua脚本在Redis实例中共用同一个Lua解释器,某一个lua脚本在被执行的时候,其他lua脚本无法执行。因此对于其他lua脚本而言,一个lua脚本要么不可见,要么就已经执行完了。
为什么现在的Setnx默认是指set命令带上nx参数,而不是直接是说Setnx这个命令。
因为Redis版本在2.6.12之前,set是不支持nx参数的,如果想要完成一个锁,那么需要两条命令:
1. setnx Test uuid
2. expire Test 30
即放入Key和设置有效期,是分开的两步,理论上会出现,刚执行完,程序挂掉,无法保证原子性。
但是早在2013年,Redis就发布了2.6.12版本,并且官网(set命令页[1]),也早早就说明了“SETNX,SETEX,PSETEX可能在未来的版本中,会弃用并永久删除”。
2.3 关于可重入的考虑
所谓不可重入锁,即若当前线程执行某个方法已经获取了该锁,那么在方法中尝试再次获取锁时,就会获取不到被阻塞。 同一个人拿一个锁 ,只能拿一次不能同时拿2次。
2.3.1 什么是可重入锁?它有什么作用?
可重入锁,也叫做递归锁,指的是在同一线程内,外层函数获得锁之后,内层递归函数仍然可以获取到该锁。 说白了就是同一个线程再次进入同样代码时,可以再次拿到该锁。 它的作用是:防止在同一线程中多次获取锁而导致死锁发生。
在java的编程中synchronized 和 ReentrantLock都是可重入锁。我们可以参考ReentrantLock的代码。
2.3.2 ReentrantLock的实现
看个ReentrantLock的例子
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.locks.ReentrantLock;
@Slf4j
public class ReentrantLockDemo {
//锁
private static ReentrantLock lock = new ReentrantLock();
public void doSomething(int n){
try{
//进入递归第一件事:加锁
lock.lock();
log.info("--------lock()执行后,getState()的值:{} lock.isLocked():{}",lock.getHoldCount(),lock.isLocked());
log.info("--------递归{}次--------",n);
if(n<=2){
this.doSomething(++n);
}else{
return;
}
}finally {
lock.unlock();
log.info("--------unlock()执行后,getState()的值:{} lock.isLocked():{}",lock.getHoldCount(),lock.isLocked());
}
}
public static void main(String[] args) {
ReentrantLockDemo reentrantLockDemo=new ReentrantLockDemo();
reentrantLockDemo.doSomething(1);
log.info("执行完doSomething方法 是否还持有锁:{}",lock.isLocked());
}
}
结果打印
16:35:58.051 [main] INFO com.test.ReentrantLockDemo - --------lock()执行后,getState()的值:1 lock.isLocked():true
16:35:58.055 [main] INFO com.test.ReentrantLockDemo - --------递归1次--------
16:35:58.055 [main] INFO com.test.ReentrantLockDemo - --------lock()执行后,getState()的值:2 lock.isLocked():true
16:35:58.055 [main] INFO com.test.ReentrantLockDemo - --------递归2次--------
16:35:58.055 [main] INFO com.test.ReentrantLockDemo - --------lock()执行后,getState()的值:3 lock.isLocked():true
16:35:58.055 [main] INFO com.test.ReentrantLockDemo - --------递归3次--------
16:35:58.055 [main] INFO com.test.ReentrantLockDemo - --------unlock()执行后,getState()的值:2 lock.isLocked():true
16:35:58.055 [main] INFO com.test.ReentrantLockDemo - --------unlock()执行后,getState()的值:1 lock.isLocked():true
16:35:58.055 [main] INFO com.test.ReentrantLockDemo - --------unlock()执行后,getState()的值:0 lock.isLocked():false
16:35:58.055 [main] INFO com.test.ReentrantLockDemo - 执行完doSomething方法 是否还持有锁:false
从上面栗子可以看出ReentrantLock是可重入锁,那么他是如何实现的了,我们看下源码就知道了
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
//先判断,c(state)是否等于0,如果等于0,说明没有线程持有锁
if (c == 0) {
//通过cas方法把state的值0替换成1,替换成功说明加锁成功
if (compareAndSetState(0, acquires)) {
//如果加锁成功,设置持有锁的线程是当前线程
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {//判断当前持有锁的线程是否是当前线程
//如果是当前线程,则state值加acquires,代表了当前线程加锁了多少次
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
ReentrantLock的加锁流程是:
- 先判断是否有线程持有锁,没有加锁进行加锁
- 如果加锁成功,则设置持有锁的线程是当前线程
- 如果有线程持有了锁,则再去判断,是否是当前线程持有了锁
- 如果是当前线程持有锁,则加锁数量(state)+1
/**
* 释放锁
* @param releases
* @return
*/
protected final boolean tryRelease(int releases) {
int c = getState() - releases;//state-1 减加锁次数
//如果持有锁的线程,不是当前线程,抛出异常
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {//如果c==0了说明当前线程,已经要释放锁了
free = true;
setExclusiveOwnerThread(null);//设置当前持有锁的线程为null
}
setState(c);//设置c的值
return free;
}
看ReentrantLock的解锁代码我们知道,
- 每次释放锁的时候都对state减1,
- 当c值等于0的时候,说明锁重入次数也为0了,
- 最终设置当前持有锁的线程为null,state也设置为0,锁就释放了。
2.3.3 那么redis要怎么实现可重入的操作
看ReentrantLock的源码我们知道,它是加锁成功了,记录了当前持有锁的线程,并通过一个int类型的数字,来记录了加锁次数。
我们知道ReentrantLock的实现原理了,那么redis只要下面两个问题解决,就能实现重入锁了:
- 怎么保存当前持有的线程
- 加锁次数(重入了多少次),怎么记录维护。
2.3.4 Hash散列类型
HSET hash-key sub-key value
锁的名字 某个请求线程的表示 加锁的次数
命令示例
127.0.0.1:6379> exists redis_lock #判定锁是否存在
(integer) 0
127.0.0.1:6379> hset redis_lock ordernum1:thread1 1 # 加锁
(integer) 1
127.0.0.1:6379> hincrby redis_lock ordernum1:thread1 1 # 锁的重入
(integer) 2
127.0.0.1:6379> hincrby redis_lock ordernum1:thread1 1 # 锁的重入
(integer) 3
127.0.0.1:6379> hget redis_lock ordernum1:thread1 # 获取锁的重入次数
"3"
127.0.0.1:6379> hincrby redis_lock ordernum1:thread1 -1 #释放锁
(integer) 2
127.0.0.1:6379> hincrby redis_lock ordernum1:thread1 -1 #释放锁
(integer) 1
127.0.0.1:6379> hincrby redis_lock ordernum1:thread1 -1 #释放锁
(integer) 0
127.0.0.1:6379> del redis_lock # 删除分布式锁
(integer) 1
127.0.0.1:6379>
2.3.5 Lua脚本 + springBoot 项目整合
业务代码
public String sale(){
String retMessage = "";
Lock redisLock = distributedLockFactory.getDistributedLock("redis");
redisLock.lock();
try{
//1 查询库存信息
String result = stringRedisTemplate.opsForValue().get("inventory001");
//2 判断库存是否足够
Integer inventoryNumber = result == null ? 0 : Integer.parseInt(result);
//3 扣减库存,每次减少一个
if(inventoryNumber > 0){
stringRedisTemplate.opsForValue().set("inventory001",String.valueOf(--inventoryNumber));
retMessage = "成功卖出一个商品,库存剩余:"+inventoryNumber;
testReEntry();
}else{
retMessage = "商品卖完了";
}
}finally {
redisLock.unlock();
}
return retMessage ;
}
锁工厂,支持多种类型的实现,redis / zookeeper / mysql
/**
* 分布式锁工厂
* @auther hll
* @create 2023-03-15 23:35
*/
@Component
public class DistributedLockFactory {
@Autowired
private StringRedisTemplate stringRedisTemplate;
private String lockName;
private String uuid;
public DistributedLockFactory(){
this.uuid = IdUtil.simpleUUID();
}
public Lock getDistributedLock(String lockType){
if(lockType == null) return null;
if(lockType.equalsIgnoreCase("REDIS")){
this.lockName = "RedisLock";
return new RedisDistributedLock(stringRedisTemplate,lockName,uuid);
}else if(lockType.equalsIgnoreCase("ZOOKEEPER")){
// todo
this.lockName = "ZookeeperLockNode";
return null;
}else if(lockType.equalsIgnoreCase("MYSQL")){
// todo
return null;
}
return null;
}
}
锁的实现
/**
* @auther hll
* @create 2023-03-15 23:41
* 自研的redis分布式锁,实现了Lock接口
*/
public class RedisDistributedLock implements Lock {
private StringRedisTemplate stringRedisTemplate;
private String lockName;//KEYS[1]
private String uuidValue;//ARGV[1]
private long expireTime;//ARGV[2]
public RedisDistributedLock(StringRedisTemplate stringRedisTemplate, String lockName, String uuid) {
this.stringRedisTemplate = stringRedisTemplate;
this.lockName = lockName;
this.uuidValue = uuid+":"+Thread.currentThread().getId();
this.expireTime = 30L;
}
@Override
public void lock(){
tryLock();
}
@Override
public boolean tryLock() {
try {
tryLock(-1L,TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
return false;
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
if(time == -1L) {
String script =
"if redis.call('exists',KEYS[1]) == 0 or redis.call('hexists',KEYS[1],ARGV[1]) == 1 then " +
"redis.call('hincrby',KEYS[1],ARGV[1],1) " +
"redis.call('expire',KEYS[1],ARGV[2]) " +
"return 1 " +
"else " +
"return 0 " +
"end";
//加锁不行,自旋重试
while(!stringRedisTemplate.execute(new DefaultRedisScript<>(script,Boolean.class), Arrays.asList(lockName), uuidValue,String.valueOf(expireTime))){
//暂停60毫秒
try {
TimeUnit.MILLISECONDS.sleep(60);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return true;
}
return false;
}
@Override
public void unlock() {
String script =
"if redis.call('HEXISTS',KEYS[1],ARGV[1]) == 0 then " +
"return nil " +
"elseif redis.call('HINCRBY',KEYS[1],ARGV[1],-1) == 0 then " +
"return redis.call('del',KEYS[1]) " +
"else " +
"return 0 " +
"end";
Long flag = stringRedisTemplate.execute(new DefaultRedisScript<>(script, Long.class), Arrays.asList(lockName), uuidValue, String.valueOf(expireTime));
if(null == flag){
throw new RuntimeException("this lock doesn't exists,o(╥﹏╥)o");
}
}
@Override
public void lockInterruptibly() throws InterruptedException {
}
@Override
public Condition newCondition(){
return null;
}
}
2.3.6 增加自动续期的功能
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
if(time == -1L) {
String script =
"if redis.call('exists',KEYS[1]) == 0 or redis.call('hexists',KEYS[1],ARGV[1]) == 1 then " +
"redis.call('hincrby',KEYS[1],ARGV[1],1) " +
"redis.call('expire',KEYS[1],ARGV[2]) " +
"return 1 " +
"else " +
"return 0 " +
"end";
//加锁不行,自旋重试
while(!stringRedisTemplate.execute(new DefaultRedisScript<>(script,Boolean.class), Arrays.asList(lockName), uuidValue,String.valueOf(expireTime))){
//暂停60毫秒
try {
TimeUnit.MILLISECONDS.sleep(60);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//新建一个后台扫描程序,来坚持key目前的ttl,是否到我们规定的1/2 1/3来实现续期
renewExpire();
return true;
}
return false;
}
//自动续期
private void renewExpire() {
String script =
"if redis.call('HEXISTS',KEYS[1],ARGV[1]) == 1 then " +
"return redis.call('expire',KEYS[1],ARGV[2]) " +
"else " +
"return 0 " +
"end";
new Timer().schedule(new TimerTask(){
@Override
public void run(){
if (stringRedisTemplate.execute(new DefaultRedisScript<>(script, Boolean.class), Arrays.asList(lockName), uuidValue, String.valueOf(expireTime))) {
renewExpire();
}
}
},(this.expireTime * 1000)/3);
}
3 redisson
Redisson是java的redis客户端之一,提供了一些api方便操作redis。但是redisson这个客户端可有点厉害,在官网截了仅仅是一部分的图:
这个特性列表可以说是太多了,是不是还看到了一些JUC包下面的类名,redisson帮我们搞了分布式的版本,比如AtomicLong,直接用RedissonAtomicLong就行了,连类名都不用去新记,很人性化了。
锁只是它的冰山一角,并且从它的wiki页面看到,对主从,哨兵,集群等模式都支持,当然了,单节点模式肯定是支持的。
本文还是以锁为主,其他的不过多介绍。
Redisson普通的锁实现源码主要是RedissonLock这个类,还没有看过它源码的盆友,不妨去瞧一瞧。源码中加锁/释放锁操作都是用lua脚本完成的,封装的非常完善,开箱即用。
4 RedLock
redLock的中文是直译过来的,就叫红锁。
红锁并非是一个工具,而是redis官方提出的一种分布式锁的算法。就在刚刚介绍完的redisson中,就实现了redLock版本的锁。也就是说除了getLock方法,还有getRedLock方法。
大概画一下对红锁的理解:
如果你不熟悉redis高可用部署,那么没关系。redLock算法虽然是需要多个实例,但是这些实例都是独自部署的,没有主从关系。
RedLock作者指出,之所以要用独立的,是避免了redis异步复制造成的锁丢失,比如:主节点没来的及把刚刚set进来这条数据给从节点,就挂了。
有些人是不是觉得大佬们都是杠精啊,天天就想着极端情况。 其实高可用嘛,拼的就是99.999...% 中小数点后面的位数。
回到上面那张简陋的图片,红锁算法认为,只要(N/2) + 1个节点加锁成功,那么就认为获取了锁, 解锁时将所有实例解锁。 流程为:
- 顺序向五个节点请求加锁
- 根据一定的超时时间来推断是不是跳过该节点
- 三个节点加锁成功并且花费时间小于锁的有效期
- 认定加锁成功
也就是说,假设锁30秒过期,三个节点加锁花了31秒,自然是加锁失败了。
现在这个官网已经不推荐使用了,如果要使用多机案例的话官方推荐使用 MultiLock多重锁。
5 redisson看门狗机制
5.1 官网解释
Redisson内部提供了一个监控锁的看门狗,它的作用是在Redisson实例被关闭前,不断的延长锁的有效期。默认情况下,看门狗的检查锁的超时时间是30秒钟,也可以通过修改Config.lockWatchdogTimeout来另行指定。
5.2 看门狗开启条件
我们可以看到,leaseTime != -1时,只执行tryLockInnerAsync方法,其它情况会执行下面的代码,而leaseTime 就是我们调用lock(10, TimeUnit.SECONDS);方法传入的时间参数。
由此可知:redisson如果只是用lock.lock(); 不传过期时间的话,会启动看门狗机制,传过期时间的话,就不会启动看门狗机制。
// org.redisson.RedissonLock#tryAcquireAsync
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
if (leaseTime != -1) {
return tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
}
RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(waitTime,
commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(),
TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
// future模式,抢到锁之后开启看门狗
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
if (e != null) {
return;
}
// lock acquired
if (ttlRemaining == null) {
scheduleExpirationRenewal(threadId); // 开启看门狗
}
});
return ttlRemainingFuture;
}
5.3 看门狗如何开启的
以下代码就是开启看门狗的方法,我们可以看到,启动了一个TimerTask进行倒计时,默认倒计时时间为internalLockLeaseTime / 3,也就是默认的10秒钟(默认过期时间是30秒)。
// org.redisson.RedissonLock#renewExpiration
private void renewExpiration() {
ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ee == null) {
return;
}
Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ent == null) {
return;
}
Long threadId = ent.getFirstThreadId();
if (threadId == null) {
return;
}
// 关键方法,使用lua脚本刷新过期时间
RFuture<Boolean> future = renewExpirationAsync(threadId);
future.onComplete((res, e) -> {
if (e != null) {
log.error("Can't update lock " + getName() + " expiration", e);
return;
}
if (res) {
// reschedule itself
renewExpiration(); // 不断地调用自己,刷新过期时间
}
});
}
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
ee.setTimeout(task);
}
5.4 看门狗的性能问题
很多小伙伴都认为看门狗是非常消耗性能的,其实性能的确是会有一些消耗,但是没有很多。
前几天有个小伙伴抛出了一个疑问:假如说每个线程都启动一个TimerTask来不断刷新过期时间,岂不是服务器很快就“炸了”?
看门狗机制主要是用于业务代码执行时间忽长忽短的,如果一个业务代码,我们确定它在10秒钟之内就会执行完毕,完全可以取消这个看门狗机制,来提升一部分性能。
举例来说
String lockKey = "product_001";//商品id作为锁
RLock lock = redisson.getLock(lockKey);
try{
System.out.println("准备尝试获取锁");
lock.lock(30, TimeUnit.SECONDS);
System.out.println("进来了,开始停8秒");
Thread.sleep(8000);
//这里写逻辑
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
以上是一个redisson锁的实现逻辑。我们可以看到,锁住的key是商品id。假如说product_001商品被锁住了,在同一时间访问product_001商品的线程有且只能最多有一个,所以redisson看门狗在product_001商品上有且最多有一个。
但是商品并不只有product_001,可能有成百上千个商品,此时如果有大批量客户访问这成百上千个商品的话,那么就会生成成百上千个看门狗!这是个很恐怖的事情。
所以,看门狗能不用还是不用了。
思考:是否可以考虑用list或者zset来维护要续期的分布式锁,来降低创建的线程过多带来的性能消耗