普通實作
說道Redis分布式鎖大部分人都會想到:
setnx+lua
,或者知道
setkey value px milliseconds nx
。後一種方式的核心實作指令如下:
-
- 擷取鎖(unique_value可以是UUID等)
-
SET resource_name unique_value NX PX 30000
-
- 釋放鎖(lua腳本中,一定要比較value,防止誤解鎖)
-
if redis.call("get",KEYS[1]) == ARGV[1] then
-
return redis.call("del",KEYS[1])
-
else
-
return 0
-
end
這種實作方式有3大要點(也是面試機率非常高的地方):
- set指令要用
;setkey value px milliseconds nx
- value要具有唯一性;
- 釋放鎖時要驗證value值,不能誤解鎖;
事實上這類瑣最大的缺點就是它加鎖時隻作用在一個Redis節點上,即使Redis通過sentinel保證高可用,如果這個master節點由于某些原因發生了主從切換,那麼就會出現鎖丢失的情況:
- 在Redis的master節點上拿到了鎖;
- 但是這個加鎖的key還沒有同步到slave節點;
- master故障,發生故障轉移,slave節點更新為master節點;
- 導緻鎖丢失。
正因為如此,Redis作者antirez基于分布式環境下提出了一種更進階的分布式鎖的實作方式:Redlock。筆者認為,Redlock也是Redis所有分布式鎖實作方式中唯一能讓面試官高潮的方式。
Redlock實作
antirez提出的redlock算法大概是這樣的:
在Redis的分布式環境中,我們假設有N個Redis master。這些節點完全互相獨立,不存在主從複制或者其他叢集協調機制。我們確定将在N個執行個體上使用與在Redis單執行個體下相同方法擷取和釋放鎖。現在我們假設有5個Redis master節點,同時我們需要在5台伺服器上面運作這些Redis執行個體,這樣保證他們不會同時都宕掉。
為了取到鎖,用戶端應該執行以下操作:
- 擷取目前Unix時間,以毫秒為機關。
- 依次嘗試從5個執行個體,使用相同的key和具有唯一性的value(例如UUID)擷取鎖。當向Redis請求擷取鎖時,用戶端應該設定一個網絡連接配接和響應逾時時間,這個逾時時間應該小于鎖的失效時間。例如你的鎖自動失效時間為10秒,則逾時時間應該在5-50毫秒之間。這樣可以避免伺服器端Redis已經挂掉的情況下,用戶端還在死死地等待響應結果。如果伺服器端沒有在規定時間内響應,用戶端應該盡快嘗試去另外一個Redis執行個體請求擷取鎖。
- 用戶端使用目前時間減去開始擷取鎖時間(步驟1記錄的時間)就得到擷取鎖使用的時間。當且僅當從大多數(N/2+1,這裡是3個節點)的Redis節點都取到鎖,并且使用的時間小于鎖失效時間時,鎖才算擷取成功。
- 如果取到了鎖,key的真正有效時間等于有效時間減去擷取鎖所使用的時間(步驟3計算的結果)。
- 如果因為某些原因,擷取鎖失敗(沒有在至少N/2+1個Redis執行個體取到鎖或者取鎖時間已經超過了有效時間),用戶端應該在所有的Redis執行個體上進行解鎖(即便某些Redis執行個體根本就沒有加鎖成功,防止某些節點擷取到鎖但是用戶端沒有得到響應而導緻接下來的一段時間不能被重新擷取鎖)。
Redlock源碼
redisson已經有對redlock算法封裝,接下來對其用法進行簡單介紹,并對核心源碼進行分析(假設5個redis執行個體)。
POM依賴
-
<!-- https://mvnrepository.com/artifact/org.redisson/redisson -->
-
<dependency>
-
<groupId>org.redisson</groupId>
-
<artifactId>redisson</artifactId>
-
<version>3.3.2</version>
-
</dependency>
用法
首先,我們來看一下redission封裝的redlock算法實作的分布式鎖用法,非常簡單,跟重入鎖(ReentrantLock)有點類似:
-
Config config = new Config();
-
config.useSentinelServers().addSentinelAddress("127.0.0.1:6369","127.0.0.1:6379", "127.0.0.1:6389")
-
.setMasterName("masterName")
-
.setPassword("password").setDatabase(0);
-
RedissonClient redissonClient = Redisson.create(config);
-
// 還可以getFairLock(), getReadWriteLock()
-
RLock redLock = redissonClient.getLock("REDLOCK_KEY");
-
boolean isLock;
-
try {
-
isLock = redLock.tryLock();
-
// 500ms拿不到鎖, 就認為擷取鎖失敗。10000ms即10s是鎖失效時間。
-
isLock = redLock.tryLock(500, 10000, TimeUnit.MILLISECONDS);
-
if (isLock) {
-
//TODO if get lock success, do something;
-
}
-
} catch (Exception e) {
-
} finally {
-
// 無論如何, 最後都要解鎖
-
redLock.unlock();
-
}
唯一ID
實作分布式鎖的一個非常重要的點就是set的value要具有唯一性,redisson的value是怎樣保證value的唯一性呢?答案是UUID+threadId。入口在redissonClient.getLock("REDLOCK_KEY"),源碼在Redisson.java和RedissonLock.java中:
-
protected final UUID id = UUID.randomUUID();
-
String getLockName(long threadId) {
-
return id + ":" + threadId;
-
}
擷取鎖
擷取鎖的代碼為redLock.tryLock()或者redLock.tryLock(500, 10000, TimeUnit.MILLISECONDS),兩者的最終核心源碼都是下面這段代碼,隻不過前者擷取鎖的預設租約時間(leaseTime)是LOCKEXPIRATIONINTERVAL_SECONDS,即30s:
-
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
-
internalLockLeaseTime = unit.toMillis(leaseTime);
-
// 擷取鎖時向5個redis執行個體發送的指令
-
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
-
// 首先分布式鎖的KEY不能存在,如果确實不存在,那麼執行hset指令(hset REDLOCK_KEY uuid+threadId 1),并通過pexpire設定失效時間(也是鎖的租約時間)
-
"if (redis.call('exists', KEYS[1]) == 0) then " +
-
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
-
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
-
"return nil; " +
-
"end; " +
-
// 如果分布式鎖的KEY已經存在,并且value也比對,表示是目前線程持有的鎖,那麼重入次數加1,并且設定失效時間
-
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
-
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
-
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
-
"return nil; " +
-
"end; " +
-
// 擷取分布式鎖的KEY的失效時間毫秒數
-
"return redis.call('pttl', KEYS[1]);",
-
// 這三個參數分别對應KEYS[1],ARGV[1]和ARGV[2]
-
Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
-
}
擷取鎖的指令中,
- KEYS[1] 就是Collections.singletonList(getName()),表示分布式鎖的key,即REDLOCK_KEY;
- ARGV[1] 就是internalLockLeaseTime,即鎖的租約時間,預設30s;
- ARGV[2] 就是getLockName(threadId),是擷取鎖時set的唯一值,即UUID+threadId:
釋放鎖
釋放鎖的代碼為redLock.unlock(),核心源碼如下:
-
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
-
// 向5個redis執行個體都執行如下指令
-
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
-
// 如果分布式鎖KEY不存在,那麼向channel釋出一條消息
-
"if (redis.call('exists', KEYS[1]) == 0) then " +
-
"redis.call('publish', KEYS[2], ARGV[1]); " +
-
"return 1; " +
-
"end;" +
-
// 如果分布式鎖存在,但是value不比對,表示鎖已經被占用,那麼直接傳回
-
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
-
"return nil;" +
-
"end; " +
-
// 如果就是目前線程占有分布式鎖,那麼将重入次數減1
-
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
-
// 重入次數減1後的值如果大于0,表示分布式鎖有重入過,那麼隻設定失效時間,還不能删除
-
"if (counter > 0) then " +
-
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
-
"return 0; " +
-
"else " +
-
// 重入次數減1後的值如果為0,表示分布式鎖隻擷取過1次,那麼删除這個KEY,并釋出解鎖消息
-
"redis.call('del', KEYS[1]); " +
-
"redis.call('publish', KEYS[2], ARGV[1]); " +
-
"return 1; "+
-
"end; " +
-
"return nil;",
-
// 這5個參數分别對應KEYS[1],KEYS[2],ARGV[1],ARGV[2]和ARGV[3]
-
Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));
-
}
參考:https://redis.io/topics/distlock
原文釋出時間為: 2018-12-02
本文作者:阿飛的部落格
本文來自雲栖社群合作夥伴“
Java技術驿站”,了解相關資訊可以關注“
”。