在執行一個job的時候,hadoop會将輸入資料劃分成n個split,然後啟動相應的n個map程式來分别處理它們。
資料如何劃分?split如何排程(如何決定處理split的map程式應該運作在哪台tasktracker機器上)?劃分後的資料又如何讀取?這就是本文所要讨論的問題。
先從一張經典的mapreduce工作流程圖出發:
1、運作mapred程式;
2、本次運作将生成一個job,于是jobclient向jobtracker申請一個jobid以辨別這個job;
3、jobclient将job所需要的資源送出到hdfs中一個以jobid命名的目錄中。這些資源包括jar包、配置檔案、inputsplit、等;
4、jobclient向jobtracker送出這個job;
5、jobtracker初始化這個job;
6、jobtracker從hdfs擷取這個job的split等資訊;
7、jobtracker向tasktracker配置設定任務;
8、tasktracker從hdfs擷取這個job的相關資源;
9、tasktracker開啟一個新的jvm;
10、tasktracker用新的jvm來執行map或reduce;
……
對于之前提到的三個問題,這個流程中的幾個點需要展開一下。
首先是“資料如何劃分”的問題。
在第3步中,jobclient向hdfs送出的資源就包含了inputsplit,這就是資料劃分的結果。也就是說,資料劃分是在jobclient上完成的。在這裡,jobclient會使用指定的inputformat将輸入資料做一次劃分,形成若幹個split。
inputformat是一個interface。使用者在啟動mapreduce的時候需要指定一個inputformat的implement。inputformat隻包含了兩個接口函數:
inputsplit[] getsplits(jobconf job, int numsplits) throws ioexception;
recordreader<k, v> getrecordreader(inputsplit split, jobconf job, reporter reporter) throws ioexception;
getsplits就是現在要使用的劃分函數。job參數是任務的配置集合,從中可以取到使用者在啟動mapreduce時指定的輸入檔案路徑。而numsplits參數是一個split數目的建議值,是否考慮這個值,由具體的inputformat實作來決定。
傳回的是inputsplit數組,它描述了所有的split資訊,一個inputsplit描述一個split。
inputsplit也是一個interface,具體傳回什麼樣的implement,這是由具體的inputformat來決定的。inputsplit也隻有兩個接口函數:
long getlength() throws ioexception;
string[] getlocations() throws ioexception;
這個interface僅僅描述了split有多長,以及存放這個split的location資訊(也就是這個split在hdfs上存放的機器。它可能有多個replication,存在于多台機器上)。除此之外,就再沒有任何直接描述split的資訊了。比如:split對應于哪個檔案?在檔案中的起始和結束位置是什麼?等等重要的特征都沒有描述到。
為什麼會這樣呢?因為關于split的那些描述資訊,對于mapreduce架構來說是不需要關心的。架構隻關心split的長度(主要用于一些統計資訊)和split的location(主要用于split的排程,後面會細說)。
而split中真正重要的描述資訊還是隻有inputformat會關心。在需要讀取一個split的時候,其對應的inputsplit會被傳遞到inputformat的第二個接口函數getrecordreader,然後被用于初始化一個recordreader,以解析輸入資料。也就是說,描述split的重要資訊都被隐藏了,隻有具體的inputformat自己知道。它隻需要保證getsplits傳回的inputsplit和getrecordreader所關心的inputsplit是同樣的implement就行了。這就給inputformat的實作提供了巨大的靈活性。
最常見的fileinputformat(implements inputformat)使用filesplit(implements inputsplit)來描述split。而filesplit中有以下描述資訊:
private path file; // split所在的檔案
private long start; // split的起始位置
private long length; // split的長度,getlength()會傳回它
private string[] hosts; // split所在的機器名稱,getlocations()會傳回它
然後,配套使用的recordreader将從filesplit中擷取資訊,解析檔案名為filesplit.file的檔案中從filesplit.start到filesplit.start+filesplit.length之間的内容。
至于具體的劃分政策,fileinputformat預設為檔案在hdfs上的每一個block生成一個對應的filesplit。那麼自然,filesplit.start就是對應block在檔案中的offset、filesplit.length就是對應block的length、filesplit.hosts就是對應block的location。
但是可以設定“mapred.min.split.size”參數,使得split的大小大于一個block,這時候fileinputformat會将連續的若幹個block分在一個split中、也可能會将一個block分别劃在不同的split中(但是前提是一個split必須在一個檔案中)。split的start、length都好說,都是劃分前就定好的。而split的location就需要對所有劃在其中的block的location進行整合,盡量尋找它們共有的location。而這些block很可能并沒有共同的location,那麼就需要找一個距離這些block最近的location作為split的location。
還有combinefileinputformat(implements inputformat),它可以将若幹個split打包成一個,目的是避免過多的map任務(因為split的數目決定了map的數目)。雖然說設定“mapred.min.split.size”參數也可以讓fileinputformat做到這一點,但是filesplit取的是連續的block,大多數情況下這些block可能并不會有共同的location。
combinefileinputformat使用combinefilesplit(implements inputsplit)來描述split。combinefilesplit的成員如下:
private path[] paths; // 每個子split對應一個檔案
private long[] startoffset; // 每個子split在對應檔案中的起始位置
private long[] lengths; // 每個子split的長度
private string[] locations; // split所在的機器名稱,getlocations()會傳回它
private long totlength; // 所有子split長度之和,getlength()會傳回它
其中前三個數組一定是長度相等并且一一對應的,描述了每一個子split的資訊。而locations,注意它并沒有描述每一個子split,而描述的是整個split。這是因為combinefileinputformat在打包一組子split時,會考慮子split的location,盡量将在同一個location(或者臨近位置)出現的split打包在一起,生成一個combinefilesplit。而打包以後的locations自然就是由所有子split的location整合而來。
同樣,配套使用的recordreader将從combinefilesplit中擷取資訊,解析每一個檔案名為combinefilesplit.paths[i]的檔案中從combinefilesplit.startoffset[i]到combinefilesplit.startoffset[i]+combinefilesplit.lengths[i]之間的内容。
具體到劃分政策,combinefilesplit先将輸入檔案拆分成若幹個子split,每個子split對應檔案在hdfs的一個block。然後按照“mapred.max.split.size”配置,将length之和不超過這個值的擁有共同location的幾個子split打包起來,得到一個combinefilesplit。最後可能會剩下一些子split,它們不滿足擁有共同location這個條件,那麼打包它們的時候就需要找一個距離這些子split最近的location作為split的location。
有時候,可能輸入檔案是不可以劃分的(比如它是一個tar.gz,劃分會導緻它無法解壓),這也是設計inputformat時需要考慮的。可以重載fileinputformat的issplitable()函數來告知檔案不可劃分,或者幹脆就從頭實作自己的inputformat。
由于inputsplit接口是非常靈活的,還可以設計出千奇百怪的劃分方式。
接下來就是“split如何排程”的問題。
前面在劃分輸入資料的時候,不斷提到location這個東西。inputsplit接口中有getlocations()、inputformat的implement在生成inputsplit的時候需要關心對應block的location,并且當多個block需要放到一個inputsplit的時候還需要對location做合并。
那麼這個location到底用來做什麼呢?它主要就是用來給split的排程提供參考。
先簡單介紹一下jobtracker是怎樣将一個split所對應的map任務分派給tasktracker的。在前面的流程圖中,第6步jobtracker會從hdfs擷取job的split資訊,這将生成一系列待處理的map和reduce任務。jobtracker并不會主動的為每一個tasktracker劃分一個任務子集,而是直接把所有任務都放在跟job對應的待處理任務清單中。
tasktracker定期向jobtracker發送心跳,除了保持活動以外,還會報告tasktracker目前可以執行的map和reduce的剩餘配額(tasktracker總的配額由“mapred.tasktracker.map.tasks.maximun”和“mapred.tasktracker.reduce.tasks.maximun”來配置)。如果jobtracker有待處理的任務,tasktracker又有相應的配額,則jobtracker會在心跳的應答中給jobtracker配置設定任務(優先配置設定map任務)。
在配置設定map任務時,split的location資訊就要發揮作用了。jobtracker會根據tasktracker的位址來選擇一個location與之最接近的split所對應的map任務(注意一個split可以有多個location)。這樣一來,輸入檔案中block的location資訊經過一系列的整合(by inputformat)和傳遞,最終就影響到了map任務的配置設定。其結果是map任務傾向于處理存放在本地的資料,以保證效率。
當然,location僅僅是jobtracker在配置設定map任務時所考慮的因素之一。jobtracker在選擇任務之前,需要先標明一個job(可能正有多個job等待處理),這取決于具體taskscheduler的排程政策。然後,jobtracker又會優先選擇因為失敗而需要重試的任務,而重試任務又盡量不要配置設定到它曾經執行失敗過的機器上。
jobtracker在配置設定reduce任務時并不考慮location,因為大部分情況下,reduce處理的是所有map的輸出,這些map遍布在hadoop叢集的每一個角落,考慮location意義不大。
最後就是“劃分後的資料如何讀取”的問題。
接下來,在前面的流程圖的第10步,tasktracker就要啟動一個新的jvm來執行map程式了。在map執行的時候,會使用inputformat.getrecordreader()所傳回的recordreader對象來讀取split中的每一條記錄(getrecordreader函數中會使用inputsplit對recordreader進行初始化)。
咋一看,recordreader似乎會使用split的location資訊來決定資料應該從哪裡去讀。但是事實并非如此。前面也說過,split的location很可能是被inputformat整合過的,可能并不是block真正的location(就算是,也沒法保證從inputsplit在jobclient上被生成到現在的這段時間之内,block沒有被移動過)。
說白了,split的location其實是inputformat期望這個split被處理的location,它完全可以跟實際block的location沒有半點關系。inputformat甚至可以将split的location定義為“距離split所包含的所有block的location最遠的那個location”,隻不過大多數時候我們肯定是希望map程式在本地就能讀取到輸入資料的。
是以說,recordreader并不關心split的location,隻管open它的path。前面說過,recordreader是由具體的inputformat建立并傳回的,它跟對應的inputformat所使用的inputsplit必定是配對的。比如,對應于filesplit,recordreader要讀取filesplit.file檔案中的相應區間、對應于combinefilesplit,recordreader要讀取combinefilesplit.paths中的每個檔案的相應區間。
recordreader對一個path的open操作由dfsclient來完成,它會向hdfs的namenode擷取對應檔案在對應區間上的block資訊:依次有哪些block、每個block各自的location。而要讀寫一個block的時候,dfsclient總是使用namenode傳回的第一個location,除非讀寫失敗才會依次選擇後面的location。
而namenode在處理open請求時(getblocklocations),在得到一個block有哪些location之後,會以dfsclient所在的位址為依據,對這些location進行排序,距離越小的越排在前。而dfsclient又總是會選擇排在前面的location,是以,最終recordreader會傾向于讀取本地的資料(如果有的話)。
但是,不管block是不是本地的,dfsclient都會向datanode建立連接配接,然後請求資料。并不會因為block是本地的而直接讀磁盤上的檔案,因為這些檔案都是由datanode來管理的,需要通過datanode來找到block所對應的實體檔案、也需要由datanode來協調對檔案的并發讀寫。是以本地與非本地的差别僅僅在于網絡傳輸上,前者是僅僅在本地網絡協定棧上面繞一圈、而後者則是真正的網絡通訊。在block離得不遠、且網絡比較暢通的情況下,非local并不意味着太大的開銷。
是以hadoop優先追求map的data-local,也就是輸入資料存放在本地。如果不能滿足,則退而求其次,追求rack-local,也就是輸入資料存放在同一機架的其他機器上,這樣的話網絡開銷對性能影響一般不會太大。而如果這兩種情況都不能滿足,則網絡傳輸可能會帶來較大的開銷,hadoop會盡量去避免。這也就是之前提到的,在屬于同一split的block沒有共同location的情況下,要計算一下離它們最近的location的原因。
至此,關于inputformat的資料劃分、split排程、資料讀取三個問題就分析完了。