天天看點

【從入門到放棄-ZooKeeper】ZooKeeper實戰-分布式鎖-更新版

前言

上文

【從入門到放棄-ZooKeeper】ZooKeeper實戰-分布式鎖

中,我們通過利用ZooKeeper的臨時節點特性,實作了一個分布式鎖。

但是是通過輪詢的方式去判斷不斷嘗試擷取鎖,空轉對于CPU還是有一定消耗的,同時,對于多個線程競争鎖激烈的時候,很容易出現羊群效應。

為了解決上面兩個問題。本文來看一下如何實作一個更新版的分布式鎖。

設計

我們依然實作java.util.concurrent.locks.Lock接口。

和上一文中實作方式不同的是,我們使用ZooKeeper的EPHEMERAL_SEQUENTIAL臨時順序節點。

當首次擷取鎖時,會建立一個臨時節點,如果這個臨時節點末尾數字是目前父節點下同名節點中最小的,則擷取鎖成功。

否則,則監聽上一個數字較大的節點,直到上一個節點被釋放,則再次嘗試擷取鎖成功。這樣可以避免多個線程同時擷取一把鎖造成的競争。

同時使用了ZooKeeper提供的watch功能,避免了輪詢帶來的CPU空轉。

擷取鎖後使用一個volatile int類型的state進行計數,來實作鎖的可重入機制。

DistributedFairLock

public class DistributedFairLock implements Lock {
    private static Logger logger = LoggerFactory.getLogger(DistributedFairLock.class);

    //ZooKeeper用戶端,進行ZooKeeper操作
    private ZooKeeper zooKeeper;

    //根節點名稱
    private String dir;

    //加鎖節點
    private String node;

    //ZooKeeper鑒權資訊
    private List<ACL> acls;

    //要加鎖節點
    private String fullPath;

    //加鎖辨別,為0時表示未擷取到鎖,每擷取一次鎖則加一,釋放鎖時減一。減到0時斷開連接配接,删除臨時節點。
    private volatile int state;

    //目前鎖建立的節點id
    private String id;

    //通過CountDownLatch阻塞,直到監聽上一節點被取消,再進行後續操作
    private CountDownLatch countDownLatch;

    /**
     * Constructor.
     *
     * @param zooKeeper the zoo keeper
     * @param dir       the dir
     * @param node      the node
     * @param acls      the acls
     */
    public DistributedFairLock(ZooKeeper zooKeeper, String dir, String node, List<ACL> acls) {
        this.zooKeeper = zooKeeper;
        this.dir = dir;
        this.node = node;
        this.acls = acls;
        this.fullPath = dir.concat("/").concat(this.node);
        init();
    }

    private void init() {
        try {
            Stat stat = zooKeeper.exists(dir, false);
            if (stat == null) {
                zooKeeper.create(dir, null, acls, CreateMode.PERSISTENT);
            }
        } catch (Exception e) {
            logger.error("[DistributedFairLock#init] error : " + e.toString(), e);
        }
    }
}           

lock

public void lock() {
    try {
        //加鎖
        synchronized (this) {
            //如果目前未持有鎖
            if (state <= 0) {
                //建立節點
                if (id == null) {
                    id = zooKeeper.create(fullPath, null, acls, CreateMode.EPHEMERAL_SEQUENTIAL);
                }

                //擷取目前路徑下所有的節點
                List<String> nodes = zooKeeper.getChildren(dir, false);
                SortedSet<String> sortedSet = new TreeSet<>();
                for (String node : nodes) {
                    sortedSet.add(dir.concat("/").concat(node));
                }

                //擷取所有id小于目前節點順序的節點
                SortedSet<String> lessSet = ((TreeSet<String>) sortedSet).headSet(id);

                if (!lessSet.isEmpty()) {
                    //監聽上一個節點,就是通過這裡避免多鎖競争和CPU空轉,實作公平鎖的  
                    Stat stat = zooKeeper.exists(lessSet.last(), new LockWatcher());
                    if (stat != null) {
                        countDownLatch = new CountDownLatch(1);
                        countDownLatch.await();
                    }

                }
            }

            state++;
        }
    } catch (InterruptedException e) {
        logger.error("[DistributedFairLock#lock] error : " + e.toString(), e);
        Thread.currentThread().interrupt();
    } catch (KeeperException ke) {
        logger.error("[DistributedFairLock#lock] error : " + ke.toString(), ke);
        if (!KeeperException.Code.NODEEXISTS.equals(ke.code())) {
            Thread.currentThread().interrupt();
        }
    }
}           

