天天看點

zookeeper源碼深入解析(zk叢集部署與選舉機制)

作者:記錄生活的小熊

引言

最近公司使用dolphinschedualer做流程排程,發現其重度依賴zookeeper,這次引起了本人對zookeeper的濃厚興趣,故拉了zookeeper源碼來詳細了解一下他的内部實作。

zookeeper源碼本地部署

本次源碼解析使用的是zk-3.5.8,對應git位址為:https://github.com/apache/zookeeper/tree/release-3.5.8。為友善解析源碼,先把zk在本地以叢集的方式啟動起來,友善看某些代碼的具體實作類。

本次本地叢集部署為“一主兩從”,部署步驟如下:

  • 把源碼從git上拉下來,直接更新maven依賴,腳本如下:mvn clean install ‐DskipTests。注意:如果報缺少git.properties檔案,直接在對應目錄建立檔案即可,内容如下:
git.commit.id=92138F25
build.time=2022-03-21           
  • 在目前項目中建立data檔案夾,在data下建立zk1,zk2,zk3三個檔案夾,并在檔案夾下建立myid檔案,myid内容為1、2、3。将conf檔案夾下的zoo_sample.cfg複制三份,修改dataDir與clientPort配置,增加server.1,server.2,server.3,(這裡的1、2、3和data檔案夾下的myid對應,)詳細配置如下:
dataDir=D:/work/workspace/zookeeper/zookeeper-release-3.5.8/data/zk1
clientPort=2181
server.1=127.0.0.1:2888:3888
server.2=127.0.0.1:2889:3889
server.3=127.0.0.1:2890:3890           
  • zookeeper-server項目裡pom.xml檔案裡依賴的包(除了jline)的scope為provided這一行全部注釋掉,不然啟動的時候會報錯
  • 将conf檔案夾裡的log4j.properties檔案複制一份到zookeeper-server項目的 \target\classes,保證項目日志列印正常
  • zk啟動類為:org.apache.zookeeper.server.quorum.QuorumPeerMain,在idea建立三個application,項目參數配置為zk配置檔案路徑,效果如下:
zookeeper源碼深入解析(zk叢集部署與選舉機制)

zk叢集啟動類配置

zookeeper選舉源碼分析

zk本地部署完之後就要分析源碼了,下面截取核心源碼(非關鍵代碼此處會省略 ,本文主要剖析選舉機制相關源碼,有興趣者可以自己下載下傳源碼觀看),解析邏輯如下:

//org.apache.zookeeper.server.quorum.QuorumPeerMain,啟動類,不解釋
public static void main(String[] args) {
        QuorumPeerMain main = new QuorumPeerMain();
         main.initializeAndRun(args);
 }
protected void initializeAndRun(String[] args)
    {
        QuorumPeerConfig config = new QuorumPeerConfig();
      //如果通過應用的啟動參數配置了配置檔案,則讀取配置檔案并加載至記憶體中
        if (args.length == 1) {
            config.parse(args[0]);
        }
        // 這裡會啟動一個線程,去回收zk過期的一些快照檔案,防止快照檔案過多
        DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config
                .getDataDir(), config.getDataLogDir(), config
                .getSnapRetainCount(), config.getPurgeInterval());
        purgeMgr.start();
        //判斷目前zk是否是分布式部署,此文隻分析分布式情況(單體zk也沒選舉可研QAQ)
        if (args.length == 1 && config.isDistributed()) {
            runFromConfig(config);
        } 
    }
//QuorumPeerConfig.runFromConfig
public void runFromConfig(QuorumPeerConfig config) {
             //此處是為了初始化zk的連接配接對象,即用戶端連接配接方式
           //有NIO和Netty兩種,預設NIO,詳細見下方createFactory()方法
              cnxnFactory = ServerCnxnFactory.createFactory();
            //給連接配接工廠進行對應設定
              cnxnFactory.configure(config.getClientPortAddress(),
                      config.getMaxClientCnxns(), false);
         //建立一個zk節點對象
          quorumPeer = getQuorumPeer();
          quorumPeer.setTxnFactory(new FileTxnSnapLog(
                      config.getDataLogDir(),
                      config.getDataDir()));
          //配置選舉方式。預設為3,快速選舉,其他選舉方式已過時
          quorumPeer.setElectionType(config.getElectionAlg());
          //初始化一個記憶體資料庫,類型為ConcurrentHashMap
          //同時zk啟動時自動建立的“/zookeeper”節點也是在此時設定的
          quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));
          //省略若幹配置項,開始初始化zk權限配置,暫不考慮權限,跳過
          quorumPeer.initialize();
          //核心方法:zk啟動,見Quorum,start()
          quorumPeer.start();
          quorumPeer.join();
    }
