天天看點

【從入門到放棄-ZooKeeper】ZooKeeper實戰-分布式競選

前言

上文

【從入門到放棄-ZooKeeper】ZooKeeper實戰-分布式鎖-更新版

中,我們通過利用ZooKeeper的臨時節點和Watcher特性,實作了一個分布式鎖。

本文我們結合實際場景,完成一個分布式競争選舉。

設計

這裡我們實作一個公平的選舉方式,即先參加選舉的優先被選為leader。

具體的實作思路 參考了ZooKeeper提供的官方示例:

zookeeper-recipes-election
【從入門到放棄-ZooKeeper】ZooKeeper實戰-分布式競選
  • START:伺服器開始競選
  • OFFER:建立臨時順序結點
  • DETERMINE:開始決策,将臨時節點按末尾序号從小到大排序,如果目前節點的序号最小,則競選成功,否則,則Watch前一個節點,目前一個節點被删除時,再次進行決策
  • ELECTED:目前節點是序号最小的節點,競選成功
  • READY:目前節點不是序号最小的節點,競選不成功,Watch前一個節點,進入READY态
  • FAILED:當出現異常情況時,為失敗狀态
  • STOP:結束競選

LeaderElectionSupport

public class LeaderElectionSupport implements LeaderElection{
    private static Logger logger = LoggerFactory.getLogger(LeaderElectionSupport.class);

    //ZooKeeper用戶端,進行ZooKeeper操作
    private ZooKeeper zooKeeper;

    //根節點名稱
    private String dir;

    //節點字首
    private String node;

    //ZooKeeper鑒權資訊
    private List<ACL> acls;

    //要加鎖節點
    private String fullPath;

    //選舉狀态
    private State state;

    //監聽器
    private Set<LeaderElectionListener> listeners;

    //存目前節點的資訊
    private volatile LeaderNode leaderNode;

    //監察器
    private Watcher watcher;


    /**
     * Constructor.
     *
     * @param zooKeeper the zoo keeper
     * @param dir       the dir
     * @param node      the node
     * @param acls      the acls
     */
    public LeaderElectionSupport(ZooKeeper zooKeeper, String dir, String node, List<ACL> acls) {
        this.zooKeeper = zooKeeper;
        this.dir = dir;
        this.node = node;
        this.acls = acls;
        this.fullPath = dir.concat("/").concat(this.node);
        init();

        state = State.STOP;
        listeners = Collections.synchronizedSet(new HashSet<>());
    }

    /**
     * 初始化根節點、檢查器等
     * */
    private void init() {
        try {
            watcher = new LeaderWatcher();
            Stat stat = zooKeeper.exists(dir, false);
            if (stat == null) {
                zooKeeper.create(dir, null, acls, CreateMode.PERSISTENT);
            }
        } catch (Exception e) {
            logger.error("[LeaderElectionSupport#init] error : " + e.toString(), e);
        }
    }
}           

start

/**
 * Start.
 * 開始競選
 */
@Override
public void start() {
    synchronized (this) {
        state = State.START;
        dispatchEvent(EventType.START);
        offerElection();
        determineElection();
    }

}           

offerElection

/**
 * 建立臨時節點,參加競選,并将主機資訊儲存在node中
 * */
private void offerElection() {
    dispatchEvent(EventType.OFFER_START);
    state = State.OFFER;
    if (leaderNode == null) {
        synchronized (this) {
            try {
                if (leaderNode == null) {
                    InetAddress ia = InetAddress.getLocalHost();
                    LeaderNode tmpNode = new LeaderNode();
                    tmpNode.setHostName(ia.getHostName());
                    String path = zooKeeper.create(fullPath, ConversionUtil.objectToBytes(ia.getHostName()), acls, CreateMode.EPHEMERAL_SEQUENTIAL);

                    tmpNode.setNodePath(path);
                    tmpNode.setId(NodeUtil.getNodeId(path));

                    leaderNode = tmpNode;
                }
            } catch (Exception e) {
                becomeFailed(e);
            }
        }
    }
    dispatchEvent(EventType.OFFER_COMPLETE);
}           

determineElection

/**
 * 決定競選結果
 * 1、競選節點序号最低的赢取選舉
 * 2、未赢得選舉的節點,監聽上一個節點,直到上一個節點被删除,則嘗試重新競選
 * */
private void determineElection() {
    dispatchEvent(EventType.DETERMINE_START);
    state = State.DETERMINE;
    synchronized (this) {
        TreeSet<String> nodePathSet = getNodePathSet();
        if (nodePathSet.isEmpty()) {
            becomeFailed(new Exception("no node"));
            return;
        }
        String leaderPath = nodePathSet.first();
        if (leaderNode.getNodePath().equalsIgnoreCase(leaderPath)) {
            becomeLeader();
        } else {
            becomeReady(nodePathSet.headSet(leaderNode.getNodePath()).last());
        }
    }
    dispatchEvent(EventType.DETERMINE_COMPLETE);
}           

becomeLeader

