引言
最近公司使用dolphinschedualer做流程调度,发现其重度依赖zookeeper,这次引起了本人对zookeeper的浓厚兴趣,故拉了zookeeper源码来详细了解一下他的内部实现。
zookeeper源码本地部署
本次源码解析使用的是zk-3.5.8,对应git地址为:https://github.com/apache/zookeeper/tree/release-3.5.8。为方便解析源码,先把zk在本地以集群的方式启动起来,方便看某些代码的具体实现类。
本次本地集群部署为“一主两从”,部署步骤如下:
- 把源码从git上拉下来,直接更新maven依赖,脚本如下:mvn clean install ‐DskipTests。注意:如果报缺少git.properties文件,直接在对应目录创建文件即可,内容如下:
git.commit.id=92138F25
build.time=2022-03-21
- 在当前项目中创建data文件夹,在data下创建zk1,zk2,zk3三个文件夹,并在文件夹下创建myid文件,myid内容为1、2、3。将conf文件夹下的zoo_sample.cfg复制三份,修改dataDir与clientPort配置,增加server.1,server.2,server.3,(这里的1、2、3和data文件夹下的myid对应,)详细配置如下:
dataDir=D:/work/workspace/zookeeper/zookeeper-release-3.5.8/data/zk1
clientPort=2181
server.1=127.0.0.1:2888:3888
server.2=127.0.0.1:2889:3889
server.3=127.0.0.1:2890:3890
- zookeeper-server项目里pom.xml文件里依赖的包(除了jline)的scope为provided这一行全部注释掉,不然启动的时候会报错
- 将conf文件夹里的log4j.properties文件复制一份到zookeeper-server项目的 \target\classes,保证项目日志打印正常
- zk启动类为:org.apache.zookeeper.server.quorum.QuorumPeerMain,在idea创建三个application,项目参数配置为zk配置文件路径,效果如下:
zk集群启动类配置
zookeeper选举源码分析
zk本地部署完之后就要分析源码了,下面截取核心源码(非关键代码此处会省略 ,本文主要剖析选举机制相关源码,有兴趣者可以自己下载源码观看),解析逻辑如下:
//org.apache.zookeeper.server.quorum.QuorumPeerMain,启动类,不解释
public static void main(String[] args) {
QuorumPeerMain main = new QuorumPeerMain();
main.initializeAndRun(args);
}
protected void initializeAndRun(String[] args)
{
QuorumPeerConfig config = new QuorumPeerConfig();
//如果通过应用的启动参数配置了配置文件,则读取配置文件并加载至内存中
if (args.length == 1) {
config.parse(args[0]);
}
// 这里会启动一个线程,去回收zk过期的一些快照文件,防止快照文件过多
DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config
.getDataDir(), config.getDataLogDir(), config
.getSnapRetainCount(), config.getPurgeInterval());
purgeMgr.start();
//判断当前zk是否是分布式部署,此文只分析分布式情况(单体zk也没选举可研QAQ)
if (args.length == 1 && config.isDistributed()) {
runFromConfig(config);
}
}
//QuorumPeerConfig.runFromConfig
public void runFromConfig(QuorumPeerConfig config) {
//此处是为了初始化zk的连接对象,即客户端连接方式
//有NIO和Netty两种,默认NIO,详细见下方createFactory()方法
cnxnFactory = ServerCnxnFactory.createFactory();
//给连接工厂进行对应设置
cnxnFactory.configure(config.getClientPortAddress(),
config.getMaxClientCnxns(), false);
//创建一个zk节点对象
quorumPeer = getQuorumPeer();
quorumPeer.setTxnFactory(new FileTxnSnapLog(
config.getDataLogDir(),
config.getDataDir()));
//配置选举方式。默认为3,快速选举,其他选举方式已过时
quorumPeer.setElectionType(config.getElectionAlg());
//初始化一个内存数据库,类型为ConcurrentHashMap
//同时zk启动时自动创建的“/zookeeper”节点也是在此时设置的
quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));
//省略若干配置项,开始初始化zk权限配置,暂不考虑权限,跳过
quorumPeer.initialize();
//核心方法:zk启动,见Quorum,start()
quorumPeer.start();
quorumPeer.join();
}
//ServerCnxnFactory.createFactory()
static public ServerCnxnFactory createFactory() throws IOException {
//获取启动参数中zookeeper.serverCnxnFactory参数值
String serverCnxnFactoryName =System.getProperty(ZOOKEEPER_SERVER_CNXN_FACTORY);
//启动参数中连接方式为空,默认用nio连接方式
serverCnxnFactoryName = NIOServerCnxnFactory.class.getName();
}
//Quorum,start()
public synchronized void start() {
//读取本地日志文件、快照文件至内存中
loadDataBase();
//针对QuorumPeerConfig.runFromConfig初始化的ServerCnxnFactory进行start调用,也就是netty的一些配置初始化
startServerCnxnFactory();
//启动一个jetty-server,运维人员可以进入一个后台管理页面
adminServer.start();
//开始初始化快速选举对象,初始化选举需要的队列与线程,并完成一系列初始化配置,
//QuorumPeer.startLeaderElection,
startLeaderElection();
//QuorumPeer继承了Thread类,Thread的start会调用QuorumPeer.run方法
//见QuorumPeer.run() 重要!!!
super.start();
}
//QuorumPeer.startLeaderElection
synchronized public void startLeaderElection() {
//zk所有的节点启动时都默认为looking(observing节点除外),
//即选举状态,选举完成后,节点会变成leading或者following状态
if (getPeerState() == ServerState.LOOKING) {
//生成选票对象,myid即当前节点标识,getLastLoggedZxid为获取最新日志事务id,初始启动为0
currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());
}
//针对选举算法进行初始化
this.electionAlg = createElectionAlgorithm(electionType);
}
//QuorumPeer.createElectionAlgorithm
protected Election createElectionAlgorithm(int electionAlgorithm){
Election le=null;
switch (electionAlgorithm) {
case 3:
//初始化一个新的zk对象管理器
QuorumCnxManager qcm = createCnxnManager();
//createCnxnManager会初始化一个QuorumCnxManager.listener对象
QuorumCnxManager.Listener listener = qcm.listener;
//对zk节点的监听器进行初始化(见 QuorumCnxManager.Listener.run)
listener.start();
//初始化快速选举对象,需要注意的是里面会初始化0sendqueue,recvqueue两个队列,用于存储需要发送和接受的选票
//并且会初始化一个Messenger对象
/* 会初始化两个工作者线程,用于发送和接受选票
Messenger(QuorumCnxManager manager) {
this.ws = new WorkerSender(manager);
this.wsThread = new Thread(this.ws, "WorkerSender[myid=" + self.getId() + "]");
this.wr = new WorkerReceiver(manager);
this.wrThread = new Thread(this.wr, "WorkerReceiver[myid=" + self.getId() + "]");
}*/
FastLeaderElection fle = new FastLeaderElection(this, qcm);
//调用的是WorkerSender.run()和WorkerReceiver.run()方法,WorkerSender.run如下
/* public void run() {
while (!stop) {
//从待发送的消息队列中区一个需要发送的选票,并进行后续处理
ToSend m = sendqueue.poll(3000, TimeUnit.MILLISECONDS);
//选票处理逻辑,核心是调用QuorumCnxManager.toSend,见下
process(m);
}
}*/
fle.start();
//将当前快速选举对象返回
le = fle;
}
return le;
}
//QuorumCnxManager.toSend
public void toSend(Long sid, ByteBuffer b) {
//如果对端serverid和当前myid相同,说明是自己的选票,直接放入接受队列即可
if (this.mySid == sid) {
b.position(0);
addToRecvQueue(new Message(b.duplicate(), sid));
} else {
// 如果对端serverid和当前myid不同,意味着要接受消息了
//先初始化一个新的队列
ArrayBlockingQueue<ByteBuffer> bq = new ArrayBlockingQueue<ByteBuffer>(SEND_CAPACITY);
// 这个putIfAbsent意思是,如果当前serverid已经有队列,则返回,无队列则返回空
ArrayBlockingQueue<ByteBuffer> oldq = queueSendMap.putIfAbsent(sid, bq);
//下面逻辑就是将需要发送的字节流添加至发送队列中
if (oldq != null) {
addToSendQueue(oldq, b);
} else {
addToSendQueue(bq, b);
}
//通过socket进行数据传输
connectOne(sid);
}
}
// QuorumCnxManager.Listener.run(listener实际是一个Thread,就是调用对应的run方法)
public void run() {
int numRetries = 0;
InetSocketAddress addr;
Socket client = null;
Exception exitException = null;
while ((!shutdown) && (portBindMaxRetry == 0 || numRetries < portBindMaxRetry)) {
//创建一个serversocker,用于监听zk节点通信
ss = new ServerSocket();
ss.setReuseAddress(true);
//获取zk集群节点地址配置,并设置节点配置
self.recreateSocketAddresses(self.getId());
addr = self.getElectionAddress();
setName(addr.toString());
//将创建的serversocker对端口进行监听
ss.bind(addr);
while (!shutdown) {
//准备接受其他节点的选票信息
client = ss.accept();
setSockOpts(client);
//开始处理连接,其核心是调用QuorumCnxManager.handleConnection()
receiveConnection(client);
}
}
}
//QuorumCnxManager.handleConnection()
private void handleConnection(Socket sock, DataInputStream din){
Long sid = null, protocolVersion = null;
InetSocketAddress electionAddr = null;
//读取目标服务器sid
protocolVersion = din.readLong();
if (protocolVersion >= 0) {
sid = protocolVersion;
}
if (sid < self.getId()) {
//此处就是zk比较妙的一点,如果当前myid小于目标server myid,则会关闭当前socket连接
//复用从对端server和当前server建立的socket通道
SendWorker sw = senderWorkerMap.get(sid);
if (sw != null) {
sw.finish();
}
closeSocket(sock);
if (electionAddr != null) {
connectOne(sid, electionAddr);
} else {
connectOne(sid);
}
} else {
SendWorker sw = new SendWorker(sock, sid);
RecvWorker rw = new RecvWorker(sock, din, sid, sw);
sw.setRecv(rw);
//为远端的serverid初始化一个发送工作者线程,并至存储内存中
//若内存中该远端的serverid已经有对应工作者线程,则先将其关闭
SendWorker vsw = senderWorkerMap.get(sid);
if (vsw != null) {
vsw.finish();
}
senderWorkerMap.put(sid, sw);
//为远端serverid初始化一个队列,并存储值内存中,脑子里要记住目前已经为
//远端serverid初始化了发送工作者线程和队列
queueSendMap.putIfAbsent(sid,new ArrayBlockingQueue<ByteBuffer>(SEND_CAPACITY));
//此处调用的是SendWorker.run方法,如下
/* public void run() {
//这里会把上面初始化的队列取出来,这边是避免参数传来传去,直接搞了个内部变量,方便主方法与线程通信
ArrayBlockingQueue<ByteBuffer> bq = queueSendMap.get(sid);
if (bq == null || isSendQueueEmpty(bq)) {
ByteBuffer b = lastMessageSent.get(sid);
//如果当前队列中存在需要发送的消息,就会写到数据流中,对端server就可以接收到信息了
if (b != null) {
send(b);
}
}
}
*/
sw.start();
//这个线程就是将要接受的消息加入接受队列中
rw.start();
}
}
//QuorumPeer.run()
public void run() {
//根据zk节点的id、连接配置、安全等配置更新线程名称
updateThreadName();
try {
while (running) {
switch (getPeerState()) {
//启动时节点状态一定为LOOKING,先不考虑其他状态
case LOOKING:
//makeLEStrategy()获取当前节点leader选取策略,即返回createElectionAlgorithm初始化的选举对象
//lookForLeader()作用是寻找领导者节点,见FastLeaderElection.lookForLeader
setCurrentVote(makeLEStrategy().lookForLeader());
break;
case LEADING:
//经过两轮选举后,会出现状态为leading的节点,就会走这个逻辑,将当前节点设置为leader
try {
setLeader(makeLeader(logFactory));
leader.lead();
setLeader(null);
} finally {
updateServerState();
}
break;
}
start_fle = Time.currentElapsedTime();
}
} finally {
MBeanRegistry instance = MBeanRegistry.getInstance();
instance.unregister(jmxQuorumBean);
instance.unregister(jmxLocalPeerBean);
for (RemotePeerBean remotePeerBean : jmxRemotePeerBean.values()) {
instance.unregister(remotePeerBean);
}
}
}
//FastLeaderElection.lookForLeader
public Vote lookForLeader() throws InterruptedException {
try {
//初始化一些内部对象
HashMap<Long, Vote> recvset = new HashMap<Long, Vote>();
HashMap<Long, Vote> outofelection = new HashMap<Long, Vote>();
int notTimeout = finalizeWait;
synchronized(this){
//本地计数器加一
logicalclock.incrementAndGet();
//更新一下当前节点对应的配置信息
updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
}
/*当前方法主要作用是将需要发送的消息放入发送队列中
private void sendNotifications() {
//获取所有的选票id
for (long sid : self.getCurrentAndNextConfigVoters()) {
QuorumVerifier qv = self.getQuorumVerifier();
//将选票保障为可发送的消息对象
ToSend notmsg = new ToSend(ToSend.mType.notification, proposedLeader,
proposedZxid,logicalclock.get(),QuorumPeer.ServerState.LOOKING,sid,
proposedEpoch, qv.toString().getBytes());
//将选票消息放入发送发送队列中,注意此处要和createElectionAlgorithm方法联系
//WorkerSender.run()方法中会调用sendqueue.poll()即去除待发送的消息进行发送处理
//(●ˇ∀ˇ●)选票发送机制这样就稍微串起来一些了,继续往下看O(∩_∩)O
sendqueue.offer(notmsg);
}
} */
sendNotifications();
/*
* Loop in which we exchange notifications until we find a leader
*/
while ((self.getPeerState() == ServerState.LOOKING) &&
(!stop)){
//从接受队列中取消息对象,首次启动肯定没有对象
//当然如果此时是非首节点启动,则接受队列中会接收到选票信息
Notification n = recvqueue.poll(notTimeout,
TimeUnit.MILLISECONDS);
if(n == null){
//这里是判断内存中是否存在待发送消息,如果有待发送消息就推到发送队列中
//我们这是首次启动,肯定为空,返回的是fasle
if(manager.haveDelivered()){
sendNotifications();
} else {
//这里是zk节点互通信的逻辑,详细见QuorumCnxManager.connectAll
manager.connectAll();
}
}
//非首个节点启动时,此处会一直等待其他节点发送选票,一旦收到选票,上面的n就不是null,就走以下逻辑
//校验选票的leaderid以及当前id是否都属于集群节点中,也就是最开始zoo.cfg文件的配置内容
else if (validVoter(n.sid) && validVoter(n.leader)) {
switch (n.state) {
case LOOKING:
//判断选举次数是否和内存选举计数器一致,第二个节点启动,二者都为1,不走此逻辑
if (n.electionEpoch > logicalclock.get()) {
}
/*这个是选举leader的具体算法,也比较简单,首选比较选举次数,再比较事务id,最后比较myid,哪个id大哪个为leader
((newEpoch > curEpoch) ||
((newEpoch == curEpoch) &&
((newZxid > curZxid) || ((newZxid == curZxid) && (newId > curId)))))
*/
else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
proposedLeader, proposedZxid, proposedEpoch)) {
//计算完leader节点后,需要将计算出来的leader节点、事务id、选举次数等信息更新
updateProposal(n.leader, n.zxid, n.peerEpoch);
//更新完节点信息后,需要将自己选举出来的最新leader以选票的形式发给其他节点
sendNotifications();
}
//源码注释写着:如果是looking状态,没什么意义,作用应该是将所有的 选票收集起来
recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
//termPredicate用于判断当前节点是否已经获得超过半数选票,如果超过半数,则将
//自己的节点信息更新,并且将自己的状态改为leadering
if (termPredicate(recvset,
new Vote(proposedLeader, proposedZxid,
logicalclock.get(), proposedEpoch))) {
}
}
break;
case OBSERVING:
case FOLLOWING:
case LEADING:
break;
}
} else{
}
}
return null;
} finally {
try {
if(self.jmxLeaderElectionBean != null){
MBeanRegistry.getInstance().unregister(
self.jmxLeaderElectionBean);
}
} catch (Exception e) {
LOG.warn("Failed to unregister with JMX", e);
}
self.jmxLeaderElectionBean = null;
LOG.debug("Number of connection processing threads: {}",
manager.getConnectionThreadCount());
}
}
//QuorumCnxManager.connectAll
public void connectAll(){
long sid;
for(Enumeration<Long> en = queueSendMap.keys();
en.hasMoreElements();){
sid = en.nextElement();
//获取需要发送消息的所有server节点,并一一连接,源码见下
connectOne(sid);
}
}
//QuorumCnxManager.connectOne
synchronized void connectOne(long sid){
//获取锁
synchronized (self.QV_LOCK) {
boolean knownId = false;
//读取配置文件中所有server成员信息
self.recreateSocketAddresses(sid);
Map<Long, QuorumPeer.QuorumServer> lastCommittedView = self.getView();
//获取节点检查器,用于读取其他节点信息
QuorumVerifier lastSeenQV = self.getLastSeenQuorumVerifier();
Map<Long, QuorumPeer.QuorumServer> lastProposedView = lastSeenQV.getAllMembers();
if (lastSeenQV != null && lastProposedView.containsKey(sid)
&& (!knownId || (lastProposedView.get(sid).electionAddr !=
lastCommittedView.get(sid).electionAddr))) {
knownId = true;
/* 核心就是调用initiateConnectionAsync方法
synchronized private boolean connectOne(long sid, InetSocketAddress electionAddr){
return initiateConnectionAsync(electionAddr, sid);
}
//通过线程池调用QuorumConnectionReqThread.run方法,
//其核心是调用QuorumConnectionReqThread.initiateConnection
public boolean initiateConnectionAsync(final InetSocketAddress electionAddr, final Long sid) {
connectionExecutor.execute(new QuorumConnectionReqThread(electionAddr, sid));
connectionThreadCnt.incrementAndGet();
return true;
}
//QuorumConnectionReqThread.initiateConnection
public void initiateConnection(final InetSocketAddress electionAddr, final Long sid) {
Socket sock = null;
//从工厂中获取一个socker对象
sock = SOCKET_FACTORY.get();
//对socket对象做一些基本的设置
setSockOpts(sock);
//通过socket连接目标zk服务节点
sock.connect(electionAddr, cnxTO);
//开始读取传输的数据,并放入对应的发送队列中,启动发送和接受工作者线程
//里面逻辑也相对简单,就不进一步展示了
startConnection(sock, sid);
}
*/
if (connectOne(sid, lastProposedView.get(sid).electionAddr))
return;
}
}
}
zk选举流程图
经过以上的源码分析,在有两个zk节点启动的情况下,整个选举机制如下图:
zk选举流程图
已有leader情形下,加入新的zk节点选举机制
在选取了两个zk节点之后,如果有第三个zk节点加入的话,此时第三个节点仍会给其他节点发选票,不过在获取到已经选取出leader的结果后,会生成一张自己角色为following,并且将已有leader作为leading的选票,发送给其他节点,从而生成新的集群网络。
以上就是个人对zk选举机制源码的解析,下一篇写一下zk选举leader之后,各节点通信逻辑。