//ServerCnxnFactory.createFactory()
 static public ServerCnxnFactory createFactory() throws IOException {
      //擷取啟動參數中zookeeper.serverCnxnFactory參數值
        String serverCnxnFactoryName =System.getProperty(ZOOKEEPER_SERVER_CNXN_FACTORY);
         //啟動參數中連接配接方式為空,預設用nio連接配接方式
         serverCnxnFactoryName = NIOServerCnxnFactory.class.getName();

    }
//Quorum,start()
public synchronized void start() {
        //讀取本地日志檔案、快照檔案至記憶體中
        loadDataBase();
       //針對QuorumPeerConfig.runFromConfig初始化的ServerCnxnFactory進行start調用,也就是netty的一些配置初始化
        startServerCnxnFactory();
        //啟動一個jetty-server,運維人員可以進入一個背景管理頁面
        adminServer.start();
       //開始初始化快速選舉對象,初始化選舉需要的隊列與線程,并完成一系列初始化配置,
       //QuorumPeer.startLeaderElection,
        startLeaderElection();
        //QuorumPeer繼承了Thread類,Thread的start會調用QuorumPeer.run方法
       //見QuorumPeer.run()  重要!!!
        super.start();
    }
//QuorumPeer.startLeaderElection
synchronized public void startLeaderElection() {
          //zk所有的節點啟動時都預設為looking(observing節點除外),
          //即選舉狀态,選舉完成後,節點會變成leading或者following狀态
           if (getPeerState() == ServerState.LOOKING) {
             //生成選票對象,myid即目前節點辨別,getLastLoggedZxid為擷取最新日志事務id,初始啟動為0
               currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());
           }
       //針對選舉算法進行初始化
        this.electionAlg = createElectionAlgorithm(electionType);
    }
//QuorumPeer.createElectionAlgorithm
protected Election createElectionAlgorithm(int electionAlgorithm){
        Election le=null;
        switch (electionAlgorithm) {
        case 3:
            //初始化一個新的zk對象管理器
            QuorumCnxManager qcm = createCnxnManager();
            //createCnxnManager會初始化一個QuorumCnxManager.listener對象
            QuorumCnxManager.Listener listener = qcm.listener;
            //對zk節點的監聽器進行初始化(見 QuorumCnxManager.Listener.run)
            listener.start();
            //初始化快速選舉對象,需要注意的是裡面會初始化0sendqueue,recvqueue兩個隊列,用于存儲需要發送和接受的選票
           //并且會初始化一個Messenger對象
            /* 會初始化兩個工作者線程,用于發送和接受選票
            Messenger(QuorumCnxManager manager) {
                this.ws = new WorkerSender(manager);
                this.wsThread = new Thread(this.ws, "WorkerSender[myid=" + self.getId() + "]");
                this.wr = new WorkerReceiver(manager);
                this.wrThread = new Thread(this.wr, "WorkerReceiver[myid=" + self.getId() + "]");
            }*/
            FastLeaderElection fle = new FastLeaderElection(this, qcm);
            //調用的是WorkerSender.run()和WorkerReceiver.run()方法,WorkerSender.run如下
            /* public void run() {
                while (!stop) {
              				  //從待發送的消息隊列中區一個需要發送的選票,并進行後續處理
                        ToSend m = sendqueue.poll(3000, TimeUnit.MILLISECONDS);
                        //選票處理邏輯,核心是調用QuorumCnxManager.toSend,見下
                        process(m);
                }
            }*/
            fle.start();
            //将目前快速選舉對象傳回
            le = fle;
        }
        return le;
    }
//QuorumCnxManager.toSend
public void toSend(Long sid, ByteBuffer b) {
        //如果對端serverid和目前myid相同,說明是自己的選票,直接放入接受隊列即可
        if (this.mySid == sid) {
             b.position(0);
             addToRecvQueue(new Message(b.duplicate(), sid));
        } else {
            // 如果對端serverid和目前myid不同,意味着要接受消息了
            //先初始化一個新的隊列
             ArrayBlockingQueue<ByteBuffer> bq = new ArrayBlockingQueue<ByteBuffer>(SEND_CAPACITY);
             // 這個putIfAbsent意思是,如果目前serverid已經有隊列,則傳回,無隊列則傳回空
             ArrayBlockingQueue<ByteBuffer> oldq = queueSendMap.putIfAbsent(sid, bq);
             //下面邏輯就是将需要發送的位元組流添加至發送隊列中
             if (oldq != null) {
                 addToSendQueue(oldq, b);
             } else {
                 addToSendQueue(bq, b);
             }
          //通過socket進行資料傳輸
             connectOne(sid);

        }
    }

