可以隻用一行代碼來運作mapreduce作業:jobclient.runjon(conf),job作業運作時參與的四個實體:
1.jobclient 寫代碼,配置作業,送出作業。
2.jobtracker:初始化作業,配置設定作業,協調作業運作。這是一個java程式,主類是jobtracker。
3.tasktracker:運作作業劃分後的任務,即配置設定資料配置設定上執行map或reduce任務。
4.hdfs:儲存作業資料、配置資訊等,儲存作業結果。
map/reduce 作業總體執行流程:
代碼編寫 ----> 作業配置 ----> 作業送出 ----> map任務配置設定和執行 ----> 進行中間結果 ----> reduce任務配置設定與執行 ----> 輸出結果
而對于每個作業的執行,又包含:
輸入準備 ----> 任務執行 ----> 輸出結果
作業送出jobclient:
jobclient的runjob方法産生一個jobclient執行個體并調用其submitjob方法,然後runjob開始循環嗎,并在循環中調用gettaskcompetionevents方法,獲得taskcompletionevent執行個體,每秒輪詢作業進度(後面有介紹進度和狀态更新),把進度寫到控制台,作業完成後顯示作業計數器,若失敗,則把錯誤記錄到控制台。
submitjob方法作業送出的過程:
1.向jobtracker請求一個新的jobid。
2.檢查作業相關路徑,如果路徑不正确就會傳回錯誤。
3.計算作業輸入分片及其劃分資訊。
4.将作業運作需要的資源(jar檔案、配置檔案等)複制到shared hdfs,并複制多個副本(參數控制,預設值為10)供tasktracker通路,也會将計算的分片複制到hdfs。
5.調用jobtracker對象的submitjob()方法來真正送出作業,告訴jobtracker作業準備執行。
作業的初始化jobtracker:
jobtracker收到submitjob方法調用後,會把調用放入到一個内部隊列,由作業排程器(job scheduler)進行排程并對其初始化。job初始化即建立一個作業對象。
當作業被排程後,jobtracker會建立一個代表這個作業的jobinprogress對象,并将任務和記錄資訊封裝在這個對象中,以便跟蹤任務狀态和程序。
初始化過程就是jobinprogress對象的inittasks方法進行初始化的。
初始化步驟:
1.從hdfs中讀取作業對應的job.split資訊,為後面的初始化做好準備。
2.建立并初始化map和reduce任務。根據資料分片資訊中的個數确定map task的個數,然後為每個map task生成一個taskinprogress對象來處理資料分片,先将其放入nonrunningmapcache,以便jobtracker配置設定任務的時候使用。接下來根據jobconf中的mapred.reduce.tasks屬性利用setnumreducetasks()方法設定reduce
task的數量,然後同map task建立方式。
3.最後就是建立兩個初始化task,進行map和reduce的初始化。
任務的配置設定jobtracker:
消息傳遞heartbeat: tasktracker運作一個簡單循環定期發送心跳(heartbeat)給jobtracker。由心跳告知jobtracker自己是否存活,同時作為消息通道傳遞其它資訊(請求新task)。作為心跳的一部分,tasktracker會指明自己是否已準備好運作新的任務,如果是,jobtracker會配置設定它一個任務。
配置設定任務所屬于的作業:在jobtracker配置設定任務前需先确定任務所在的作業。後面會介紹到各種作業排程算法,預設是一個fifo的作業排程。
配置設定map和reduce任務:tasktracker有固定數量的任務槽,一個tasktracker可以同時運作多個map和reduce任務,但其準确的數量由tasktracker的核的數量和記憶體大小決定。預設排程器會先填滿map任務槽,再填reduce任務槽。jobtracker會選擇距離離分片檔案最近的tasktracker,最理想情況下,任務是資料本地化(data-local)的,當然也可以是機架本地化(rack-local),如果不是本地化的,那麼他們就需要從其他機架上檢索資料。reduce任務配置設定很簡單,jobtracker會簡單的從待運作的reduce任務清單中選取下一個來執行,不用考慮資料本地化。
任務的執行tasktracker:
tasktracker收到新任務後,就要在本地運作任務了,運作任務的第一步就是通過localizedjob将任務本地化所需要的注入配置、資料、程式等資訊進行本地化。
1.本地化資料:從共享檔案系統将job.split 、job.jar (在分布式緩存中)複制本地,将job配置資訊寫入job.xml。
2.建立本地工作目錄:tasktracker會加壓job.jar檔案到本工作目錄。
3.調用launchtaskforjob方法釋出任務(其中會建立taskrunner執行個體運作任務),如果是map任務就啟用maptaskrunner,對于reduce就是reducetaskrunner。
在這之後,taskrunner會啟用一個新的jvm來運作每個map/reduce任務,防止程式原因而導緻tasktracker崩潰,但不同任務間重用jvm還是可以的,後續會講到任務jvm重用。
對于單個map,任務執行的簡單流程是:
1.配置設定任務執行參數
2.在child臨時檔案中添加map任務資訊(child是運作map和reduce任務的主程序)
3.配置log檔案夾,配置map任務的通信和輸出參數
4.讀取input split,生成recordreader讀取資料
5.為map生成maprunnable,依次從recordreader中接收資料,并調用map函數進行處理。
6.最後将map函數的輸出調用collect收集到mapoutputbuffer(參數控制其大小)中。
streaming和pipes:
streaming和pipes都運作特殊的map和reduce任務,目的是運作使用者提供的可執行程式并與之通信。
streaming:使用标準輸入輸出streaming與程序進行通信。
pipes:用來監聽套接字,會發送一個端口号給c++程式,兩者便可建立連結。
進度和狀态更新:
一個作業和它的任務都有狀态(status),其中包括:運作成功失敗狀态、map/reduce進度、作業計數器值、狀态消息。
狀态消息與用戶端的通信:
1.對于map任務progress的追蹤:progress是已經處理完的輸入所占的比例。
2.對于reduce:稍複雜,reduce任務分三個階段(每個階段占1/3),複制、排序和reduce處理,若reduce已執行一半的輸入的話,那麼任務進度便是1/3+1/3+1/6=5/6。
3.任務計數器:任務有一組計數器,負責對任務運作各個事件進行計數。
4.任務進度報告:如果任務報告了進度,便會設定一個标記以表明狀态将被發送到tasktracker。有一個獨立線程每隔三秒檢查一次此标記,如果已設定,則告知tasktracker目前狀态。
5.tasktracker進度報告:tasktracker會每隔5秒(這個心跳是由叢集大小決定,叢集越大時間會越長)發送heartbeat到jobtracker,并且tasktracker運作的所有狀态都會在調用中被發送到jobtracker。
6.jobtracker合并各任務報告:産生一個表明所有運作作業機器所含任務狀态的全局視圖。
前面提到的jobclient就是通過每秒查詢jobtracker來接收最新狀态,而且用戶端jobclient的getjob方法可以得到一個runningjob的執行個體,其包含了作業的是以狀态資訊。
作業的完成:
當jobtracker收到作業最後一個任務已完成的通知後,便把作業狀态設定成成功。jobclient查詢狀态時,便知道任務已成功完成,于是jobclient列印一條消息告知使用者,然後從runjob方法傳回。
如果jobtracker有相應設定,也會發送一個http作業通知給用戶端,希望收到回調指令的用戶端可以通過job.end.notification.url屬性來進行設定。
jobtracker情況作業的工作狀态,訓示tasktracker也清空作業的工作狀态,如删除中間輸出。
失敗
實際情況下,使用者的代碼存在軟體錯誤程序會崩潰,機器也會産生故障,但hadoop能很好的應對這些故障并完成作業。
1.任務失敗
子任務異常:如map/reduce任務中的使用者代碼抛出異常,子任務jvm程序會在退出前向父程序tasktracker發送錯誤報告,錯誤被記錄使用者日志。tasktracker會将此次task attempt标記為tailed,并釋放這個任務槽運作另外一個任務。
子程序jvm突然退出:可能由于jvm bug導緻使用者代碼造成的某些特殊原因導緻jvm退出,這種情況下,tasktracker會注意到程序已經退出,并将此次嘗試标記為failed。
任務挂起:一旦tasktracker注意一段時間沒有收到進度更新,便會将任務标記為failed,jvm子程序将被自動殺死。任務失敗間隔時間通常為10分鐘,可以以作業或者叢集為基礎設定過期時間,參數為mapred.task.timeout。注意:如果參數值設定為0,則挂起的任務永遠不會釋放掉它的任務槽,随着時間的推移會降低整個叢集的效率。
任務失敗嘗試次數:jobtracker得知一個tasktracker失敗後,它會重新排程該任務執行,當然,jobtracker會嘗試避免重新排程失敗過的tasktracker任務。如果一個任務嘗試次數超過4次,它将不再被重試。這個值是可以設定的,對于map任務,參數是mapred.map.max.attempts,對于reduce任務,則由mapred.reduce.max.attempts屬性控制。如果次數超過限制,整個作業都會失敗。當然,有時我們不希望少數幾個任務失敗就終止運作的整個作業,因為即使有些任務失敗,作業的一些結果可能還是有用的,這種情況下,可以為作業設定在不觸發作業失敗情況下的允許任務失敗的最大百分比,map任務和reduce任務可以獨立控制,參數為mapred.max.map.failures.percent
和mapred.max.reduce.failures.percent。
任務嘗試中止(kill):任務終止和任務失敗不同,task attempt可以中止是因為他是一個推測副本或因為它所處的tasktracker失敗,導緻jobtracker将它上面的所有task attempt标記為killed。被終止的task attempt不會被計入任務運作嘗試次數,因為嘗試中止并不是任務的錯。
2.tasktracker失敗
tasktracker由于崩潰或者運作過慢而失敗,他将停止向jobtracker發送心跳(或很少發送心跳)。jobtracker注意已停止發送心跳的tasktracker(過期時間由參數mapred.tasktracker.expiry.interval設定,機關毫秒),并将它從等待排程的tasktracker池中移除。如果是未完成的作業,jobtracker會安排次tasktracker上已經運作成功的map任務重新運作,因為此時reduce任務已無法通路(中間輸出存放在失敗的tasktracker的本地檔案系統上)。
即使tasktracker沒有失敗,也有可能被jobtracker列入黑名單。如果tasktracker上面的失敗任務數量遠遠高于叢集的平均失敗任務次數,他就會被列入黑名單,被列入黑名單的tasktracker可以通過重新開機從jobtracker黑名單中移除。
3.jobtracker失敗
老版本的jobtracker失敗屬于單點故障,這種情況下作業注定失敗。
作業排程:
早期作業排程fifo:按作業送出順序先進先出。可以設定優先級,通過設定mapred.job.priority屬性或者jobclient的setjobpriority()方法制定優先級(優先級别:very_high,high,normal,low,very_low)。注意fifo排程算法不支援搶占(preemption),是以高優先級作業仍然會被那些已經開始的長時間運作的低優先級作業所阻塞。
fair scheduler:目标是讓每個使用者公平地共享叢集能力。當叢集存在很多作業時,空閑的任務槽會以”讓每個使用者共享叢集“的方式進行配置設定。預設每個使用者都有自己的作業池。fairscheduler支援搶占,是以,如果一個池在特定的一段時間未得到公平地資源共享,它會終止池中得到過多的資源任務,以便把任務槽讓給資源不足的池。fairscheduler是一個後續子產品,使用它需要将其jar檔案放在hadoop的類路徑下。可以通過參數map.red.jobtracker.taskscheduler屬性配置(值為org.apache.hadoop.mapred.fairscheduler)
capacity scheduler:
叢集由很多隊列組成,每個隊列都有一個配置設定能力,這一點與fairscheduler類似,隻不過在每個隊列内部,作業根據fifo方式進行排程。本質上說,capacity scheduler允許使用者或組織為每個使用者模拟一個獨立使用fifo的叢集。
shuffle和排序:
mapreduce確定每個reducer的輸入都是按鍵排序的。系統執行排序的過程-将map輸出作為輸入傳給reducer的過程稱為shuffle。shuffle屬于不斷被優化和改進的代碼庫的一部分,從許多方面來看,shuffle是mapreduce的心髒。
整個shuffle的流程應該是這樣:
map結果劃分partition 排序sort 分割spill 合并同一劃分 合并同一劃分 合并結果排序 reduce處理 輸出
map端:
寫入緩沖區:map函數的輸出,是由collector處理的,它并不是簡單的将結果寫到磁盤。它利用緩沖的方式寫到記憶體,并處于效率的考慮進行預排序。每個map都有一個環形的記憶體緩沖區,用于任務輸出,預設緩沖區大小為100mb(由參數io.sort.mb調整),一旦緩沖區内容達到門檻值(預設0.8),背景程序邊開始把内容寫到磁盤(spill),在寫磁盤過程中,map輸出繼續被寫到緩沖區,但如果緩沖區被填滿,map會阻塞知道寫磁盤過程完成。寫磁盤将按照輪詢方式寫到mapred.local.dir屬性制定的作業特定子目錄中。
寫出緩沖區:collect将緩沖區的内容寫出時,會調用sortandspill函數,這個函數作用主要是建立spill檔案,按照key值對資料進行排序,按照劃分将資料寫入檔案,如果配置了combiner類,會先調用combineandspill函數再寫檔案。sortandspill每被調用一次,就會寫一個spill檔案。
合并所有map的spill檔案:tasktracker會在每個map任務結束後對所有map産生的spill檔案進行merge,merge規則是根據分區将各個spill檔案中資料同一分區中的資料合并在一起,并寫入到一個已分區且排序的map輸出檔案中。待唯一的已分區且已排序的map輸出檔案寫入最後一條記錄後,map端的shuffle階段就結束了。
在寫磁盤前,線程首先根據資料最終要傳遞到的reducer把資料劃分成響應的分區(partition),在每個分區中,背景線程按鍵進行内排序,如果有一個combiner,它會在排序後的輸出上運作。
記憶體達到溢出寫的門檻值時,就會建立一個溢出寫檔案,因為map任務完成其最後一個輸出記錄之後,會有幾個溢出寫檔案。在任務完成前,溢出寫檔案會被合并成一個已分區且已排序的輸出檔案。配置屬性io.sort.facor控制一次最多能合并多少流,預設值是10。
如果已經指定combiner,并且寫次數至少為3(通過min.mum.spills.for.combine設定)時,則combiner就會在輸出檔案寫到磁盤之前運作。運作combiner的意義在于使map輸出更緊湊,舍得寫到本地磁盤和傳給reducer的資料更少。
寫磁盤時壓縮:寫磁盤時壓縮會讓寫的速度更快,節約磁盤空間,并且減少傳給reducer的資料量。預設情況下,輸出是不壓縮的,但可以通過設定mapred.compress.map.output值為true,就可以啟用壓縮。使用的壓縮庫是由mapred.map.output.compression.codec制定。
reducer獲得檔案分區的工作線程:reducer通過http方式得到輸出檔案的分區,用于檔案分區的工作線程數量由tracker.http.threads屬性指定,此設定針對的是每個tasktracker,而不是每個map任務槽。預設值為40,在大型叢集上此值可以根據需要而增加。
reduce端:
複制階段:reduce會定期向jobtracker擷取map的輸出位置,一旦拿到輸出位置,reduce就會從對應的tasktracker上複制map輸出到本地(如果map輸出很小,則會被複制到tasktracker節點的記憶體中,否則會被讓如磁盤),而不會等到所有map任務結束(當然這個也有參數控制)。
合并階段:從各個tasktracker上複制的map輸出檔案(無論在磁盤還是記憶體)進行整合,并維持資料原來的順序。
reduce階段:從合并的檔案中順序拿出一條資料進行reduce函數處理,然後将結果輸出到本地hdfs。
map的輸出檔案位于運作map任務的tasktracker的本地磁盤,現在,tasktracker要為分區檔案運作reduce任務。每個任務完成時間可能不同,但是隻要有一個任務完成,reduce任務就開始複制其輸出,這就是reduce任務的複制階段(copy phase)。reduce任務有少量複制線程,是以能夠并行取得map輸出。預設值是5個線程,可以通過mapred.reduce.parallel.copies屬性設定。
reducer如何得知從哪個tasktracker獲得map輸出:map任務完成後會通知其父tasktracker狀态已更新,tasktracker進而通知(通過heart beat)jobtracker。是以,jobtracker就知道map輸出和tasktracker之間的映射關系,reducer中的一個線程定期詢問jobtracker以便獲知map輸出位置。由于reducer有可能失敗,是以tasktracker并沒有在第一個reducer檢索到map輸出時就立即從磁盤上删除它們,相反他會等待jobtracker告示它可以删除map輸出時才删除,這是作業完成後最後執行的。
如果map輸出很小,則會被直接複制到reduce tasktracker的記憶體緩沖區(大小由mapred.job.shuffle.input.buffer.percent控制,占堆空間的百分比),否則,map輸出被複制到磁盤。一旦記憶體緩沖區達到門檻值大小(由mapred.iob.shuffle.merge.percent)
或達到map輸出門檻值大小(mapred.inmem.threadhold),則合并後溢出寫到磁盤中。
随着磁盤上副本增多,背景線程會将他們合并為更大的、排好序的檔案。注意:為了合并,壓縮的map輸出必須在記憶體中被解壓縮。
排序階段:複制階段完成後,reduce任務會進入排序階段,更确切的說是合并階段,這個階段将合并map輸出,維持其順序排列。合并是循環進行的,由合并因子決定每次合并的輸出檔案數量。但讓有可能會産生中間檔案。
reduce階段:在最後reduce階段,會直接把排序好的檔案輸入reduce函數,不會對中間檔案進行再合并,最後的合并即可來自記憶體,也可來自磁盤。此階段的輸出會直接寫到檔案系統,一般為hdfs。
細節:這裡合并是并非平均合并,比如有40個檔案,合并因子為10,我們并不是每趟合并10個,合并四趟。而是第一趟合并4個,後三趟合并10,在最後一趟中4個已合并的檔案和餘下6個未合并會直接并入reduce。
配置調優:
調優總原則:在保證map函數和reduce函數能夠得到足夠記憶體的前提下,給shuffle過程提供更多的記憶體空間。
1.編寫map和reduce函數時盡量少占用記憶體空間。
2.設定jvm記憶體大小(mapred.child.java.opts),任務節點記憶體大小應該盡量大(關于記憶體請見叢集建構中的環境配置筆記)。
3.map端:避免多次溢出寫磁盤。估算map輸出大小,調整io.sort.mb(map輸出記憶體緩沖區大小),如果可以,可增加其值。注意mapreduce計數器會記錄作業在整個運作過程溢出寫磁盤的記錄數,這對調優很有幫助。
4.reduce端:中間資料全部駐留記憶體可獲得最佳性能。如果reduce函數記憶體需求不大,那麼可以把mapred.inmem.threadhold輸出門檻值調為0(即不寫溢出),把mapred.job.shuffle.input.buffer.percent reduce 值設為1(即reduce記憶體緩沖區最大)會帶來性能提升。
5.提高hadoop緩沖區:預設為4kb,應該在叢集中增加這個值。
任務的執行:
推測執行:
為了避免由于一個任務執行慢而是整個作業執行過慢的情況,hadoop 提供了一種推測執行的機制:即hadoop不會嘗試診斷或修複執行慢的任務(其實不可能辦到),而是在一個任務比預期慢的時候啟動另一個相同的任務作為備份。
一個任務和其推測任務任何一個成功完成,另一個就會中止。
推測執行是一種優化措施,預設情況下推測執行是啟用的。可以基于叢集或基于每個作業,單獨為map或reduce任務啟用或禁用該項功能。mapred.map.tasks.speculative.execution 預設值為 true;mapred.reduce.tasks.speculative.execution 預設值為 true。推測執行目的是減少作業執行時間,但這是以叢集效率為代價的,一般而言,叢集管理者傾向于在叢集上關閉該功能,而讓使用者根據個别需要而開啟該功能。
任務jvm重用:
hadoop在自己的java虛拟機上運作任務,而且會為每個任務啟動一個新的jvm,啟動時間大約為1秒。參數mapred.job.reuse.jvm.num.tasks制定給定作業每個jvm運作任務的最大數,預設值為1,若設定為-1,則不限任務數量。jobconf中的setnumtaskstoexecuteperjvm方法也可設定這個屬性。計算超短任務或密集型任務也可以受益于jvm重用機制。共享jvm的另一個非常有用的地方是:作業個任務之間共享狀态,任務可以較快的通路共享資料。
跳過壞資料:
通過開啟skipnode來控制。
本節相關參數:
1.mapred.submit.replication 運作作業資源的副本數。
2.mapred.reduce.task 作業的reduce任務數量,可通過setnumreducetasks()方法設定。
3.tasktracker任務槽數量。
4.心跳發送周期、任務進度報告周期、tasktracker進度報告周期、jobclient輪詢周期。
5.job.end.notification.url 作業完成時用戶端接收作業完成回調指令的參數。
6.本節會産生各種狀态資訊。
7.mapred.task.timeout 任務挂起的最大等待時間。
8.mapred.map.max.attempts 、mapred.reduce.max.attempts 任務失敗最大嘗試次數
9.mapred.max.map.failures.percent 、mapred.max.reduce.failures.percent 允許錯誤但不觸發作業失敗的任務數的百分比。
10.mapred.tasktracker.expiry.interval tasktracker向jobtracker發送心跳的過期時間,預設10分鐘,機關毫秒。
11.mapred.job.priority 作業排程優先級。
12.map.red.jobtracker.taskscheduler 配置作業排程算法參數(值org.apache.hadoop.mapred.fairscheduler)。
13.io.sort.mb 預設map輸出緩沖區大小參數 預設值100mb。
14.io.sort.spill.percent 寫緩沖區内容門檻值參數 預設值0.8。
15.mapred.local.dir map函數輸出寫磁盤目錄。
16.io.sort.facor map溢出寫檔案一次被合并的數目 預設值是10(設定成100是很常見的)
17.mapred.compress.map.output map輸出寫磁盤時是否啟用壓縮參數 預設值為false。
18.mapred.map.output.compression.codec map輸出磁盤啟用壓縮的壓縮庫
19.tracker.http.threads reducer 每個tasktracker獲得檔案分區的工作線程數量,針對一個tasktracker 預設值是40。
20.mapred.reduce.parallel.copies reduce任務複制map輸出檔案線程數量 預設值是5。
21.mapred.job.shuffle.input.buffer.percent shuffle複制階段,配置設定給map 輸出存緩沖區占堆棧空間的百分比,預設值0.7。
22.mapred.inmem.threadhold map輸出門檻值。
23.io.sort.record.percent 存儲map輸出記錄邊界的io.sort.mb的比例(記憶體緩沖區所占棧空間比例),剩餘空間空間存儲記錄本身。
24.min.num.spills.for.combine 運作combiner所需要的最少溢出寫檔案數 預設值為3。
25.mapred.reduce.copy.backoff reducer擷取一個map輸出所花最大時間,機關是秒,預設值300
26.mapred.ijob.shufffle.merge.percent map輸出緩沖區使用門檻值的比例,啟動合并輸出 預設值為0.66.
27.mapred.inmem.merge.threadhold 啟動map輸出和磁盤溢出寫過程的map輸出門檻值,預設值1000.
28.mapred.iob.reduce.input.buffer.percent 在reduce過程中,内從中儲存map輸出的空間占整個堆記憶體空間的比例。預設值為0。預設情況下,map輸出都合并到磁盤上,以便為reducer提供盡可能多的記憶體,如果reducer需要的記憶體較少,可以增加此值來最小化磁盤通路次數。