天天看点

zookeeper源码深入解析(zk集群部署与选举机制)

引言

最近公司使用dolphinschedualer做流程调度,发现其重度依赖zookeeper,这次引起了本人对zookeeper的浓厚兴趣,故拉了zookeeper源码来详细了解一下他的内部实现。

zookeeper源码本地部署

本次源码解析使用的是zk-3.5.8,对应git地址为:https://github.com/apache/zookeeper/tree/release-3.5.8。为方便解析源码,先把zk在本地以集群的方式启动起来,方便看某些代码的具体实现类。

本次本地集群部署为“一主两从”,部署步骤如下:

  • 把源码从git上拉下来,直接更新maven依赖,脚本如下:mvn clean install ‐DskipTests。注意:如果报缺少git.properties文件,直接在对应目录创建文件即可,内容如下:
git.commit.id=92138F25
build.time=2022-03-21           
  • 在当前项目中创建data文件夹,在data下创建zk1,zk2,zk3三个文件夹,并在文件夹下创建myid文件,myid内容为1、2、3。将conf文件夹下的zoo_sample.cfg复制三份,修改dataDir与clientPort配置,增加server.1,server.2,server.3,(这里的1、2、3和data文件夹下的myid对应,)详细配置如下:
dataDir=D:/work/workspace/zookeeper/zookeeper-release-3.5.8/data/zk1
clientPort=2181
server.1=127.0.0.1:2888:3888
server.2=127.0.0.1:2889:3889
server.3=127.0.0.1:2890:3890           
  • zookeeper-server项目里pom.xml文件里依赖的包(除了jline)的scope为provided这一行全部注释掉,不然启动的时候会报错
  • 将conf文件夹里的log4j.properties文件复制一份到zookeeper-server项目的 \target\classes,保证项目日志打印正常
  • zk启动类为:org.apache.zookeeper.server.quorum.QuorumPeerMain,在idea创建三个application,项目参数配置为zk配置文件路径,效果如下:
zookeeper源码深入解析(zk集群部署与选举机制)

zk集群启动类配置

zookeeper选举源码分析

zk本地部署完之后就要分析源码了,下面截取核心源码(非关键代码此处会省略 ,本文主要剖析选举机制相关源码,有兴趣者可以自己下载源码观看),解析逻辑如下:

//org.apache.zookeeper.server.quorum.QuorumPeerMain,启动类,不解释
public static void main(String[] args) {
        QuorumPeerMain main = new QuorumPeerMain();
         main.initializeAndRun(args);
 }
protected void initializeAndRun(String[] args)
    {
        QuorumPeerConfig config = new QuorumPeerConfig();
      //如果通过应用的启动参数配置了配置文件,则读取配置文件并加载至内存中
        if (args.length == 1) {
            config.parse(args[0]);
        }
        // 这里会启动一个线程,去回收zk过期的一些快照文件,防止快照文件过多
        DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config
                .getDataDir(), config.getDataLogDir(), config
                .getSnapRetainCount(), config.getPurgeInterval());
        purgeMgr.start();
        //判断当前zk是否是分布式部署,此文只分析分布式情况(单体zk也没选举可研QAQ)
        if (args.length == 1 && config.isDistributed()) {
            runFromConfig(config);
        } 
    }
//QuorumPeerConfig.runFromConfig
public void runFromConfig(QuorumPeerConfig config) {
             //此处是为了初始化zk的连接对象,即客户端连接方式
           //有NIO和Netty两种,默认NIO,详细见下方createFactory()方法
              cnxnFactory = ServerCnxnFactory.createFactory();
            //给连接工厂进行对应设置
              cnxnFactory.configure(config.getClientPortAddress(),
                      config.getMaxClientCnxns(), false);
         //创建一个zk节点对象
          quorumPeer = getQuorumPeer();
          quorumPeer.setTxnFactory(new FileTxnSnapLog(
                      config.getDataLogDir(),
                      config.getDataDir()));
          //配置选举方式。默认为3,快速选举,其他选举方式已过时
          quorumPeer.setElectionType(config.getElectionAlg());
          //初始化一个内存数据库,类型为ConcurrentHashMap
          //同时zk启动时自动创建的“/zookeeper”节点也是在此时设置的
          quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));
          //省略若干配置项,开始初始化zk权限配置,暂不考虑权限,跳过
          quorumPeer.initialize();
          //核心方法:zk启动,见Quorum,start()
          quorumPeer.start();
          quorumPeer.join();
    }
