分布式環境下如何保證一個資料在并發的情況下保證同一時間在一台機器隻有一個線程在執行?
實作一個分布式鎖需要解決的問題有以下幾個主要點:
- 資源被唯一執行
- 良好的可重入機制:發生死鎖情況下可被其他程序可重入
- 良好的性能問題:操作簡單,額外的請求不要過多
- 容災機制:個别的鎖機器崩潰也能穩定運作
在 ZooKeeper 中,節點類型可以分為持久節點(PERSISTENT )、臨時節點(EPHEMERAL),以及時序節點(SEQUENTIAL ),具體在節點建立過程中,一般是組合使用,可以生成以下 4 種節點類型。
- 持久節點(PERSISTENT)
- 持久順序節點(PERSISTENT_SEQUENTIAL)
- 臨時節點(EPHEMERAL)
- 臨時順序節點(EPHEMERAL_SEQUENTIAL)
這裡我們就基于臨時有序節點去實作分布式鎖:
1、首先建立一個根節點用于存放分布式鎖需要建立的臨時節點
2、在建立臨時有序節點的時候,zk會根據client的建立請求會按順序依次生成0000001、0000002、0000003這種類似序号的節點
3、每個client判斷目前自己建立的節點是否為序号最小的節點,是的話就認為拿到了鎖,可以執行業務代碼,否則就去監聽最小節點的删除時間
4、當處理完後需要删除節點,這樣就能被其他線程監聽到,開始競争鎖

當然,聰明的同學一看就能看出這個步驟有很多的問題,首先就是最小節點的删除時間會觸發大量client的啟動執行,需要發送大量的通知,而且如果在去建立監聽的時候出現了最小節點已經被删除的情況,這時候永遠都收不到最小節點的删除事件,而且如果按不同的資源建立了太多的目錄的話,是否會對整個zk的性能産生影響,如果對不同資源采用同一個目錄話,那麼一個目錄下的節點又過多,取子節點的性能又有問題...
是以該分布式鎖的實作步驟可以優化一下
1、首先不必所有client都去監聽最小節點的删除事件,隻需要監聽比自己稍小點的節點的删除事件即可
2、每此執行完處理後可以嘗試性删除目錄,避免随着時間的增長建立了過多的目錄
3、另外zookeeper提供的API中設定監聽器的操作與讀操作是原子執行的,也就是說在讀子節點清單時同時設定監聽器,保證不會丢失事件,是以可以在建立監聽的時候一旦發現監聽的節點為空就認為節點已删除,可以拿到鎖
下面是代碼:裡面包含了測試main方法
https://github.com/fengym201507411/lockpackage com.fym;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.util.List;
import java.util.concurrent.*;
/**
* Created by fengyiming on 2018/12/24.
* 基于zookeeper的代碼
*/
public class LockServiceImpl {
private static volatile ZooKeeper zooKeeper;
/**
* zk連接配接逾時時間/s
*/
private final static int SESSION_TIMEOUT = 10000;
/**
* 分布式鎖建立key的根路徑
*/
private final static String PRE_ROOT_PATH = "/zkLockRoot";
/**
* /字元
*/
private final static String PATH = "/";
static {
try {
// 連接配接zookeeper
zooKeeper = new ZooKeeper("127.0.0.1:2181", SESSION_TIMEOUT, new Watcher() {
@Override
public void process(WatchedEvent event) {
}
});
Stat stat = zooKeeper.exists(PRE_ROOT_PATH, false);
if (stat == null) {
System.out.println("root path is null,create......");
zooKeeper.create(PRE_ROOT_PATH, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
System.out.println("root path ok");
} catch (Exception e) {
System.out.println("加載zk資訊異常" + e.getMessage());
}
}
public static void lock(String threadName, String key, long waitSeconds) {
try {
key = key + LocalDate.now().toString();
System.out.println(threadName + "begin lock");
String path = new StringBuffer(PRE_ROOT_PATH).append(PATH).append(key).toString();
Stat stat = zooKeeper.exists(path, false);
if (stat == null) {
System.out.println(threadName + "key path is null,create......");
try {
zooKeeper.create(path, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
} catch (KeeperException k) {
System.out.println(threadName + "目錄" + path + "已存在,無需重複建立");
} catch (Exception e) {
System.out.println(threadName + "create key path error,error message:" + e.getMessage());
}
}
LocalDateTime begin = LocalDateTime.now();
String lockNodePre = new StringBuffer(PRE_ROOT_PATH).append(PATH).append(key).append(PATH).toString();
String lockNode = zooKeeper.create(lockNodePre, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode
.EPHEMERAL_SEQUENTIAL);
String lockNodeName = lockNode.substring(lockNodePre.length());
System.out.println(threadName + "建立有序臨時節點:" + lockNode + ",節點名稱:" + lockNodeName);
// 取所有子節點
List<String> subNodes = zooKeeper.getChildren(path, false);
System.out.println(threadName + "目前競争資源下的節點數:" + subNodes.size());
//排序
subNodes.sort(String::compareTo);
System.out.println(threadName + "first:" + subNodes.get(0) + ",last: " + subNodes.get(subNodes.size() - 1));
int index = subNodes.indexOf(lockNodeName);
String minNodeName = subNodes.get(0);
if (!lockNodeName.equals(minNodeName)) {
String min1NodeName = subNodes.get(index - 1);
String min1NodePath = new StringBuffer(PRE_ROOT_PATH).append(PATH).append(key).append(PATH)
.append(min1NodeName).toString();
CountDownLatch countDownLatch = new CountDownLatch(1);
System.out.println(threadName + "目前節點" + lockNodeName + "準備監聽節點:" + min1NodeName);
Stat min1Stat = zooKeeper.exists(min1NodePath, new Watcher() {
@Override
public void process(WatchedEvent event) {
System.out.println(threadName + "節點" + event.getPath() + ",事件 : " + event.getType());
if (event.getType() == Event.EventType.NodeDeleted) {
System.out.println(threadName + "節點删除");
countDownLatch.countDown();
}
}
});
if (min1Stat == null) {
System.out.println(threadName + "節點不存在,無需等待,目前節點:" + lockNodeName + ",前一節點:" + min1NodeName);
} else {
System.out.println(threadName + "------wait-------");
countDownLatch.await();
}
}
//逾時
if(LocalDateTime.now().compareTo(begin.plusSeconds(waitSeconds)) > 0){
throw new Exception("waite time out");
}
System.out.println(threadName + "拿到了lock" + lockNodeName + ",do -----");
zooKeeper.delete(lockNode, -1);
System.out.println(threadName + "執行完畢,解鎖" + lockNodeName + "------");
String lastNodeName = subNodes.get(subNodes.size() - 1);
if (lockNodeName.equals(lastNodeName)) {
try {
zooKeeper.delete(path, -1);
System.out.println(threadName + "嘗試删除該key目錄成功,path" + path);
} catch (KeeperException k) {
System.out.println(threadName + "嘗試删除該key目錄,失敗:" + k.getMessage());
} catch (Exception e) {
System.out.println(threadName + "嘗試删除該key目錄,失敗:" + e.getMessage());
}
}
} catch (Exception e) {
System.out.println(threadName + "lock error" + e.getMessage());
}
}
public static void main(String[] args) throws InterruptedException {
ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 10, 0, TimeUnit.SECONDS, new
LinkedBlockingQueue<Runnable>());
for (int i = 0; i < 1000; i++) {
String name = new StringBuffer("ThreadName[").append(i).append("]").toString();
executor.execute(() -> {
LockServiceImpl.lock(name, "firstLock",10);
});
}
}
}