天天看点

什么是分布式锁~什么是setnx?什么是Redlock?什么是Redisson?

作者:做好一个程序猿

1 分布式锁的概述

什么是分布式锁~什么是setnx?什么是Redlock?什么是Redisson?

1.1 锁的种类

  • 单机版同一个JVM虚拟机内,synchronized或者Lock接口
  • 分布式不同个JVM虚拟机内,单机的线程锁机制不再起作用,资源类在不同的服务器之间共享了

1.2 一个靠谱分布式锁需要具备的条件

  • 独占性: 任何时刻只能有且仅有一个线程持有
  • 高可用: 若redis集群环境下, 不能因为某一个节点挂了而出现获取锁和释放锁失败的情况
  • 防死锁: 杜绝死锁,必须有超时控制机制或者撤销操作,有个兜底终止跳出方案
  • 不乱抢: 防止张冠李戴,不能私下unlock别人的锁,只能自己加锁自己释放。
  • 重入性: 同一个节点的同一个线程如果获得锁之后,它也可以再次获取这个锁。

2 实现之一 Setnx

其实目前通常所说的Setnx命令,并非单指Redis的setnx key value这条命令。

一般代指Redis中对set命令加上nx参数进行使用,set这个命令,目前已经支持这么多参数可选:

SET key value [EX seconds|PX milliseconds] [NX|XX] [KEEPTTL]

参数说明

    • EX: key在多少秒之后过期。
    • PX: key在多少毫秒之后过期。
    • NX: 当key不存在的时候,才创建key,效果等同于setnx。
    • XX: 当key存在的时候,覆盖key。

2.1 示例代码

@GetMapping("/buy_goods")
    public String buy_Goods() {
        String key = "RedisLock";
        String value = UUID.randomUUID().toString()+Thread.currentThread().getName();
			 // setIfAbsent是java中的方法
			 // setnx是redis命令中的方法
        Boolean flagLock = stringRedisTemplate.opsForValue().setIfAbsent(key, value);
        if(!flagLock) {
            return "抢夺锁失败";
        }

        String result = stringRedisTemplate.opsForValue().get("goods:001");
        int goodsNumber = result == null ? 0 : Integer.parseInt(result);

        if(goodsNumber > 0){
            int realNumber = goodsNumber - 1;
            stringRedisTemplate.opsForValue().set("goods:001",realNumber + "");
            stringRedisTemplate.delete(key);
            return "你已经成功秒杀商品,此时还剩余:" + realNumber + "件"
        }
        return "商品已经售罄/活动结束/调用超时,欢迎下次光临" ;
    }
           

如果在上述的代码执行过程中,出异常的话,可能无法释放锁,必须要在代码层面finally释放锁

}finally {
        stringRedisTemplate.delete(key)
    }           

再如果部署了微服务jar包的机器挂了,代码层面根本没有走到finally这块,没办法保证解锁,这个key没有被删除,需要加入一个过期时间限定key

Boolean flagLock = stringRedisTemplate.opsForValue().setIfAbsent(key, value);//1
stringRedisTemplate.expire(key,10L,TimeUnit.SECONDS);//2

//设置key+过期时间必须要合并成一行具备原子性(加锁必须确保原子性)
//如果某一个时刻,刚刚执行完//1,这个时候发生宕机,那么就造成了死锁

stringRedisTemplate.opsForValue().setIfAbsent(key,value,10L,TimeUnit.SECONDS);
           

这样就已经没问题了吗?不!还是存在问题:

张冠李戴,删除了别人的锁:如果线程A拿到分布式锁,设置的过期时间小于业务代码执行的时间,当A线程分布式锁刚刚过期,这个时候B线程获取到了分布式锁,A线程执行完业务逻辑,进行删除,就可能删除的是B的分布式锁。

所以我们要先获取value判断是否是当前进程加的锁,value可以设置一个唯一的客户端ID,或者用UUID这种随机数。

finally {
  if (stringRedisTemplate.opsForValue().get(key).equals(value)) {
      stringRedisTemplate.delete(key);
  }
}           

这下总没问题了吧, 。额还是存在一个小问题。

finally块的判断+del删除操作不是原子性的。假如说我们通过stringRedisTemplate.opsForValue().get(key)拿到了value值,此时正好key过期了,另外一个线程获取到了锁,那么就会进行误删除了。

