mapreduce的資料流程:
預先加載本地的輸入檔案
經過map處理産生中間結果
經過shuffle程式将相同key的中間結果分發到同一節點上處理
recude處理産生結果輸出
将結果輸出儲存在hdfs上
map
在map階段,使用job.setinputformatclass定義的inputformat将輸入的資料集分割成小資料塊splites,
同時inputformat提供一個recordreder的實作。預設的是textinputformat,
他提供的recordreder會将文本的一行的偏移量作為key,這一行的文本作為value。
這就是自定義map的輸入是<longwritable, text>的原因。
然後調用自定義map的map方法,将一個個<longwritable, text>對輸入給map的map方法。
最終是按照自定義的map的輸出key類,輸出class類生成一個list<mapoutputkeyclass, mapoutputvalueclass>。
partitioner
在map階段的最後,會先調用job.setpartitionerclass設定的類對這個list進行分區,
每個分區映射到一個reducer。每個分區内又調用job.setsortcomparatorclass設定的key比較函數類排序。
可以看到,這本身就是一個二次排序。
如果沒有通過job.setsortcomparatorclass設定key比較函數類,則使用key的實作的compareto方法。
shuffle:
将每個分區根據一定的規則,分發到reducer處理
sort
在reduce階段,reducer接收到所有映射到這個reducer的map輸出後,
也是會調用job.setsortcomparatorclass設定的key比較函數類對所有資料對排序。
然後開始構造一個key對應的value疊代器。這時就要用到分組,
使用jobjob.setgroupingcomparatorclass設定的分組函數類。隻要這個比較器比較的兩個key相同,
他們就屬于同一個組,它們的value放在一個value疊代器
reduce
最後就是進入reducer的reduce方法,reduce方法的輸入是所有的(key和它的value疊代器)。
同樣注意輸入與輸出的類型必須與自定義的reducer中聲明的一緻。
一個更為詳細的流程圖
具體的例子:
是hadoop mapreduce example中的例子,自己改寫了一下并加入的注釋