手把手教你實作基于Redis的分布式鎖
-
概述
目前,分布式系統已經是各大公司的标配,它具有高可用、可擴充等特點。在分布式系統中,由于存在多台機器上的程序競争同一份資源的問題,是以需要分布式鎖來保證同步通路資源。
一個經典的場景就是淘寶雙11秒殺活動,全國人民的用戶端通路不同的後端伺服器,然後後端伺服器再通路資料庫,此時資料庫就是需要同步通路的資源。
在介紹基于Redis實作的分布式鎖之前;以Python語言為例,我們看看根據應用的實作架構,同步鎖可能會有以下幾種類型:
如果處理程式是單程序多線程的,在Python語言中,就可以使用 threading 子產品的 Lock 對象來限制對共享資源的同步通路,實作多線程安全。
單機多程序的情況,在Python語言中,可以使用 multiprocessing 的 Lock 對象來保證多程序安全。
多機多程序部署的情況,需要依賴一個第三方元件(存儲鎖對象)來實作一個分布式的同步鎖。
-
分布式鎖的必要條件
本文主要介紹第三種場景下基于Redis如何實作分布式鎖。現在我們來看看實作一個分布式鎖的必要條件有哪些?
原子性:加鎖和釋放鎖的操作必須滿足原子性
無死鎖:不會發生死鎖(PS:例如已獲得鎖的線程/程序在釋放鎖之前突然異常退出,導緻其他線程/程序會一直在循環等待鎖被釋放)
互斥性:同一個時刻隻能有一個線程/程序占有鎖,其他線程/程序必須等待直到鎖被釋放
可重入性:目前線程/程序獲得鎖之後,還可以繼續調用擷取鎖的操作,第二次以及之後的擷取鎖的操作不會被阻塞等待(PS:釋放鎖的操作也是一樣的,調用多次之後,隻有最後一次釋放鎖的時候才會真正地釋放鎖)--- 這個條件根據業務來決定是否需要實作
-
實作過程
根據分布式鎖的必要條件,下面将給出幾種實作方式,來觀察任意一個條件不滿足時,會出現什麼樣的問題?在實作的過程中将使用同一份測試用例。測試用例代碼如下:
test.py
'''
啟用多個線程對 redis 中的 test_key 的值進行自增操作,理想情況,test_key 的值應該等于線程的數量,比如開了 10 個線程,test_key的值最終應該是10。
def increase(redis, lock, key):
# 獲得鎖
lock_value = lock.get_lock(key)
value = redis.get(key)
# 模拟實際情況下進行的某些耗時操作
time.sleep(0.1)
value += 1
redis.set(key, value)
thread_name = threading.current_thread().name
# 列印線程名和最新的值
print thread_name, new_value
# 釋放鎖
lock.del_lock(key, lock_value)
連接配接服務端
redis = RedisCli(REDIS_CACHE_HOST_LIST, REDIS_CACHE_MASTER_NAME)
lock = RedisLock(redis)
key = 'test_key'
thread_count = 10
redis.delete(key)
for i in xrange(thread_count):
thread = threading.Thread(target=increase, args=(redis, lock, key))
thread.start()
Tips:
下面的代碼片段中隻展示需要修改的部分,其他部分和test.py保持一緻。
3.1 原子性
在這個版本中,當線程 A get(lock_key) 的值為空時,set lock_key 的值為 1,并傳回,這表示線程 A 獲得了鎖,可以繼續執行後面的操作,否則需要一直循環去擷取鎖,直到 key 的值再次為空,重新獲得鎖,執行任務完成後釋放鎖。
class RedisLock(object):
def __init__(self, rediscli):
self.rediscli = rediscli
def _get_lock_key(self, key):
lock_key = "lock_%s" % key
return lock_key
def get_lock(self, key):
lock_key = self._get_lock_key(key)
while True:
value = self.rediscli.get(lock_key)
if not value:
self.rediscli.set(lock_key, '1')
return True
time.sleep(0.01)
def del_lock(self, key, new_expire_time):
lock_key = self._get_lock_key(key)
return self.rediscli.delete(lock_key)
執行test.py測試腳本,得到的結果如下:
Thread-1 1
Thread-5 2
Thread-2 2
Thread-6 3
Thread-7 3
Thread-4 3
Thread-9 4
Thread-8 5
Thread-10 5
Thread-3 5
觀察輸出結果發現,同時有多個線程輸出的結果是一樣的。初看上面加鎖的代碼邏輯似乎沒什麼問題,但是最終的結果卻事與願違,原因是上面的代碼get(lock_key)和set(lock_key, '1')并不是原子性的執行,而是分開執行。A 線程在get(lock_key)的時候發現是空值,于是重新set(lock_key, '1'),但在get操作之後,set操作之前,B 線程恰好執行了get(lock_key),此時B 線程的get操作得到的還是空值,然後也順利獲得鎖,導緻資料被兩個或多個線程同時修改,最後出現不一緻。
3.2 無死鎖
由于3.1的版本是因為get_lock方法不是原子性操作,造成兩個或多個線程同時獲得鎖的問題,這個版本改成使用 redis 的 setnx 指令來進行鎖的查詢和設定操作,setnx 即 set if not exists,顧名思義就是當key不存在的時候才設定 value,并傳回 1,如果 key 已經存在,則不進行任何操作,傳回 0。
隻展示需要修改的部分,其他部分還是和3.1的代碼一樣
def get_lock(self, key):
lock_key = self._get_lock_key(key)
thread_name = threading.current_thread().name
while True:
value = self.rediscli.setnx(lock_key, 1)
if value:
return True
time.sleep(0.01)
print "{} waiting...".format(thread_name)
Thread-4 2
Thread-2 3
Thread-3 4
Thread-7 5
Thread-6 6
Thread-5 7
Thread-8 8
Thread-9 9
Thread-10 10
輸出結果是正确的,但是還有潛在的問題。比如假設 A 線程獲得了鎖後,由于某種異常原因導緻線程crash了,這個時候鎖将無法被釋放。稍微修改一下測試用例的 increase 函數,模拟某個線程在釋放鎖之前因為異常退出。
test-3-2.py
thread_name = threading.current_thread().name
lock_value = lock.get_lock(key)
value = redis.get(key)
if not value:
value = 0
# 模拟實際情況下進行的某些耗時操作
time.sleep(0.1)
value = int(value) + 1
redis.set(key, value)
print thread_name, value
# 模拟線程2異常退出
if thread_name == 'Thread-2':
print '{} crash...'.format(thread_name)
import sys
sys.exit(1)
lock.del_lock(key, lock_value)
執行test-3-2.py測試腳本,得到的結果如下:
Thread-2 crash...
Thread-7 waiting...
Thread-3 waiting...
Thread-5 waiting...
Thread-4 waiting...
Thread-9 waiting...
Thread-6 waiting...
Thread-10 waiting...
此時就會出現問題,當線程2 crash 之後,後續擷取鎖的線程一直擷取不了鎖,一直處于等待鎖的狀态,于是産生了死鎖。如果請求是多線程處理的,比如每來一個請求就開一個線程去處理,那麼堆積的線程會逐漸增多,最終可能會導緻系統崩潰。
當獲得鎖的線程異常退出後,無法主動釋放鎖,是以需要找到一種方式即使線程異常退出,線程占用的鎖也能夠被釋放,顯然我們需要一種被動釋放鎖的機制。從 redis 2.6.12 版本開始,set 指令就已經支援了 nx 和 expire 功能。改進代碼如下:
def get_lock(self, key, timeout=3):
lock_key = self._get_lock_key(key)
while True:
value = self.rediscli.set(lock_key, '1', nx=True, ex=timeout)
if value:
return True
time.sleep(0.01)
Thread-9 2
Thread-2 4
Thread-4 5
Thread-5 6
Thread-8 7
Thread-3 8
Thread-7 9
執行test-3-2.py測試腳本,模拟 線程2 crash,得到的結果如下:
Thread-10 3
Thread-7 4
Thread-8 6
Thread-3 7
Thread-9 8
Thread-6 9
Thread-5 10
從上面的運作結果來看,似乎已經解決了原子性和無死鎖的問題。那第三個條件互斥性是否滿足呢?正常情況下,3.2節的實作方式是滿足互斥性的,但是還有一種場景需要我們考慮:比如假設 A 線程的邏輯還沒處理完,但是鎖由于過期時間到了,導緻鎖自動被釋放掉,這時 B 線程獲得了鎖,開始處理 B 的邏輯,然後 A 程序的邏輯處理完了,B 線程還在進行中,就把 B 線程的鎖給删除了。通過修改一下測試用例,模拟一下這種場景。
thread_name = threading.current_thread().name
# 設定鎖的過期時間為2s
lock_value = lock.get_lock(key, thread_name, timeout=2)
value = redis.get(key)
if not value:
value = 0
# 模拟實際情況下進行的某些耗時操作, 且執行時間大于鎖過期的時間
time.sleep(2.5)
value = int(value) + 1
print thread_name, value
redis.set(key, value)
lock.del_lock(key, lock_value)
我們讓線程的執行時間大于鎖的過期時間,導緻鎖到期自動釋放。執行上面的測試腳本,得到的結果如下:
Thread-3 1
Thread-5 3
Thread-6 4
Thread-4 4
既然這種現象是由于鎖過期導緻誤删其他線程的鎖引發的,那我們就順着這個思路,強制線程隻能删除自己設定的鎖。如果是這樣,就需要為每個線程的鎖添加一個唯一辨別。在我們的分布式鎖實作機制中,我們每次添加鎖的時候,都是給 lock_key 設為 1,無論是 key 還是 value,都不具備唯一性,如果把 key 設為唯一的,那麼在分布式系統中需要産生 N (等于總線程數)個 key 了 ,從直覺性和維護性上來說,這都是不可取的。是以隻能将 value 設定為每個線程的唯一辨別。這個唯一辨別由線程 ID + 程序的 PID + 機器的 IP + 時間戳 + 叢集名稱組成,這樣就構成了一個線程鎖的唯一辨別。
3.3 互斥性
根據上一節最後的分析,我們設計出了基于Redis實作分布式鎖的最終版。
最終版
def __init__(self, rediscli):
self.rediscli = rediscli.master
# ip 在執行個體化的時候就擷取,避免過多通路DNS
self.ip = socket.gethostbyname(socket.gethostname())
self.pid = os.getpid()
self.cluster = "hna"
def _gen_lock_key(self, key):
lock_key = "lock_%s" % key
return lock_key
def _gen_unique_value(self):
thread_name = threading.current_thread().name
time_now = time.time()
unique_value = "{0}-{1}-{2}-{3}-{4}".format(self.ip, self.pid, thread_name, self.cluster, time_now)
return unique_value
def get_lock(self, key, timeout=3):
lock_key = self._gen_lock_key(key)
unique_value = self._gen_unique_value()
logger.info("unique value %s" % unique_value)
while True:
value = self.rediscli.set(lock_key, unique_value, nx=True, ex=timeout)
if value:
# 注意,我們傳回了唯一辨別,用于後面的delete時檢查是否是目前線程的鎖
return unique_value
# 進入阻塞狀态,避免一直消耗CPU
time.sleep(0.1)
def del_lock(self, key, value):
lock_key = self._gen_lock_key(key)
old_value = self.rediscli.get(lock_key)
# 檢查是否是目前線程持有的鎖
if old_value == value:
return self.rediscli.delete(lock_key)
Thread-5 4
Thread-3 6
Thread-9 7
Thread-6 8
Thread-8 9
Thread-7 10
修改test.py測試腳本,測試一下鎖過期。測試腳本如下:
test-3-3.py
thread_name = threading.current_thread().name
lock_value = lock.get_lock(key, timeout=1)
value = redis.get(key)
if not value:
value = 0
# 模拟實際情況下進行的某些耗時操作, 且執行時間大于鎖過期的時間
time.sleep(3)
value = int(value) + 1
print thread_name, value
redis.set(key, value)
lock.del_lock(key, lock_value)
執行test-3-3.py測試腳本,得到的結果如下:
Thread-2 1
Thread-5 1
Thread-6 2
Thread-8 2
Thread-10 2
Thread-9 3
Thread-3 3
從運作test-3-3.py測試腳本結果來看,問題沒有得到解決。這是為什麼呢?因為我們設定value的唯一性隻能確定線程不會誤删其他線程産生的鎖,不會出現一連串的誤删鎖的情況,比如 A 删了 B 的鎖,B 執行完删了 C 的鎖。使用 redis 的過期機制,隻要業務的處理時間大于鎖的過期時間,就沒有一個很好的方式來避免由于鎖過期導緻其他線程同時占有鎖的問題,是以需要熟悉業務的執行時間,來合理地設定鎖的過期時間。(PS:對于這種情況,一般的處理方式是獲得鎖的線程開啟一個守護線程,用來給快要過期的鎖"續航"。比如過去了29秒,線程A還沒執行完,這時候守護線程會執行expire指令,為這把鎖"續航"20秒。守護線程從第29秒開始執行,每20秒執行一次檢查。當線程A執行完任務,會顯式關掉守護線程。線程A的程序或者守護程序異常退出,這把鎖将自動逾時釋放,進而不會導緻死鎖。)
另外,需要注意的一點是:3.3節的實作方式中,删除鎖(del_lock)的操作不是原子性的,先是拿到鎖,再判斷鎖的值是否相等,相等的話最後再删除鎖,既然不是原子性的,就有可能存在這樣一種極端情況:在判斷的那一時刻,鎖正好過期了,被其他線程占有了鎖,那最後一步的删除,就可能會造成誤删其他線程的鎖。是以推薦使用官方提供的 Lua 腳本來確定原子性:
def del_lock(self, key, value):
if redis.call("get",key) == value then
return redis.call("del",key)
else
return 0
-
總結
以上就是我們使用 Redis 來實作一個分布式同步鎖的方式,其特點是:
加鎖和釋放鎖是原子性的
滿足互斥性,同一個時刻隻能有一個線程可以擷取鎖和釋放鎖
利用 Redis 的 ttl機制和守護程序的方式來保證不會出現死鎖
以上的方案中,我們是假設 Redis 服務端是單叢集且高可用的,忽視了以下的問題:
如果某一時刻 Redis master 節點發生了故障,叢集中的某個 slave 節點變成 master 節點,在故障遷移(failover)過程中可能出現原 master 節點上的鎖沒有及時同步到 slave 節點,導緻其他線程同時獲得鎖。對于這個問題,可以參考 Redis 官方推出的 redlock 算法,但是比較遺憾的是,該算法也沒有很好地解決鎖過期的問題。(PS:不過這種不安全也僅僅是在主從發生 failover 的情況下才會産生,而且持續時間極短,業務系統多數情況下可以容忍。)
-
參考資料
漫畫:什麼是分布式鎖?
基于 redis 的分布式鎖實作
redis分布式鎖深度剖析(逾時情況)
SET key value
Distributed locks with Redis
原文位址
https://www.cnblogs.com/wengle520/p/12484931.html