天天看點

Hadoop OutputFormat淺析

在hadoop中,outputformat和inputformat是相對應的兩個東西。相比于inputformat,outputformat似乎沒有那麼多細節。inputformat涉及到對輸入資料的解析和劃分,繼而影響到map任務的數目,以及map任務的排程(見《hadoop inputformat淺析》)。而outputformat似乎像其字面意思那樣,僅僅是完成對輸出資料的格式化。

對于輸出資料的格式化,這個應該沒什麼值得多說的。根據需要,outputformat愛把輸出寫成什麼格式就寫成什麼格式、愛把輸出寫到資料庫就寫到資料庫、愛把輸出通過網絡發給其他服務就發給其他服務...

不過,outputformat所做的事情其實并不限于此。outputformat類包含如下三個方法:recordwriter getrecordwriter(taskattemptcontext context);void checkoutputspecs(jobcontext context);outputcommitter getoutputcommitter(taskattemptcontext context);

其中:checkoutputspecs是在jobclient送出job之前被調用的(在使用inputfomat進行輸入資料劃分之前),用于檢測job的輸出路徑。比如,fileoutputformat通過這個方法來确認在job開始之前,job的output路徑并不存在,然後該方法又會重新建立這個output路徑。這樣一來,就能確定job結束後,output路徑下的東西就是且僅是該job輸出的。

getrecordwriter用于傳回一個recordwriter的執行個體,reduce任務在執行的時候就是利用這個執行個體來輸出key/value的。(如果job不需要reduce,那麼map任務會直接使用這個執行個體來進行輸出。)

recordwriter有如下兩個方法:

void write(k key, v value);void close(taskattemptcontext context);前者負責将reduce輸出的key/value寫成特定的格式,後者負責對輸出做最後的确認并關閉輸出。前面提到的outputformat的字面含義,其實就是由這個recordwriter來實作的。

而第三個方法,getoutputcommitter則用于傳回一個outputcommitter的執行個體。(在hadoop-0.20中,mapreduce有兩套api。getoutputcommitter是在newapi中才提供的,oldapi裡面并沒有。不過oldapi同樣有outputcommtter這個東西,隻是不能通過outputformat來定制而已。)

outputcommitter用于控制job的輸出環境,它有下面幾個方法:void setupjob(jobcontext jobcontext);void commitjob(jobcontext jobcontext);void abortjob(jobcontext jobcontext, jobstatus.state state);void setuptask(taskattemptcontext taskcontext);boolean needstaskcommit(taskattemptcontext taskcontext);void committask(taskattemptcontext taskcontext);void aborttask(taskattemptcontext taskcontext);

job開始被執行之前,架構會調用outputcommitter.setupjob()為job建立一個輸出路徑;

如果job成功完成,架構會調用outputcommitter.commitjob()送出job的輸出;

如果job失敗,架構會調用outputcommitter.abortjob()撤銷job的輸出;

對應于job下的每一個task,同樣牽涉建立、送出和撤銷三個動作,分别由outputcommitter.setuptask()、outputcommitter.committask()、outputcommitter.aborttask()來完成。而一個task可能沒有輸出,進而也就不需要送出,這個可以通過outputcommitter.needstaskcommit()來判斷;

具體outputcommitter的這些方法裡面完成了什麼樣的操作,這是由具體的outputcommitter來定制的,可以任意去實作。比如,fileoutputcommitter完成了如下操作:

