mapreduce中包含了兩個角色,coordinator和worker,其中,前者掌管任務的分發和回收,後者執行任務。mapreduce分為兩個階段,map階段和reduce階段。
map階段對應的是map任務。coordinator将會把任務分成多個部分,例如,有多個檔案待處理,則每個檔案的處理是一個任務。coordinator根據待處理檔案生成多個任務,将這些任務用available管道暫存,供worker取用。worker将任務完成之後,需要告知coordinator,coordinator需要記錄任務的狀态。為了辨別任務,每個任務需要有唯一的taskId。coordinator可以用taskId為key的map來存儲所有task,worker完成一個task之後,這個task就沒有必要儲存,coordinator可以從map中删除該task。coordinator存儲未完成的task,除了供worker比對之外,還可以用來重新分發逾時的任務。worker調用coordinator的applyForTask函數,來從avaliable隊列中得到新的任務。在map階段,worker收到任務後會調用mapf函數,這個函數是使用者傳入的參數,指向任務的具體執行過程。對mapf的執行結果,worker根據reduce的個數,将執行結果hash成reduce份。例如,對于wordcount任務,每個檔案中的詞的統計數量将根據詞分為reduce份,儲存在reduce個檔案中。
reduce階段對應的是reduce任務。coordinator将生成reduce個新的任務,每個任務處理一個hash桶中的内容。同樣用available管道供worker取用。當然,這時worker隻需要知道自己取到的是第幾個hash桶對應的reduce任務,即可通過共享檔案和統一的檔案命名規則擷取到此時需要處理的檔案。根據使用者reducef函數的輸入,worker将輸入檔案中的内容排序之後,将相同key的value存儲成數組,輸入reducef函數處理。
worker通知coordinator任務完成:worker對任務完成的通知可以不必發一個新的包,因為worker每次完成任務的同時都會立即向coordinator請求新的任務,是以可以在請求包中附送上一個已經完成的taskId。coordinator經過比對taskId和workerId确認無誤之後,在分發新任務之前就可以處理舊的已完成任務。
逾時任務檢測:有兩種選擇,一是worker接收任務之後定時發心跳包,但是這種方式較為繁瑣。另一種是coordinator定時檢查,task的map中對每個task維護一個ddl,若目前時刻已經超過了ddl時間,就視為逾時。
available管道初始化容量:不初始化容量的話,管道會阻塞。
任務結果檔案重命名:worker處理階段,為了防止其他worker也在處理這一檔案導緻的寫沖突,會将處理結果檔案命名中加上workerId,但reduce階段不需要知道map結果是由哪個worker生成的,是以coordinator确認任務完成後會對結果檔案重新命名,去掉workerId的标記。reduce階段同理。

