天天看點

Go+Redis實作分布式互斥鎖和紅鎖

前言

在項目中我們經常有需要使用分布式鎖的場景,而Redis是實作分布式鎖最常見的一種方式,這篇文章主要是使用Go+Redis實作互斥鎖和紅鎖。

下面的代碼使用go-redis用戶端和gofakeit]庫。

互斥鎖

Redis裡有一個​

​設定如果不存在​

​的指令,我們可以通過這個指令來實作互斥鎖功能,在Redis官方文檔裡面推薦的标準實作方式是​

​SET resource_name my_random_value NX PX 30000​

​這串指令,其中:

  • ​resource_name​

    ​表示要鎖定的資源
  • ​NX​

    ​表示如果不存在則設定
  • ​PX 30000​

    ​表示過期時間為30000毫秒,也就是30秒
  • ​my_random_value​

    ​這個值在所有的用戶端必須是唯一的,所有同一key的鎖競争者這個值都不能一樣。

值必須是随機數主要是為了更安全的釋放鎖,釋放鎖的時候使用腳本告訴Redis:隻有key存在并且存儲的值和我指定的值一樣才能告訴我删除成功,避免錯誤釋放别的競争者的鎖。

由于涉及到兩個操作,是以我們需要通過Lua腳本保證操作的原子性:

if redis.call("get",KEYS[1]) == ARGV[1] then
    return redis.call("del",KEYS[1])
else
    return 0
end      

舉個不用Lua腳本的例子:用戶端A取得資源鎖,但是緊接着被一個其他操作阻塞了,當用戶端A運作完畢其他操作後要釋放鎖時,原來的鎖早已逾時并且被Redis自動釋放,并且在這期間資源鎖又被用戶端B再次擷取到。

因為判斷和删除是兩個操作,是以有可能A剛判斷完鎖就過期自動釋放了,然後B就擷取到了鎖,然後A又調用了Del,導緻把B的鎖給釋放了。

TryLock和Unlock實作

​TryLock​

​其實就是使用​

​SET resource_name my_random_value NX PX 30000​

​加鎖,這裡使用​

​UUID​

​作為随機值,并且在加鎖成功時把随機值傳回,這個随機值會在​

​Unlock​

​時使用;

​Unlock​

​解鎖邏輯就是執行前面說到的​

​lua腳本​

​。

func (l *Lock) TryLock(ctx context.Context) error {
   success, err := l.client.SetNX(ctx, l.resource, l.randomValue, ttl).Result()
   if err != nil {
      return err
   }
   // 加鎖失敗
   if !success {
      return ErrLockFailed
   }
   // 加鎖成功
   l.randomValue = randomValue
   return nil
}

func (l *Lock) Unlock(ctx context.Context) error {
   return l.script.Run(ctx, l.client, []string{l.resource}, l.randomValue).Err()
}      

Lock實作

​Lock​

​是阻塞的擷取鎖,是以在加鎖失敗的時候,需要重試。當然也可能出現其他異常情況(比如網絡問題,請求逾時等),這些情況則直接傳回​

​error​

​。

步驟如下:

  • 嘗試加鎖,加鎖成功直接傳回
  • 加鎖失敗則不斷循環嘗試加鎖直到成功或出現異常情況
func (l *Lock) Lock(ctx context.Context) error {
  // 嘗試加鎖
  err := l.TryLock(ctx)
  if err == nil {
    return nil
  }
  if !errors.Is(err, ErrLockFailed) {
    return err
  }
  // 加鎖失敗,不斷嘗試
  ticker := time.NewTicker(l.tryLockInterval)
  defer ticker.Stop()
  for {
    select {
    case <-ctx.Done():
      // 逾時
      return ErrTimeout
    case <-ticker.C:
      // 重新嘗試加鎖
      err := l.TryLock(ctx)
      if err == nil {
        return nil
      }
      if !errors.Is(err, ErrLockFailed) {
        return err
      }
    }
  }
}      

實作看門狗機制

我們前面的例子中提到的互斥鎖有一個小問題,就是如果持有鎖用戶端A被阻塞,那麼A的鎖可能會逾時被自動釋放,導緻用戶端B提前擷取到鎖。

為了減少這種情況的發生,我們可以在A持有鎖期間,不斷地延長鎖的過期時間,減少用戶端B提前擷取到鎖的情況,這就是看門狗機制。

當然,這沒辦法完全避免上述情況的發生,因為如果用戶端A擷取鎖之後,剛好與Redis的連接配接關閉了,這時候也就沒辦法延長逾時時間了。

看門狗實作

加鎖成功時啟動一個線程,不斷地延長鎖地過期時間;在Unlock時關閉看門狗線程。

看門狗流程如下:

  • 加鎖成功,啟動看門狗
  • 看門狗線程不斷延長鎖的過程時間
  • 解鎖,關閉看門狗