//ServerCnxnFactory.createFactory()
 static public ServerCnxnFactory createFactory() throws IOException {
      //获取启动参数中zookeeper.serverCnxnFactory参数值
        String serverCnxnFactoryName =System.getProperty(ZOOKEEPER_SERVER_CNXN_FACTORY);
         //启动参数中连接方式为空,默认用nio连接方式
         serverCnxnFactoryName = NIOServerCnxnFactory.class.getName();

    }
//Quorum,start()
public synchronized void start() {
        //读取本地日志文件、快照文件至内存中
        loadDataBase();
       //针对QuorumPeerConfig.runFromConfig初始化的ServerCnxnFactory进行start调用,也就是netty的一些配置初始化
        startServerCnxnFactory();
        //启动一个jetty-server,运维人员可以进入一个后台管理页面
        adminServer.start();
       //开始初始化快速选举对象,初始化选举需要的队列与线程,并完成一系列初始化配置,
       //QuorumPeer.startLeaderElection,
        startLeaderElection();
        //QuorumPeer继承了Thread类,Thread的start会调用QuorumPeer.run方法
       //见QuorumPeer.run()  重要!!!
        super.start();
    }
//QuorumPeer.startLeaderElection
synchronized public void startLeaderElection() {
          //zk所有的节点启动时都默认为looking(observing节点除外),
          //即选举状态,选举完成后,节点会变成leading或者following状态
           if (getPeerState() == ServerState.LOOKING) {
             //生成选票对象,myid即当前节点标识,getLastLoggedZxid为获取最新日志事务id,初始启动为0
               currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());
           }
       //针对选举算法进行初始化
        this.electionAlg = createElectionAlgorithm(electionType);
    }
//QuorumPeer.createElectionAlgorithm
protected Election createElectionAlgorithm(int electionAlgorithm){
        Election le=null;
        switch (electionAlgorithm) {
        case 3:
            //初始化一个新的zk对象管理器
            QuorumCnxManager qcm = createCnxnManager();
            //createCnxnManager会初始化一个QuorumCnxManager.listener对象
            QuorumCnxManager.Listener listener = qcm.listener;
            //对zk节点的监听器进行初始化(见 QuorumCnxManager.Listener.run)
            listener.start();
            //初始化快速选举对象,需要注意的是里面会初始化0sendqueue,recvqueue两个队列,用于存储需要发送和接受的选票
           //并且会初始化一个Messenger对象
            /* 会初始化两个工作者线程,用于发送和接受选票
            Messenger(QuorumCnxManager manager) {
                this.ws = new WorkerSender(manager);
                this.wsThread = new Thread(this.ws, "WorkerSender[myid=" + self.getId() + "]");
                this.wr = new WorkerReceiver(manager);
                this.wrThread = new Thread(this.wr, "WorkerReceiver[myid=" + self.getId() + "]");
            }*/
            FastLeaderElection fle = new FastLeaderElection(this, qcm);
            //调用的是WorkerSender.run()和WorkerReceiver.run()方法,WorkerSender.run如下
            /* public void run() {
                while (!stop) {
              				  //从待发送的消息队列中区一个需要发送的选票,并进行后续处理
                        ToSend m = sendqueue.poll(3000, TimeUnit.MILLISECONDS);
                        //选票处理逻辑,核心是调用QuorumCnxManager.toSend,见下
                        process(m);
                }
            }*/
            fle.start();
            //将当前快速选举对象返回
            le = fle;
        }
        return le;
    }
//QuorumCnxManager.toSend
public void toSend(Long sid, ByteBuffer b) {
        //如果对端serverid和当前myid相同,说明是自己的选票,直接放入接受队列即可
        if (this.mySid == sid) {
             b.position(0);
             addToRecvQueue(new Message(b.duplicate(), sid));
        } else {
            // 如果对端serverid和当前myid不同,意味着要接受消息了
            //先初始化一个新的队列
             ArrayBlockingQueue<ByteBuffer> bq = new ArrayBlockingQueue<ByteBuffer>(SEND_CAPACITY);
             // 这个putIfAbsent意思是,如果当前serverid已经有队列,则返回,无队列则返回空
             ArrayBlockingQueue<ByteBuffer> oldq = queueSendMap.putIfAbsent(sid, bq);
             //下面逻辑就是将需要发送的字节流添加至发送队列中
             if (oldq != null) {
                 addToSendQueue(oldq, b);
             } else {
                 addToSendQueue(bq, b);
             }
          //通过socket进行数据传输
             connectOne(sid);

        }
    }