//  QuorumCnxManager.Listener.run(listener實際是一個Thread,就是調用對應的run方法)
public void run() {
            int numRetries = 0;
            InetSocketAddress addr;
            Socket client = null;
            Exception exitException = null;
            while ((!shutdown) && (portBindMaxRetry == 0 || numRetries < portBindMaxRetry)) {
                   //建立一個serversocker,用于監聽zk節點通信
                    ss = new ServerSocket();
                    ss.setReuseAddress(true);
                   //擷取zk叢集節點位址配置,并設定節點配置
                    self.recreateSocketAddresses(self.getId());
                    addr = self.getElectionAddress();
                    setName(addr.toString());
                  //将建立的serversocker對端口進行監聽
                    ss.bind(addr);
                    while (!shutdown) {
                           //準備接受其他節點的選票資訊
                            client = ss.accept();
                            setSockOpts(client);
                            //開始處理連接配接,其核心是調用QuorumCnxManager.handleConnection()
                            receiveConnection(client);
                    }
                }
        }
//QuorumCnxManager.handleConnection()
private void handleConnection(Socket sock, DataInputStream din){
        Long sid = null, protocolVersion = null;
        InetSocketAddress electionAddr = null;
        //讀取目标伺服器sid
        protocolVersion = din.readLong();
        if (protocolVersion >= 0) {
            sid = protocolVersion;
        }   
  
        if (sid < self.getId()) {
            //此處就是zk比較妙的一點,如果目前myid小于目标server myid,則會關閉目前socket連接配接
            //複用從對端server和目前server建立的socket通道
            SendWorker sw = senderWorkerMap.get(sid);
            if (sw != null) {
                sw.finish();
            }
            closeSocket(sock);
            if (electionAddr != null) {
                connectOne(sid, electionAddr);
            } else {
                connectOne(sid);
            }
        } else { 
            SendWorker sw = new SendWorker(sock, sid);
            RecvWorker rw = new RecvWorker(sock, din, sid, sw);
            sw.setRecv(rw);
            //為遠端的serverid初始化一個發送工作者線程,并至存儲記憶體中
            //若記憶體中該遠端的serverid已經有對應工作者線程,則先将其關閉
            SendWorker vsw = senderWorkerMap.get(sid);
            if (vsw != null) {
                vsw.finish();
            }
            senderWorkerMap.put(sid, sw);
           //為遠端serverid初始化一個隊列,并存儲值記憶體中,腦子裡要記住目前已經為
           //遠端serverid初始化了發送工作者線程和隊列
            queueSendMap.putIfAbsent(sid,new ArrayBlockingQueue<ByteBuffer>(SEND_CAPACITY));
           //此處調用的是SendWorker.run方法,如下
           /*   public void run() {
                    //這裡會把上面初始化的隊列取出來,這邊是避免參數傳來傳去,直接搞了個内部變量,友善主方法與線程通信
                    ArrayBlockingQueue<ByteBuffer> bq = queueSendMap.get(sid);
                    if (bq == null || isSendQueueEmpty(bq)) {
                       ByteBuffer b = lastMessageSent.get(sid);
                       //如果目前隊列中存在需要發送的消息,就會寫到資料流中,對端server就可以接收到資訊了
                       if (b != null) {
                           send(b);
                       }
                    }
                }
           */
            sw.start();
          //這個線程就是将要接受的消息加入接受隊列中
            rw.start();
        }
    }

