前言
MapReduce的源碼分析是基于Hadoop1.2.1基礎上進行的代碼分析。
本篇,将不會涉及代碼部分的分析,隻是簡單分析map的整體架構,并介紹map與reduce的運作過程,主要是為後續的分析做一個鋪墊。至于MapTask/ReduceTask的原理分析,JobTracker部分,以及TaskTracker如何啟動一個Task這些都将在後續章節給出。
MR程式設計模型
MapReduce的程式設計模型是來自lisp,源于在函數式程式設計語言中可以通過reduce(map(fn())這種形式将問題進行拆分處理。
借鑒于此,MapReduce程式設計分為兩個階段,Map階段和Reduce階段。通過這種形式,使用者隻需要關心map函數和reduce函數的編寫,就可以将複雜業務處理通過簡單的MapReduce來實作。
map函數,接收輸入資料的key/value鍵值對,進行處理,産生新的key/value鍵值對,作為中間資料,輸出到磁盤上。MapReduce架構會對輸出的key/value鍵值對,進行分區,同一個分區中的資料最後都會被同一個Reduce處理。
reduce函數,讀取Map的輸出結果,讀取完成後進行排序,将排序後的有序的key/value鍵值對傳遞給reduce函數處理,處理完成後産生新的key/value鍵值對,輸出到存儲上(一般是hdfs上)。
MR架構
MapReduce架構的架構如上圖所示,是采用比較常見的Master/Slave的架構模式。
在MapReduce中JobTracker負責作業的管理,資源監控。TaskTracker負責執行Task。Task可以分為MapTask和ReduceTask。
需要知道的是一個Job即代表了使用者執行的一個分布式的計算過程的作業,而Task則表示一個作業會被拆分為許多個小的任務中的一個,Task可以分為Map階段的任務或者是Reduce階段的任務,也就是說Task是任務進行中的一個單元,有這個小的單元去完成一部分計算的任務,最後所有任務都完成,結果經過彙聚,就代表一個作業的處理完畢。
使用者執行一個作業的過程如下:
1.使用者送出作業,由JobClient将相關作業資訊進行打包并傳遞到hdfs上,并通知JobTracker。
2.JobTracker接收到新的作業,對作業初始化。根據TaskTracker通過心跳彙報上來的負載資訊,得知空閑資源分布情況。由任務排程器決定将任務下發到哪些有空閑資源的節點。
3.Tracker接收到任務後,啟動單獨的jvm運作Task。Task在運作過程中的狀态都會先彙報給TaskTracker,再由TaskTracker彙報給JobTracker。
MapTask運作過程
一個MapTask的運作過程如上圖所示。運作過程可以分為三個階段,分别是資料讀取階段,Map階段,分區階段。
資料讀取階段,是指InputFormat負責了解資料格式并解析成key,value傳遞給map函數處理。InputFormat提供了getInputSplit接口,負責将輸入資料切分為一個個InputSplit(分片),同時還提供getRecordReader接口,負責解析InputSplit,通過RecordReader提取出key/value鍵值對。這些鍵值對就是作為map函數的輸入。
Map階段,是指運作使用者實作的Mapper接口中的map函數,該函數負責對輸入的key/value資料進行初步處理,産生新的key/value鍵值對作為中間資料儲存在本地磁盤上。
分區階段,map輸出的臨時資料,會被分成若幹個分區,每個分區都會被一個ReduceTask處理。
ReduceTask運作過程
ReduceTask的執行過程如上圖所示。該過程可以分為三個階段,分别為Shuffle階段,sort階段,reduce階段。
Shuffle階段,是指ReduceTask通過查詢JobTracker知道有哪些Map處理完畢。一旦有MapTask處理完畢,就會遠端讀取這些MapTask産生的臨時資料。
Sort階段,是指将讀取的資料,按照key對key/value鍵值對進行排序。
Reduce階段,是指運作使用者實作的Reducer接口中的reduce函數,該函數讀取sort階段的有序的key/value鍵值對,進行處理,并将最終的處理結果存到hdfs上。
多Map和多Reduce運作分析
上圖所示,是一個多MapTask和多ReduceTask的資料處理流程。這個Job使用者指定的是3個Map和2個Reduce。
其處理過程是:
JobTracker啟動3個MapTask,每個MapTask都從讀取對應的InputSplit。MapTask調用使用者實作的map函數,産生新的資料。至于資料會被劃到哪個分區,這是有Partitioner控制的,預設情況下使用的HashPartitioner,相同的key最終都會被劃到一個分區中,因為指定了2個ReduceTask是以資料會被分為兩個分區。在中間資料産生到磁盤前都會對資料進行排序。保證每個分區中的資料都是有序的。
在Map運作中,會根據使用者配置的Reduce在Map完成多少進度時啟動。根據使用者的配置JobTracker會在适當的時候向有空閑資源的TaskTracker下發ReduceTask,由TaskTracker啟動ReduceTask。ReduceTask啟動後,查詢JobTracker擷取它的輸入源的Map是否已經有可讀的資料。有則從發起http請求從這些MapTask輸出的節點上拉取資料到本地。因為指定了3個Map,是以一個Reduce需要從3個Map讀取資料。讀取後需要對讀取的資料進行歸并排序,以保障資料的有序性。這時,才會調用到使用者實作的reduce函數,做最後的處理,在處理完畢後,将資料存儲到hdfs上。每個ReduceTask處理完成後,都會在hdfs上産生一個結果檔案。是以這裡就會在hdfs上看到有兩個結果檔案。