前言
Paxos、Zab、Raft都屬于在分布式環境保持資料一緻性的相關算法。
對于這三個算法,初次接觸的時候有很多疑惑的地方:
1. 這3個算法的實作是什麼,複雜麼
2. 為什麼要存在這麼多算法,一個不能解決麼,都用在什麼場景。
算法本身不太複雜,但是應用在實際場景中解決問題,開發起來還是比較複雜的。
下面盡可能簡單易懂的進行描述。
Paxos
paxos算法是在不會出現拜占庭錯誤的環境下達成一緻性協定的解決方案。
p.s. 分布式環境都是通過網絡通訊,系統中的成員可能出錯而發送錯誤的資訊,用于傳遞資訊的通訊網絡也可能導緻資訊損壞,傳輸或響應的資訊有誤算是拜占庭錯誤,而非拜占庭錯誤就是說資訊不會在傳輸過程中遭到篡改。
詳細了解,檢視:拜占庭将軍問題
角色
paxos在分布式環境中存在多個節點,節點的角色如下:
- Client 在分布式系統中發送一個請求并等待響應,比如寫一條資料的請求
- Proposer 發送prepare和accept請求,對于client發送的請求希望Acceptor能同意
- Acceptor 處理prepare和accept請求,就是對發送的請求進行投票
- Learner 對于上面的請求,擷取paxos算法的結果
- Leader Paxos叢集中選出唯一一個節點作為leader處理提議
Acceptor是對請求進行投票的,那在分布式環境中作為Acceptor的大部分節點都應當是存活的,來保證多票當選(同zab或raft的投票選舉來了解)。
并不是一個節點隻能作為一個角色,paxos實作的叢集,每一個節點應該包含Proposer、Acceptor、Learner三種角色,可以處理Client請求并進行投票最終響應。
Paxos的實作也是不同的,比如Basic Paxos和Multi-Paxos,本文主要以基本實作進行說明。
算法
paxos算法實作分為兩個階段,通信過程中資料結構可以簡化為(n, v)表示。
n表示一個提案号,v是與該提案号對應的值。
e.g. client要寫入一條資料,Proposer提出一個提案号給叢集中其它的Acceptor,如果有Quorum(可以認為是半數以上)的Acceptor同意這個提案,那麼這個提案号對應的資料v就可以被寫入。
這兩個階段如下:
階段1a:Proposer發出一條“Prepare"消息,帶着提案号n,發給Acceptor(包括它自身)
階段1b:Acceptor收到Proposer的Prapare消息後,看一下這個提案号和之前收到的提案号相比,如果比之前的都大就同意(發一條Propose消息給Proposer),不是就忽略或表示拒絕
階段2a:Proposer收到Qururm數量的Acceptor的Propose消息,說明都同意這個提案,就發送一個Accept(n,v)消息給這些Acceptor(帶着提案号和該提案号對應的資料)
階段2b:收到Proposer的Accept消息的Acceptor就把這個資料寫入。
這樣就算達成一個共識,如果上面提案最終失敗,其實會重新開始新一輪提案。
下面這個流程圖來自Paxos的wiki,可以看一下幫助了解下這個過程:
圖檔來源
注意,上面這個基本的paxos實作包括兩個階段會涉及很多消息交換,Multi-Paxos 實作會選舉一個leader,隻需要第2階段即可确定一個值。
Paxos的實作案例 chubby:https://courses.cs.washington.edu/courses/csep552/13sp/lectures/5/chubby.pdf
Raft
目前使用相當廣泛的一個一緻性算法,比如ETCD,Consul,kafka,rocketmq的dledger都有用到。
p.s. 我在wiki搜尋的搜尋的搜到的是一款遊戲
目前我實際接觸到的都是leader選舉、日志複制的解決方案。
角色
一個raft叢集有若幹個節點,角色如下:
- Leader 隻有一個(比如kafka的分區、rocketmq的主節點(dledger),接受用戶端的請求
- Follower 接受leader的寫請求(資料同步過來)
- Candidate 在選舉leader時的這些follower
比如我們說kafka消息主從複制就是說Leader和Follower。
對于Raft的快速了解,推薦一個網站(新手都容易看懂):Raft
算法
以實際項目來舉例,其實很簡單,以rocketmq dledger來說明。
日志複制
p.s. 不以kafka示例,有兩個原因,一個是kafka的ack數量可以配置,寫入一條消息的可以配置ack數量算是成功。rocketmq dledger如是1主2從,隻要有一個從節點寫入成功(叢集中一主一從2個節點已經寫入,超過半數節點)便可以認為成功寫入,更容易了解。第二個原因,是我本地剛好有一版rocketmq的源碼。
代碼版本是4.9.1
DLedgerCommitLog:DLedgerCommitLog
BatchAppendEntryRequest request = new BatchAppendEntryRequest();
request.setGroup(dLedgerConfig.getGroup());
request.setRemoteId(dLedgerServer.getMemberState().getSelfId());
request.setBatchMsgs(encodeResult.batchData);
// 寫入消息
dledgerFuture = (BatchAppendFuture<AppendEntryResponse>) dLedgerServer.handleAppend(request);
看handleAppend()方法:dledger/DLedgerServer.java at master · openmessaging/dledger · GitHub
看方法注釋:
關鍵是第2步,等待Quorum節點的ack.
DLedgerEntry dLedgerEntry = new DLedgerEntry();
dLedgerEntry.setBody(request.getBody());
DLedgerEntry resEntry = dLedgerStore.appendAsLeader(dLedgerEntry);
return dLedgerEntryPusher.waitAck(resEntry, false);
基本流程就是用戶端生産者發送一條消息到主節點,主節點發送給從節點,等待其中部分從節點寫入成功傳回ack,主節點響應用戶端消息送出成功。
選舉
關于ledaer選舉主要在:DLedgerLeaderElector
通過心跳維護leader和follower之間的關系:
/**
* The core method of maintainer. Run the specified logic according to the current role: candidate => propose a
* vote. leader => send heartbeats to followers, and step down to candidate when quorum followers do not respond.
* follower => accept heartbeats, and change to candidate when no heartbeat from leader.
*
* @throws Exception
*/
private void maintainState() throws Exception {
if (memberState.isLeader()) {
maintainAsLeader();
} else if (memberState.isFollower()) {
maintainAsFollower();
} else {
maintainAsCandidate();
}
}
private void maintainAsLeader() throws Exception {
if (DLedgerUtils.elapsed(lastSendHeartBeatTime) > heartBeatTimeIntervalMs) {
long term;
String leaderId;
synchronized (memberState) {
if (!memberState.isLeader()) {
//stop sending
return;
}
term = memberState.currTerm();
leaderId = memberState.getLeaderId();
lastSendHeartBeatTime = System.currentTimeMillis();
}
sendHeartbeats(term, leaderId);
}
}
private void maintainAsFollower() {
if (DLedgerUtils.elapsed(lastLeaderHeartBeatTime) > 2 * heartBeatTimeIntervalMs) {
synchronized (memberState) {
if (memberState.isFollower() && (DLedgerUtils.elapsed(lastLeaderHeartBeatTime) > maxHeartBeatLeak * heartBeatTimeIntervalMs)) {
logger.info("[{}][HeartBeatTimeOut] lastLeaderHeartBeatTime: {} heartBeatTimeIntervalMs: {} lastLeader={}", memberState.getSelfId(), new Timestamp(lastLeaderHeartBeatTime), heartBeatTimeIntervalMs, memberState.getLeaderId());
changeRoleToCandidate(memberState.currTerm());
}
}
}
}
超過2個心跳的逾時,follower就會進入candidate重新選舉。
看下這段代碼,作為不同的角色在實作會有不同的行為:
/**
* The core method of maintainer. Run the specified logic according to the current role: candidate => propose a
* vote. leader => send heartbeats to followers, and step down to candidate when quorum followers do not respond.
* follower => accept heartbeats, and change to candidate when no heartbeat from leader.
*
* @throws Exception
*/
private void maintainState() throws Exception {
if (memberState.isLeader()) {
maintainAsLeader();
} else if (memberState.isFollower()) {
maintainAsFollower();
} else {
maintainAsCandidate();
}
}
成為candidate 候選者的時候,就會投票選舉leader,實作就在maintainAsCandidate()方法:
同意票數超過Quorum數量,就是投票通過,選舉為leader。
raft的應用正如上面說的,比較多了:kafka, rocketmq dledger, etcd...
Zab
Zab的全稱是Zookeeper Atomic Broadcast協定,聽名字好像是專用于zookeeper的協定,目前我主要了解到的也是在zookeeper上的應用。
p.s. zab的讀法,我目前聽到過兩個版本(zai bi)或者(za bi),至于哪個标準我也不知道。
zab是相對來說是屬于非常強一緻的協定了(zookeeper不算是最強一緻性,業内好像是還有比zk更強一緻性的實作)。CAP理論,如果有了解應該聽說個,zookeeper是CP,叢集中必須半數以上節點存在才可用,犧牲可用性來滿足一緻性。
下面以zookeeper來說明。
角色
- Leader 叢集通過投票選舉出來的唯一節點,主要接受用戶端對叢集的寫請求
- Follower 參與投票選舉leader,當用戶端發起寫請求的時候,處理leader的Proposal,leader在收到超過Quorum(大多數節點,zk是半數節點)的follower的accept的ack後,ledaer才會送出這條消息。follower其實跟leader保持資料同步。
在zookeeper中還個observer的節點角色,但是這個并參與選舉和事務送出,是以在zab協定中不再提現。zookeeper的observer隻是叢集讀性能的一種優化。
p.s. zookeeper叢集的所有類型的節點都可以處理讀請求,但是寫請求都是轉發給leader處理。
算法
在zab中事務送出時的資料同步術語好像不像是raft的複制而broadcast,我這裡就叫做廣播了。
在zab中有個zxid是一個自增值,叫做事務id,每次一條寫請求可以看做是一個事務。
廣播
- 用戶端發一起提條資料更新的請求
- leader進行處理,發送 PROPOSE(zxid, data)(帶着zxid和更新的資料)請求包給所有連接配接的follower
- follower收到請求會将資料同步到本地,然後傳回一個ACK(zxid)資料包給leader
- leader收到quorum(超過半數)的節點的ack就發送一個送出請求COMMIT(zxid) 給所有的follower
- 響應用戶端
選舉
直接拿zookeeper的源碼進行說明。源碼在:zookeeper/FastLeaderElection.java at master · apache/zookeeper · GitHub
zookeeper投票的主要資料是(epoch,zxid,sid),epoch是zookeeper的選舉輪次,這個資料會備份到資料目錄下的一個檔案(哪個目錄下,具體我也記不清了);zxid就是上文提到的事務id,在資料目錄下增量檔案的名,sid是叢集配置的server id,在配置檔案中。
zookeeper的follower發起投票選舉,如果誰收獲大多數(Querum)的票就當選leader。
if (voteSet.hasAllQuorums()) {
// Verify if there is any change in the proposed leader
// 獲得多數投票的時候還是等一會,看這期間是否會收到新的投票不
// 因為可能有一些更符合leader條件的節點由于網絡的原因,投票的請求傳輸慢了,其它節點收到的晚了
while ((n = recvqueue.poll(finalizeWait, TimeUnit.MILLISECONDS)) != null) {
if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) {
recvqueue.put(n);
break;
}
}
/*
* This predicate is true once we don't read any new
* relevant message from the reception queue
*/
if (n == null) {
setPeerState(proposedLeader, voteSet);
Vote endVote = new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch);
leaveInstance(endVote);
return endVote;
}
}
對目前收到的投票是否有效的判斷邏輯,在totalOrderPredicate方法:
return ((newEpoch > curEpoch)
|| ((newEpoch == curEpoch)
&& ((newZxid > curZxid)
|| ((newZxid == curZxid)
&& (newId > curId)))));
如果一個follower收到的票比自己的更有效(上面這個判斷),設定這個票為自己的投票并重新投出去。
在選舉出leader後,跟follower有一個資料同步的動作。具體說明感興趣可以看一下:Zab1.0 - Apache ZooKeeper - Apache Software Foundation
總結
在raft之前,paxos使用的比較多。
基本的paxos,對于一次資料更新請求,叢集各個節點可能要進行多輪消息交換,而raft因為必須要選擇一個leader,通過leader隻需要一輪消息交換。