MapTask工作機制
1)Read階段:MapTask通過使用者編寫的RecordReader,從輸入InputSplit中解析出一個個key/value。
2)Map階段:該節點主要是将解析出的key/value交給使用者編寫map()函數處理,并産生一系列新的key/value。
3)Collect收集階段:在使用者編寫map()函數中,當資料處理完成後,一般會調用OutputCollector.collect()輸出結果。在該函數内部,它會将生成的key/value分區(調用Partitioner),并寫入一個環形記憶體緩沖區中。
4)Spill階段:即“溢寫”,當環形緩沖區滿後,MapReduce會将資料寫到本地磁盤上,生成一個臨時檔案。需要注意的是,将資料寫入本地磁盤之前,先要對資料進行一次本地排序,并在必要時對資料進行合并、壓縮等操作。
溢寫階段詳情:
步驟1:利用快速排序算法對緩存區内的資料進行排序,排序方式是,先按照分區編号Partition進行排序,然後按照key進行排序。這樣,經過排序後,資料以分區為機關聚集在一起,且同一分區内所有資料按照key有序。
步驟2:按照分區編号由小到大依次将每個分區中的資料寫入任務工作目錄下的臨時檔案output/spillN.out(N表示目前溢寫次數)中。如果使用者設定了Combiner,則寫入檔案之前,對每個分區中的資料進行一次聚集操作。
步驟3:将分區資料的元資訊寫到記憶體索引資料結構SpillRecord中,其中每個分區的元資訊包括在臨時檔案中的偏移量、壓縮前資料大小和壓縮後資料大小。如果目前記憶體索引大小超過1MB,則将記憶體索引寫到檔案output/spillN.out.index中。
5)Combine階段:當所有資料處理完成後,MapTask對所有臨時檔案進行一次合并,以確定最終隻會生成一個資料檔案。
當所有資料處理完後,MapTask會将所有臨時檔案合并成一個大檔案,并儲存到檔案output/file.out中,同時生成相應的索引檔案output/file.out.index。
在進行檔案合并過程中,MapTask以分區為機關進行合并。對于某個分區,它将采用多輪遞歸合并的方式。每輪合并io.sort.factor(預設10)個檔案,并将産生的檔案重新加入待合并清單中,對檔案排序後,重複以上過程,直到最終得到一個大檔案。
讓每個MapTask最終隻生成一個資料檔案,可避免同時打開大量檔案和同時讀取大量小檔案産生的随機讀取帶來的開銷。
ReduceTask工作機制
1)Copy階段:ReduceTask從各個MapTask上遠端拷貝一片資料,并針對某一片資料,如果其大小超過一定門檻值,則寫到磁盤上,否則直接放到記憶體中。
2)Merge階段:在遠端拷貝資料的同時,ReduceTask啟動了兩個背景線程對記憶體和磁盤上的檔案進行合并,以防止記憶體使用過多或磁盤上檔案過多。
3)Sort階段:按照MapReduce語義,使用者編寫reduce()函數輸入資料是按key進行聚集的一組資料。為了将key相同的資料聚在一起,Hadoop采用了基于排序的政策。由于各個MapTask已經實作對自己的處理結果進行了局部排序,是以,ReduceTask隻需對所有資料進行一次歸并排序即可。
4)Reduce階段:reduce()函數将計算結果寫到HDFS上。
設定ReduceTask并行度(個數)
ReduceTask的并行度同樣影響整個Job的執行并發度和執行效率,但與MapTask的并發數由切片數決定不同,ReduceTask數量的決定是可以直接手動設定:
// 預設值是1,手動設定為4
job.setNumReduceTasks(4);
實驗:測試ReduceTask多少合适
1)實驗環境:1個Master節點,16個Slave節點:CPU:8GHZ,記憶體: 2G
2)實驗結論:
改變ReduceTask (資料量為1GB)
MapTask =16
ReduceTask | 1 | 5 | 10 | 15 | 16 | 20 | 25 | 30 | 45 | 60 |
---|---|---|---|---|---|---|---|---|---|---|
總時間 | 892 | 146 | 110 | 92 | 88 | 100 | 128 | 101 | 145 | 104 |
注意事項
1)ReduceTask=0,表示沒有Reduce階段,輸出檔案個數和Map個數一緻。
2)ReduceTask預設值就是1,是以輸出檔案個數為一個
3)如果資料分布不均勻,就有可能在Reduce階段産生資料傾斜
4)ReduceTask數量并不是任意設定,還要考慮業務邏輯需求,有些情況下,需要計算全局彙總結果,就隻能有1個ReduceTask
5)具體多少個ReduceTask,需要根據叢集性能而定
6)如果分區數不是1,但是ReduceTask為1,是否執行分區過程。答案是:不執行分區過程。因為在MapTask的源碼中,執行分區的前提是先判斷ReduceNum個數是否大于1。不大于1肯定不執行
簡書:https://www.jianshu.com/u/0278602aea1d
CSDN:https://blog.csdn.net/u012387141