天天看點

MapReduce的資料流程、執行流程

mapreduce的資料流程:

預先加載本地的輸入檔案

經過map處理産生中間結果

經過shuffle程式将相同key的中間結果分發到同一節點上處理

recude處理産生結果輸出

将結果輸出儲存在hdfs上

MapReduce的資料流程、執行流程

map

在map階段,使用job.setinputformatclass定義的inputformat将輸入的資料集分割成小資料塊splites, 

同時inputformat提供一個recordreder的實作。預設的是textinputformat, 

他提供的recordreder會将文本的一行的偏移量作為key,這一行的文本作為value。 

這就是自定義map的輸入是<longwritable, text>的原因。 

然後調用自定義map的map方法,将一個個<longwritable, text>對輸入給map的map方法。

最終是按照自定義的map的輸出key類,輸出class類生成一個list<mapoutputkeyclass, mapoutputvalueclass>。

partitioner

在map階段的最後,會先調用job.setpartitionerclass設定的類對這個list進行分區, 

每個分區映射到一個reducer。每個分區内又調用job.setsortcomparatorclass設定的key比較函數類排序。

可以看到,這本身就是一個二次排序。 

如果沒有通過job.setsortcomparatorclass設定key比較函數類,則使用key的實作的compareto方法。

shuffle:

将每個分區根據一定的規則,分發到reducer處理

sort

在reduce階段,reducer接收到所有映射到這個reducer的map輸出後, 

也是會調用job.setsortcomparatorclass設定的key比較函數類對所有資料對排序。 

然後開始構造一個key對應的value疊代器。這時就要用到分組, 

使用jobjob.setgroupingcomparatorclass設定的分組函數類。隻要這個比較器比較的兩個key相同, 

他們就屬于同一個組,它們的value放在一個value疊代器

reduce 

最後就是進入reducer的reduce方法,reduce方法的輸入是所有的(key和它的value疊代器)。 

同樣注意輸入與輸出的類型必須與自定義的reducer中聲明的一緻。

MapReduce的資料流程、執行流程

一個更為詳細的流程圖

MapReduce的資料流程、執行流程

具體的例子:

是hadoop mapreduce example中的例子,自己改寫了一下并加入的注釋