setupjob - mkdir ${mapred.output.dir}/_temporarycommitjob - touch ${mapred.output.dir}/_success && rm -r ${mapred.output.dir}/_temporaryabortjob - rm -r ${mapred.output.dir}/_temporarysetuptask - <nothing>needstaskcommit - test -d ${mapred.output.dir}/_temporary/_${taskattemptid}committask - mv ${mapred.output.dir}/_temporary/_${taskattemptid}/* ${mapred.output.dir}/aborttask - rm -r ${mapred.output.dir}/_temporary/_${taskattemptid}

(注意,上面這些路徑都是hdfs上的,不是某個tasktracker本地機器上的。)

其中的邏輯是:job執行的時候,task的輸出放到output路徑下的_temporary目錄的以taskattemptid命名的子目錄中。隻有當task成功了,相應的輸出才會被送出到output路徑下。而隻有當整個job都成功了,才會在output路徑下放置_success檔案。_success檔案的存在表明了output路徑下的輸出資訊是正确且完整的;而如果_success檔案不存在,output下的資訊也依然是正确的(這已經由committask保證了),但是不一定是完整的(可能隻包含部分reduce的輸出)。

與之對應,fileoutputformat會讓它所建立的recordwriter将輸出寫到${mapred.output.dir}/_temporary/_${taskattemptid}/下。當然,map和reduce任務也可以自己向這個路徑put資料。

接下來就是到在哪裡去執行這些方法的問題了。

一個job被送出到jobtracker後會生成若幹的map和reduce任務,這些任務會被分派到tasktracker上。對于每一個task,tasktracker會使用一個子jvm來執行它們。那麼對于task的setup/commit/abort這些操作,自然應該在執行task的子jvm裡面去完成:

當一個task被關聯到一個子jvm後,在任務初始化階段,outputcommitter.setuptask()會被調用;

當一個任務執行成功完成了之後,脫離子jvm之前,outputcommitter.committask()會被調用。不過這裡還有兩個細節:1、需要先調用outputcommitter.needstaskcommit()來确定是否有輸出需要送出;2、送出之前還有一個同步邏輯,需要由jobtracker同意送出後才能送出。因為hadoop有推測執行的邏輯,一個task可能在多個tasktracker上同時執行,但是它們之中最多隻有一個能得到送出,否則可能導緻結果的錯亂;

當一個任務執行失敗時,outputcommitter.aborttask()會被調用。這個調用很特殊,它不大可能在執行任務的子jvm裡面完成。因為執行任務的子jvm裡面跑的是使用者提供的map/reduce代碼,hadoop架構是無法保證這些代碼的穩定性的,是以任務的失敗往往伴随着子jvm的異常退出(這也就是為什麼要用子jvm來執行map和reduce任務的原因,否則異常退出的可能就是整個架構了)。于是,對于失敗的任務,jobtracker除了要考慮它的重試之外,還要為其生成一個cleanup任務。這個cleanup任務像普通的map和reduce任務一樣,會被分派到tasktracker上去執行(不一定分派到之前執行該任務失敗的那個tasktracker上,因為輸出是在hdfs上,是全局的)。而它的執行邏輯主要就是調用outputcommitter.aborttask();

而對于job的setup/commit/abort,則顯然不能使用上面的邏輯。

從時間上說,outputcommitter.setupjob()應該在所有map和reduce任務執行之前被調用、outputcommitter.commitjob()應該在所有map和reduce任務執行之後被調用、而outputcommitter.abortjob()應該在job确認失敗之後被調用;

從地點上說,可能調用這些方法的地方無外乎jobclient、jobtracker、或tasktracker;

jobclient應該第一個被排除,因為job的執行并不依賴于jobclient。jobclient在送出完job之後就可以退出了,它的退出并不會影響job的繼續執行(如果不退出則可以接收jobtracker的進度回報)。是以,不可能依靠jobclient在job成功以後來調用outputcommitter.commitjob();

jobtracker呢?貌似是個合适的地方,因為jobtracker明确知道job的開始與結束、成功與失敗。但是實際上還是不能由jobtracker來調用這些方法。就像前面說到的outputcommitter.aborttask()一樣,既然jobtracker知道了task的失敗,卻不直接為它清理輸出,而是通過生成一個對應的cleanup任務來完成清理工作。為什麼要這樣做呢?其實原因很簡單,因為outputcommitter是獨立于hadoop架構,可以由使用者自己定制的。hadoop架構不能保證使用者定制代碼的穩定性,當然不能讓它直接在jobtracker上執行。必須啟動一個新的jvm來執行這些方法,那麼正好tasktracker上已經有這樣的邏輯了。

是以,對于job的setup/commit/abort,跟outputcommitter.aborttask()類似,jobtracker會生成對應的setup任務和cleanup任務。在初始化job的時期将job的setup任務分派給tasktracker,tasktracker執行這個setup任務所要做的事情就是調用outputcommitter.setupjob();在job結束時,job的cleanup任務将分派給tasktracker,tasktracker執行這個cleanup任務所要做的事情就是根據job的執行結果是成功或是失敗,來調用outputcommitter.commitjob()或outputcommitter.abortjob()。

為了保證outputcommitter.setupjob()在所有map和reduce任務執行之前被調用,在jobtracker上,job的初始化被分成了兩個步驟:一是為job生成一堆任務,二是将setup任務分派給tasktracker去執行,并等待它執行完成。在這之後,初始化才算完成,map和reduce任務才能得到分派。

可見,在job執行的過程中,除了我們關注的map和reduce任務之外,還會有一些隐藏的setup和cleanup任務。不過這些任務都有一個共同點,它們都可以是使用者定制的。