天天看點

基于ZooKeeper的分布式鎖實作

今天介紹基于ZooKeeper的分布式鎖的簡單實作,包括阻塞鎖和非阻塞鎖。同時增加了網上很少介紹的基于節點的非阻塞鎖實作,主要是為了加深對ZooKeeper的了解。

維基百科:分布式鎖,是控制分布式系統之間同步通路共享資源的一種方式。在分布式系統中,常常需要協調他們的動作。如果不同的系統或是同一個系統的不同主機之間共享了一個或一組資源,那麼通路這些資源的時候,往往需要互斥來防止彼此幹擾來保證一緻性,在這種情況下,便需要使用到分布式鎖。

1 阻塞鎖和非阻塞鎖

根據業務特點,普通分布式鎖有兩種需求:阻塞鎖和非阻塞鎖。

阻塞鎖:多個系統同時調用同一個資源,所有請求被排隊處理。已經得到分布式鎖的系統,進入運作狀态完成業務操作;沒有得到分布式鎖的線程進入阻塞狀态等待,當獲得相應的信号并獲得分布式鎖後,進入運作狀态完成業務操作。

非阻塞鎖:多個系統同時調用同一個資源,當某一個系統最先擷取到鎖,進入運作狀态完成業務操作;其他沒有得到分布式鎖的系統,就直接傳回,不做任何業務邏輯,可以給使用者提示進行其他操作。

2 鎖代碼簡單設計

基于ZooKeeper實作鎖,一般都是建立EPHEMERAL_SEQUENTIAL子節點并比較序号實作的。參照Redis的分布式鎖實作,也可以使用EPHEMERAL節點實作。

3 分布式鎖代碼

完整代碼比較多,占篇幅。在文中隻保留了關鍵的代碼。

3.1 分布式鎖接口定義

ZooKeeperLock.java

public interface ZooKeeperLock {
  /**
   * 嘗試擷取鎖
   *
   * @param guidNodeName 用于加鎖的唯一節點名
   * @param clientGuid 用于唯一辨別目前用戶端的ID
   * @return
   */
  boolean lock(String guidNodeName, String clientGuid);
 
  /**
   * 釋放鎖
   *
   * @param guidNodeName 用于加鎖的唯一節點名
   * @param clientGuid 用于唯一辨別目前用戶端的ID
   * @return
   */
  boolean release(String guidNodeName, String clientGuid);
 
  /**
   * 鎖是否已經存在
   *
   * @param guidNodeName 用于加鎖的唯一節點名
   * @return
   */
  boolean exists(String guidNodeName);
}
           

3.2 基于節點實作的非阻塞鎖

NodeBlocklessLock.java

public class NodeBlocklessLock extends ZooKeeperBase implements ZooKeeperLock {
  /** 嘗試擷取鎖 */
  public boolean lock(String guidNodeName, String clientGuid) {
    boolean result = false;
    if (getZooKeeper().exists(guidNodeName, false) == null) {
      getZooKeeper().create(guidNodeName, clientGuid.getBytes(),
          ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
      byte[] data = getZooKeeper().getData(guidNodeName, false, null);
      if (data != null && clientGuid.equals(new String(data))) {
        result = true;
      }
    }
    return result;
  }
 
  /** 釋放鎖 */
  public boolean release(String guidNodeName, String clientGuid) {
    boolean result = false;
    Stat stat = new Stat();
    byte[] data = getZooKeeper().getData(guidNodeName, false, stat);
    if (data != null && clientGuid.equals(new String(data))) {
      getZooKeeper().delete(guidNodeName, stat.getVersion());
      result = true;
    }
    return result;
  }
 
  /** 鎖是否已經存在 */
  public boolean exists(String guidNodeName) {
    boolean result = false;
    Stat stat = getZooKeeper().exists(guidNodeName, false);
    result = stat != null;
    return result;
  }
}
           

3.3 基于子節點實作的分布式鎖基類

ChildrenNodeLock.java

public abstract class ChildrenNodeLock extends ZooKeeperBase implements ZooKeeperLock {
  /** 擷取目前節點的前一個節點,如果為空表示自己是第一個 */
  protected String getPrevElementName() {
    List<String> elementNames = getZooKeeper().getChildren(this.guidNodeName, false);
    long curElementSerial = Long.valueOf(
        elementNodeFullName.substring((this.guidNodeName + "/" + childPrefix).length()));
    String prevElementName = null;
    long prevElementSerial = -1;
    for (String oneElementName : elementNames) {
      long oneElementSerial = Long.parseLong(oneElementName.substring(childPrefix.length()));
      if (oneElementSerial < curElementSerial) {
        // 比目前節點小
        if (oneElementSerial > prevElementSerial) {
          prevElementSerial = oneElementSerial;
          prevElementName = oneElementName;
        }
      }
    }
    return prevElementName;
  }
 
