利用zookeeper實作分布式leader節點選舉
依賴原理
- 在ZK中添加基本節點,路徑程式定義,節點類型為持久節點(PERSISTENT)。
- 對需要競選leader的每個程序,在ZK中分别添加基本節點的子節點,類型為臨時自編号節點(EPHEMERAL_SEQUENTIAL),并儲存建立傳回的實際節點路徑。
- 通過delete方式删除本程序建立的子節點,可以作為退出leader狀态的方式。
- 基本節點的子節點類型為臨時自編号節點(EPHEMERAL_SEQUENTIAL),當程序與ZK連接配接中斷後,ZK會自動将該節點删除,確定了斷連之後其他程序對leader的選舉。
- 由于ZK自編号産生的路徑是遞增的,是以可以通過判斷基本節點的子節點中最小路徑數字編号的節點是否是本程序建立的節點來判斷是否獲得leader地位。
原理圖示
利用zk實作的分布式leader節點選舉實作原理如下:
若幹程序分别嘗試競選leader,情況如下:
- (1)8個程序分别在ZK基本節點下建立臨時自編号節點,擷取建立成功後的實際路徑
- (2)在基本節點子節點清單中,判斷本程序建立節點編号是否為最小
- (3)最小編号程序獲得leader地位

leader程式異常退出或者伺服器異常導緻leader程序無法執行leader功能:
- (1)程序将ZK中對應的臨時節點删除,此時基本節點下路徑最小的子節點将獲得leader地位
- (2)程序由于網絡或其他原因與ZK斷開了連接配接,ZK自動将其對應的臨時節點删除
- (3)新出現的程序加入leader競選,在ZK下建立臨時節點,排隊等待
方案一 :父節點監聽方式
實作原理
程式流程圖如下:
實作代碼
package xuyihao.zktest.server.zk.leader;
import org.apache.zookeeper.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
/**
* 基于zk的分布式leader節點選舉
* <pre>
* 方案一:父節點監聽方式
*
* 實作思路:監聽父節點狀态
* 1.在父節點(持久化)下建立臨時節點,實際建立的節點路徑會根據數量進行自增(ZK自編号方式建立節點)。
* 2.建立節點成功後,擷取父節點下的子節點清單,判斷本線程的路徑字尾編号是否是所有子節點中最小的,若是則成為leader,反之監聽父節點變動狀态(通過getChildren()方法注冊watcher)
* 3.當父節點狀态變動(主要是子節點清單變動)後watcher會接收到通知,這時判斷父節點下的子節點的排序狀态,若滿足本線程的路徑字尾編号最小則成為leader,反之繼續注冊watcher監聽父節點狀态
* </pre>
* <p>
* Created by xuyh at 2017/11/24 9:19.
*/
public class ZKLeader {
private static ZKLeader zkLeader;
private Logger logger = LoggerFactory.getLogger(ZKLeader.class);
private final static String BASE_NODE_PATH = "/ZKLeader_Leader";
private final static String NODE_PATH = "host_process_no_";
private String finalNodePath;
//是否是主節點标志位
private boolean leader = false;
private String host = "127.0.0.1";
private String port = "2181";
private ZooKeeper zooKeeper;
private FatherWatcher fatherWatcher;
//是否連接配接成功标志位
private boolean connected = false;
public static ZKLeader create(String host, String port) {
ZKLeader zkLeader = new ZKLeader(host, port);
zkLeader.connectZookeeper();
return zkLeader;
}
public boolean leader() {
return leader;
}
public void close() {
disconnectZooKeeper();
}
private ZKLeader(String host, String port) {
this.host = host;
this.port = port;
this.fatherWatcher = new FatherWatcher(this);
}
private boolean connectZookeeper() {
try {
zooKeeper = new ZooKeeper(host + ":" + port, , event -> {
if (event.getState() == Watcher.Event.KeeperState.AuthFailed) {
leader = false;
} else if (event.getState() == Watcher.Event.KeeperState.Disconnected) {
leader = false;
} else if (event.getState() == Watcher.Event.KeeperState.Expired) {
leader = false;
} else {
if (event.getType() == Watcher.Event.EventType.None) {//說明連接配接成功了
connected = true;
}
}
});
int i = ;
while (!connected) {//等待異步連接配接成功,超過時間30s則退出等待
if (i == )
break;
Thread.sleep();
i++;
}
if (connected) {
if (zooKeeper.exists(BASE_NODE_PATH, false) == null) {//建立父節點
zooKeeper.create(BASE_NODE_PATH, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
//建立子節點
finalNodePath = zooKeeper.create(BASE_NODE_PATH + "/" + NODE_PATH, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
//檢查一次是否是主節點
checkLeader();
} else {
logger.warn("Connect zookeeper failed. Time consumes 30 s");
return false;
}
} catch (Exception e) {
logger.warn(e.getMessage(), e);
return false;
}
return true;
}
private boolean disconnectZooKeeper() {
if (zooKeeper == null)
return false;
try {
connected = false;
leader = false;
zooKeeper.close();
} catch (Exception e) {
logger.warn(String.format("ZK disconnect failed. [%s]", e.getMessage()), e);
}
return true;
}
private void checkLeader() {
if (!connected)
return;
try {
//擷取子節點清單同時再次注冊監聽
List<String> childrenList = zooKeeper.getChildren(BASE_NODE_PATH, fatherWatcher);
if (judgePathNumMin(childrenList)) {
leader = true;
}
} catch (Exception e) {
logger.warn(e.getMessage(), e);
}
}
private boolean judgePathNumMin(List<String> paths) {
if (paths.isEmpty())
return true;
if (paths.size() >= ) {
//對無序狀态的path清單按照編号升序排序
paths.sort((str1, str2) -> {
int num1;
int num2;
String string1 = str1.substring(NODE_PATH.length(), str1.length());
String string2 = str2.substring(NODE_PATH.length(), str2.length());
num1 = Integer.parseInt(string1);
num2 = Integer.parseInt(string2);
if (num1 > num2) {
return ;
} else if (num1 < num2) {
return -;
} else {
return ;
}
});
}
String minId = paths.get();
return finalNodePath.equals(BASE_NODE_PATH + "/" + minId);
}
private class FatherWatcher implements Watcher {
private ZKLeader context;
FatherWatcher(ZKLeader context) {
this.context = context;
}
@Override
public void process(WatchedEvent event) {
if (event.getType() == Event.EventType.NodeChildrenChanged) {//子節點有變化
context.checkLeader();
}
}
}
}
測試
測試程式
private void zkLeaderOneTestWithMultiThread() throws Exception {
List<LeaderOneThread> leaderOneThreads = new ArrayList<>();
for (int i = ; i < ; i++) {
leaderOneThreads.add(new LeaderOneThread(ZKLeader.create("127.0.0.1", "2181"), i));
}
leaderOneThreads.forEach(LeaderOneThread::start);
//線程0斷連
Thread.sleep();
leaderOneThreads.get().getZkLeader().close();
Thread.sleep();
System.out.println(String.format("線程: [%s] 斷開連接配接", ));
//線程1斷連
Thread.sleep();
leaderOneThreads.get().getZkLeader().close();
System.out.println(String.format("線程: [%s] 斷開連接配接", ));
//線程3斷連
Thread.sleep();
leaderOneThreads.get().getZkLeader().close();
System.out.println(String.format("線程: [%s] 斷開連接配接", ));
//線程4斷連
Thread.sleep();
leaderOneThreads.get().getZkLeader().close();
System.out.println(String.format("線程: [%s] 斷開連接配接", ));
//線程2斷連
Thread.sleep();
leaderOneThreads.get().getZkLeader().close();
System.out.println(String.format("線程: [%s] 斷開連接配接", ));
Thread.sleep();
}
private class LeaderOneThread extends Thread {
private ZKLeader zkLeader;
private int threadNum;
public ZKLeader getZkLeader() {
return zkLeader;
}
LeaderOneThread(ZKLeader zkLeader, int threadNum) {
this.zkLeader = zkLeader;
this.threadNum = threadNum;
}
@Override
public void run() {
while (true) {
try {
Thread.sleep();
} catch (Exception e) {
e.printStackTrace();
}
Date dt = new Date();
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String currentTime = sdf.format(dt);
if (zkLeader.leader()) {
System.out.println(String.format("[%s] 線程: [%s] 是主節點", currentTime, threadNum));
}
}
}
}
結果:
[2017-11-30 17:05:02] 線程: [0] 是主節點
[2017-11-30 17:05:07] 線程: [0] 是主節點
[2017-11-30 17:05:12] 線程: [0] 是主節點
線程: [0] 斷開連接配接
[2017-11-30 17:05:22] 線程: [1] 是主節點
[2017-11-30 17:05:27] 線程: [1] 是主節點
[2017-11-30 17:05:32] 線程: [1] 是主節點
[2017-11-30 17:05:37] 線程: [1] 是主節點
線程: [1] 斷開連接配接
[2017-11-30 17:05:42] 線程: [2] 是主節點
[2017-11-30 17:05:47] 線程: [2] 是主節點
[2017-11-30 17:05:52] 線程: [2] 是主節點
[2017-11-30 17:05:57] 線程: [2] 是主節點
線程: [3] 斷開連接配接
[2017-11-30 17:06:02] 線程: [2] 是主節點
[2017-11-30 17:06:07] 線程: [2] 是主節點
[2017-11-30 17:06:12] 線程: [2] 是主節點
[2017-11-30 17:06:17] 線程: [2] 是主節點
線程: [4] 斷開連接配接
[2017-11-30 17:06:22] 線程: [2] 是主節點
[2017-11-30 17:06:27] 線程: [2] 是主節點
[2017-11-30 17:06:32] 線程: [2] 是主節點
[2017-11-30 17:06:37] 線程: [2] 是主節點
線程: [2] 斷開連接配接
[2017-11-30 17:06:42] 線程: [5] 是主節點
[2017-11-30 17:06:47] 線程: [5] 是主節點
[2017-11-30 17:06:52] 線程: [5] 是主節點
[2017-11-30 17:06:57] 線程: [5] 是主節點
[2017-11-30 17:07:02] 線程: [5] 是主節點
[2017-11-30 17:07:07] 線程: [5] 是主節點
[2017-11-30 17:07:12] 線程: [5] 是主節點
[2017-11-30 17:07:17] 線程: [5] 是主節點
[2017-11-30 17:07:22] 線程: [5] 是主節點
[2017-11-30 17:07:27] 線程: [5] 是主節點
[2017-11-30 17:07:32] 線程: [5] 是主節點
[2017-11-30 17:07:37] 線程: [5] 是主節點
方案一優劣
優點
- 實作對父節點變動狀态(主要是子節點清單變化)的監聽
- 當子節點清單出現變化後,ZK通知監聽的各個程序,各個程序查詢子節點狀态
- 對父節點進行監聽,實作起來相對簡單
劣勢
- 每個程序都監聽父節點狀态,即父節點出現變動(主要是子節點清單變化)後,ZK伺服器需要通知到所有注冊監聽的程序,網絡消耗和資源浪費比較大
方案三 :子節點監聽方式
實作原理
程式流程圖如下:
實作代碼
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
/**
* Created by xuyh at 2017/11/30 14:40.
* <p>
* **最優方案**
* <pre>
* 方案三:子節點監聽方式
*
* 實作思路:監聽子節點狀态
* 1.在父節點(持久化)下建立臨時節點,實際建立的節點路徑會根據數量進行自增(ZK自編号方式建立節點)。
* 2.建立節點成功後,首先擷取父節點下的子節點清單,判斷本線程的路徑字尾編号是否是所有子節點中最小的,若是則成為leader,反之監聽本節點前一個節點(路徑排序為本節點路徑數字減一的節點)變動狀态(通過getData()方法注冊watcher)
* 3.當監聽對象狀态變動(節點删除狀态)後watcher會接收到通知,這時再次判斷父節點下的子節點的排序狀态,若滿足本線程的路徑字尾編号最小則成為leader,反之繼續注冊watcher監聽前一個節點狀态
*/
public class ZKLeaderTwo {
private static ZKLeaderTwo zkLeaderTwo;
private Logger logger = LoggerFactory.getLogger(ZKLeader.class);
private final static String BASE_NODE_PATH = "/ZKLeader_Leader";
private final static String NODE_PATH = "host_process_no_";
private String finalNodePath;
//是否是主節點标志位
private boolean leader = false;
private String host = "127.0.0.1";
private String port = "2181";
private ZooKeeper zooKeeper;
private PreviousNodeWatcher previousNodeWatcher;
//是否連接配接成功标志位
private boolean connected = false;
public static ZKLeaderTwo create(String host, String port) {
ZKLeaderTwo zkLeaderTwo = new ZKLeaderTwo(host, port);
zkLeaderTwo.connectZookeeper();
return zkLeaderTwo;
}
public boolean leader() {
return leader;
}
public void close() {
disconnectZooKeeper();
}
private ZKLeaderTwo(String host, String port) {
this.host = host;
this.port = port;
this.previousNodeWatcher = new PreviousNodeWatcher(this);
}
private boolean connectZookeeper() {
try {
zooKeeper = new ZooKeeper(host + ":" + port, , event -> {
if (event.getState() == Watcher.Event.KeeperState.AuthFailed) {
leader = false;
} else if (event.getState() == Watcher.Event.KeeperState.Disconnected) {
leader = false;
} else if (event.getState() == Watcher.Event.KeeperState.Expired) {
leader = false;
} else {
if (event.getType() == Watcher.Event.EventType.None) {//說明連接配接成功了
connected = true;
}
}
});
int i = ;
while (!connected) {//等待異步連接配接成功,超過時間30s則退出等待
if (i == )
break;
Thread.sleep();
i++;
}
if (connected) {
if (zooKeeper.exists(BASE_NODE_PATH, false) == null) {//建立父節點
zooKeeper.create(BASE_NODE_PATH, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
//建立子節點
finalNodePath = zooKeeper.create(BASE_NODE_PATH + "/" + NODE_PATH, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
//檢查一次是否是主節點
checkLeader();
} else {
logger.warn("Connect zookeeper failed. Time consumes 30 s");
return false;
}
} catch (Exception e) {
logger.warn(e.getMessage(), e);
return false;
}
return true;
}
private boolean disconnectZooKeeper() {
if (zooKeeper == null)
return false;
try {
zooKeeper.close();
connected = false;
leader = false;
} catch (Exception e) {
logger.warn(String.format("ZK disconnect failed. [%s]", e.getMessage()), e);
}
return true;
}
private void checkLeader() {
if (!connected)
return;
try {
//擷取子節點清單,若沒有成為leader,注冊監聽,監聽對象應當是比本節點路徑編号小一(或者排在前面一位)的節點
List<String> childrenList = zooKeeper.getChildren(BASE_NODE_PATH, false);
if (judgePathNumMin(childrenList)) {
leader = true;//成為leader
} else {
watchPreviousNode(childrenList);
}
} catch (Exception e) {
logger.warn(e.getMessage(), e);
}
}
private boolean judgePathNumMin(List<String> paths) {
if (paths.isEmpty())
return true;
if (paths.size() >= ) {
//對無序狀态的path清單按照編号升序排序
paths.sort((str1, str2) -> {
int num1;
int num2;
String string1 = str1.substring(NODE_PATH.length(), str1.length());
String string2 = str2.substring(NODE_PATH.length(), str2.length());
num1 = Integer.parseInt(string1);
num2 = Integer.parseInt(string2);
if (num1 > num2) {
return ;
} else if (num1 < num2) {
return -;
} else {
return ;
}
});
}
String minId = paths.get();
return finalNodePath.equals(BASE_NODE_PATH + "/" + minId);
}
private void watchPreviousNode(List<String> paths) {
if (paths.isEmpty() || paths.size() == ) {
return;
}
int currentNodeIndex = paths.indexOf(finalNodePath.substring((BASE_NODE_PATH + "/").length(), finalNodePath.length()));
String previousNodePath = BASE_NODE_PATH + "/" + paths.get(currentNodeIndex - );
//通過getData方法再次注冊watcher
try {
zooKeeper.getData(previousNodePath, previousNodeWatcher, new Stat());
} catch (Exception e) {
logger.warn(String.format("Previous node watcher register failed! message: [%s]", e.getMessage()), e);
}
}
private class PreviousNodeWatcher implements Watcher {
private ZKLeaderTwo context;
PreviousNodeWatcher(ZKLeaderTwo context) {
this.context = context;
}
@Override
public void process(WatchedEvent event) {
//節點被删除了,說明這個節點放棄了leader
if (event.getType() == Event.EventType.NodeDeleted) {
context.checkLeader();
}
}
}
}
測試
測試程式
private void zkLeaderTwoTestWithMultiThread() throws Exception {
List<LeaderTwoThread> leaderTwoThreads = new ArrayList<>();
for (int i = ; i < ; i++) {
leaderTwoThreads.add(new LeaderTwoThread(ZKLeaderTwo.create("127.0.0.1", "2181"), i));
}
leaderTwoThreads.forEach(LeaderTwoThread::start);
//線程0斷連
Thread.sleep();
leaderTwoThreads.get().getZkLeaderTwo().close();
System.out.println(String.format("線程: [%s] 斷開連接配接", ));
//線程1斷連
Thread.sleep();
leaderTwoThreads.get().getZkLeaderTwo().close();
System.out.println(String.format("線程: [%s] 斷開連接配接", ));
//線程3斷連
Thread.sleep();
leaderTwoThreads.get().getZkLeaderTwo().close();
System.out.println(String.format("線程: [%s] 斷開連接配接", ));
//線程4斷連
Thread.sleep();
leaderTwoThreads.get().getZkLeaderTwo().close();
System.out.println(String.format("線程: [%s] 斷開連接配接", ));
//線程2斷連
Thread.sleep();
leaderTwoThreads.get().getZkLeaderTwo().close();
System.out.println(String.format("線程: [%s] 斷開連接配接", ));
Thread.sleep();
}
private class LeaderTwoThread extends Thread {
private ZKLeaderTwo zkLeaderTwo;
private int threadNum;
public ZKLeaderTwo getZkLeaderTwo() {
return zkLeaderTwo;
}
LeaderTwoThread(ZKLeaderTwo zkLeaderTwo, int threadNum) {
this.zkLeaderTwo = zkLeaderTwo;
this.threadNum = threadNum;
}
@Override
public void run() {
while (true) {
try {
Thread.sleep();
} catch (Exception e) {
e.printStackTrace();
}
Date dt = new Date();
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String currentTime = sdf.format(dt);
if (zkLeaderTwo.leader()) {
System.out.println(String.format("[%s] 線程: [%s] 是主節點", currentTime, threadNum));
}
}
}
}
結果:
[2017-11-30 16:47:41] 線程: [0] 是主節點
[2017-11-30 16:47:46] 線程: [0] 是主節點
[2017-11-30 16:47:51] 線程: [0] 是主節點
[2017-11-30 16:47:56] 線程: [0] 是主節點
線程: [0] 斷開連接配接
[2017-11-30 16:48:01] 線程: [1] 是主節點
[2017-11-30 16:48:06] 線程: [1] 是主節點
[2017-11-30 16:48:11] 線程: [1] 是主節點
[2017-11-30 16:48:16] 線程: [1] 是主節點
線程: [1] 斷開連接配接
[2017-11-30 16:48:21] 線程: [2] 是主節點
[2017-11-30 16:48:26] 線程: [2] 是主節點
[2017-11-30 16:48:31] 線程: [2] 是主節點
[2017-11-30 16:48:36] 線程: [2] 是主節點
線程: [3] 斷開連接配接
[2017-11-30 16:48:41] 線程: [2] 是主節點
[2017-11-30 16:48:46] 線程: [2] 是主節點
[2017-11-30 16:48:51] 線程: [2] 是主節點
[2017-11-30 16:48:56] 線程: [2] 是主節點
線程: [4] 斷開連接配接
[2017-11-30 16:49:01] 線程: [2] 是主節點
[2017-11-30 16:49:06] 線程: [2] 是主節點
[2017-11-30 16:49:11] 線程: [2] 是主節點
[2017-11-30 16:49:16] 線程: [2] 是主節點
線程: [2] 斷開連接配接
[2017-11-30 16:49:21] 線程: [5] 是主節點
[2017-11-30 16:49:26] 線程: [5] 是主節點
[2017-11-30 16:49:31] 線程: [5] 是主節點
[2017-11-30 16:49:36] 線程: [5] 是主節點
[2017-11-30 16:49:41] 線程: [5] 是主節點
[2017-11-30 16:49:46] 線程: [5] 是主節點
[2017-11-30 16:49:51] 線程: [5] 是主節點
[2017-11-30 16:49:56] 線程: [5] 是主節點
[2017-11-30 16:50:01] 線程: [5] 是主節點
[2017-11-30 16:50:06] 線程: [5] 是主節點
[2017-11-30 16:50:11] 線程: [5] 是主節點
[2017-11-30 16:50:16] 線程: [5] 是主節點
方案二優劣
優點
- 實作對子節點變動狀态(排序在本程序對應節點之前的一個節點)的監聽
- 被監聽子節點變動(删除)之後,ZK通知本程序執行相應操作,判斷是否成為leader
- 相對于父節點監聽方式,子節點監聽方式在每一次鎖釋放(或者節點變動)時,ZK僅通知到一個程序的watcher,節省了大量的網絡消耗和資源占用
劣勢
- 實作方式與程式邏輯較父節點監聽來說比較繁瑣
總結比較
-
程式複雜度:
父節點監聽方式 < 子節點監聽方式
-
網絡資源消耗:
父節點監聽方式 >> 子節點監聽方式
-
程式可靠性
父節點監聽方式 < 子節點監聽方式
輕量Zookeeper用戶端實作
github位址:
https://github.com/johnsonmoon/zk-client