前言
在項目中我們經常有需要使用分布式鎖的場景,而Redis是實作分布式鎖最常見的一種方式,這篇文章主要是使用Go+Redis實作互斥鎖和紅鎖。
下面的代碼使用go-redis用戶端和gofakeit]庫。
互斥鎖
Redis裡有一個
設定如果不存在
的指令,我們可以通過這個指令來實作互斥鎖功能,在Redis官方文檔裡面推薦的标準實作方式是
SET resource_name my_random_value NX PX 30000
這串指令,其中:
-
表示要鎖定的資源resource_name
-
表示如果不存在則設定NX
-
表示過期時間為30000毫秒,也就是30秒PX 30000
-
這個值在所有的用戶端必須是唯一的,所有同一key的鎖競争者這個值都不能一樣。my_random_value
值必須是随機數主要是為了更安全的釋放鎖,釋放鎖的時候使用腳本告訴Redis:隻有key存在并且存儲的值和我指定的值一樣才能告訴我删除成功,避免錯誤釋放别的競争者的鎖。
由于涉及到兩個操作,是以我們需要通過Lua腳本保證操作的原子性:
if redis.call("get",KEYS[1]) == ARGV[1] then
return redis.call("del",KEYS[1])
else
return 0
end
舉個不用Lua腳本的例子:用戶端A取得資源鎖,但是緊接着被一個其他操作阻塞了,當用戶端A運作完畢其他操作後要釋放鎖時,原來的鎖早已逾時并且被Redis自動釋放,并且在這期間資源鎖又被用戶端B再次擷取到。
因為判斷和删除是兩個操作,是以有可能A剛判斷完鎖就過期自動釋放了,然後B就擷取到了鎖,然後A又調用了Del,導緻把B的鎖給釋放了。
TryLock和Unlock實作
TryLock
其實就是使用
SET resource_name my_random_value NX PX 30000
加鎖,這裡使用
UUID
作為随機值,并且在加鎖成功時把随機值傳回,這個随機值會在
Unlock
時使用;
Unlock
解鎖邏輯就是執行前面說到的
lua腳本
。
func (l *Lock) TryLock(ctx context.Context) error {
success, err := l.client.SetNX(ctx, l.resource, l.randomValue, ttl).Result()
if err != nil {
return err
}
// 加鎖失敗
if !success {
return ErrLockFailed
}
// 加鎖成功
l.randomValue = randomValue
return nil
}
func (l *Lock) Unlock(ctx context.Context) error {
return l.script.Run(ctx, l.client, []string{l.resource}, l.randomValue).Err()
}
Lock實作
Lock
是阻塞的擷取鎖,是以在加鎖失敗的時候,需要重試。當然也可能出現其他異常情況(比如網絡問題,請求逾時等),這些情況則直接傳回
error
。
步驟如下:
- 嘗試加鎖,加鎖成功直接傳回
- 加鎖失敗則不斷循環嘗試加鎖直到成功或出現異常情況
func (l *Lock) Lock(ctx context.Context) error {
// 嘗試加鎖
err := l.TryLock(ctx)
if err == nil {
return nil
}
if !errors.Is(err, ErrLockFailed) {
return err
}
// 加鎖失敗,不斷嘗試
ticker := time.NewTicker(l.tryLockInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
// 逾時
return ErrTimeout
case <-ticker.C:
// 重新嘗試加鎖
err := l.TryLock(ctx)
if err == nil {
return nil
}
if !errors.Is(err, ErrLockFailed) {
return err
}
}
}
}
實作看門狗機制
我們前面的例子中提到的互斥鎖有一個小問題,就是如果持有鎖用戶端A被阻塞,那麼A的鎖可能會逾時被自動釋放,導緻用戶端B提前擷取到鎖。
為了減少這種情況的發生,我們可以在A持有鎖期間,不斷地延長鎖的過期時間,減少用戶端B提前擷取到鎖的情況,這就是看門狗機制。
當然,這沒辦法完全避免上述情況的發生,因為如果用戶端A擷取鎖之後,剛好與Redis的連接配接關閉了,這時候也就沒辦法延長逾時時間了。
看門狗實作
加鎖成功時啟動一個線程,不斷地延長鎖地過期時間;在Unlock時關閉看門狗線程。
看門狗流程如下:
- 加鎖成功,啟動看門狗
- 看門狗線程不斷延長鎖的過程時間
- 解鎖,關閉看門狗
func (l *Lock) startWatchDog() {
ticker := time.NewTicker(l.ttl / 3)
defer ticker.Stop()
for {
select {
case <-ticker.C:
// 延長鎖的過期時間
ctx, cancel := context.WithTimeout(context.Background(), l.ttl/3*2)
ok, err := l.client.Expire(ctx, l.resource, l.ttl).Result()
cancel()
// 異常或鎖已經不存在則不再續期
if err != nil || !ok {
return
}
case <-l.watchDog:
// 已經解鎖
return
}
}
}
TryLock:啟動看門狗
func (l *Lock) TryLock(ctx context.Context) error {
success, err := l.client.SetNX(ctx, l.resource, l.randomValue, l.ttl).Result()
if err != nil {
return err
}
// 加鎖失敗
if !success {
return ErrLockFailed
}
// 加鎖成功,啟動看門狗
go l.startWatchDog()
return nil
}
Unlock:關閉看門狗
func (l *Lock) Unlock(ctx context.Context) error {
err := l.script.Run(ctx, l.client, []string{l.resource}, l.randomValue).Err()
// 關閉看門狗
close(l.watchDog)
return err
}
紅鎖
由于上面的實作是基于單Redis執行個體,如果這個唯一的執行個體挂了,那麼所有請求都會因為拿不到鎖而失敗,為了提高容錯性,我們可以使用多個分布在不同機器上的Redis執行個體,并且隻要拿到其中大多數節點的鎖就能加鎖成功,這就是紅鎖算法。它其實也是基于上面的單執行個體算法的,隻是我們需要同時對多個Redis執行個體擷取鎖。
加鎖實作
在加鎖邏輯裡,我們主要是對每個Redis執行個體執行
SET resource_name my_random_value NX PX 30000
擷取鎖,然後把成功擷取鎖的用戶端放到一個
channel
裡(這裡因為是多線程并發擷取鎖,使用slice可能有并發問題),同時使用
sync.WaitGroup
等待所有擷取鎖操作結束。
然後判斷成功擷取到的鎖的數量是否大于一半,如果沒有得到一半以上的鎖,說明加鎖失敗,釋放已經獲得的鎖。
func (l *RedLock) TryLock(ctx context.Context) error {
randomValue := gofakeit.UUID()
var wg sync.WaitGroup
wg.Add(len(l.clients))
// 成功獲得鎖的Redis執行個體的用戶端
successClients := make(chan *redis.Client, len(l.clients))
for _, client := range l.clients {
go func(client *redis.Client) {
defer wg.Done()
success, err := client.SetNX(ctx, l.resource, randomValue, ttl).Result()
if err != nil {
return
}
// 加鎖失敗
if !success {
return
}
// 加鎖成功,啟動看門狗
go l.startWatchDog()
successClients <- client
}(client)
}
// 等待所有擷取鎖操作完成
wg.Wait()
close(successClients)
// 如果成功加鎖得用戶端少于用戶端數量的一半+1,表示加鎖失敗
if len(successClients) < len(l.clients)/2+1 {
// 就算加鎖失敗,也要把已經獲得的鎖給釋放掉
for client := range successClients {
go func(client *redis.Client) {
ctx, cancel := context.WithTimeout(context.Background(), ttl)
l.script.Run(ctx, client, []string{l.resource}, randomValue)
cancel()
}(client)
}
return ErrLockFailed
}
// 加鎖成功,啟動看門狗
l.randomValue = randomValue
l.successClients = nil
for successClient := range successClients {
l.successClients = append(l.successClients, successClient)
}
return nil
}
看門狗實作
func (l *RedLock) startWatchDog() {
l.watchDog = make(chan struct{})
ticker := time.NewTicker(resetTTLInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
// 延長鎖的過期時間
for _, client := range l.successClients {
go func(client *redis.Client) {
ctx, cancel := context.WithTimeout(context.Background(), ttl-resetTTLInterval)
client.Expire(ctx, l.resource, ttl)
cancel()
}(client)
}
case <-l.watchDog:
// 已經解鎖
return
}
}
}
解鎖實作
func (l *RedLock) Unlock(ctx context.Context) error {
for _, client := range l.successClients {
go func(client *redis.Client) {
l.script.Run(ctx, client, []string{l.resource}, l.randomValue)
}(client)
}
// 關閉看門狗
close(l.watchDog)
return nil
}