引言
最近公司使用dolphinschedualer做流程排程,發現其重度依賴zookeeper,這次引起了本人對zookeeper的濃厚興趣,故拉了zookeeper源碼來詳細了解一下他的内部實作。
zookeeper源碼本地部署
本次源碼解析使用的是zk-3.5.8,對應git位址為:https://github.com/apache/zookeeper/tree/release-3.5.8。為友善解析源碼,先把zk在本地以叢集的方式啟動起來,友善看某些代碼的具體實作類。
本次本地叢集部署為“一主兩從”,部署步驟如下:
- 把源碼從git上拉下來,直接更新maven依賴,腳本如下:mvn clean install ‐DskipTests。注意:如果報缺少git.properties檔案,直接在對應目錄建立檔案即可,内容如下:
git.commit.id=92138F25
build.time=2022-03-21
- 在目前項目中建立data檔案夾,在data下建立zk1,zk2,zk3三個檔案夾,并在檔案夾下建立myid檔案,myid内容為1、2、3。将conf檔案夾下的zoo_sample.cfg複制三份,修改dataDir與clientPort配置,增加server.1,server.2,server.3,(這裡的1、2、3和data檔案夾下的myid對應,)詳細配置如下:
dataDir=D:/work/workspace/zookeeper/zookeeper-release-3.5.8/data/zk1
clientPort=2181
server.1=127.0.0.1:2888:3888
server.2=127.0.0.1:2889:3889
server.3=127.0.0.1:2890:3890
- zookeeper-server項目裡pom.xml檔案裡依賴的包(除了jline)的scope為provided這一行全部注釋掉,不然啟動的時候會報錯
- 将conf檔案夾裡的log4j.properties檔案複制一份到zookeeper-server項目的 \target\classes,保證項目日志列印正常
- zk啟動類為:org.apache.zookeeper.server.quorum.QuorumPeerMain,在idea建立三個application,項目參數配置為zk配置檔案路徑,效果如下:
zk叢集啟動類配置
zookeeper選舉源碼分析
zk本地部署完之後就要分析源碼了,下面截取核心源碼(非關鍵代碼此處會省略 ,本文主要剖析選舉機制相關源碼,有興趣者可以自己下載下傳源碼觀看),解析邏輯如下:
//org.apache.zookeeper.server.quorum.QuorumPeerMain,啟動類,不解釋
public static void main(String[] args) {
QuorumPeerMain main = new QuorumPeerMain();
main.initializeAndRun(args);
}
protected void initializeAndRun(String[] args)
{
QuorumPeerConfig config = new QuorumPeerConfig();
//如果通過應用的啟動參數配置了配置檔案,則讀取配置檔案并加載至記憶體中
if (args.length == 1) {
config.parse(args[0]);
}
// 這裡會啟動一個線程,去回收zk過期的一些快照檔案,防止快照檔案過多
DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config
.getDataDir(), config.getDataLogDir(), config
.getSnapRetainCount(), config.getPurgeInterval());
purgeMgr.start();
//判斷目前zk是否是分布式部署,此文隻分析分布式情況(單體zk也沒選舉可研QAQ)
if (args.length == 1 && config.isDistributed()) {
runFromConfig(config);
}
}
//QuorumPeerConfig.runFromConfig
public void runFromConfig(QuorumPeerConfig config) {
//此處是為了初始化zk的連接配接對象,即用戶端連接配接方式
//有NIO和Netty兩種,預設NIO,詳細見下方createFactory()方法
cnxnFactory = ServerCnxnFactory.createFactory();
//給連接配接工廠進行對應設定
cnxnFactory.configure(config.getClientPortAddress(),
config.getMaxClientCnxns(), false);
//建立一個zk節點對象
quorumPeer = getQuorumPeer();
quorumPeer.setTxnFactory(new FileTxnSnapLog(
config.getDataLogDir(),
config.getDataDir()));
//配置選舉方式。預設為3,快速選舉,其他選舉方式已過時
quorumPeer.setElectionType(config.getElectionAlg());
//初始化一個記憶體資料庫,類型為ConcurrentHashMap
//同時zk啟動時自動建立的“/zookeeper”節點也是在此時設定的
quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));
//省略若幹配置項,開始初始化zk權限配置,暫不考慮權限,跳過
quorumPeer.initialize();
//核心方法:zk啟動,見Quorum,start()
quorumPeer.start();
quorumPeer.join();
}
//ServerCnxnFactory.createFactory()
static public ServerCnxnFactory createFactory() throws IOException {
//擷取啟動參數中zookeeper.serverCnxnFactory參數值
String serverCnxnFactoryName =System.getProperty(ZOOKEEPER_SERVER_CNXN_FACTORY);
//啟動參數中連接配接方式為空,預設用nio連接配接方式
serverCnxnFactoryName = NIOServerCnxnFactory.class.getName();
}
//Quorum,start()
public synchronized void start() {
//讀取本地日志檔案、快照檔案至記憶體中
loadDataBase();
//針對QuorumPeerConfig.runFromConfig初始化的ServerCnxnFactory進行start調用,也就是netty的一些配置初始化
startServerCnxnFactory();
//啟動一個jetty-server,運維人員可以進入一個背景管理頁面
adminServer.start();
//開始初始化快速選舉對象,初始化選舉需要的隊列與線程,并完成一系列初始化配置,
//QuorumPeer.startLeaderElection,
startLeaderElection();
//QuorumPeer繼承了Thread類,Thread的start會調用QuorumPeer.run方法
//見QuorumPeer.run() 重要!!!
super.start();
}
//QuorumPeer.startLeaderElection
synchronized public void startLeaderElection() {
//zk所有的節點啟動時都預設為looking(observing節點除外),
//即選舉狀态,選舉完成後,節點會變成leading或者following狀态
if (getPeerState() == ServerState.LOOKING) {
//生成選票對象,myid即目前節點辨別,getLastLoggedZxid為擷取最新日志事務id,初始啟動為0
currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());
}
//針對選舉算法進行初始化
this.electionAlg = createElectionAlgorithm(electionType);
}
//QuorumPeer.createElectionAlgorithm
protected Election createElectionAlgorithm(int electionAlgorithm){
Election le=null;
switch (electionAlgorithm) {
case 3:
//初始化一個新的zk對象管理器
QuorumCnxManager qcm = createCnxnManager();
//createCnxnManager會初始化一個QuorumCnxManager.listener對象
QuorumCnxManager.Listener listener = qcm.listener;
//對zk節點的監聽器進行初始化(見 QuorumCnxManager.Listener.run)
listener.start();
//初始化快速選舉對象,需要注意的是裡面會初始化0sendqueue,recvqueue兩個隊列,用于存儲需要發送和接受的選票
//并且會初始化一個Messenger對象
/* 會初始化兩個工作者線程,用于發送和接受選票
Messenger(QuorumCnxManager manager) {
this.ws = new WorkerSender(manager);
this.wsThread = new Thread(this.ws, "WorkerSender[myid=" + self.getId() + "]");
this.wr = new WorkerReceiver(manager);
this.wrThread = new Thread(this.wr, "WorkerReceiver[myid=" + self.getId() + "]");
}*/
FastLeaderElection fle = new FastLeaderElection(this, qcm);
//調用的是WorkerSender.run()和WorkerReceiver.run()方法,WorkerSender.run如下
/* public void run() {
while (!stop) {
//從待發送的消息隊列中區一個需要發送的選票,并進行後續處理
ToSend m = sendqueue.poll(3000, TimeUnit.MILLISECONDS);
//選票處理邏輯,核心是調用QuorumCnxManager.toSend,見下
process(m);
}
}*/
fle.start();
//将目前快速選舉對象傳回
le = fle;
}
return le;
}
//QuorumCnxManager.toSend
public void toSend(Long sid, ByteBuffer b) {
//如果對端serverid和目前myid相同,說明是自己的選票,直接放入接受隊列即可
if (this.mySid == sid) {
b.position(0);
addToRecvQueue(new Message(b.duplicate(), sid));
} else {
// 如果對端serverid和目前myid不同,意味着要接受消息了
//先初始化一個新的隊列
ArrayBlockingQueue<ByteBuffer> bq = new ArrayBlockingQueue<ByteBuffer>(SEND_CAPACITY);
// 這個putIfAbsent意思是,如果目前serverid已經有隊列,則傳回,無隊列則傳回空
ArrayBlockingQueue<ByteBuffer> oldq = queueSendMap.putIfAbsent(sid, bq);
//下面邏輯就是将需要發送的位元組流添加至發送隊列中
if (oldq != null) {
addToSendQueue(oldq, b);
} else {
addToSendQueue(bq, b);
}
//通過socket進行資料傳輸
connectOne(sid);
}
}
// QuorumCnxManager.Listener.run(listener實際是一個Thread,就是調用對應的run方法)
public void run() {
int numRetries = 0;
InetSocketAddress addr;
Socket client = null;
Exception exitException = null;
while ((!shutdown) && (portBindMaxRetry == 0 || numRetries < portBindMaxRetry)) {
//建立一個serversocker,用于監聽zk節點通信
ss = new ServerSocket();
ss.setReuseAddress(true);
//擷取zk叢集節點位址配置,并設定節點配置
self.recreateSocketAddresses(self.getId());
addr = self.getElectionAddress();
setName(addr.toString());
//将建立的serversocker對端口進行監聽
ss.bind(addr);
while (!shutdown) {
//準備接受其他節點的選票資訊
client = ss.accept();
setSockOpts(client);
//開始處理連接配接,其核心是調用QuorumCnxManager.handleConnection()
receiveConnection(client);
}
}
}
//QuorumCnxManager.handleConnection()
private void handleConnection(Socket sock, DataInputStream din){
Long sid = null, protocolVersion = null;
InetSocketAddress electionAddr = null;
//讀取目标伺服器sid
protocolVersion = din.readLong();
if (protocolVersion >= 0) {
sid = protocolVersion;
}
if (sid < self.getId()) {
//此處就是zk比較妙的一點,如果目前myid小于目标server myid,則會關閉目前socket連接配接
//複用從對端server和目前server建立的socket通道
SendWorker sw = senderWorkerMap.get(sid);
if (sw != null) {
sw.finish();
}
closeSocket(sock);
if (electionAddr != null) {
connectOne(sid, electionAddr);
} else {
connectOne(sid);
}
} else {
SendWorker sw = new SendWorker(sock, sid);
RecvWorker rw = new RecvWorker(sock, din, sid, sw);
sw.setRecv(rw);
//為遠端的serverid初始化一個發送工作者線程,并至存儲記憶體中
//若記憶體中該遠端的serverid已經有對應工作者線程,則先将其關閉
SendWorker vsw = senderWorkerMap.get(sid);
if (vsw != null) {
vsw.finish();
}
senderWorkerMap.put(sid, sw);
//為遠端serverid初始化一個隊列,并存儲值記憶體中,腦子裡要記住目前已經為
//遠端serverid初始化了發送工作者線程和隊列
queueSendMap.putIfAbsent(sid,new ArrayBlockingQueue<ByteBuffer>(SEND_CAPACITY));
//此處調用的是SendWorker.run方法,如下
/* public void run() {
//這裡會把上面初始化的隊列取出來,這邊是避免參數傳來傳去,直接搞了個内部變量,友善主方法與線程通信
ArrayBlockingQueue<ByteBuffer> bq = queueSendMap.get(sid);
if (bq == null || isSendQueueEmpty(bq)) {
ByteBuffer b = lastMessageSent.get(sid);
//如果目前隊列中存在需要發送的消息,就會寫到資料流中,對端server就可以接收到資訊了
if (b != null) {
send(b);
}
}
}
*/
sw.start();
//這個線程就是将要接受的消息加入接受隊列中
rw.start();
}
}
//QuorumPeer.run()
public void run() {
//根據zk節點的id、連接配接配置、安全等配置更新線程名稱
updateThreadName();
try {
while (running) {
switch (getPeerState()) {
//啟動時節點狀态一定為LOOKING,先不考慮其他狀态
case LOOKING:
//makeLEStrategy()擷取目前節點leader選取政策,即傳回createElectionAlgorithm初始化的選舉對象
//lookForLeader()作用是尋找上司者節點,見FastLeaderElection.lookForLeader
setCurrentVote(makeLEStrategy().lookForLeader());
break;
case LEADING:
//經過兩輪選舉後,會出現狀态為leading的節點,就會走這個邏輯,将目前節點設定為leader
try {
setLeader(makeLeader(logFactory));
leader.lead();
setLeader(null);
} finally {
updateServerState();
}
break;
}
start_fle = Time.currentElapsedTime();
}
} finally {
MBeanRegistry instance = MBeanRegistry.getInstance();
instance.unregister(jmxQuorumBean);
instance.unregister(jmxLocalPeerBean);
for (RemotePeerBean remotePeerBean : jmxRemotePeerBean.values()) {
instance.unregister(remotePeerBean);
}
}
}
//FastLeaderElection.lookForLeader
public Vote lookForLeader() throws InterruptedException {
try {
//初始化一些内部對象
HashMap<Long, Vote> recvset = new HashMap<Long, Vote>();
HashMap<Long, Vote> outofelection = new HashMap<Long, Vote>();
int notTimeout = finalizeWait;
synchronized(this){
//本地計數器加一
logicalclock.incrementAndGet();
//更新一下目前節點對應的配置資訊
updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
}
/*目前方法主要作用是将需要發送的消息放入發送隊列中
private void sendNotifications() {
//擷取所有的選票id
for (long sid : self.getCurrentAndNextConfigVoters()) {
QuorumVerifier qv = self.getQuorumVerifier();
//将選票保障為可發送的消息對象
ToSend notmsg = new ToSend(ToSend.mType.notification, proposedLeader,
proposedZxid,logicalclock.get(),QuorumPeer.ServerState.LOOKING,sid,
proposedEpoch, qv.toString().getBytes());
//将選票消息放入發送發送隊列中,注意此處要和createElectionAlgorithm方法聯系
//WorkerSender.run()方法中會調用sendqueue.poll()即去除待發送的消息進行發送處理
//(●ˇ∀ˇ●)選票發送機制這樣就稍微串起來一些了,繼續往下看O(∩_∩)O
sendqueue.offer(notmsg);
}
} */
sendNotifications();
/*
* Loop in which we exchange notifications until we find a leader
*/
while ((self.getPeerState() == ServerState.LOOKING) &&
(!stop)){
//從接受隊列中取消息對象,首次啟動肯定沒有對象
//當然如果此時是非首節點啟動,則接受隊列中會接收到選票資訊
Notification n = recvqueue.poll(notTimeout,
TimeUnit.MILLISECONDS);
if(n == null){
//這裡是判斷記憶體中是否存在待發送消息,如果有待發送消息就推到發送隊列中
//我們這是首次啟動,肯定為空,傳回的是fasle
if(manager.haveDelivered()){
sendNotifications();
} else {
//這裡是zk節點互通信的邏輯,詳細見QuorumCnxManager.connectAll
manager.connectAll();
}
}
//非首個節點啟動時,此處會一直等待其他節點發送選票,一旦收到選票,上面的n就不是null,就走以下邏輯
//校驗選票的leaderid以及目前id是否都屬于叢集節點中,也就是最開始zoo.cfg檔案的配置内容
else if (validVoter(n.sid) && validVoter(n.leader)) {
switch (n.state) {
case LOOKING:
//判斷選舉次數是否和記憶體選舉計數器一緻,第二個節點啟動,二者都為1,不走此邏輯
if (n.electionEpoch > logicalclock.get()) {
}
/*這個是選舉leader的具體算法,也比較簡單,首選比較選舉次數,再比較事務id,最後比較myid,哪個id大哪個為leader
((newEpoch > curEpoch) ||
((newEpoch == curEpoch) &&
((newZxid > curZxid) || ((newZxid == curZxid) && (newId > curId)))))
*/
else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
proposedLeader, proposedZxid, proposedEpoch)) {
//計算完leader節點後,需要将計算出來的leader節點、事務id、選舉次數等資訊更新
updateProposal(n.leader, n.zxid, n.peerEpoch);
//更新完節點資訊後,需要将自己選舉出來的最新leader以選票的形式發給其他節點
sendNotifications();
}
//源碼注釋寫着:如果是looking狀态,沒什麼意義,作用應該是将所有的 選票收集起來
recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
//termPredicate用于判斷目前節點是否已經獲得超過半數選票,如果超過半數,則将
//自己的節點資訊更新,并且将自己的狀态改為leadering
if (termPredicate(recvset,
new Vote(proposedLeader, proposedZxid,
logicalclock.get(), proposedEpoch))) {
}
}
break;
case OBSERVING:
case FOLLOWING:
case LEADING:
break;
}
} else{
}
}
return null;
} finally {
try {
if(self.jmxLeaderElectionBean != null){
MBeanRegistry.getInstance().unregister(
self.jmxLeaderElectionBean);
}
} catch (Exception e) {
LOG.warn("Failed to unregister with JMX", e);
}
self.jmxLeaderElectionBean = null;
LOG.debug("Number of connection processing threads: {}",
manager.getConnectionThreadCount());
}
}
//QuorumCnxManager.connectAll
public void connectAll(){
long sid;
for(Enumeration<Long> en = queueSendMap.keys();
en.hasMoreElements();){
sid = en.nextElement();
//擷取需要發送消息的所有server節點,并一一連接配接,源碼見下
connectOne(sid);
}
}
//QuorumCnxManager.connectOne
synchronized void connectOne(long sid){
//擷取鎖
synchronized (self.QV_LOCK) {
boolean knownId = false;
//讀取配置檔案中所有server成員資訊
self.recreateSocketAddresses(sid);
Map<Long, QuorumPeer.QuorumServer> lastCommittedView = self.getView();
//擷取節點檢查器,用于讀取其他節點資訊
QuorumVerifier lastSeenQV = self.getLastSeenQuorumVerifier();
Map<Long, QuorumPeer.QuorumServer> lastProposedView = lastSeenQV.getAllMembers();
if (lastSeenQV != null && lastProposedView.containsKey(sid)
&& (!knownId || (lastProposedView.get(sid).electionAddr !=
lastCommittedView.get(sid).electionAddr))) {
knownId = true;
/* 核心就是調用initiateConnectionAsync方法
synchronized private boolean connectOne(long sid, InetSocketAddress electionAddr){
return initiateConnectionAsync(electionAddr, sid);
}
//通過線程池調用QuorumConnectionReqThread.run方法,
//其核心是調用QuorumConnectionReqThread.initiateConnection
public boolean initiateConnectionAsync(final InetSocketAddress electionAddr, final Long sid) {
connectionExecutor.execute(new QuorumConnectionReqThread(electionAddr, sid));
connectionThreadCnt.incrementAndGet();
return true;
}
//QuorumConnectionReqThread.initiateConnection
public void initiateConnection(final InetSocketAddress electionAddr, final Long sid) {
Socket sock = null;
//從工廠中擷取一個socker對象
sock = SOCKET_FACTORY.get();
//對socket對象做一些基本的設定
setSockOpts(sock);
//通過socket連接配接目标zk服務節點
sock.connect(electionAddr, cnxTO);
//開始讀取傳輸的資料,并放入對應的發送隊列中,啟動發送和接受工作者線程
//裡面邏輯也相對簡單,就不進一步展示了
startConnection(sock, sid);
}
*/
if (connectOne(sid, lastProposedView.get(sid).electionAddr))
return;
}
}
}
zk選舉流程圖
經過以上的源碼分析,在有兩個zk節點啟動的情況下,整個選舉機制如下圖:
zk選舉流程圖
已有leader情形下,加入新的zk節點選舉機制
在選取了兩個zk節點之後,如果有第三個zk節點加入的話,此時第三個節點仍會給其他節點發選票,不過在擷取到已經選取出leader的結果後,會生成一張自己角色為following,并且将已有leader作為leading的選票,發送給其他節點,進而生成新的叢集網絡。
以上就是個人對zk選舉機制源碼的解析,下一篇寫一下zk選舉leader之後,各節點通信邏輯。