/**
 * 競選成功
 * */
private void becomeLeader() {
    dispatchEvent(EventType.ELECTED_START);
    state = State.ELECTED;
    dispatchEvent(EventType.ELECTED_COMPLETE);
}           

becomeReady

/**
 * 競選失敗進入就緒态
 * */
private void becomeReady(String path) {
    try {
        Stat stat = zooKeeper.exists(path, watcher);

        if (stat == null) {
            determineElection();
        } else {
            dispatchEvent(EventType.READY_START);
            state = State.READY;
            dispatchEvent(EventType.READY_COMPLETE);
        }
    } catch (KeeperException e) {
        becomeFailed(e);
    } catch (InterruptedException e) {
        becomeFailed(e);
    }
}           

becomeFailed

/**
 * 當發生異常時,更新為FAILED狀态
 * */
private void becomeFailed(Exception e) {
    state = State.FAILED;
    dispatchEvent(EventType.FAILED);
    logger.error("[LeaderElectionSupport#becomeFailed] error : " + e.toString(), e);
}           

getNodePathSet

/**
 * 擷取參加競選的節點資訊
 * */
private TreeSet<String> getNodePathSet() {
    TreeSet<String> nodeSet = new TreeSet<>();
    try {
        List<String> nodes = zooKeeper.getChildren(dir, false);

        for (String node : nodes) {
            nodeSet.add(dir.concat("/").concat(node));
        }

    } catch (KeeperException e) {
        becomeFailed(e);
    } catch (InterruptedException e) {
        becomeFailed(e);
    }

    return nodeSet;
}           

stop

/**
 * Stop.
 * 停止競選
 */
@Override
public void stop() {
    synchronized (this) {
        dispatchEvent(EventType.STOP_START);
        deleteNode();
        state = State.STOP;
        dispatchEvent(EventType.STOP_COMPLETE);
    }
}           

deleteNode

/**
 * 停止時,删除節點,退出競選
 * */
private void deleteNode() {
    try {
        if (leaderNode != null) {
            synchronized (this) {
                zooKeeper.delete(leaderNode.getNodePath(), -1);
                leaderNode = null;
            }
        }
    } catch (InterruptedException e) {
        becomeFailed(e);
    } catch (KeeperException e) {
        becomeFailed(e);
    }
}           

getLeaderHostName

/**
 * Gets get leader host name.
 *
 * @return the get leader host name
 */
@Override
public String getLeaderHostName() {
    synchronized (this) {
        TreeSet<String> nodePathSet = getNodePathSet();

        if (!nodePathSet.isEmpty()) {
            try {
                String leaderPath = nodePathSet.first();
                return (String) ConversionUtil.bytesToObject(zooKeeper.getData(leaderPath, false, null));
            } catch (KeeperException e) {
                logger.error("[LeaderWatcher#getLeaderHostName] error : " + e.toString(), e);
            } catch (InterruptedException e) {
                logger.error("[LeaderWatcher#getLeaderHostName] error : " + e.toString(), e);
            } catch (IOException e) {
                logger.error("[LeaderWatcher#getLeaderHostName] error : " + e.toString(), e);
            } catch (ClassNotFoundException e) {
                logger.error("[LeaderWatcher#getLeaderHostName] error : " + e.toString(), e);
            }
        }

        return null;
    }
}           

getLeaderNodePath

/**
 * Gets get leader node path.
 *
 * @return the get leader node path
 */
@Override
public String getLeaderNodePath() {
    synchronized (this) {
        TreeSet<String> nodePathSet = getNodePathSet();

        return nodePathSet.isEmpty() ? null : nodePathSet.first();
    }

}           

LeaderWatcher

/**
 * 内部watcher類,當競選失敗時,watch前一個節點,目前一個節點别移除時,再次發起決策
 * */
private class LeaderWatcher implements Watcher {
    /**
     * Process.
     *
     * @param watchedEvent the watched event
     */
    @Override
    public void process(WatchedEvent watchedEvent) {
        try {
            if (Event.EventType.NodeDeleted.equals(watchedEvent.getType()) && !State.STOP.equals(state)) {
                determineElection();
            }
        } catch (Exception e) {
            logger.error("[LeaderWatcher#process] error : " + e.toString(), e);
        }

    }
}           

總結

以上就是我們利用ZooKeeper的臨時節點和Watcher特性實作的公平模式分布式競選。

可以進行簡單的選主操作,适用于如執行單機定時任務、心跳檢測等場景。實際上是實作的Master-Slave模型。

源代碼可見:

aloofJr

而對高可用要求較多的複雜選舉場景,如分布式存儲、同步等,則需要考慮叢集一緻性、腦裂等各種情況,則需要實作如Paxos、raft、Zab等一緻性算法協定。如ZooKeeper叢集的選舉模式就是使用的Zab算法。

我們後續會進行深入的探讨。

更多文章

見我的部落格:

https://nc2era.com

written by

AloofJr

,轉載請注明出處

繼續閱讀