//QuorumPeer.run()
public void  run() {
       //根據zk節點的id、連接配接配置、安全等配置更新線程名稱
        updateThreadName();
        try {
            while (running) {
                switch (getPeerState()) {
                    //啟動時節點狀态一定為LOOKING,先不考慮其他狀态
                case LOOKING:
                    //makeLEStrategy()擷取目前節點leader選取政策,即傳回createElectionAlgorithm初始化的選舉對象
                    //lookForLeader()作用是尋找上司者節點,見FastLeaderElection.lookForLeader
                    setCurrentVote(makeLEStrategy().lookForLeader());      
                    break;
               case LEADING:
                    //經過兩輪選舉後,會出現狀态為leading的節點,就會走這個邏輯,将目前節點設定為leader
                 try {
                     setLeader(makeLeader(logFactory));
                     leader.lead();
                     setLeader(null);
                 }  finally {
                     updateServerState();
                 }
                      break;
                  }
                start_fle = Time.currentElapsedTime();
            }
        } finally {
            MBeanRegistry instance = MBeanRegistry.getInstance();
            instance.unregister(jmxQuorumBean);
            instance.unregister(jmxLocalPeerBean);
            for (RemotePeerBean remotePeerBean : jmxRemotePeerBean.values()) {
                instance.unregister(remotePeerBean);
            }
        }
    }
//FastLeaderElection.lookForLeader
public Vote lookForLeader() throws InterruptedException {
        try {
            //初始化一些内部對象
            HashMap<Long, Vote> recvset = new HashMap<Long, Vote>();
            HashMap<Long, Vote> outofelection = new HashMap<Long, Vote>();
            int notTimeout = finalizeWait;
            synchronized(this){
                //本地計數器加一
                logicalclock.incrementAndGet();
                //更新一下目前節點對應的配置資訊
                updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
            }
						/*目前方法主要作用是将需要發送的消息放入發送隊列中
            private void sendNotifications() {
                //擷取所有的選票id
                for (long sid : self.getCurrentAndNextConfigVoters()) {
                    QuorumVerifier qv = self.getQuorumVerifier();
                    //将選票保障為可發送的消息對象
                    ToSend notmsg = new ToSend(ToSend.mType.notification, proposedLeader, 
                             proposedZxid,logicalclock.get(),QuorumPeer.ServerState.LOOKING,sid,
                            proposedEpoch, qv.toString().getBytes());
                    //将選票消息放入發送發送隊列中,注意此處要和createElectionAlgorithm方法聯系
                    //WorkerSender.run()方法中會調用sendqueue.poll()即去除待發送的消息進行發送處理
                    //(●ˇ∀ˇ●)選票發送機制這樣就稍微串起來一些了,繼續往下看O(∩_∩)O
                    sendqueue.offer(notmsg);
                }
            } */
            sendNotifications();
            /*
             * Loop in which we exchange notifications until we find a leader
             */

            while ((self.getPeerState() == ServerState.LOOKING) &&
                    (!stop)){
               //從接受隊列中取消息對象,首次啟動肯定沒有對象
               //當然如果此時是非首節點啟動,則接受隊列中會接收到選票資訊
                Notification n = recvqueue.poll(notTimeout,
                        TimeUnit.MILLISECONDS);
                if(n == null){
                  //這裡是判斷記憶體中是否存在待發送消息,如果有待發送消息就推到發送隊列中
                  //我們這是首次啟動,肯定為空,傳回的是fasle
                    if(manager.haveDelivered()){
                        sendNotifications();
                    } else {
                      //這裡是zk節點互通信的邏輯,詳細見QuorumCnxManager.connectAll
                        manager.connectAll();
                    }

                } 
              //非首個節點啟動時,此處會一直等待其他節點發送選票,一旦收到選票,上面的n就不是null,就走以下邏輯
              //校驗選票的leaderid以及目前id是否都屬于叢集節點中,也就是最開始zoo.cfg檔案的配置内容
              else if (validVoter(n.sid) && validVoter(n.leader)) {
                    switch (n.state) {
                    case LOOKING:
                       //判斷選舉次數是否和記憶體選舉計數器一緻,第二個節點啟動,二者都為1,不走此邏輯
                        if (n.electionEpoch > logicalclock.get()) {
                          
                        } 
                        /*這個是選舉leader的具體算法,也比較簡單,首選比較選舉次數,再比較事務id,最後比較myid,哪個id大哪個為leader
                         ((newEpoch > curEpoch) ||
                          ((newEpoch == curEpoch) &&
                          ((newZxid > curZxid) || ((newZxid == curZxid) && (newId > curId)))))
                        */
                       else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                                proposedLeader, proposedZxid, proposedEpoch)) {
                            //計算完leader節點後,需要将計算出來的leader節點、事務id、選舉次數等資訊更新
                            updateProposal(n.leader, n.zxid, n.peerEpoch);
                           //更新完節點資訊後,需要将自己選舉出來的最新leader以選票的形式發給其他節點
                            sendNotifications();
                        }
                       //源碼注釋寫着:如果是looking狀态,沒什麼意義,作用應該是将所有的 選票收集起來
                        recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
                       //termPredicate用于判斷目前節點是否已經獲得超過半數選票,如果超過半數,則将
                       //自己的節點資訊更新,并且将自己的狀态改為leadering
                        if (termPredicate(recvset,
                                new Vote(proposedLeader, proposedZxid,
                                        logicalclock.get(), proposedEpoch))) {
                            }
                        }
                        break;
                    case OBSERVING:
                    case FOLLOWING:
                    case LEADING:
                        break;
                    }
                } else{
                }
            }
            return null;
        } finally {
            try {
                if(self.jmxLeaderElectionBean != null){
                    MBeanRegistry.getInstance().unregister(
                            self.jmxLeaderElectionBean);
                }
            } catch (Exception e) {
                LOG.warn("Failed to unregister with JMX", e);
            }
            self.jmxLeaderElectionBean = null;
            LOG.debug("Number of connection processing threads: {}",
                    manager.getConnectionThreadCount());
        }
    }