什么是分布式锁~什么是setnx?什么是Redlock?什么是Redisson?

为什么有问题还说这么多呢?

第一,搞清劣势所在,才能更好的完善。

第二点,其实上文中最后这段代码,还是有很多公司在用的。

2.2 删除锁的正确姿势-Lua脚本

//1.占分布式锁,去redis占坑  setIfAbsent==sexnx
String uuid = UUID.randomUUID().toString();
Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", uuid,300,TimeUnit.SECONDS);
if(lock){
   try{
       //加锁成功,执行业务
       getDataFromDb();
   }finally {
       //原子删锁
       String script = "if redis.call('get', KEYS[1]) == ARGV[1] " +
               "then " +
               "return redis.call('del', KEYS[1]) " +
               "else " +
               "   return 0 " +
               "end";
       //删除成功返回1,删除不成功返回0
       redisTemplate.execute(
               new DefaultRedisScript<Long>(script, Long.class),
               Arrays.asList("lock"),uuid);
   }
   return res;
}else{
   //加锁失败,重试
   try { 
       TimeUnit.SECONDS.sleep(1);  
    } catch (InterruptedException e) {
       e.printStackTrace();
   }
   return null;
}
           

通过Lua脚本能保证原子性的原因说的简单一点:市因为所有的lua脚本在Redis实例中共用同一个Lua解释器,某一个lua脚本在被执行的时候,其他lua脚本无法执行。因此对于其他lua脚本而言,一个lua脚本要么不可见,要么就已经执行完了。

为什么现在的Setnx默认是指set命令带上nx参数,而不是直接是说Setnx这个命令。

因为Redis版本在2.6.12之前,set是不支持nx参数的,如果想要完成一个锁,那么需要两条命令:

1. setnx Test uuid

2. expire Test 30

即放入Key和设置有效期,是分开的两步,理论上会出现,刚执行完,程序挂掉,无法保证原子性。

但是早在2013年,Redis就发布了2.6.12版本,并且官网(set命令页[1]),也早早就说明了“SETNX,SETEX,PSETEX可能在未来的版本中,会弃用并永久删除”。

2.3 关于可重入的考虑

所谓不可重入锁,即若当前线程执行某个方法已经获取了该锁,那么在方法中尝试再次获取锁时,就会获取不到被阻塞。 同一个人拿一个锁 ,只能拿一次不能同时拿2次。

2.3.1 什么是可重入锁?它有什么作用?

可重入锁,也叫做递归锁,指的是在同一线程内,外层函数获得锁之后,内层递归函数仍然可以获取到该锁。 说白了就是同一个线程再次进入同样代码时,可以再次拿到该锁。 它的作用是:防止在同一线程中多次获取锁而导致死锁发生。

在java的编程中synchronized 和 ReentrantLock都是可重入锁。我们可以参考ReentrantLock的代码。

2.3.2 ReentrantLock的实现

看个ReentrantLock的例子

import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.locks.ReentrantLock;

@Slf4j
public class ReentrantLockDemo {
    //锁
    private static ReentrantLock lock =  new ReentrantLock();
    public void doSomething(int n){
        try{
            //进入递归第一件事:加锁
            lock.lock();
            log.info("--------lock()执行后,getState()的值:{} lock.isLocked():{}",lock.getHoldCount(),lock.isLocked());
            log.info("--------递归{}次--------",n);
            if(n<=2){
                this.doSomething(++n);
            }else{
                return;
            }
        }finally {
            lock.unlock();
            log.info("--------unlock()执行后,getState()的值:{} lock.isLocked():{}",lock.getHoldCount(),lock.isLocked());
        }
    }

    public static void main(String[] args) {
        ReentrantLockDemo reentrantLockDemo=new ReentrantLockDemo();
        reentrantLockDemo.doSomething(1);
        log.info("执行完doSomething方法 是否还持有锁:{}",lock.isLocked());
    }

}
           

结果打印