func (l *Lock) startWatchDog() {
  ticker := time.NewTicker(l.ttl / 3)
  defer ticker.Stop()
  for {
    select {
    case <-ticker.C:
      // 延長鎖的過期時間
      ctx, cancel := context.WithTimeout(context.Background(), l.ttl/3*2)
      ok, err := l.client.Expire(ctx, l.resource, l.ttl).Result()
      cancel()
      // 異常或鎖已經不存在則不再續期
      if err != nil || !ok {
        return
      }
    case <-l.watchDog:
      // 已經解鎖
      return
    }
  }
}      

TryLock:啟動看門狗

func (l *Lock) TryLock(ctx context.Context) error {
  success, err := l.client.SetNX(ctx, l.resource, l.randomValue, l.ttl).Result()
  if err != nil {
    return err
  }
  // 加鎖失敗
  if !success {
    return ErrLockFailed
  }
  // 加鎖成功,啟動看門狗
  go l.startWatchDog()
  return nil
}      

Unlock:關閉看門狗

func (l *Lock) Unlock(ctx context.Context) error {
  err := l.script.Run(ctx, l.client, []string{l.resource}, l.randomValue).Err()
  // 關閉看門狗
  close(l.watchDog)
  return err
}      

紅鎖

由于上面的實作是基于單Redis執行個體,如果這個唯一的執行個體挂了,那麼所有請求都會因為拿不到鎖而失敗,為了提高容錯性,我們可以使用多個分布在不同機器上的Redis執行個體,并且隻要拿到其中大多數節點的鎖就能加鎖成功,這就是紅鎖算法。它其實也是基于上面的單執行個體算法的,隻是我們需要同時對多個Redis執行個體擷取鎖。

加鎖實作

在加鎖邏輯裡,我們主要是對每個Redis執行個體執行​

​SET resource_name my_random_value NX PX 30000​

​擷取鎖,然後把成功擷取鎖的用戶端放到一個​

​channel​

​裡(這裡因為是多線程并發擷取鎖,使用slice可能有并發問題),同時使用​

​sync.WaitGroup​

​等待所有擷取鎖操作結束。

然後判斷成功擷取到的鎖的數量是否大于一半,如果沒有得到一半以上的鎖,說明加鎖失敗,釋放已經獲得的鎖。

func (l *RedLock) TryLock(ctx context.Context) error {
  randomValue := gofakeit.UUID()
  var wg sync.WaitGroup
  wg.Add(len(l.clients))
  // 成功獲得鎖的Redis執行個體的用戶端
  successClients := make(chan *redis.Client, len(l.clients))
  for _, client := range l.clients {
    go func(client *redis.Client) {
      defer wg.Done()
      success, err := client.SetNX(ctx, l.resource, randomValue, ttl).Result()
      if err != nil {
        return
      }
      // 加鎖失敗
      if !success {
        return
      }
      // 加鎖成功,啟動看門狗
      go l.startWatchDog()
      successClients <- client
    }(client)
  }
  // 等待所有擷取鎖操作完成
  wg.Wait()
  close(successClients)
  // 如果成功加鎖得用戶端少于用戶端數量的一半+1,表示加鎖失敗
  if len(successClients) < len(l.clients)/2+1 {
    // 就算加鎖失敗,也要把已經獲得的鎖給釋放掉
    for client := range successClients {
      go func(client *redis.Client) {
        ctx, cancel := context.WithTimeout(context.Background(), ttl)
        l.script.Run(ctx, client, []string{l.resource}, randomValue)
        cancel()
      }(client)
    }
    return ErrLockFailed
  }

  // 加鎖成功,啟動看門狗
  l.randomValue = randomValue
  l.successClients = nil
  for successClient := range successClients {
    l.successClients = append(l.successClients, successClient)
  }

  return nil
}      

看門狗實作

func (l *RedLock) startWatchDog() {
  l.watchDog = make(chan struct{})
  ticker := time.NewTicker(resetTTLInterval)
  defer ticker.Stop()
  for {
    select {
    case <-ticker.C:
      // 延長鎖的過期時間
      for _, client := range l.successClients {
        go func(client *redis.Client) {
          ctx, cancel := context.WithTimeout(context.Background(), ttl-resetTTLInterval)
          client.Expire(ctx, l.resource, ttl)
          cancel()
        }(client)
      }
    case <-l.watchDog:
      // 已經解鎖
      return
    }
  }
}      

解鎖實作

func (l *RedLock) Unlock(ctx context.Context) error {
   for _, client := range l.successClients {
      go func(client *redis.Client) {
         l.script.Run(ctx, client, []string{l.resource}, l.randomValue)
      }(client)
   }
   // 關閉看門狗
   close(l.watchDog)
   return nil
}      

繼續閱讀