//QuorumCnxManager.connectAll
public void connectAll(){
        long sid;
        for(Enumeration<Long> en = queueSendMap.keys();
            en.hasMoreElements();){
            sid = en.nextElement();
            //擷取需要發送消息的所有server節點,并一一連接配接,源碼見下
            connectOne(sid);
        }
    }
//QuorumCnxManager.connectOne
synchronized void connectOne(long sid){
       //擷取鎖
        synchronized (self.QV_LOCK) {
            boolean knownId = false;
            //讀取配置檔案中所有server成員資訊
            self.recreateSocketAddresses(sid);
            Map<Long, QuorumPeer.QuorumServer> lastCommittedView = self.getView();
          //擷取節點檢查器,用于讀取其他節點資訊
            QuorumVerifier lastSeenQV = self.getLastSeenQuorumVerifier();
            Map<Long, QuorumPeer.QuorumServer> lastProposedView = lastSeenQV.getAllMembers();
           if (lastSeenQV != null && lastProposedView.containsKey(sid)
                    && (!knownId || (lastProposedView.get(sid).electionAddr !=
                    lastCommittedView.get(sid).electionAddr))) {
                knownId = true;
             /* 核心就是調用initiateConnectionAsync方法
             synchronized private boolean connectOne(long sid, InetSocketAddress electionAddr){
                return initiateConnectionAsync(electionAddr, sid);
            }
            //通過線程池調用QuorumConnectionReqThread.run方法,
            //其核心是調用QuorumConnectionReqThread.initiateConnection
            public boolean initiateConnectionAsync(final InetSocketAddress electionAddr, final Long sid) {
                connectionExecutor.execute(new QuorumConnectionReqThread(electionAddr, sid));
                connectionThreadCnt.incrementAndGet();
                return true;
            }
            //QuorumConnectionReqThread.initiateConnection
            public void initiateConnection(final InetSocketAddress electionAddr, final Long sid) {
       				  Socket sock = null;
                //從工廠中擷取一個socker對象
                sock = SOCKET_FACTORY.get();
                //對socket對象做一些基本的設定
                setSockOpts(sock);
                //通過socket連接配接目标zk服務節點
                sock.connect(electionAddr, cnxTO);
               //開始讀取傳輸的資料,并放入對應的發送隊列中,啟動發送和接受工作者線程
               //裡面邏輯也相對簡單,就不進一步展示了
               startConnection(sock, sid);
            }
             */
                if (connectOne(sid, lastProposedView.get(sid).electionAddr))
                    return;
            }
          
        }
    }           

zk選舉流程圖

經過以上的源碼分析,在有兩個zk節點啟動的情況下,整個選舉機制如下圖:

zookeeper源碼深入解析(zk叢集部署與選舉機制)

zk選舉流程圖

已有leader情形下,加入新的zk節點選舉機制

在選取了兩個zk節點之後,如果有第三個zk節點加入的話,此時第三個節點仍會給其他節點發選票,不過在擷取到已經選取出leader的結果後,會生成一張自己角色為following,并且将已有leader作為leading的選票,發送給其他節點,進而生成新的叢集網絡。

以上就是個人對zk選舉機制源碼的解析,下一篇寫一下zk選舉leader之後,各節點通信邏輯。