16:35:58.051 [main] INFO com.test.ReentrantLockDemo - --------lock()执行后,getState()的值:1 lock.isLocked():true
16:35:58.055 [main] INFO com.test.ReentrantLockDemo - --------递归1次--------
16:35:58.055 [main] INFO com.test.ReentrantLockDemo - --------lock()执行后,getState()的值:2 lock.isLocked():true
16:35:58.055 [main] INFO com.test.ReentrantLockDemo - --------递归2次--------
16:35:58.055 [main] INFO com.test.ReentrantLockDemo - --------lock()执行后,getState()的值:3 lock.isLocked():true
16:35:58.055 [main] INFO com.test.ReentrantLockDemo - --------递归3次--------
16:35:58.055 [main] INFO com.test.ReentrantLockDemo - --------unlock()执行后,getState()的值:2 lock.isLocked():true
16:35:58.055 [main] INFO com.test.ReentrantLockDemo - --------unlock()执行后,getState()的值:1 lock.isLocked():true
16:35:58.055 [main] INFO com.test.ReentrantLockDemo - --------unlock()执行后,getState()的值:0 lock.isLocked():false
16:35:58.055 [main] INFO com.test.ReentrantLockDemo - 执行完doSomething方法 是否还持有锁:false           

从上面栗子可以看出ReentrantLock是可重入锁,那么他是如何实现的了,我们看下源码就知道了