  /** 嘗試擷取鎖 */
  public boolean lock(String guidNodeName, String clientGuid) {
    boolean result = false;
    // 確定根節點存在,并且建立為容器節點
    super.createRootNode(this.guidNodeName, CreateMode.CONTAINER);
    // 建立子節點并傳回帶序列号的節點名
    elementNodeFullName = getZooKeeper().create(this.guidNodeName + "/" + childPrefix,
        new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
    boolean lockSuccess = isLockSuccess();
    result = lockSuccess;
    return result;
  }
 
 
  /** 釋放鎖 */
  public boolean release(String guidNodeName, String clientGuid) {
    // 删除子節點
    getZooKeeper().delete(elementNodeFullName, 0);
    return true;
  }
 
  /** 鎖是否已經存在,容器節點存在,并且有子節點,則說明鎖已經存在 */
  public boolean exists(String guidNodeName) {
    boolean exists = false;
    Stat stat = new Stat();
    try {
      getZooKeeper().getData(guidNodeName, false, stat);
      exists = stat.getNumChildren() > 0;
    } catch (KeeperException.NoNodeException e) {
      exists = false;
    }
    return exists;
  }
 
  /** 是否加鎖成功 , 由子類實作 */
  protected abstract boolean isLockSuccess();
}
           

3.4 基于子節點實作的非阻塞鎖

ChildrenBlocklessLock.java

public class ChildrenBlocklessLock extends ChildrenNodeLock {
  /** 是否加鎖成功 */
  protected boolean isLockSuccess() throws KeeperException, InterruptedException {
    boolean lockSuccess = false;
    String prevElementName = getPrevElementName();
    if (prevElementName != null) {
      // 有更小的節點,說明目前節點沒搶到鎖,删掉自己并退出
      getZooKeeper().delete(elementNodeFullName, 0);
    } else {
      lockSuccess = true;
    }
    return lockSuccess;
  }
}
 
3.5   基于子節點實作的阻塞鎖
ChildrenBlockingLock.java
public class ChildrenBlockingLock extends ChildrenNodeLock {
  /** 前一個節點被删除的信号 */
  static Integer mutex = Integer.valueOf(-1);
 
  /** 監控的節點被删除 */
  protected void processNodeDeleted(WatchedEvent event) {
    synchronized (mutex) {
      // 節點被删除,通知退出線程
      mutex.notify();
    }
  }
 
