天天看點

Redisson可重入與鎖續期源碼分析

一、前言

在跨程序的前提下通路某個共享資源時,需要使用到分布式鎖來保證同一時間隻有一個程序能夠操作共享資源。

這個時候,鎖對象需要從單個JVM記憶體中遷移到某個多程序共用的中間件上,例如MySQL、Redis或ZK上。

我們常常選擇Redis來實作分布式鎖,這裡面有很多的坑,詳情可以參考我的這篇文章​​我用了上萬字,走了一遍Redis實作分布式鎖的坎坷之路,從單機到主從再到多執行個體,原來會發生這麼多的問題​​

Redisson是一個可以在java項目中使用的Redis用戶端,其屏蔽了原子性、可重入、鎖續期的諸多細節,内部實作各種各樣的鎖。

例如可重入鎖、公平鎖、MultiLock與Red Lock與讀寫鎖等,今天主要分析可重入鎖與鎖續期的源碼。

二、準備工作

引入redisson的依賴包:

<dependency>
            <groupId>org.redisson</groupId>
            <artifactId>redisson-spring-boot-starter</artifactId>
            <version>3.16.6</version>
        </dependency>      

使用單節點配置:

redisson:
  singleServerConfig:
    address: 127:0:0:1:6379      

使用以下指令來啟動一個redis容器:

docker run --name redis -p 6379:6379 -d redis      

測試代碼:

@Resource
    RedissonClient redissonClient;

    public void lock() {
        RLock lock = redissonClient.getLock("SunAlwaysOnline");
        lock.lock();
        try {
            //模拟業務耗時
            Thread.sleep(60000);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }      

三、可重入鎖分析

getLock方法,構造了一個RedissonLock對象

public RLock getLock(String name) {
        return new RedissonLock(commandExecutor, name);
    }      

RedissonLock構造方法:

public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {
        super(commandExecutor, name);
        //指令執行器
        this.commandExecutor = commandExecutor;
        //初次生成鎖時,指定的過期時間,預設是30秒,用于避免程式當機而導緻鎖無法釋放
        this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();
        //用于釋出訂閱
        this.pubSub = commandExecutor.getConnectionManager().getSubscribeService().getLockPubSub();
    }      

進入到lock方法中:

public void lock() {
        try {
            lock(-1, null, false);
        } catch (InterruptedException e) {
            throw new IllegalStateException();
        }
    }      

調用的是本類中的lock方法,leaseTime=-1,代表沒有指定過期時間。

private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
        //擷取線程id
        long threadId = Thread.currentThread().getId();
        //如果能擷取到鎖,則剩餘存活時間ttl為空
        Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
        if (ttl == null) {
            return;
        }

        //如果擷取不到鎖,則訂閱該線程釋放鎖的消息
        RFuture<RedissonLockEntry> future = subscribe(threadId);
        if (interruptibly) {
            //可中斷式同步訂閱
            commandExecutor.syncSubscriptionInterrupted(future);
        } else {
            //由于傳入的是false,是以會走該方法
            commandExecutor.syncSubscription(future);
        }

        try {
            while (true) {
                //第一次擷取鎖失敗後,後續進行自旋
                ttl = tryAcquire(-1, leaseTime, unit, threadId); 
                if (ttl == null) {
                    //擷取到則結束死循環
                    break;
                }

                //阻塞等待釋放鎖的消息
                if (ttl >= 0) {
                    try {
                        //在ttl時間内阻塞擷取鎖,内部是靠Semaphore來實作阻塞的
                        future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                    } catch (InterruptedException e) {
                        if (interruptibly) {
                            throw e;
                        }
                        future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                    }
                } else {
                    //ttl小于0,繼續阻塞等待
                    if (interruptibly) {
                        future.getNow().getLatch().acquire();
                    } else {
                        future.getNow().getLatch().acquireUninterruptibly();
                    }
                }
            }
        } finally {
            //此時已經擷取到鎖,或者出現中斷異常,則取消訂閱
            unsubscribe(future, threadId);
        }
    }      

進入核心的tryAcquire方法:

private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
        //get會進行同步等待,類似Future.get
        return get(tryAcquireAsync(waitTime, leaseTime, unit, threadId));
    }      

tryAcquireAsync方法:

private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
        RFuture<Long> ttlRemainingFuture;
        if (leaseTime != -1) {
            //指定鎖的過期時間
            ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
        } else {
            //由于leaseTime=-1,是以走該方法去異步擷取鎖
            ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
                    TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
        }
        //ttlRemainingFuture有結果後,會執行該方法,内部借用semaphore來實作
        ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
            if (e != null) {
                return;
            }

            //ttlRemaining為null,代表擷取到鎖
            if (ttlRemaining == null) {
                if (leaseTime != -1) {
                    internalLockLeaseTime = unit.toMillis(leaseTime);
                } else {
                    //進行鎖的續期
                    scheduleExpirationRenewal(threadId);
                }
            }
        });
        return ttlRemainingFuture;
    }      

