下圖是mapreduce任務運作過程的一個圖:

map-reduce的處理過程主要涉及以下四個部分:
用戶端client:用于送出map-reduce任務job
jobtracker:協調整個job的運作,其為一個java程序,其main class為jobtracker
tasktracker:運作此job的task,處理input split,其為一個java程序,其main class為tasktracker
hdfs:hadoop分布式檔案系統,用于在各個程序間共享job相關的檔案
上圖中主要包括以下過程:
送出作業
作業初始化
任務配置設定
執行任務
進度和狀态更新
完成作業
使用hadoop提供的指令行或者通過程式設計接口送出任務,指令行方式如下:
當使用者按上述指令格式送出作業後,指令行腳本會調用jobclient.runjob()方法送出作業
jobclient将作業送出到jobtracker節點上之前,需要作業寫初始化工作。初始化工作由 <code>jobclient.submitjobinternal(job)</code> 實作,這些初始化包括擷取作業的jobid、建立hdfs目錄、上傳作業以及生成所有的inputsplit分片的相關資訊等。
mapreduce的作業檔案的上傳和下載下傳都是由distributedcache透明完成的,它是hadoop專門開發的資料分發工具。
jobclient上傳檔案時可以修改檔案副本數(通過參數 <code>mapred.submit.replication</code> 指定,預設值為10),這樣的話可以分攤負載以避免産生通路熱點。
作業送出後,jobclient會調用inputformat的getsplits()方法生成相關的split分片資訊,該資訊包括inputsplit中繼資料資訊和原始的inputsplit資訊,其中中繼資料資訊被jobtracker使用,第二部分在map task初始化時由mapper使用來擷取自己要處理的資料,這兩部分資料被儲存到job.split檔案和job.splitmetainfo檔案中。
調用jobtracker的submitjob()方法将作業送出。在這一階段會依次進行如下操作:
1)、為作業建立jobinprogress對象。jobtracker會為使用者送出的每一個作業建立一個jobinprogress對象,這個對象維護了作業的運作時資訊,主要用于跟蹤正在運作的作業的狀态和進度;
2)、檢查使用者是否具有指定隊列的作業送出權限。hadoop以隊列為機關來管理作業和資源,每個隊列配置設定有一定亮的資源,管理嚴可以為每個隊列指定哪些使用者有權限送出作業;
3)、檢查作業配置的記憶體使用量是否合理。使用者在送出作業時,可已分别通過參數 <code>mapred.job.map.memory.mb</code> 和<code>mapred.job.reduce.memory.mb</code> 指定map task和reduce task的記憶體使用量,而管理者可以給叢集中的map task和reduce task分别設定中的記憶體使用量,一旦使用者配置的記憶體使用量超過總的記憶體限制,作業就會送出失敗;
4)、jobtracker收到送出的作業後,并不會馬上對其進行初始化,而是會交給taskscheduler排程器,由他按照一定的政策對作業做初始化操作。
jobtracker采用了觀察者模式将“送出新作業”這一事件告訴taskscheduler
送出任務後,runjob每隔一秒鐘輪詢一次job的進度,将進度傳回到指令行,直到任務運作完畢。
排程器調用jobtracker.initjob()方法來對新作業做初始化的。hadoop将每個作業分節成4中類型的任務:setup task,map task,reduce task和cleanup task,它們的運作時資訊由taskinprogress維護,是以,從某個方面将,建立這些任務就是建立taskinprogress對象。
<code>setup task</code>。作業初始化标志性任務,它進行一些很簡單的作業初始化工作。該類型任務又分為map setup task和reduce setup task兩種,并且隻能運作一次。
<code>map task</code>。map階段的資料處理任務。 其數目及對應的處理資料分片由應用程式中的inputformat中間确定。
<code>reduce task</code>。reduce階段的處理資料的任務。其數目可以由使用者通過參數 <code>mapred.reduce.tasks</code> 指定。hadoop剛開始的時候隻會排程map task任務,直到map task完成數目達到由參數 <code>mapred.reduce.slowstart.completed.maps</code>指定的百分比(預設值為0.05,即百分之5)後,才開始排程reduce task。
<code>cleanup task</code>。作業結束的标志性任務,主要是做一些作業清理的工作,比如删除作業在運作中産生的一些零食目錄和資料等資訊。
說明:可以通過參數 <code>mapred.committer.job.setup.cleanup.needed</code> 配置是否為作業建立setup/cleanup task,以避免他們拖慢作業執行進度且降低作業的可靠性。
tasktracker 和 jobtracker 通過心跳通信配置設定一個任務
tasktracker 定期發送心跳,告知 jobtracker, tasktracker 是否還存活,并充當兩者之間的消息通道。
tasktracker 主動向 jobtracker 詢問是否有作業。若自己有空閑的 solt,就可在心跳階段得到 jobtracker 發送過來的 map 任務或 reduce 任務。對于 map 任務和 task 任務,tasktracker 有固定數量的任務槽,準确數量由 tasktracker 核的個數核記憶體的大小來确定。預設排程器在處理 reduce 任務槽之前,會填充滿空閑的 map 任務槽,是以,如果 tasktracker 至少有一個空閑的 map 任務槽,tasktracker 會為它選擇一個 map 任務,否則選擇一個 reduce 任務。選擇 map 任務時,jobtracker 會考慮資料本地化(任務運作在輸入分片所在的節點),而 reduce 任務不考慮資料本地化。任務還可能是機架本地化。
tasktracker 執行任務大緻步驟:
被配置設定到一個任務後,從共享檔案中把作業的jar複制到本地,并将程式執行需要的全部檔案(配置資訊、資料分片)複制到本地
為任務建立一個本地工作目錄
内部類taskrunner執行個體啟動一個新的jvm運作任務
狀态包括:作業或認為的狀态(成功,失敗,運作中)、map 和 reduce 的進度、作業計數器的值、狀态消息或描述
task 運作時,将自己的狀态發送給 tasktracker,由 tasktracker 心跳機制向 jobtracker 彙報
狀态進度由計數器實作
當jobtracker獲得最後一個task的運作成功的報告後,将job得狀态改為成功。
當jobclient從jobtracker輪詢的時候,發現此job已經成功結束,則向使用者列印消息,從runjob函數中傳回。