//  QuorumCnxManager.Listener.run(listener实际是一个Thread,就是调用对应的run方法)
public void run() {
            int numRetries = 0;
            InetSocketAddress addr;
            Socket client = null;
            Exception exitException = null;
            while ((!shutdown) && (portBindMaxRetry == 0 || numRetries < portBindMaxRetry)) {
                   //创建一个serversocker,用于监听zk节点通信
                    ss = new ServerSocket();
                    ss.setReuseAddress(true);
                   //获取zk集群节点地址配置,并设置节点配置
                    self.recreateSocketAddresses(self.getId());
                    addr = self.getElectionAddress();
                    setName(addr.toString());
                  //将创建的serversocker对端口进行监听
                    ss.bind(addr);
                    while (!shutdown) {
                           //准备接受其他节点的选票信息
                            client = ss.accept();
                            setSockOpts(client);
                            //开始处理连接,其核心是调用QuorumCnxManager.handleConnection()
                            receiveConnection(client);
                    }
                }
        }
//QuorumCnxManager.handleConnection()
private void handleConnection(Socket sock, DataInputStream din){
        Long sid = null, protocolVersion = null;
        InetSocketAddress electionAddr = null;
        //读取目标服务器sid
        protocolVersion = din.readLong();
        if (protocolVersion >= 0) {
            sid = protocolVersion;
        }   
  
        if (sid < self.getId()) {
            //此处就是zk比较妙的一点,如果当前myid小于目标server myid,则会关闭当前socket连接
            //复用从对端server和当前server建立的socket通道
            SendWorker sw = senderWorkerMap.get(sid);
            if (sw != null) {
                sw.finish();
            }
            closeSocket(sock);
            if (electionAddr != null) {
                connectOne(sid, electionAddr);
            } else {
                connectOne(sid);
            }
        } else { 
            SendWorker sw = new SendWorker(sock, sid);
            RecvWorker rw = new RecvWorker(sock, din, sid, sw);
            sw.setRecv(rw);
            //为远端的serverid初始化一个发送工作者线程,并至存储内存中
            //若内存中该远端的serverid已经有对应工作者线程,则先将其关闭
            SendWorker vsw = senderWorkerMap.get(sid);
            if (vsw != null) {
                vsw.finish();
            }
            senderWorkerMap.put(sid, sw);
           //为远端serverid初始化一个队列,并存储值内存中,脑子里要记住目前已经为
           //远端serverid初始化了发送工作者线程和队列
            queueSendMap.putIfAbsent(sid,new ArrayBlockingQueue<ByteBuffer>(SEND_CAPACITY));
           //此处调用的是SendWorker.run方法,如下
           /*   public void run() {
                    //这里会把上面初始化的队列取出来,这边是避免参数传来传去,直接搞了个内部变量,方便主方法与线程通信
                    ArrayBlockingQueue<ByteBuffer> bq = queueSendMap.get(sid);
                    if (bq == null || isSendQueueEmpty(bq)) {
                       ByteBuffer b = lastMessageSent.get(sid);
                       //如果当前队列中存在需要发送的消息,就会写到数据流中,对端server就可以接收到信息了
                       if (b != null) {
                           send(b);
                       }
                    }
                }
           */
            sw.start();
          //这个线程就是将要接受的消息加入接受队列中
            rw.start();
        }
    }