tryLockInnerAsync方法内部:

<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
        return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
                "if (redis.call('exists', KEYS[1]) == 0) then " +
                        "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                        "return nil; " +
                        "end; " +
                        "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                        "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                        "return nil; " +
                        "end; " +
                        "return redis.call('pttl', KEYS[1]);",
                Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
    }      

這裡使用具有原子性的lua腳本并通過Netty進行網絡傳輸,詳細看下這段腳本

(這裡插一句,為什麼lua腳本具有原子性?因為在執行該腳本時,會排斥其他lua腳本和指令,不過lua腳本無法保證事務性。)

方法參數

  • waitTime,值為-1,代表未指定過期時間
  • leaseTime,值為30秒
  • unit,時間機關,毫秒
  • threadId,線程ID
  • command,redis指令類型,此處是eval

腳本參數

  • KEYS[1],getRawName(),鎖的key,這裡就是SunAlwaysOnline
  • ARGV[1],unit.toMillis(leaseTime),鎖的過期時間,這裡是30*1000毫秒
  • ARGV[2],getLockName(threadId),UUID+線程id

腳本含義

//如果key不存在
if (redis.call('exists', KEYS[1]) == 0) then 
    //建立一個hash結構,該key的字段值被初始化為1
    redis.call('hincrby', KEYS[1], ARGV[2], 1); 
    //設定過期時間
    redis.call('pexpire', KEYS[1], ARGV[1]);
    //傳回null
    return nil; 
end; 
//如果key存在,且目前線程持有該鎖
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
    //字段值加1,對同一線程的加鎖統計,以此來實作可重入
    redis.call('hincrby', KEYS[1], ARGV[2], 1); 
    //重新整理過期時間
    redis.call('pexpire', KEYS[1], ARGV[1]); 
    //傳回null
    return nil; 
end; 
//鎖被其他線程占用,傳回剩餘過期時間
return redis.call('pttl', KEYS[1]);      

加鎖後,可以使用hgetall SunAlwaysOnline檢視申請到的鎖

127.0.0.1:6379> hgetall SunAlwaysOnline
1) "2f160d0b-3112-4c78-a1b3-2d7f123ce216:78"
2) "1"      

在以SunAlwaysOnline為key哈希結構中,2f160d0b-3112-4c78-a1b3-2d7f123ce216:78是其中的一個字段,其值為1.

2f160d0b-3112-4c78-a1b3-2d7f123ce216是UUID,78為線程id。使用UUID+threadId的格式,可以來區分不同機器上出現相同線程id的情況。

繼續看解鎖的邏輯:

public void unlock() {
        try {
            get(unlockAsync(Thread.currentThread().getId()));
        } catch (RedisException e) {
            if (e.getCause() instanceof IllegalMonitorStateException) {
                throw (IllegalMonitorStateException) e.getCause();
            } else {
                throw e;
            }
        }
    }      

調用的是RedissonBaseLock類中的unlockAsync方法:

public RFuture<Void> unlockAsync(long threadId) {
        RPromise<Void> result = new RedissonPromise<>();
        //異步解鎖
        RFuture<Boolean> future = unlockInnerAsync(threadId);
        //解鎖完成後的回調
        future.onComplete((opStatus, e) -> {
            //取消對鎖的續期
            cancelExpirationRenewal(threadId);

            if (e != null) {
                result.tryFailure(e);
                return;
            }
            
            if (opStatus == null) {
                //該線程嘗試去釋放别的線程加的鎖,是以抛出異常
                IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
                        + id + " thread-id: " + threadId);
                result.tryFailure(cause);
                return;
            }

            result.trySuccess(null);
        });

        return result;
    }      

進入到核心的unlockInnerAsync方法中:

protected RFuture<Boolean> unlockInnerAsync(long threadId) {
        return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                        "return nil;" +
                        "end; " +
                        "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
                        "if (counter > 0) then " +
                        "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                        "return 0; " +
                        "else " +
                        "redis.call('del', KEYS[1]); " +
                        "redis.call('publish', KEYS[2], ARGV[1]); " +
                        "return 1; " +
                        "end; " +
                        "return nil;",
                Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
    }      

依然是一段lua腳本,在這裡解釋下:

腳本參數

  • KEYS[1],getRawName(),鎖的key,即SunAlwaysOnline
  • KEYS[2],getChannelName(),channel名稱,内容為redisson_lock__channel:{SunAlwaysOnline}
  • ARGV[1],LockPubSub.UNLOCK_MESSAGE,解鎖的消息,值為0
  • ARGV[2],internalLockLeaseTime,預設30秒
  • ARGV[3],getLockName(threadId),UUID+線程id

腳本含義

//如果解鎖線程和加鎖線程不是一個線程時,直接傳回null
if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then
    return nil;
