天天看點

MapReduce執行流程解讀(續)

MapReduce 超詳細執行流程解讀

1、 一個大檔案需要處理,它在在 HDFS 上是以 block 塊形式存放,每個 block 預設為 128M 存 3 份,運作時每個 map 任務會處理一個 split,如果 block 大和 split 相同(預設情況下确實相同),有多少個 block 就有多少個 map 任務,是以對整個檔案處理時會有很多 map 任務進行并行計算

2、 每個 map 任務處理完輸入的 split 後會把結果寫入到記憶體的一個環形緩沖區,寫入過程中會進行簡單排序,它的預設大小為 100M,當緩沖區的大小使用超過一定的閥值,一個背景的線程就會啟動把緩沖區中的資料溢寫 (spill) 到本地磁盤中

(mapred-site.xml:mapreduce.cluster.local.dir),同 Mapper 繼續時向環形緩沖區中寫入資料;

3、 資料溢寫入到磁盤之前,首先會根據 reducer 的數量劃分成同數量的分區(partition),每個分區中的資料都會有背景線程根據 map 任務的輸出結果 key 進行内排序(字典順序、自然數順序或自定義順序 comparator),如果有 combiner,它會在溢寫到磁盤之前排好序的輸出上運作(combiner 的作用是使 map 輸出更緊湊,寫到本地磁盤和傳給 reducer 的資料更少),最後在本地生成分好區且排好序的小檔案; 如果 map 向環形緩沖區寫入資料的速度大于向本地寫入資料的速度,環形緩沖區被寫滿,向環形緩沖區寫入資料的線程會阻塞直至緩沖區中的内容全部溢寫到磁盤後再次啟動,到閥值後會向本地磁盤建立一個溢寫檔案;

4、 map 任務完成之前,會把本地磁盤溢寫的所有檔案不停地合并成得到一個結果檔案,合并得到的結果檔案會根據小溢寫檔案的分區而分區,每個分區的資料會再次根據 key進行排序,得到的結果檔案是分好區且排好序的,可以合并成一個檔案的溢寫檔案數量預設為

10(mapred-site.xml:mapreduce.task.io.sort.factor);這個結果檔案的分區存在一個映射關系,比如 0~1024 位元組内容為 0 号分區内容,1025~4096 位元組内容為 1 号分區内容等等;

5、 reduce 任務啟動,Reducer 個數由 mapred-site.xml 的 mapreduce.job.reduces 配置決定,或者初始化 job 時調用 Job.setNumReduceTasks(int) ; Reducer 中的一個線程定期向 MRAppMaster詢問Mapper輸出結果檔案位置,mapper結束後會向MRAppMaster彙報資訊;進而 Reducer 得知 Mapper 狀态,得到 map 結果檔案目錄;

6、 當有一個 Mapper 結束時,reduce 任務進入複制階段,reduce 任務通過 http 協定(hadoop 内置了 netty 容器)把所有 Mapper 結果檔案的對應的分區資料複制過來,比如,編号為 0 的 reducer 複制 map 結果檔案中 0 号分區資料,1 号 reduce 複制 map 結果檔案中 1 号分區的資料等等, Reducer 可以并行複制 Mapper 的結果,預設線程數為

5(mapred-site.xml:mapreduce.reduce.shuffle.parallelcopies);

所有 Reducer 複制完成 map 結果檔案後,由于 Reducer 會失敗,NodeManager 并沒有在第一個 map 結果檔案複制完成後删除它,直到作業完成後 MRAppMaster 通知 NodeManager 進行删除; 另外:如果 map 結果檔案相當小,則會被直接複制到 reduce NodeManager 的記憶體中(緩沖區大小由 mapred-site.xml:mapreduce.reduce.shuffle.input.buffer.percent 指定,預設 0.7);一旦緩沖區達到 reduce 的門檻值大小 0.66(mapred-site.xml:mapreduce.reduce.shuffle.merge.percent) 或寫入到 reduce NodeManager 記憶體中檔案個數達到 map 輸出門檻值

1000(mapred-site.xml:mapreduce.reduce.merge.inmem.threshold),reduce 就會把 map 結果文

件合并溢寫到本地;

7、 複制階段完成後,Reducer 進入 Merge 階段,循環地合并 map 結果檔案,維持其順序排序,合并因子預設為 10(mapred-site.xml:mapreduce.task.io.sort.factor),經過不斷地 Merge 後

得到一個“最終檔案”,可能存儲在磁盤也可能存在記憶體中;

8、 “最終檔案”輸入到 reduce 進行計算,計算結果輸入到 HDFS。

MapReduce執行流程解讀(續)

繼續閱讀