//QuorumPeer.run()
public void  run() {
       //根据zk节点的id、连接配置、安全等配置更新线程名称
        updateThreadName();
        try {
            while (running) {
                switch (getPeerState()) {
                    //启动时节点状态一定为LOOKING,先不考虑其他状态
                case LOOKING:
                    //makeLEStrategy()获取当前节点leader选取策略,即返回createElectionAlgorithm初始化的选举对象
                    //lookForLeader()作用是寻找领导者节点,见FastLeaderElection.lookForLeader
                    setCurrentVote(makeLEStrategy().lookForLeader());      
                    break;
               case LEADING:
                    //经过两轮选举后,会出现状态为leading的节点,就会走这个逻辑,将当前节点设置为leader
                 try {
                     setLeader(makeLeader(logFactory));
                     leader.lead();
                     setLeader(null);
                 }  finally {
                     updateServerState();
                 }
                      break;
                  }
                start_fle = Time.currentElapsedTime();
            }
        } finally {
            MBeanRegistry instance = MBeanRegistry.getInstance();
            instance.unregister(jmxQuorumBean);
            instance.unregister(jmxLocalPeerBean);
            for (RemotePeerBean remotePeerBean : jmxRemotePeerBean.values()) {
                instance.unregister(remotePeerBean);
            }
        }
    }
//FastLeaderElection.lookForLeader
public Vote lookForLeader() throws InterruptedException {
        try {
            //初始化一些内部对象
            HashMap<Long, Vote> recvset = new HashMap<Long, Vote>();
            HashMap<Long, Vote> outofelection = new HashMap<Long, Vote>();
            int notTimeout = finalizeWait;
            synchronized(this){
                //本地计数器加一
                logicalclock.incrementAndGet();
                //更新一下当前节点对应的配置信息
                updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
            }
						/*当前方法主要作用是将需要发送的消息放入发送队列中
            private void sendNotifications() {
                //获取所有的选票id
                for (long sid : self.getCurrentAndNextConfigVoters()) {
                    QuorumVerifier qv = self.getQuorumVerifier();
                    //将选票保障为可发送的消息对象
                    ToSend notmsg = new ToSend(ToSend.mType.notification, proposedLeader, 
                             proposedZxid,logicalclock.get(),QuorumPeer.ServerState.LOOKING,sid,
                            proposedEpoch, qv.toString().getBytes());
                    //将选票消息放入发送发送队列中,注意此处要和createElectionAlgorithm方法联系
                    //WorkerSender.run()方法中会调用sendqueue.poll()即去除待发送的消息进行发送处理
                    //(●ˇ∀ˇ●)选票发送机制这样就稍微串起来一些了,继续往下看O(∩_∩)O
                    sendqueue.offer(notmsg);
                }
            } */
            sendNotifications();
            /*
             * Loop in which we exchange notifications until we find a leader
             */

            while ((self.getPeerState() == ServerState.LOOKING) &&
                    (!stop)){
               //从接受队列中取消息对象,首次启动肯定没有对象
               //当然如果此时是非首节点启动,则接受队列中会接收到选票信息
                Notification n = recvqueue.poll(notTimeout,
                        TimeUnit.MILLISECONDS);
                if(n == null){
                  //这里是判断内存中是否存在待发送消息,如果有待发送消息就推到发送队列中
                  //我们这是首次启动,肯定为空,返回的是fasle
                    if(manager.haveDelivered()){
                        sendNotifications();
                    } else {
                      //这里是zk节点互通信的逻辑,详细见QuorumCnxManager.connectAll
                        manager.connectAll();
                    }

                } 
              //非首个节点启动时,此处会一直等待其他节点发送选票,一旦收到选票,上面的n就不是null,就走以下逻辑
              //校验选票的leaderid以及当前id是否都属于集群节点中,也就是最开始zoo.cfg文件的配置内容
              else if (validVoter(n.sid) && validVoter(n.leader)) {
                    switch (n.state) {
                    case LOOKING:
                       //判断选举次数是否和内存选举计数器一致,第二个节点启动,二者都为1,不走此逻辑
                        if (n.electionEpoch > logicalclock.get()) {
                          
                        } 
                        /*这个是选举leader的具体算法,也比较简单,首选比较选举次数,再比较事务id,最后比较myid,哪个id大哪个为leader
                         ((newEpoch > curEpoch) ||
                          ((newEpoch == curEpoch) &&
                          ((newZxid > curZxid) || ((newZxid == curZxid) && (newId > curId)))))
                        */
                       else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                                proposedLeader, proposedZxid, proposedEpoch)) {
                            //计算完leader节点后,需要将计算出来的leader节点、事务id、选举次数等信息更新
                            updateProposal(n.leader, n.zxid, n.peerEpoch);
                           //更新完节点信息后,需要将自己选举出来的最新leader以选票的形式发给其他节点
                            sendNotifications();
                        }
                       //源码注释写着:如果是looking状态,没什么意义,作用应该是将所有的 选票收集起来
                        recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
                       //termPredicate用于判断当前节点是否已经获得超过半数选票,如果超过半数,则将
                       //自己的节点信息更新,并且将自己的状态改为leadering
                        if (termPredicate(recvset,
                                new Vote(proposedLeader, proposedZxid,
                                        logicalclock.get(), proposedEpoch))) {
                            }
                        }
                        break;
                    case OBSERVING:
                    case FOLLOWING:
                    case LEADING:
                        break;
                    }
                } else{
                }
            }
            return null;
        } finally {
            try {
                if(self.jmxLeaderElectionBean != null){
                    MBeanRegistry.getInstance().unregister(
                            self.jmxLeaderElectionBean);
                }
            } catch (Exception e) {
                LOG.warn("Failed to unregister with JMX", e);
            }
            self.jmxLeaderElectionBean = null;
            LOG.debug("Number of connection processing threads: {}",
                    manager.getConnectionThreadCount());
        }
    }

