.net分布式鎖,包括redis分布式鎖和zookeeper分布式鎖的.net實作。
分布式鎖在解決分布式環境下的業務一緻性是非常有用的。
分布式鎖
經常用于在解決分布式環境下的業務一緻性和協調分布式環境。
實際業務場景中,比如說解決并發一瞬間的重複下單,重複确認收貨,重複發現金券等。
使用分布式鎖的場景一般不能太多。
開源位址:http://git.oschina.net/chejiangyi/XXF.BaseService.DistributedLock
開源相關群: .net 開源基礎服務 238543768
這裡整理了C#.net關于redis分布式鎖和zookeeper分布式鎖的實作,僅用于研究。(可能有bug)
采用ServiceStack.Redis實作Redis分布式鎖
/*
* Redis分布式鎖
* 采用ServiceStack.Redis實作的Redis分布式鎖
* 詳情可閱讀其開源代碼
* 備注:不同版本的 ServiceStack.Redis 實作reidslock機制不同 xxf裡面預設使用2.2版本
*/ public class RedisDistributedLock : BaseRedisDistributedLock
{
private ServiceStack.Redis.RedisLock _lock;
private RedisClient _client;
public RedisDistributedLock(string redisserver, string key)
: base(redisserver, key)
{
}
public override LockResult TryGetDistributedLock(TimeSpan? getlockTimeOut, TimeSpan? taskrunTimeOut)
{
if (lockresult == LockResult.Success)
throw new DistributedLockException("檢測到目前鎖已擷取");
_client = DistributedLockConfig.GetRedisPoolClient(redisserver).GetClient();
/* * 閱讀源碼發現當其擷取鎖後,redis連接配接資源會一直占用,知道擷取鎖的資源釋放後,連接配接才會跳出,可能會導緻連接配接池資源的浪費。 */
try {
this._lock = new ServiceStack.Redis.RedisLock(_client, key, getlockTimeOut);
lockresult = LockResult.Success;
}
catch (Exception exp)
{
XXF.Log.ErrorLog.Write(string.Format("redis分布式嘗試鎖系統級别嚴重異常,redisserver:{0}", redisserver.NullToEmpty()), exp);
lockresult = LockResult.LockSystemExceptionFailure;
}
return lockresult;
}
public override void Dispose()
{
try {
if (this._lock != null)
this._lock.Dispose();
if (_client != null)
this._client.Dispose();
}
catch (Exception exp)
{
XXF.Log.ErrorLog.Write(string.Format("redis分布式嘗試鎖釋放嚴重異常,redisserver:{0}", redisserver.NullToEmpty()), exp);
}
}
}
來自網絡的java實作Redis分布式鎖(C#版)
/*
* Redis分布式鎖
* 采用網絡上java實作的Redis分布式鎖
* 參考 http://www.blogjava.net/hello-yun/archive/2014/01/15/408988.html
* 詳情可閱讀其開源代碼
*/ public class RedisDistributedLockFromJava : BaseRedisDistributedLock
{
public RedisDistributedLockFromJava(string redisserver, string key)
: base(redisserver, key)
{
}
public override LockResult TryGetDistributedLock(TimeSpan? getlockTimeOut, TimeSpan? taskrunTimeOut)
{
if (lockresult == LockResult.Success)
throw new DistributedLockException("檢測到目前鎖已擷取");
try {
// 1. 通過SETNX試圖擷取一個lock
string @lock = key;
long taskexpiredMilliseconds = (taskrunTimeOut != null ? (long)taskrunTimeOut.Value.TotalMilliseconds : (long)DistributedLockConfig.MaxLockTaskRunTime);
long getlockexpiredMilliseconds = (getlockTimeOut != null ? (long)getlockTimeOut.Value.TotalMilliseconds : 0);
long hassleepMilliseconds = 0;
while (true)
{
using (var redisclient = DistributedLockConfig.GetRedisPoolClient(redisserver).GetClient())
{
long value = CurrentUnixTimeMillis() + taskexpiredMilliseconds + 1;
/*Java以前版本都是用SetNX,但是這種是無法設定逾時時間的,不是很了解為什麼,
* 可能是因為原來的redis指令比較少導緻的?現在用Add不知道效果如何.
因對redis細節不了解,但個人懷疑若異常未釋放鎖經常發生,可能會導緻記憶體逐漸溢出*/
bool acquired = redisclient.Add<long>(@lock, value, TimeSpan.FromMilliseconds(taskexpiredMilliseconds + DistributedLockConfig.TaskLockDelayCleepUpTime));
//SETNX成功,則成功擷取一個鎖
if (acquired == true)
{
lockresult = LockResult.Success;
}
//SETNX失敗,說明鎖仍然被其他對象保持,檢查其是否已經逾時
else
{
var oldValueBytes = redisclient.Get(@lock);
//逾時
if (oldValueBytes != null && BitConverter.ToInt64(oldValueBytes, 0) < CurrentUnixTimeMillis())
{
/*此處雖然重設并擷取鎖,但是逾時時間可能被覆寫,故重設逾時時間;若有程序一直在嘗試擷取鎖,那麼鎖存活時間應該被延遲*/
var getValueBytes = redisclient.GetSet(@lock, BitConverter.GetBytes(value));
var o1 = redisclient.ExpireEntryIn(@lock, TimeSpan.FromMilliseconds(taskexpiredMilliseconds + DistributedLockConfig.TaskLockDelayCleepUpTime));//這裡如果程式異常終止,依然會有部分鎖未釋放的情況。 // 擷取鎖成功 if (getValueBytes == oldValueBytes)
{
lockresult = LockResult.Success;
}
// 已被其他程序捷足先登了
else
{
lockresult = LockResult.GetLockTimeOutFailure;
}
}
//未逾時,則直接傳回失敗
else
{
lockresult = LockResult.GetLockTimeOutFailure;
}
}
}
//成功拿到鎖
if (lockresult == LockResult.Success)
break;
//擷取鎖逾時
if (hassleepMilliseconds >= getlockexpiredMilliseconds)
{
lockresult = LockResult.GetLockTimeOutFailure;
break;
}
//繼續等待
System.Threading.Thread.Sleep(DistributedLockConfig.GetLockFailSleepTime);
hassleepMilliseconds += DistributedLockConfig.GetLockFailSleepTime;
}
}
catch (Exception exp)
{
XXF.Log.ErrorLog.Write(string.Format("redis分布式嘗試鎖系統級别嚴重異常,redisserver:{0}", redisserver.NullToEmpty()), exp);
lockresult = LockResult.LockSystemExceptionFailure;
}
return lockresult;
}
private long CurrentUnixTimeMillis()
{
return (long)(System.DateTime.UtcNow - new System.DateTime(1970, 1, 1, 0, 0, 0, System.DateTimeKind.Utc)).TotalMilliseconds;
}
public override void Dispose()
{
if (lockresult == LockResult.Success || lockresult == LockResult.LockSystemExceptionFailure)
{
try {
long current = CurrentUnixTimeMillis();
using (var redisclient = DistributedLockConfig.GetRedisPoolClient(redisserver).GetClient())
{
var v = redisclient.Get(key);
if (v != null)
{
// 避免删除非自己擷取得到的鎖
if (current < BitConverter.ToInt64(v, 0))
{
redisclient.Del(key);
}
}
}
}
catch (Exception exp)
{
XXF.Log.ErrorLog.Write(string.Format("redis分布式嘗試鎖釋放嚴重異常,redisserver:{0}", redisserver.NullToEmpty()), exp);
}
}
}
}
ServiceStack.Redis内部實作版本(較舊)
/*
* Redis分布式鎖
* 采用ServiceStack.Redis實作的Redis分布式鎖
* 詳情可閱讀其開源代碼
* 備注:不同版本的 ServiceStack.Redis 實作reidslock機制不同
* 拷貝自網絡開源代碼 較舊的實作版本
*/ public class RedisDistributedLockFromServiceStack : BaseRedisDistributedLock
{
public RedisDistributedLockFromServiceStack(string redisserver, string key)
: base(redisserver, key)
{
}
public override LockResult TryGetDistributedLock(TimeSpan? getlockTimeOut, TimeSpan? taskrunTimeOut)
{
if (lockresult == LockResult.Success)
throw new DistributedLockException("檢測到目前鎖已擷取");
try
{
using (var redisClient = DistributedLockConfig.GetRedisPoolClient(redisserver).GetClient())
{
ExecExtensions.RetryUntilTrue(
() =>
{
//This pattern is taken from the redis command for SETNX http://redis.io/commands/setnx
//Calculate a unix time for when the lock should expire
TimeSpan realSpan = taskrunTimeOut ?? TimeSpan.FromMilliseconds(DistributedLockConfig.MaxLockTaskRunTime); //new TimeSpan(365, 0, 0, 0); //if nothing is passed in the timeout hold for a year DateTime expireTime = DateTime.UtcNow.Add(realSpan);
string lockString = (expireTime.ToUnixTimeMs() + 1).ToString();
//Try to set the lock, if it does not exist this will succeed and the lock is obtained
var nx = redisClient.SetEntryIfNotExists(key, lockString);
if (nx)
{
lockresult = LockResult.Success;
return true;
}
//If we've gotten here then a key for the lock is present. This could be because the lock is
//correctly acquired or it could be because a client that had acquired the lock crashed (or didn't release it properly).
//Therefore we need to get the value of the lock to see when it should expire
redisClient.Watch(key);
string lockExpireString = redisClient.Get<string>(key);
long lockExpireTime;
if (!long.TryParse(lockExpireString, out lockExpireTime))
{
redisClient.UnWatch(); // since the client is scoped externally
lockresult = LockResult.GetLockTimeOutFailure;
return false;
}
//If the expire time is greater than the current time then we can't let the lock go yet
if (lockExpireTime > DateTime.UtcNow.ToUnixTimeMs())
{
redisClient.UnWatch(); // since the client is scoped externally
lockresult = LockResult.GetLockTimeOutFailure;
return false;
}
//If the expire time is less than the current time then it wasn't released properly and we can attempt to //acquire the lock. The above call to Watch(_lockKey) enrolled the key in monitoring, so if it changes //before we call Commit() below, the Commit will fail and return false, which means that another thread //was able to acquire the lock before we finished processing. using (var trans = redisClient.CreateTransaction()) // we started the "Watch" above; this tx will succeed if the value has not moved {
trans.QueueCommand(r => r.Set(key, lockString));
//return trans.Commit(); //returns false if Transaction failed var t = trans.Commit();
if (t == false)
lockresult = LockResult.GetLockTimeOutFailure;
else
lockresult = LockResult.Success;
return t;
}
},
getlockTimeOut
);
}
}
catch (Exception exp)
{
XXF.Log.ErrorLog.Write(string.Format("redis分布式嘗試鎖系統級别嚴重異常,redisserver:{0}", redisserver.NullToEmpty()), exp);
lockresult = LockResult.LockSystemExceptionFailure;
}
return lockresult;
}
public override void Dispose()
{
try {
using (var redisClient = DistributedLockConfig.GetRedisPoolClient(redisserver).GetClient())
{
redisClient.Remove(key);
}
}
catch (Exception exp)
{
XXF.Log.ErrorLog.Write(string.Format("redis分布式嘗試鎖釋放嚴重異常,redisserver:{0}", redisserver.NullToEmpty()), exp);
}
}
}
Zookeeper 版本實作分布式鎖
/* * 來源java網絡源碼的zookeeper分布式鎖實作(目前僅翻譯并簡單測試ok,未來內建入sdk)
* 備注: 共享鎖在同一個程序中很容易實作,但是在跨程序或者在不同 Server 之間就不好實作了。Zookeeper 卻很容易實作這個功能,實作方式也是需要獲得鎖的 Server 建立一個 EPHEMERAL_SEQUENTIAL 目錄節點,
然後調用 getChildren方法擷取目前的目錄節點清單中最小的目錄節點是不是就是自己建立的目錄節點,如果正是自己建立的,那麼它就獲得了這個鎖,
如果不是那麼它就調用 exists(String path, boolean watch) 方法并監控 Zookeeper 上目錄節點清單的變化,一直到自己建立的節點是清單中最小編号的目錄節點,
進而獲得鎖,釋放鎖很簡單,隻要删除前面它自己所建立的目錄節點就行了。
*/ public class ZooKeeprDistributedLockFromJava : IWatcher
{
private ZooKeeper zk;
private string root = "/locks"; //根
private string lockName; //競争資源的标志
private string waitNode; //等待前一個鎖
private string myZnode; //目前鎖
//private CountDownLatch latch; //計數器
private AutoResetEvent autoevent;
private TimeSpan sessionTimeout = TimeSpan.FromMilliseconds(30000);
private IList<Exception> exception = new List<Exception>();
/// <summary>
/// 建立分布式鎖,使用前請确認config配置的zookeeper服務可用 </summary>
/// <param name="config"> 127.0.0.1:2181 </param>
/// <param name="lockName"> 競争資源标志,lockName中不能包含單詞lock </param>
public ZooKeeprDistributedLockFromJava(string config, string lockName)
{
this.lockName = lockName;
// 建立一個與伺服器的連接配接
try
{
zk = new ZooKeeper(config, sessionTimeout, this);
var stat = zk.Exists(root, false);
if (stat == null)
{
// 建立根節點
zk.Create(root, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.Persistent);
}
}
catch (KeeperException e)
{
throw e;
}
}
/// <summary>
/// zookeeper節點的螢幕
/// </summary>
public virtual void Process(WatchedEvent @event)
{
if (this.autoevent != null)
{
this.autoevent.Set();
}
}
public virtual bool tryLock()
{
try {
string splitStr = "_lock_";
if (lockName.Contains(splitStr))
{
//throw new LockException("lockName can not contains \\u000B");
}
//建立臨時子節點
myZnode = zk.Create(root + "/" + lockName + splitStr, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.EphemeralSequential);
Console.WriteLine(myZnode + " is created ");
//取出所有子節點
IList<string> subNodes = zk.GetChildren(root, false);
//取出所有lockName的鎖
IList<string> lockObjNodes = new List<string>();
foreach (string node in subNodes)
{
if (node.StartsWith(lockName))
{
lockObjNodes.Add(node);
}
}
Array alockObjNodes = lockObjNodes.ToArray();
Array.Sort(alockObjNodes);
Console.WriteLine(myZnode + "==" + lockObjNodes[0]);
if (myZnode.Equals(root + "/" + lockObjNodes[0]))
{
//如果是最小的節點,則表示取得鎖
return true;
}
//如果不是最小的節點,找到比自己小1的節點
string subMyZnode = myZnode.Substring(myZnode.LastIndexOf("/", StringComparison.Ordinal) + 1);
waitNode = lockObjNodes[Array.BinarySearch(alockObjNodes, subMyZnode) - 1];
}
catch (KeeperException e)
{
throw e;
}
return false;
}
public virtual bool tryLock(TimeSpan time)
{
try {
if (this.tryLock())
{
return true;
}
return waitForLock(waitNode, time);
}
catch (KeeperException e)
{
throw e;
}
return false;
}
private bool waitForLock(string lower, TimeSpan waitTime)
{
var stat = zk.Exists(root + "/" + lower, true);
//判斷比自己小一個數的節點是否存在,如果不存在則無需等待鎖,同時注冊監聽
if (stat != null)
{
Console.WriteLine("Thread " + System.Threading.Thread.CurrentThread.Name + " waiting for " + root + "/" + lower);
autoevent = new AutoResetEvent(false);
bool r = autoevent.WaitOne(waitTime);
autoevent.Dispose();
autoevent = null;
return r;
}
else return true;
}
public virtual void unlock()
{
try {
Console.WriteLine("unlock " + myZnode);
zk.Delete(myZnode, -1);
myZnode = null;
zk.Dispose();
}
catch (KeeperException e)
{
throw e;
}
}
}
以上代碼僅做參考,未壓測。
代碼粘貼有些問題,詳細請下載下傳開源包運作研究。
開源是一種态度,分享是一種精神,學習仍需堅持,進步仍需努力,.net生态圈因你我更加美好。