final boolean nonfairTryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        //先判断,c(state)是否等于0,如果等于0,说明没有线程持有锁
        if (c == 0) {
            //通过cas方法把state的值0替换成1,替换成功说明加锁成功
            if (compareAndSetState(0, acquires)) {
                //如果加锁成功,设置持有锁的线程是当前线程
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) {//判断当前持有锁的线程是否是当前线程
            //如果是当前线程,则state值加acquires,代表了当前线程加锁了多少次
            int nextc = c + acquires;
            if (nextc < 0) // overflow
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }
           

ReentrantLock的加锁流程是:

  1. 先判断是否有线程持有锁,没有加锁进行加锁
  2. 如果加锁成功,则设置持有锁的线程是当前线程
  3. 如果有线程持有了锁,则再去判断,是否是当前线程持有了锁
  4. 如果是当前线程持有锁,则加锁数量(state)+1
/**
 * 释放锁
 * @param releases
 * @return
 */
protected final boolean tryRelease(int releases) {
    int c = getState() - releases;//state-1 减加锁次数
    //如果持有锁的线程,不是当前线程,抛出异常
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();

    boolean free = false;
    if (c == 0) {//如果c==0了说明当前线程,已经要释放锁了
        free = true;
        setExclusiveOwnerThread(null);//设置当前持有锁的线程为null
    }
    setState(c);//设置c的值
    return free;
}
           

看ReentrantLock的解锁代码我们知道,

  • 每次释放锁的时候都对state减1,
  • 当c值等于0的时候,说明锁重入次数也为0了,
  • 最终设置当前持有锁的线程为null,state也设置为0,锁就释放了。

2.3.3 那么redis要怎么实现可重入的操作

看ReentrantLock的源码我们知道,它是加锁成功了,记录了当前持有锁的线程,并通过一个int类型的数字,来记录了加锁次数。

我们知道ReentrantLock的实现原理了,那么redis只要下面两个问题解决,就能实现重入锁了:

  1. 怎么保存当前持有的线程
  2. 加锁次数(重入了多少次),怎么记录维护。

2.3.4 Hash散列类型

HSET hash-key sub-key value

锁的名字 某个请求线程的表示 加锁的次数

命令示例

127.0.0.1:6379> exists redis_lock #判定锁是否存在
(integer) 0
127.0.0.1:6379> hset redis_lock ordernum1:thread1 1 # 加锁
(integer) 1
127.0.0.1:6379> hincrby redis_lock ordernum1:thread1 1 # 锁的重入
(integer) 2
127.0.0.1:6379> hincrby redis_lock ordernum1:thread1 1 # 锁的重入
(integer) 3
127.0.0.1:6379> hget redis_lock ordernum1:thread1 # 获取锁的重入次数
"3"
127.0.0.1:6379> hincrby redis_lock ordernum1:thread1 -1 #释放锁
(integer) 2
127.0.0.1:6379> hincrby redis_lock ordernum1:thread1 -1 #释放锁
(integer) 1
127.0.0.1:6379> hincrby redis_lock ordernum1:thread1 -1 #释放锁
(integer) 0
127.0.0.1:6379> del redis_lock # 删除分布式锁
(integer) 1
127.0.0.1:6379>            

2.3.5 Lua脚本 + springBoot 项目整合

业务代码

public String sale(){
    String retMessage = "";
    Lock redisLock = distributedLockFactory.getDistributedLock("redis");
    redisLock.lock();
    try{
        //1 查询库存信息
        String result = stringRedisTemplate.opsForValue().get("inventory001");
        //2 判断库存是否足够
        Integer inventoryNumber = result == null ? 0 : Integer.parseInt(result);
        //3 扣减库存,每次减少一个
        if(inventoryNumber > 0){
            stringRedisTemplate.opsForValue().set("inventory001",String.valueOf(--inventoryNumber));
            retMessage = "成功卖出一个商品,库存剩余:"+inventoryNumber;
            testReEntry();
        }else{
            retMessage = "商品卖完了";
        }
    }finally {
        redisLock.unlock();
    }
    return retMessage ;
}           

锁工厂,支持多种类型的实现,redis / zookeeper / mysql

/**
 * 分布式锁工厂
 * @auther hll
 * @create 2023-03-15 23:35
 */
@Component
public class DistributedLockFactory {
    
    @Autowired
    private StringRedisTemplate stringRedisTemplate;
    
    private String lockName;
    
    private String uuid;

    public DistributedLockFactory(){
        this.uuid = IdUtil.simpleUUID();
    }

    public Lock getDistributedLock(String lockType){
        if(lockType == null) return null;

        if(lockType.equalsIgnoreCase("REDIS")){
            this.lockName = "RedisLock";
            return new RedisDistributedLock(stringRedisTemplate,lockName,uuid);
        }else if(lockType.equalsIgnoreCase("ZOOKEEPER")){
            // todo 
            this.lockName = "ZookeeperLockNode";
            return null;
        }else if(lockType.equalsIgnoreCase("MYSQL")){
            // todo 
            return null;
        }

        return null;
    }
}           

锁的实现

/**
 * @auther hll
 * @create 2023-03-15 23:41
 * 自研的redis分布式锁,实现了Lock接口
 */
public class RedisDistributedLock implements Lock {
    
    private StringRedisTemplate stringRedisTemplate;
    
    private String lockName;//KEYS[1]
    
    private String uuidValue;//ARGV[1]
    
    private long  expireTime;//ARGV[2]


    public RedisDistributedLock(StringRedisTemplate stringRedisTemplate, String lockName, String uuid) {
        this.stringRedisTemplate = stringRedisTemplate;
        this.lockName = lockName;
        this.uuidValue = uuid+":"+Thread.currentThread().getId();
        this.expireTime = 30L;
    }

    @Override
    public void lock(){
        tryLock();
    }
    
    @Override
    public boolean tryLock() {
        try {
            tryLock(-1L,TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return false;
    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        if(time == -1L) {
            String script =
                    "if redis.call('exists',KEYS[1]) == 0 or redis.call('hexists',KEYS[1],ARGV[1]) == 1 then    " +
                            "redis.call('hincrby',KEYS[1],ARGV[1],1)    " +
                            "redis.call('expire',KEYS[1],ARGV[2])    " +
                            "return 1  " +
                    "else   " +
                            "return 0 " +
                    "end";
        	//加锁不行,自旋重试
            while(!stringRedisTemplate.execute(new DefaultRedisScript<>(script,Boolean.class), Arrays.asList(lockName), uuidValue,String.valueOf(expireTime))){
                //暂停60毫秒
                try { 
                    TimeUnit.MILLISECONDS.sleep(60); 
                } catch (InterruptedException e) { 
                    e.printStackTrace(); 
                }
            }
            return true;
        }
        return false;
    }

    @Override
    public void unlock() {
        String script =
                "if redis.call('HEXISTS',KEYS[1],ARGV[1]) == 0 then    " +
                        "return nil  " +
                "elseif redis.call('HINCRBY',KEYS[1],ARGV[1],-1) == 0 then    " +
                        "return redis.call('del',KEYS[1])  " +
                "else    " +
                        "return 0 " +
                "end";
        Long flag = stringRedisTemplate.execute(new DefaultRedisScript<>(script, Long.class), Arrays.asList(lockName), uuidValue, String.valueOf(expireTime));
        if(null == flag){
            throw new RuntimeException("this lock doesn't exists,o(╥﹏╥)o");
        }
    }

    @Override
    public void lockInterruptibly() throws InterruptedException {

    }
    @Override
    public Condition newCondition(){
        return null;
    }
}           

2.3.6 增加自动续期的功能

@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
    if(time == -1L) {
        String script =
                "if redis.call('exists',KEYS[1]) == 0 or redis.call('hexists',KEYS[1],ARGV[1]) == 1 then    " +
                        "redis.call('hincrby',KEYS[1],ARGV[1],1)    " +
                        "redis.call('expire',KEYS[1],ARGV[2])    " +
                        "return 1  " +
                "else   " +
                        "return 0 " +
                "end";
        //加锁不行,自旋重试
        while(!stringRedisTemplate.execute(new DefaultRedisScript<>(script,Boolean.class), Arrays.asList(lockName), uuidValue,String.valueOf(expireTime))){
            //暂停60毫秒
            try { 
                TimeUnit.MILLISECONDS.sleep(60); 
            } catch (InterruptedException e) { 
                e.printStackTrace(); 
            }
        }
        //新建一个后台扫描程序,来坚持key目前的ttl,是否到我们规定的1/2 1/3来实现续期
        renewExpire();
        return true;
    }
    return false;
}

//自动续期
private void renewExpire() {
    String script =
            "if redis.call('HEXISTS',KEYS[1],ARGV[1]) == 1 then     " +
                    "return redis.call('expire',KEYS[1],ARGV[2]) " +
            "else     " +
                    "return 0 " +
            "end";

    new Timer().schedule(new TimerTask(){
        @Override
        public void run(){
            if (stringRedisTemplate.execute(new DefaultRedisScript<>(script, Boolean.class), Arrays.asList(lockName), uuidValue, String.valueOf(expireTime))) {
                renewExpire();
            }
        }
    },(this.expireTime * 1000)/3);
}           

3 redisson

Redisson是java的redis客户端之一,提供了一些api方便操作redis。但是redisson这个客户端可有点厉害,在官网截了仅仅是一部分的图:

什么是分布式锁~什么是setnx?什么是Redlock?什么是Redisson?

这个特性列表可以说是太多了,是不是还看到了一些JUC包下面的类名,redisson帮我们搞了分布式的版本,比如AtomicLong,直接用RedissonAtomicLong就行了,连类名都不用去新记,很人性化了。

锁只是它的冰山一角,并且从它的wiki页面看到,对主从,哨兵,集群等模式都支持,当然了,单节点模式肯定是支持的。

本文还是以锁为主,其他的不过多介绍。

Redisson普通的锁实现源码主要是RedissonLock这个类,还没有看过它源码的盆友,不妨去瞧一瞧。源码中加锁/释放锁操作都是用lua脚本完成的,封装的非常完善,开箱即用。

4 RedLock

redLock的中文是直译过来的,就叫红锁。

红锁并非是一个工具,而是redis官方提出的一种分布式锁的算法。就在刚刚介绍完的redisson中,就实现了redLock版本的锁。也就是说除了getLock方法,还有getRedLock方法。

大概画一下对红锁的理解:

什么是分布式锁~什么是setnx?什么是Redlock?什么是Redisson?

如果你不熟悉redis高可用部署,那么没关系。redLock算法虽然是需要多个实例,但是这些实例都是独自部署的,没有主从关系。

RedLock作者指出,之所以要用独立的,是避免了redis异步复制造成的锁丢失,比如:主节点没来的及把刚刚set进来这条数据给从节点,就挂了。

有些人是不是觉得大佬们都是杠精啊,天天就想着极端情况。 其实高可用嘛,拼的就是99.999...% 中小数点后面的位数。

回到上面那张简陋的图片,红锁算法认为,只要(N/2) + 1个节点加锁成功,那么就认为获取了锁, 解锁时将所有实例解锁。 流程为:

  1. 顺序向五个节点请求加锁
  2. 根据一定的超时时间来推断是不是跳过该节点
  3. 三个节点加锁成功并且花费时间小于锁的有效期
  4. 认定加锁成功

也就是说,假设锁30秒过期,三个节点加锁花了31秒,自然是加锁失败了。

现在这个官网已经不推荐使用了,如果要使用多机案例的话官方推荐使用 MultiLock多重锁。

什么是分布式锁~什么是setnx?什么是Redlock?什么是Redisson?

5 redisson看门狗机制

5.1 官网解释

Redisson内部提供了一个监控锁的看门狗,它的作用是在Redisson实例被关闭前,不断的延长锁的有效期。默认情况下,看门狗的检查锁的超时时间是30秒钟,也可以通过修改Config.lockWatchdogTimeout来另行指定。

5.2 看门狗开启条件

我们可以看到,leaseTime != -1时,只执行tryLockInnerAsync方法,其它情况会执行下面的代码,而leaseTime 就是我们调用lock(10, TimeUnit.SECONDS);方法传入的时间参数。

由此可知:redisson如果只是用lock.lock(); 不传过期时间的话,会启动看门狗机制,传过期时间的话,就不会启动看门狗机制。

// org.redisson.RedissonLock#tryAcquireAsync
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
    if (leaseTime != -1) {
        return tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
    }
    RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(waitTime,
                                            commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(),
                                            TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
    // future模式,抢到锁之后开启看门狗
    ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
        if (e != null) {
            return;
        }

        // lock acquired
        if (ttlRemaining == null) {
            scheduleExpirationRenewal(threadId); // 开启看门狗
        }
    });
    return ttlRemainingFuture;
}
           

