前言
上文
【從入門到放棄-ZooKeeper】ZooKeeper實戰-分布式鎖-更新版中,我們通過利用ZooKeeper的臨時節點和Watcher特性,實作了一個分布式鎖。
本文我們結合實際場景,完成一個分布式競争選舉。
設計
這裡我們實作一個公平的選舉方式,即先參加選舉的優先被選為leader。
具體的實作思路 參考了ZooKeeper提供的官方示例:
zookeeper-recipes-election
- START:伺服器開始競選
- OFFER:建立臨時順序結點
- DETERMINE:開始決策,将臨時節點按末尾序号從小到大排序,如果目前節點的序号最小,則競選成功,否則,則Watch前一個節點,目前一個節點被删除時,再次進行決策
- ELECTED:目前節點是序号最小的節點,競選成功
- READY:目前節點不是序号最小的節點,競選不成功,Watch前一個節點,進入READY态
- FAILED:當出現異常情況時,為失敗狀态
- STOP:結束競選
LeaderElectionSupport
public class LeaderElectionSupport implements LeaderElection{
private static Logger logger = LoggerFactory.getLogger(LeaderElectionSupport.class);
//ZooKeeper用戶端,進行ZooKeeper操作
private ZooKeeper zooKeeper;
//根節點名稱
private String dir;
//節點字首
private String node;
//ZooKeeper鑒權資訊
private List<ACL> acls;
//要加鎖節點
private String fullPath;
//選舉狀态
private State state;
//監聽器
private Set<LeaderElectionListener> listeners;
//存目前節點的資訊
private volatile LeaderNode leaderNode;
//監察器
private Watcher watcher;
/**
* Constructor.
*
* @param zooKeeper the zoo keeper
* @param dir the dir
* @param node the node
* @param acls the acls
*/
public LeaderElectionSupport(ZooKeeper zooKeeper, String dir, String node, List<ACL> acls) {
this.zooKeeper = zooKeeper;
this.dir = dir;
this.node = node;
this.acls = acls;
this.fullPath = dir.concat("/").concat(this.node);
init();
state = State.STOP;
listeners = Collections.synchronizedSet(new HashSet<>());
}
/**
* 初始化根節點、檢查器等
* */
private void init() {
try {
watcher = new LeaderWatcher();
Stat stat = zooKeeper.exists(dir, false);
if (stat == null) {
zooKeeper.create(dir, null, acls, CreateMode.PERSISTENT);
}
} catch (Exception e) {
logger.error("[LeaderElectionSupport#init] error : " + e.toString(), e);
}
}
}
start
/**
* Start.
* 開始競選
*/
@Override
public void start() {
synchronized (this) {
state = State.START;
dispatchEvent(EventType.START);
offerElection();
determineElection();
}
}
offerElection
/**
* 建立臨時節點,參加競選,并将主機資訊儲存在node中
* */
private void offerElection() {
dispatchEvent(EventType.OFFER_START);
state = State.OFFER;
if (leaderNode == null) {
synchronized (this) {
try {
if (leaderNode == null) {
InetAddress ia = InetAddress.getLocalHost();
LeaderNode tmpNode = new LeaderNode();
tmpNode.setHostName(ia.getHostName());
String path = zooKeeper.create(fullPath, ConversionUtil.objectToBytes(ia.getHostName()), acls, CreateMode.EPHEMERAL_SEQUENTIAL);
tmpNode.setNodePath(path);
tmpNode.setId(NodeUtil.getNodeId(path));
leaderNode = tmpNode;
}
} catch (Exception e) {
becomeFailed(e);
}
}
}
dispatchEvent(EventType.OFFER_COMPLETE);
}
determineElection
/**
* 決定競選結果
* 1、競選節點序号最低的赢取選舉
* 2、未赢得選舉的節點,監聽上一個節點,直到上一個節點被删除,則嘗試重新競選
* */
private void determineElection() {
dispatchEvent(EventType.DETERMINE_START);
state = State.DETERMINE;
synchronized (this) {
TreeSet<String> nodePathSet = getNodePathSet();
if (nodePathSet.isEmpty()) {
becomeFailed(new Exception("no node"));
return;
}
String leaderPath = nodePathSet.first();
if (leaderNode.getNodePath().equalsIgnoreCase(leaderPath)) {
becomeLeader();
} else {
becomeReady(nodePathSet.headSet(leaderNode.getNodePath()).last());
}
}
dispatchEvent(EventType.DETERMINE_COMPLETE);
}
becomeLeader
/**
* 競選成功
* */
private void becomeLeader() {
dispatchEvent(EventType.ELECTED_START);
state = State.ELECTED;
dispatchEvent(EventType.ELECTED_COMPLETE);
}
becomeReady
/**
* 競選失敗進入就緒态
* */
private void becomeReady(String path) {
try {
Stat stat = zooKeeper.exists(path, watcher);
if (stat == null) {
determineElection();
} else {
dispatchEvent(EventType.READY_START);
state = State.READY;
dispatchEvent(EventType.READY_COMPLETE);
}
} catch (KeeperException e) {
becomeFailed(e);
} catch (InterruptedException e) {
becomeFailed(e);
}
}
becomeFailed
/**
* 當發生異常時,更新為FAILED狀态
* */
private void becomeFailed(Exception e) {
state = State.FAILED;
dispatchEvent(EventType.FAILED);
logger.error("[LeaderElectionSupport#becomeFailed] error : " + e.toString(), e);
}
getNodePathSet
/**
* 擷取參加競選的節點資訊
* */
private TreeSet<String> getNodePathSet() {
TreeSet<String> nodeSet = new TreeSet<>();
try {
List<String> nodes = zooKeeper.getChildren(dir, false);
for (String node : nodes) {
nodeSet.add(dir.concat("/").concat(node));
}
} catch (KeeperException e) {
becomeFailed(e);
} catch (InterruptedException e) {
becomeFailed(e);
}
return nodeSet;
}
stop
/**
* Stop.
* 停止競選
*/
@Override
public void stop() {
synchronized (this) {
dispatchEvent(EventType.STOP_START);
deleteNode();
state = State.STOP;
dispatchEvent(EventType.STOP_COMPLETE);
}
}
deleteNode
/**
* 停止時,删除節點,退出競選
* */
private void deleteNode() {
try {
if (leaderNode != null) {
synchronized (this) {
zooKeeper.delete(leaderNode.getNodePath(), -1);
leaderNode = null;
}
}
} catch (InterruptedException e) {
becomeFailed(e);
} catch (KeeperException e) {
becomeFailed(e);
}
}
getLeaderHostName
/**
* Gets get leader host name.
*
* @return the get leader host name
*/
@Override
public String getLeaderHostName() {
synchronized (this) {
TreeSet<String> nodePathSet = getNodePathSet();
if (!nodePathSet.isEmpty()) {
try {
String leaderPath = nodePathSet.first();
return (String) ConversionUtil.bytesToObject(zooKeeper.getData(leaderPath, false, null));
} catch (KeeperException e) {
logger.error("[LeaderWatcher#getLeaderHostName] error : " + e.toString(), e);
} catch (InterruptedException e) {
logger.error("[LeaderWatcher#getLeaderHostName] error : " + e.toString(), e);
} catch (IOException e) {
logger.error("[LeaderWatcher#getLeaderHostName] error : " + e.toString(), e);
} catch (ClassNotFoundException e) {
logger.error("[LeaderWatcher#getLeaderHostName] error : " + e.toString(), e);
}
}
return null;
}
}
getLeaderNodePath
/**
* Gets get leader node path.
*
* @return the get leader node path
*/
@Override
public String getLeaderNodePath() {
synchronized (this) {
TreeSet<String> nodePathSet = getNodePathSet();
return nodePathSet.isEmpty() ? null : nodePathSet.first();
}
}
LeaderWatcher
/**
* 内部watcher類,當競選失敗時,watch前一個節點,目前一個節點别移除時,再次發起決策
* */
private class LeaderWatcher implements Watcher {
/**
* Process.
*
* @param watchedEvent the watched event
*/
@Override
public void process(WatchedEvent watchedEvent) {
try {
if (Event.EventType.NodeDeleted.equals(watchedEvent.getType()) && !State.STOP.equals(state)) {
determineElection();
}
} catch (Exception e) {
logger.error("[LeaderWatcher#process] error : " + e.toString(), e);
}
}
}
總結
以上就是我們利用ZooKeeper的臨時節點和Watcher特性實作的公平模式分布式競選。
可以進行簡單的選主操作,适用于如執行單機定時任務、心跳檢測等場景。實際上是實作的Master-Slave模型。
源代碼可見:
aloofJr而對高可用要求較多的複雜選舉場景,如分布式存儲、同步等,則需要考慮叢集一緻性、腦裂等各種情況,則需要實作如Paxos、raft、Zab等一緻性算法協定。如ZooKeeper叢集的選舉模式就是使用的Zab算法。
我們後續會進行深入的探讨。
更多文章
見我的部落格:
https://nc2era.comwritten by
AloofJr,轉載請注明出處