//QuorumCnxManager.connectAll
public void connectAll(){
        long sid;
        for(Enumeration<Long> en = queueSendMap.keys();
            en.hasMoreElements();){
            sid = en.nextElement();
            //获取需要发送消息的所有server节点,并一一连接,源码见下
            connectOne(sid);
        }
    }
//QuorumCnxManager.connectOne
synchronized void connectOne(long sid){
       //获取锁
        synchronized (self.QV_LOCK) {
            boolean knownId = false;
            //读取配置文件中所有server成员信息
            self.recreateSocketAddresses(sid);
            Map<Long, QuorumPeer.QuorumServer> lastCommittedView = self.getView();
          //获取节点检查器,用于读取其他节点信息
            QuorumVerifier lastSeenQV = self.getLastSeenQuorumVerifier();
            Map<Long, QuorumPeer.QuorumServer> lastProposedView = lastSeenQV.getAllMembers();
           if (lastSeenQV != null && lastProposedView.containsKey(sid)
                    && (!knownId || (lastProposedView.get(sid).electionAddr !=
                    lastCommittedView.get(sid).electionAddr))) {
                knownId = true;
             /* 核心就是调用initiateConnectionAsync方法
             synchronized private boolean connectOne(long sid, InetSocketAddress electionAddr){
                return initiateConnectionAsync(electionAddr, sid);
            }
            //通过线程池调用QuorumConnectionReqThread.run方法,
            //其核心是调用QuorumConnectionReqThread.initiateConnection
            public boolean initiateConnectionAsync(final InetSocketAddress electionAddr, final Long sid) {
                connectionExecutor.execute(new QuorumConnectionReqThread(electionAddr, sid));
                connectionThreadCnt.incrementAndGet();
                return true;
            }
            //QuorumConnectionReqThread.initiateConnection
            public void initiateConnection(final InetSocketAddress electionAddr, final Long sid) {
       				  Socket sock = null;
                //从工厂中获取一个socker对象
                sock = SOCKET_FACTORY.get();
                //对socket对象做一些基本的设置
                setSockOpts(sock);
                //通过socket连接目标zk服务节点
                sock.connect(electionAddr, cnxTO);
               //开始读取传输的数据,并放入对应的发送队列中,启动发送和接受工作者线程
               //里面逻辑也相对简单,就不进一步展示了
               startConnection(sock, sid);
            }
             */
                if (connectOne(sid, lastProposedView.get(sid).electionAddr))
                    return;
            }
          
        }
    }           

zk选举流程图

经过以上的源码分析,在有两个zk节点启动的情况下,整个选举机制如下图:

zookeeper源码深入解析(zk集群部署与选举机制)

zk选举流程图

已有leader情形下,加入新的zk节点选举机制

在选取了两个zk节点之后,如果有第三个zk节点加入的话,此时第三个节点仍会给其他节点发选票,不过在获取到已经选取出leader的结果后,会生成一张自己角色为following,并且将已有leader作为leading的选票,发送给其他节点,从而生成新的集群网络。

以上就是个人对zk选举机制源码的解析,下一篇写一下zk选举leader之后,各节点通信逻辑。