5.3 看门狗如何开启的

以下代码就是开启看门狗的方法,我们可以看到,启动了一个TimerTask进行倒计时,默认倒计时时间为internalLockLeaseTime / 3,也就是默认的10秒钟(默认过期时间是30秒)。

// org.redisson.RedissonLock#renewExpiration
private void renewExpiration() {
     ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
     if (ee == null) {
         return;
     }
     
     Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
         @Override
         public void run(Timeout timeout) throws Exception {
             ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
             if (ent == null) {
                 return;
             }
             Long threadId = ent.getFirstThreadId();
             if (threadId == null) {
                 return;
             }
             // 关键方法,使用lua脚本刷新过期时间
             RFuture<Boolean> future = renewExpirationAsync(threadId);
             future.onComplete((res, e) -> {
                 if (e != null) {
                     log.error("Can't update lock " + getName() + " expiration", e);
                     return;
                 }
                 
                 if (res) {
                     // reschedule itself
                     renewExpiration(); // 不断地调用自己,刷新过期时间
                 }
             });
         }
     }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
     
     ee.setTimeout(task);
 }
           

5.4 看门狗的性能问题

很多小伙伴都认为看门狗是非常消耗性能的,其实性能的确是会有一些消耗,但是没有很多。

前几天有个小伙伴抛出了一个疑问:假如说每个线程都启动一个TimerTask来不断刷新过期时间,岂不是服务器很快就“炸了”?

看门狗机制主要是用于业务代码执行时间忽长忽短的,如果一个业务代码,我们确定它在10秒钟之内就会执行完毕,完全可以取消这个看门狗机制,来提升一部分性能。

举例来说

String lockKey = "product_001";//商品id作为锁
RLock lock = redisson.getLock(lockKey);
try{
    System.out.println("准备尝试获取锁");
    lock.lock(30, TimeUnit.SECONDS);

    System.out.println("进来了,开始停8秒");

    Thread.sleep(8000);
    //这里写逻辑
} catch (InterruptedException e) {
    e.printStackTrace();
} finally {
    lock.unlock();
}
           

以上是一个redisson锁的实现逻辑。我们可以看到,锁住的key是商品id。假如说product_001商品被锁住了,在同一时间访问product_001商品的线程有且只能最多有一个,所以redisson看门狗在product_001商品上有且最多有一个。

但是商品并不只有product_001,可能有成百上千个商品,此时如果有大批量客户访问这成百上千个商品的话,那么就会生成成百上千个看门狗!这是个很恐怖的事情。

所以,看门狗能不用还是不用了。

思考:是否可以考虑用list或者zset来维护要续期的分布式锁,来降低创建的线程过多带来的性能消耗

继续阅读