  /** 是否加鎖成功 */
  protected boolean isLockSuccess() {
    boolean lockSuccess;
    while (true) {
      String prevElementName = getPrevElementName();
      if (prevElementName == null) {
        lockSuccess = true;
        break;
      } else {
        // 有更小的節點,說明目前節點沒搶到鎖,注冊前一個節點的監聽
        getZooKeeper().exists(this.guidNodeName + "/" + prevElementName, true);
        synchronized (mutex) {
          mutex.wait();
          log.info("{} 被删除,看看是不是輪到自己了", prevElementName);
        }
      }
    }
    return lockSuccess;
  }
}
           

4 測試用例

4.1 測試代碼

LockClientThread.java 擷取分布式鎖和釋放鎖

public class LockClientThread extends Thread {
  /** 模拟擷取分布式鎖,成功後執行業務 */
  public void run() {
    boolean locked = zooKeeperLock.lock(guidNodeName, clientGuid);
    if (locked) {
      log.info("{} lock() success,拿到鎖了,假裝忙2秒", clientGuid);
      Thread.sleep(2000);
      boolean released = zooKeeperLock.release(guidNodeName, clientGuid);
      log.info("{} release() result : {}", clientGuid, released);
    } else {
      log.info("{} lock() fail", clientGuid);
    }
  }
}
           

模拟多個用戶端并發執行

public void testChildrenBlocklessMultiThread() throws IOException {
  String guidNodeName = "/multi-" + System.currentTimeMillis();
  int threadCount = 5;
 
  LockClientThread[] threads = new LockClientThread[threadCount];
  for (int i = 0; i < threadCount; i++) {
    ChildrenBlocklessLock nodeBlocklessLock = new ChildrenBlocklessLock(address);
    threads[i] = new LockClientThread(nodeBlocklessLock, guidNodeName, "client-" + (i + 1));
  }
  for (int i = 0; i < threadCount; i++) {
    threads[i].start();
  }
}
           

4.2 非阻塞鎖的測試結果

可以看到,隻有一個線程能搶到鎖并執行業務,其他線程都直接退出。

55:43.929 [INFO] LockClientThread.run(33) client-1 lock() ...
55:43.942 [INFO] LockClientThread.run(33) client-3 lock() ...
55:43.947 [INFO] LockClientThread.run(33) client-2 lock() ...
55:43.948 [INFO] LockClientThread.run(33) client-4 lock() ...
55:43.949 [INFO] LockClientThread.run(33) client-5 lock() ...
55:44.052 [INFO] LockClientThread.run(36) client-1 lock() success,拿到鎖了,假裝忙2秒
55:44.072 [INFO] LockClientThread.run(47) client-5 lock() fail
55:44.085 [INFO] LockClientThread.run(47) client-4 lock() fail
55:44.091 [INFO] LockClientThread.run(47) client-2 lock() fail
55:44.096 [INFO] LockClientThread.run(47) client-3 lock() fail
55:46.053 [INFO] LockClientThread.run(42) client-1 release() ...
55:46.057 [INFO] LockClientThread.run(44) client-1 release() result : true
           

4.3 阻塞鎖的測試結果

可以看到,搶到分布式鎖的線程執行業務,沒搶到鎖的線程會等到直到鎖被釋放重新擷取到鎖後再執行業務。

59:32.802 [INFO] LockClientThread.run(33) client-1 lock() ...
59:32.811 [INFO] LockClientThread.run(33) client-3 lock() ...
59:32.812 [INFO] LockClientThread.run(33) client-4 lock() ...
59:32.813 [INFO] LockClientThread.run(33) client-2 lock() ...
59:32.813 [INFO] LockClientThread.run(33) client-5 lock() ...
59:32.836 [INFO] LockClientThread.run(36) client-1 lock() success,拿到鎖了,假裝忙2秒
59:34.836 [INFO] LockClientThread.run(42) client-1 release() ...
59:34.844 [INFO] LockClientThread.run(44) client-1 release() result : true
59:34.846 [INFO] ChildrenBlockingLock.isLockSuccess(55) element0000000000 被删除,看看是不是輪到自己了
59:34.848 [INFO] LockClientThread.run(36) client-5 lock() success,拿到鎖了,假裝忙2秒
59:36.848 [INFO] LockClientThread.run(42) client-5 release() ...
59:36.852 [INFO] ChildrenBlockingLock.isLockSuccess(55) element0000000001 被删除,看看是不是輪到自己了
59:36.852 [INFO] LockClientThread.run(44) client-5 release() result : true
59:36.855 [INFO] LockClientThread.run(36) client-2 lock() success,拿到鎖了,假裝忙2秒
59:38.855 [INFO] LockClientThread.run(42) client-2 release() ...
59:38.869 [INFO] ChildrenBlockingLock.isLockSuccess(55) element0000000002 被删除,看看是不是輪到自己了
59:38.870 [INFO] LockClientThread.run(44) client-2 release() result : true
59:38.876 [INFO] LockClientThread.run(36) client-4 lock() success,拿到鎖了,假裝忙2秒
59:40.877 [INFO] LockClientThread.run(42) client-4 release() ...
59:40.881 [INFO] ChildrenBlockingLock.isLockSuccess(55) element0000000003 被删除,看看是不是輪到自己了
59:40.882 [INFO] LockClientThread.run(44) client-4 release() result : true
59:40.884 [INFO] LockClientThread.run(36) client-3 lock() success,拿到鎖了,假裝忙2秒
59:42.884 [INFO] LockClientThread.run(42) client-3 release() ...
59:42.887 [INFO] LockClientThread.run(44) client-3 release() result : true