tryLock

public boolean tryLock() {
    try {
        synchronized (this) {
            if (state <= 0) {
                if (id == null) {
                    id = zooKeeper.create(fullPath, null, acls, CreateMode.EPHEMERAL_SEQUENTIAL);
                }

                List<String> nodes = zooKeeper.getChildren(dir, false);
                SortedSet<String> sortedSet = new TreeSet<>();
                for (String node : nodes) {
                    sortedSet.add(dir.concat("/").concat(node));
                }


                SortedSet<String> lessSet = ((TreeSet<String>) sortedSet).headSet(id);

                if (!lessSet.isEmpty()) {
                    return false;
                }
            }
            state++;
        }
    } catch (InterruptedException e) {
        logger.error("[DistributedFairLock#tryLock] error : " + e.toString(), e);
        return false;
    } catch (KeeperException ke) {
        logger.error("[DistributedFairLock#tryLock] error : " + ke.toString(), ke);
        if (!KeeperException.Code.NODEEXISTS.equals(ke.code())) {
            return false;
        }
    }
    return true;
}

@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
    try {
        synchronized (this) {
            if (state <= 0) {
                if (id == null) {
                    id = zooKeeper.create(fullPath, null, acls, CreateMode.EPHEMERAL_SEQUENTIAL);
                }

                List<String> nodes = zooKeeper.getChildren(dir, false);
                SortedSet<String> sortedSet = new TreeSet<>();
                for (String node : nodes) {
                    sortedSet.add(dir.concat("/").concat(node));
                }


                SortedSet<String> lessSet = ((TreeSet<String>) sortedSet).headSet(id);

                if (!lessSet.isEmpty()) {
                    Stat stat = zooKeeper.exists(lessSet.last(), new LockWatcher());
                    if (stat != null) {
                        countDownLatch = new CountDownLatch(1);
                        countDownLatch.await(time, unit);
                    }

                }
            }

            state++;
        }
    } catch (InterruptedException e) {
        logger.error("[DistributedFairLock#tryLock] error : " + e.toString(), e);
        return false;
    } catch (KeeperException ke) {
        logger.error("[DistributedFairLock#tryLock] error : " + ke.toString(), ke);
        if (!KeeperException.Code.NODEEXISTS.equals(ke.code())) {
            return false;
        }
    }
    return true;
}           

unlock

public void unlock() {
    synchronized (this) {
        if (state > 0) {
            state--;
        }
        //當不再持有鎖時,删除建立的臨時節點
        if (state == 0 && zooKeeper != null) {
            try {
                zooKeeper.delete(id, -1);
                id = null;
            } catch (Exception e) {
                logger.error("[DistributedFairLock#unlock] error : " + e.toString(), e);
            }
        }
    }
}           

LockWatcher

private class LockWatcher implements Watcher {
    @Override
    public void process(WatchedEvent event) {
        synchronized (this) {
            if (countDownLatch != null) {
                countDownLatch.countDown();
            }
        }
    }
}           

總結

上面就是我們改良後,通過臨時順序節點和watch機制實作的公平可重入分布式鎖。

源代碼可見:

aloofJr

通過watch機制避免輪詢帶來的CPU空轉。

通過順序臨時節點避免了羊群效應。

如果對以上方式有更好的優化方案,歡迎一起讨論。

更多文章

見我的部落格:

https://nc2era.com

written by

AloofJr

,轉載請注明出處

繼續閱讀