end;
//使用hincrby使得字段值減1
local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1);
//如果剩餘解鎖次數大于0
if (counter > 0) then
    //重新整理過期時間,傳回0
    redis.call('pexpire', KEYS[1], ARGV[2]);
    return 0;
else
    //剩餘次數為0,可以直接釋放鎖
    redis.call('del', KEYS[1]);
    //往指定channel中釋出鎖被釋放的消息,并傳回1
    redis.call('publish', KEYS[2], ARGV[1]);
    return 1;
end;
return nil      

Redisson通過建立一個hash結構,其中有一個字段為UUID+線程ID,字段值表示鎖的重入次數,以此來實作可重入鎖。

如果沒有指定鎖的過期時間,那麼鎖續期是怎麼做的呢?

四、鎖續期分析

上文我們在分析可重入鎖加鎖時,其tryAcquireAsync方法中就表明了鎖續期的入口,即scheduleExpirationRenewal方法。

在未指定鎖的過期時間時,才會進行對鎖的續期。

protected void scheduleExpirationRenewal(long threadId) {
        //續期對象,記錄線程的重入次數,同時還會把續期任務存入timeout中
        ExpirationEntry entry = new ExpirationEntry();
        //getEntryName為UUID+key,存入緩存,下次續期直接使用
        ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
        if (oldEntry != null) {
            //進行線程的可重入
            oldEntry.addThreadId(threadId);
        } else {
            //第一次加鎖
            entry.addThreadId(threadId);
            try {
                //開啟續期
                renewExpiration();
            } finally {
                if (Thread.currentThread().isInterrupted()) {
                    //如果目前線程被中斷,取消目前線程對鎖的續期。
                    //在續期對象中,如果沒有可用于續期的線程id,則取消整個續期任務
                    cancelExpirationRenewal(threadId);
                }
            }
        }
    }      

這裡面其實隻調用了renewExpiration方法:

private void renewExpiration() {
        //擷取續期對象
        ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
        if (ee == null) {
            return;
        }
        //啟動一個延時任務,内部依靠的是Netty的時間輪算法
        Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
            @Override
            public void run(Timeout timeout) throws Exception {
                ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
                //擷取不到,說明已經結束續期
                if (ent == null) {
                    return;
                }
                //擷取第一個可用的線程id
                Long threadId = ent.getFirstThreadId();
                if (threadId == null) {
                    return;
                }
                //異步使用lua腳本進行續期
                RFuture<Boolean> future = renewExpirationAsync(threadId);
                //執行完成後,觸發該回調
                future.onComplete((res, e) -> {
                    if (e != null) {
                        //續期出現異常後,則取消之後的所有續期,并終止該任務
                        log.error("Can't update lock " + getRawName() + " expiration", e);
                        EXPIRATION_RENEWAL_MAP.remove(getEntryName());
                        return;
                    }
                    
                    if (res) {
                        //續期成功,遞歸調用本方法進行下次續期
                        renewExpiration();
                    } else {
                        //說明鎖已經被主動釋放,取消後續的所有續期
                        cancelExpirationRenewal(null);
                    }
                });
            }
        //internalLockLeaseTime 預設是30秒,是以這裡每10秒進行一次續期
        }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
        //在續期對象中儲存該任務
        ee.setTimeout(task);
    }      

在續期成功的情況下,會不斷進行遞歸調用,進而開啟下一輪任務。

當然不可能出現無限遞歸的情況,每次遞歸前都會先從緩存中擷取續期對象。

如果續期對象不存在,或内部不存在任何可用的線程id,以及續期失敗後,都會直接結束續期任務。

接着看renewExpirationAsync方法:

protected RFuture<Boolean> renewExpirationAsync(long threadId) {
        return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                        "return 1; " +
                        "end; " +
                        "return 0;",
                Collections.singletonList(getRawName()),
                internalLockLeaseTime, getLockName(threadId));
    }      

如果key存在且字段值是指定線程的情況下,重新整理鎖的過期時間,傳回true。否則,傳回false。

當然,這裡的鎖續期也叫做“Watch Dog”,即看門狗。下次面試官問到看門狗,别傻乎乎地不知道了。

那麼看門狗這種奇怪的名字,到底是怎麼來的呢?

源于internalLockLeaseTime字段,它是這樣指派的:

this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();      

五、後語

Redisson是使用hash結構來實作分布式鎖的,将UUID+線程ID作為其中一個字段,重入次數作為value。

在某個線程擷取鎖失敗後,會訂閱鎖的釋放消息,接着進行自旋的阻塞式擷取鎖。

内部封裝了lua腳本,來實作指令的原子性,可以避免以下問題:

  • 建立鎖與設定過期時間的原子性,防止建立完鎖,用戶端當機,導緻鎖永遠無法釋放
  • 檢驗鎖與删除鎖的原子性,防止檢驗通過後,直接删除其他用戶端剛申請的鎖

Redisson的看門狗機制,有效地解決了通過經驗指定過期時間導緻鎖提前被釋放的難題。

繼續閱讀