mapreduce架構的一個重要瓶頸就是可能有大量的資料需要在伺服器之間進行傳遞,其論文中詳細讨論了這一點及其解決方案。
raft是一個分布式共識算法。分為上司選舉【Leader election】、日志複制【Log replication】和安全【Safety】。
在一個raft叢集中,server總是在三種狀态之間轉換,follower、candidate、leader,且保證任何時刻系統中最多隻有一個leader。系統将時間劃分為多個term,term順序遞增,candidate進行選舉的時候會先将自身的term加一,表示自己認為已經可以開始新的term了。在一個term内的穩定狀态下,raft叢集中隻有一個leader,其餘的server是follower,系統所處term的切換意味着leader的切換。leader定時向其餘伺服器發送一個heart beat心跳資訊,表示自己仍然存活,此外,接收外界對raft系統的資料請求,提供對外服務,生成日志條目,并且将日志條目複制給其他的follower,以此實作資料的多存儲;follower接受leader發送的heart beat,确認目前系統存在leader,并且接收leader發來的日志條目副本,更新本地的日志。
【Leader election】若follower的heart beat逾時,即,在一段時間内都沒有收到leader發來的heart beat。此時,這台follower認為leader已經挂掉,于是自動轉化為candidate狀态,開始競選成為新一期的leader。candidate将自身的term加一,投票給自己,同時向所有server發送requestVote的請求,對于收到requestVote請求的伺服器來說,隻要它們在這個term沒有投出票,則投給這個candidate,換句話說,一個server在一個term隻能投一次票。在一輪投票中,若所得票數大于總伺服器數量的一半,則赢得選舉,成為本期leader,同時立即發送一條heart beat宣布上任,系統回到穩定狀态。同一時刻允許同時存在多個candidate,此時可能會出現選票平分的情況,這時無法選出新的leader,candidate将重新發起投票,并且term再加一。重新投票将會影響系統的性能,為了減小同時出現多個candidate的可能性,每台server的heart beat逾時時間(等待heart beat的時間)将設定為一個區間範圍内的随機數。一般要求:heart beat時間<<選舉逾時時間<<平均故障時間。
由于raft叢集的server總是在三種狀态之間切換,不同狀态執行不同的任務,是以将使用狀态機來實作。server之間互相發送的包是心跳包和requestVote包以及它們的reply。
leader:【發送心跳包給follower和candidate,收到不合法的心跳則拒收】向叢集中其他所有成員定時發送heart beat,确認存活,同時接收其他成員回報的reply資訊。對于reply資訊,有多種情況:
reply.Success = true:成員承認本leader;
reply.Success = false:成員拒絕承認本leader。原因是該成員的term>leader.term,本leader的任期已過,叢集已經在新的term了。于是這台機器退位,降級為follower,并更新自身的term等資訊,保持與叢集同步。
follower:【從leader接收心跳包,從candidate接收requestVote包】
接收投票要求
如果投票的term大于自己,說明有人發現leader挂了,在發起新一輪的投票,投票,同時視為收到了心跳;
否則,拒絕投票,并且告訴通過reply.term告知candidate本機認為目前所處的term;
接收心跳:重置心跳逾時計時器;
檢查是否心跳逾時:若逾時,成為candidate,并且立即發起投票;
candidate:【從leader接收心跳,發送requestVote包給follower和其他candidate,從其他candidate接收requestVote包】
發起投票。對于投票結果:
若超過半數同意,則立即成為leader并且執行leader任務;
若有人拒絕:檢視reply.term,如果reply.term>=自己,說明是自己out了,降級為follower,取消本輪投票;否則就是單純的不投我,那就算了;
檢查投票是否逾時,若逾時,重新發起投票;
接收心跳,如果在投票過程中收到term>=自己的心跳,說明現在已經有leader了,降級到follower狀态,取消本輪投票。
關于投票取消的時候可能發生的異常讨論
follower同意投票的同時,将term更新,立即視為進入了新的term并且将這個candidate視為目前term的leader,這是沒有問題的。如果candidate選舉成功,顯然是沒問題的;如果candidate選舉不成功,即,取消投票,有以下情況:收到reply.term>=candidate.term的選舉回複,說明系統正在試圖開啟更大的term;收到term>=自己的心跳,說明目前系統中正處于更大的term,并且已經處于有leader的穩定狀态。不論是試圖開啟還是已經達到,當這個更大的term達到穩定的狀态時,其leader會發送心跳,心跳的term大于candidate的term,投票給candidate的server不會拒絕這些心跳,并且會立即響應進入新的term,從前的錯誤投票在新的term下毫無影響。
附:檢測和修複data race https://www.sohamkamani.com/golang/data-races/
——————————————
(重構)
對于一台server,需要做的事情有三個方面:選舉、日志複制、apply。其中,選舉和apply兩項是所有server都主動進行的,是以在初始化的時候使用兩個goroutine來控制,日志複制應該是由client調用start來控制進行的。
timeout一直在倒計時,一旦超出了倒計時就稱為candidate開始選舉,倒計時期間,可能由于收到leader或者任期更大的server的消息而reset倒計時。
logApplier不斷地推動lastApplied追上commitIndex,通過發送ApplyMsg給applyCh通道接口來apply日志,如果已經兩者已經一緻了,就wait直到有新的commit。如何檢查有新的commit呢?可以使用sync.cond條件變量,等commitIndex更新的時候用broadcast喚醒這個cond,進而疏通堵塞。
附: 關于sync.cond https://ieevee.com/tech/2019/06/15/cond.html
【選舉】
投票條件:
候選人最後一條Log條目的任期号大于本地最後一條Log條目的任期号;
或者,候選人最後一條Log條目的任期号等于本地最後一條Log條目的任期号,且候選人的Log記錄長度大于等于本地Log記錄的長度
becomeCandidate時,立即開始選舉,當然,這時候需要一些前序步驟:将term++辨別進入了新的term,将votedfor置為me表示投票給自己了。
選舉方:選舉過程需要一個“得票數”的變量votesRcvd來記錄已得票數(在分布式系統中,它的增加需要原子操作,是以用一個鎖sync.cond鎖來保護),此外,還要用一個finish變量來确定已經做出回答的server有多少。每當得到一枚票,就喚醒(broadcast)一次cond鎖,堵塞疏通,做出“繼續等待/處理最終票數/直接return”的選擇。其中,繼續等待是當票數不夠一半,但還有server沒有做出回複的時候。處理最終票數是剩餘情況。直接return比較特殊,因為可能在等待得票的過程中,本candidate已經不是candidate了,可能降級為follower了。處理最終票數就很簡單了,如果夠一半就更新為leader(開始心跳goroutine),不夠就變成follower(此時是因為所有server都已經做出了回複是以開始處理最終票數的),處理最終票數的過程中,要通過判斷和加鎖的方式,確定本candidate仍然是candidate,且目前任期和得票的任期一樣。
接待員(中間函數):構造args和reply,調用投票方的投票函數。對傳回結果,隻在voteGranted為true的時候傳回true,否則傳回false,如果reply.term更大,就令candidate降級為follower。同時,在處理期間也要保證本candidate是candidate的時候才有必要繼續進行,但繼續進行的時候,非必要不得對candidate加鎖,否則容易形成死鎖。
投票方:先檢查args.term,如果比自己大,那就先承認一下自己的follower地位, 如果args.term比自己小,那就voteGranted置為false,讓選舉方承認自己follower的地位,并且傳回,沒必要再理會這次選舉。繼續處理的是args.term>=自己的情況。如果還沒投,或者已經投給了這個candidate,并且term相同的話選舉方log更長,那就投給選舉方,并且reset選舉逾時計時器。否則不投。簡而言之,投票要檢查term,term相等的話看log是不是新于自己,以及票是不是已經投出去了。
設計技巧:
将單個詢問、處理回複和分發、回收分為兩個過程。前者是接待員,為單個投票方提供單個接待服務,後者是總管,給各個投票方配置設定出各自的接待員。
盡量不加鎖,或者鎖粒度盡可能小,在處理的時候判斷一下是不是狀态還未過時。
【日志複制】
接待員(發送方/中間函數):取出目标server對應的nextIndex和matchIndex。如果nextIndex,即即将發送的entries的開始位置,<=snapshotLastIndex,就是已經被壓縮了,那就将snapshot發送給目标server,傳回。如果nextIndex在log裡,就構造AppendEntriesArgs,把nextIndex後面所有的entries全發送過去,這時,要附帶nextIndex-1這一條的index和term,用來給目标server做一緻性檢查。對于傳回值,首先檢查term判斷是否本leader需要降級為follower,然後再判斷是否成功。如果成功,就更新nextIndex和matchIndex,再看看需不需要commit。如果不成功,那就是一緻性檢查出問題了,找到沖突點,重新執行接待任務。
快速回退法: 發生沖突的時候,讓follower傳回足夠的資訊給leader,這樣leader可以以term為機關來回退,而不用每次隻回退一條log條目,是以當log不比對的時候,leader隻需要在每個不同的term發送一條appendEntries,這是一種加速政策。
沖突點回溯:找到args.PrevlogTerm的第一條log的index,就是目前看來的沖突index。不會往之前的term找,因為無法确定那裡是不是沖突了。這個沖突index可能會有點悲觀,這裡會增加網絡負載,可以優化。
matchIndex隻在發送成功的時候更新,并且是為了commit設定的。follower的commitIndex始終是随着AppendEntriesArgs帶來的leader的commitIndex更新的,自己不能主動判斷更新。另,commit的時候會喚醒applyCond。
nextIndex總是很樂觀的,靠一緻性檢查和沖突點回溯來防止錯誤。
一條log entry的index和它在log中的下标不是同一個東西。
對log的操作可能很多,設計一個log類來專門管理這些操作,像cmu資料庫一樣寫一些基本的常用操作函數。
向管道中塞東西,可能會發生堵塞,是以要使用goroutine。例如 go rf.applyCh<-msg
appendNewEntry時的index
【關于日志複制時可能出現的異常情況讨論】
如果leader正常工作,raft系統中不會出現什麼問題,follower隻需要接收leader發來的日志資訊,将log的狀态與leader的log狀态靠齊即可。
一個舊leader故障之後,新的leader是否可以使系統達到一緻?
假設現在系統中有三台機器,S1,S2和S3,其中S3是舊的leader,且系統此刻是一緻的。S3可能引發不一緻的故障時刻有三種:
将新條目添加到本地log之後立即故障:根據多數選舉的規則,S1和S2中可以出現新的leader,系統繼續服務。
将新條目添加到S1之後故障:S1可以成為leader,系統繼續服務,S1會将這條條目傳遞給其他機器并且送出。
将新條目添加到S1并且送出之後故障:同上。
是以,舊leader S3故障之後,剩下的團體也可以正常服務。如果此時舊leader重新與叢集建立了聯系,系統将會如何?
不論中間經過了多少個term,假設現在的leader是S1,舊leader是S3,S3重新加入叢集的時候,首先S3肯定會降級為follower,如果S3可以立即被選舉為leader,那麼就可以視為S3沒有發生過故障。S1會發送新條目給S3的時候,S3會進行不一緻性檢查,經過多次發送并嘗試append條目,S1會令S3的log狀态與自己的達成一緻。
【關于已經commit的log是否會丢失的進一步讨論】
假設目前leader是S3。已知leader選舉,當term一緻的時候,隻能給log長于自己的投選舉票。那麼隻有log長于其中超過半數機器的機器可以成為leader。已知leader永遠不會丢棄自己已有的log,那麼存在于leader中的被commit的log肯定不會被丢棄。丢棄的情況隻會是一條log被大多數機器記錄,但leader沒有記錄。
假設該log的index是i1,term是t1。根據log append的連續性,S3至多接收到i1-1之後就沒有接收到t1的其他任何log了。進一步地,由于i1被保留到了S3入選的term,是以t1之後的leader都有i1記錄,是以S3至多接收到i1-1之後就再也沒有收到到選舉為止的其他任何log了。在這種情況下,還要保證經曆了所有的term(才能term與其他選舉者一緻),即使之後的所有term都不再append條目到任何機器上,那也有大多數機器比S3多了i1這條log,S3不可能選舉成功。推出沖突,是以commit的log不會丢失。
【persist】
persist類是raft類中的一個成員。其作用應該是為了儲存state資訊和snapshot資訊,state資訊包括currentTerm,votedFor,log。隻有這三者需要被持久化存儲,log是唯一記錄了應用程式狀态的地方,其中存儲的一系列操作是唯一能在斷電重新開機之後用來重建應用程式狀态的資訊;votedfor和currenterm是為了保證每個任期最多隻有一個leader。其他的狀态,例如lastApplied和commitIndex都可以通過leader和follower之間的交流來重新獲得。
【snapshot】
每個server會自己建立自己的snapshot,也會接受并install leader發送的snapshot(這發生在日志同步的時候nextIndex<=ssLastshot時)。隻有leader可以讓其他server install自己的snapshot,這和隻有leader可以讓其他server appendEntries一樣,是以,發送處理和接收處理之前都必須check發送方的leader身份,并且可以以此來代替加鎖。
收到installSnapshot和收到AppendEntries類似,都需要有檢查leader身份,确認自己follower身份和reset election timer等操作。将得到的snapshot發送到applyCh即可。
假死問題:由于網絡原因導緻的心跳逾時,認為leader已死,但其實leader還活着。
腦裂問題:指的是分布式叢集系統中由于網絡故障等原因,選舉出了兩個leader,叢集分裂成兩個叢集。出現腦裂問題的原因是分布式算法中沒有考慮過半機制。腦裂問題對分布式系統是緻命的,兩個叢集同時對外提供服務,會出現各種不一緻問題,如果兩個叢集突然可以聯通了,将不得不面對資料合并、資料沖突的解決等問題。
為了解決腦裂問題,通常有四種做法:
zookeeper和raft中使用的過半原則;
添加心跳線。叢集中采取多種通信方式,防止一種通信方式失效導緻叢集中的節點無法通信,比如原來隻有一條心跳線路,此時若斷開,則判斷對方已死亡,若有兩條心跳線,一條斷開,另一條仍然可以收發心跳,保證叢集服務正常運作,備用線路與主線路可以互相監測,正常情況下備用線路為了節約資源而不起作。
使用磁盤鎖的形式,保證叢集中隻能有一個Leader擷取磁盤鎖,對外提供服務,避免資料錯亂發生。但是,也會存在一個問題,若該Leader節點當機,則不能主動釋放鎖,那麼其他的Follower就永遠擷取不了共享資源。于是有人在HA中設計了"智能"鎖。正在服務的一方隻有在發現心跳線全部斷開(察覺不到對端)時才啟用磁盤鎖。平時就不上鎖了。
仲裁機制。比如提供一個參考的IP位址,心跳機制斷開時,節點各自ping一下參考IP,如果ping不通,那麼表示該節點網絡已經出現問題,則該節點需要自行退出争搶資源,釋放占有的共享資源,将服務的提供功能讓給功能更全面的節點。
過半原則:根據鴿巢原理,raft中任意一個操作都需要過半的伺服器的認同,這樣能保證始終隻有一個leader。此外,伺服器通常選擇奇數台機器部署,這樣可以用較少的機器實作相同的叢集容忍度。
快速上司者選舉算法:在選舉的過程中進行過半驗證,這樣不需要等待所有server都認同,速度比較快。
在此,從一個比lab2更高層次的角度看待分布式系統。lab2中的raft是用于機器之間互相溝通形成一緻的log和state,但機器之間并不關心log中存儲的command是什麼,是以全部使用interface{}作為command的接口。lab3中,我們要實作的是client調用Get()、Append()、Put(),server通過raft達成叢集内的一緻,然後将raft apply的command正式執行。raft系統在這一過程中,隻起到了一緻性的作用,是指令的被調用和真正執行之間的一層。
這裡需要注意的是線性一緻性,為了實作這一點,給command遞增的index(由raft調用start後傳回),使用一個map記錄每個client最近最後一個被執行command的index以及執行結果,由此可以推測出command序列執行到哪一條了,防止重複執行。
另外,由于raft系統在start和apply之間需要一定的時間,是以,用戶端調用讀寫函數,讀寫函數調用start通知raft叢集之後,注冊一個index對應的待相應result channel,存儲在以index為key的map中。當raft系統達成一緻,apply這條指令的時候,從apply函數調用真正的讀寫過程,執行結果push到index對應的channel中。于是,用戶端調用的讀寫函數隻需要直接去result channel中取出這條指令的執行結果。這樣做非常的簡潔流暢,用channel阻塞的時間來等待raft系統一緻、apply執行讀寫。
關于start和apply之間leader被更換的讨論:
一條command,在其start和apply中間,可能raft系統已經更換了leader,對于新的leader來說,它沒有為這條command建立channel(start不是通過新leader進行的),試圖将result放入channel的時候會失敗,導緻直接傳回。然而,舊leader雖然降級為follower,但仍然會對這條apply,是以即使更換leader也沒關系,但需要注意的是,從channel取出result的時候,就不必判斷這個機器是不是leader了,隻要在start的時候判斷了就可以了。
關于command的index是否會發生變化的讨論:
command的index是由start調用的時候,leader的log中目前log最後一條entry的index+1決定的順序index,如果這條command的entry被覆寫,那就會逾時,client将更換server重新執行,如果沒有被覆寫,将會保持這個index。
如果command的entry被覆寫了,且這個index對應的map中仍然有channel在等待答案(發生于leader降級,被新的leader清除了index對應位置,并且沒有覆寫,leader又當選為leader,并建立了新的index位置),那麼将會發生不比對,是以,應該在從channel中取出result的時候檢查op是否是在等待的那個。
如果op正好與在等待的那個一緻,但是seq又不是那個呢?沒有關系,隻要執行内容一緻就可以了。client中等待之後那條op結果的timer會逾時,重新執行之後那條op。
B部分是壓縮,kv中有一個變量maxraftState限制了log的長度,若即将超過這個長度,就對log進行壓縮。同時,kv的data和peocessed也應該被持久化存儲。
此外,LAB3可以使用init函數完成logger注冊,并記錄。當然,這不是必需的。
關于golang中的init函數
golang裡的main函數是程式的入口函數,main函數傳回後,程式也就結束了。golang還有另外一個特殊的函數init函數,先于main函數執行,實作包級别的一些初始化操作。
init函數的主要作用:
初始化不能采用初始化表達式初始化的變量。
程式運作前的注冊。
實作sync.Once功能。
其他
init函數的主要特點:
init函數先于main函數自動執行,不能被其他函數調用;
init函數沒有輸入參數、傳回值;
每個包可以有多個init函數;
包的每個源檔案也可以有多個init函數,這點比較特殊;
同一個包的init執行順序,golang沒有明确定義,程式設計時要注意程式不要依賴這個執行順序。
不同包的init函數按照包導入的依賴關系決定執行順序。