前言
上文
【從入門到放棄-ZooKeeper】ZooKeeper實戰-分布式鎖中,我們通過利用ZooKeeper的臨時節點特性,實作了一個分布式鎖。
但是是通過輪詢的方式去判斷不斷嘗試擷取鎖,空轉對于CPU還是有一定消耗的,同時,對于多個線程競争鎖激烈的時候,很容易出現羊群效應。
為了解決上面兩個問題。本文來看一下如何實作一個更新版的分布式鎖。
設計
我們依然實作java.util.concurrent.locks.Lock接口。
和上一文中實作方式不同的是,我們使用ZooKeeper的EPHEMERAL_SEQUENTIAL臨時順序節點。
當首次擷取鎖時,會建立一個臨時節點,如果這個臨時節點末尾數字是目前父節點下同名節點中最小的,則擷取鎖成功。
否則,則監聽上一個數字較大的節點,直到上一個節點被釋放,則再次嘗試擷取鎖成功。這樣可以避免多個線程同時擷取一把鎖造成的競争。
同時使用了ZooKeeper提供的watch功能,避免了輪詢帶來的CPU空轉。
擷取鎖後使用一個volatile int類型的state進行計數,來實作鎖的可重入機制。
DistributedFairLock
public class DistributedFairLock implements Lock {
private static Logger logger = LoggerFactory.getLogger(DistributedFairLock.class);
//ZooKeeper用戶端,進行ZooKeeper操作
private ZooKeeper zooKeeper;
//根節點名稱
private String dir;
//加鎖節點
private String node;
//ZooKeeper鑒權資訊
private List<ACL> acls;
//要加鎖節點
private String fullPath;
//加鎖辨別,為0時表示未擷取到鎖,每擷取一次鎖則加一,釋放鎖時減一。減到0時斷開連接配接,删除臨時節點。
private volatile int state;
//目前鎖建立的節點id
private String id;
//通過CountDownLatch阻塞,直到監聽上一節點被取消,再進行後續操作
private CountDownLatch countDownLatch;
/**
* Constructor.
*
* @param zooKeeper the zoo keeper
* @param dir the dir
* @param node the node
* @param acls the acls
*/
public DistributedFairLock(ZooKeeper zooKeeper, String dir, String node, List<ACL> acls) {
this.zooKeeper = zooKeeper;
this.dir = dir;
this.node = node;
this.acls = acls;
this.fullPath = dir.concat("/").concat(this.node);
init();
}
private void init() {
try {
Stat stat = zooKeeper.exists(dir, false);
if (stat == null) {
zooKeeper.create(dir, null, acls, CreateMode.PERSISTENT);
}
} catch (Exception e) {
logger.error("[DistributedFairLock#init] error : " + e.toString(), e);
}
}
}
lock
public void lock() {
try {
//加鎖
synchronized (this) {
//如果目前未持有鎖
if (state <= 0) {
//建立節點
if (id == null) {
id = zooKeeper.create(fullPath, null, acls, CreateMode.EPHEMERAL_SEQUENTIAL);
}
//擷取目前路徑下所有的節點
List<String> nodes = zooKeeper.getChildren(dir, false);
SortedSet<String> sortedSet = new TreeSet<>();
for (String node : nodes) {
sortedSet.add(dir.concat("/").concat(node));
}
//擷取所有id小于目前節點順序的節點
SortedSet<String> lessSet = ((TreeSet<String>) sortedSet).headSet(id);
if (!lessSet.isEmpty()) {
//監聽上一個節點,就是通過這裡避免多鎖競争和CPU空轉,實作公平鎖的
Stat stat = zooKeeper.exists(lessSet.last(), new LockWatcher());
if (stat != null) {
countDownLatch = new CountDownLatch(1);
countDownLatch.await();
}
}
}
state++;
}
} catch (InterruptedException e) {
logger.error("[DistributedFairLock#lock] error : " + e.toString(), e);
Thread.currentThread().interrupt();
} catch (KeeperException ke) {
logger.error("[DistributedFairLock#lock] error : " + ke.toString(), ke);
if (!KeeperException.Code.NODEEXISTS.equals(ke.code())) {
Thread.currentThread().interrupt();
}
}
}
tryLock
public boolean tryLock() {
try {
synchronized (this) {
if (state <= 0) {
if (id == null) {
id = zooKeeper.create(fullPath, null, acls, CreateMode.EPHEMERAL_SEQUENTIAL);
}
List<String> nodes = zooKeeper.getChildren(dir, false);
SortedSet<String> sortedSet = new TreeSet<>();
for (String node : nodes) {
sortedSet.add(dir.concat("/").concat(node));
}
SortedSet<String> lessSet = ((TreeSet<String>) sortedSet).headSet(id);
if (!lessSet.isEmpty()) {
return false;
}
}
state++;
}
} catch (InterruptedException e) {
logger.error("[DistributedFairLock#tryLock] error : " + e.toString(), e);
return false;
} catch (KeeperException ke) {
logger.error("[DistributedFairLock#tryLock] error : " + ke.toString(), ke);
if (!KeeperException.Code.NODEEXISTS.equals(ke.code())) {
return false;
}
}
return true;
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
try {
synchronized (this) {
if (state <= 0) {
if (id == null) {
id = zooKeeper.create(fullPath, null, acls, CreateMode.EPHEMERAL_SEQUENTIAL);
}
List<String> nodes = zooKeeper.getChildren(dir, false);
SortedSet<String> sortedSet = new TreeSet<>();
for (String node : nodes) {
sortedSet.add(dir.concat("/").concat(node));
}
SortedSet<String> lessSet = ((TreeSet<String>) sortedSet).headSet(id);
if (!lessSet.isEmpty()) {
Stat stat = zooKeeper.exists(lessSet.last(), new LockWatcher());
if (stat != null) {
countDownLatch = new CountDownLatch(1);
countDownLatch.await(time, unit);
}
}
}
state++;
}
} catch (InterruptedException e) {
logger.error("[DistributedFairLock#tryLock] error : " + e.toString(), e);
return false;
} catch (KeeperException ke) {
logger.error("[DistributedFairLock#tryLock] error : " + ke.toString(), ke);
if (!KeeperException.Code.NODEEXISTS.equals(ke.code())) {
return false;
}
}
return true;
}
unlock
public void unlock() {
synchronized (this) {
if (state > 0) {
state--;
}
//當不再持有鎖時,删除建立的臨時節點
if (state == 0 && zooKeeper != null) {
try {
zooKeeper.delete(id, -1);
id = null;
} catch (Exception e) {
logger.error("[DistributedFairLock#unlock] error : " + e.toString(), e);
}
}
}
}
LockWatcher
private class LockWatcher implements Watcher {
@Override
public void process(WatchedEvent event) {
synchronized (this) {
if (countDownLatch != null) {
countDownLatch.countDown();
}
}
}
}
總結
上面就是我們改良後,通過臨時順序節點和watch機制實作的公平可重入分布式鎖。
源代碼可見:
aloofJr通過watch機制避免輪詢帶來的CPU空轉。
通過順序臨時節點避免了羊群效應。
如果對以上方式有更好的優化方案,歡迎一起讨論。
更多文章
見我的部落格:
https://nc2era.comwritten by
AloofJr,轉載請注明出處