任務目的
- 知曉用戶端對 Map 階段并行度的規劃
- 掌握 FileInputFormat 的預設切片機制
- 掌握手動設定 ReduceTask 數量的方法
- 了解 MapReduce 程式的運作流程
任務清單
- 任務1:MapTask 并行度決定機制
- 任務2:ReduceTask 并行度決定機制
- 任務3:MapReduce 程式的運作流程
詳細任務步驟
任務1:MapTask 并行度決定機制
1.1 MapTask 并行度
MapTask 并行度決定 Map 階段的任務處理并發度,進而影響到整個 Job 的處理速度。
那麼, MapTask 并行執行個體是否越多越好呢?其并行度又是如何決定呢?
一個 Job 的 Map 階段并行度由用戶端在送出 Job 時決定, 用戶端對 Map 階段并行度的規劃的基本邏輯為:
将待處理資料執行邏輯切片(即按照一個特定切片大小,将待處理資料劃分成邏輯上的多個 split),然後每一個 split 配置設定一個 MapTask 并行執行個體處理。
這段邏輯及形成的切片規劃描述檔案,是由 FileInputFormat 實作類的
getSplits()
方法完成的。該方法傳回的是
List<InputSplit>
, InputSplit 封裝了每一個邏輯切片的資訊,包括長度和位置資訊,而
getSplits()
方法傳回一組 InputSplit。
1.2 FileInputFormat 切片機制
1. FileInputFormat 中預設的切片機制:
(1)簡單地按照檔案的内容長度進行切片
預設等于 block 大小
(3)切片時不考慮資料集整體,而是逐個針對每一個檔案單獨切片
比如待處理資料有兩個檔案:
File1.txt 200M
File2.txt 100M
經過
getSplits()
方法處理之後,形成的切片資訊是:
File1.txt-split1 0-128M
File1.txt-split2 129M-200M
File2.txt-split1 0-100M
2. FileInputFormat 中切片的大小的參數配置:
long splitSize = computeSplitSize(Math.max(minSize, Math.min(maxSize, blockSize))),翻譯一下就是求這三個值的中間值。
切片主要由這幾個值來運算決定:
- blocksize: 預設是 128M,可通過
修改dfs.blocksize
- minSize: 預設是 1,可通過
修改mapreduce.input.fileinputformat.split.minsize
- maxsize: 預設是 Long.MaxValue,可通過
修改mapreduce.input.fileinputformat.split.maxsize
預設情況下,切片大小等于 blocksize。
- 如果 maxsize 調的比 blocksize 小,則切片會小于 blocksize,而且就等于配置的這個參數的值;
-
如果 minsize 調的比 blocksize 大,則切片會大于 blocksize。
但是,不論怎麼調參數,都不能讓多個小檔案“劃入”一個 split。
任務2:ReduceTask 并行度決定機制
ReduceTask 的并行度同樣影響整個 Job 的執行并發度和執行效率,但與 MapTask 的并發數由切片數決定不同, ReduceTask 數量的決定是可以直接手動設定:
job.setNumReduceTasks(4); //預設值是 1,手動設定為 4
ReduceTask 的數量預設為 1,我們手動設定為 4,表示運作 4 個 ReduceTask,相應的輸出結果會有4個,如下圖所示:
圖16
如果設定為 0,表示不運作 ReduceTask 任務,也就是沒有 Reduce 階段,隻有 Map 階段,Map 階段的輸出結果作為最終的輸出結果。
如果資料分布不均勻,就有可能在 Reduce 階段産生資料傾斜。
注意: ReduceTask 數量并不是任意設定,還要考慮業務邏輯需求,有些情況下,需要計算全局彙總結果,就隻能有 1 個 ReduceTask。
任務3:MapReduce 程式的運作流程
3.1 MapReduce 結構
一個完整的 MapReduce 程式在分布式運作時有兩類執行個體程序:
- MRAppMaster(MapReduce Application Master):負責整個程式的過程排程及狀态協調
- YarnChild(MapTask):負責 Map 階段的整個資料處理流程,階段并發任務
- YarnChild(ReduceTask):負責 Reduce 階段的整個資料處理流程,階段彙總任務
以上兩個階段 MapTask 和 ReduceTask 的程序都是 YarnChild,并不是說這 MapTask 和 ReduceTask 就跑在同一個 YarnChild 程序裡。
運作任意一個 MapReduce 程式,使用
jps
指令檢視程序:
圖1
從圖中可以看出,MapReduce 運作時,開啟了 MRAppMaster 和 YarnChild 程序,此圖中的 YarnChild 代表的是 MapTask。運作完 MapTask 階段,此 YarnChild 程序會關閉,随後再運作 ReduceTask 階段,此時還會開啟一個名為 YarnChild 的程序,但是通過檢視程序号發現,此時的 YarnChild 程序是一個新的程序,與 MapTask 階段的 YarnChild 不是同一個程序,如下圖所示:
圖2
3.2 MapReduce 程式的運作流程
1. 一個 MapReduce 程式啟動的時候,最先啟動的是 MRAppMaster, MRAppMaster 啟動後根據本次 Job 的描述資訊,計算出需要的 MapTask 執行個體數量,然後向叢集申請機器啟動相應數量的 MapTask 程序;
2. MapTask 程序啟動之後,根據給定的資料切片(哪個檔案的哪個偏移量範圍)範圍進行資料處理,主體流程為:
- 利用客戶指定的 InputFormat 來擷取 RecordReader 讀取資料,形成輸入 KV 對
- 将輸入 KV 對傳遞給客戶定義的
方法,做邏輯運算,并将map()
方法輸出的 KV 對收集到緩存map()
- 将緩存中的 KV 對按照 K分區排序後不斷溢寫到磁盤檔案
3. MRAppMaster 監控到所有 MapTask 程序任務完成之後(真實情況是,某些 MapTask 程序處理完成後,就會開始啟動 ReduceTask 去已完成的 MapTask 處 fetch 資料),會根據客戶指定的參數啟動相應數量的 ReduceTask 程序,并告知 ReduceTask 程序要處理的資料範圍(資料分區);
4. ReduceTask 程序啟動之後,根據 MRAppMaster 告知的待處理資料所在位置,從若幹台 MapTask 運作所在機器上擷取到若幹個 MapTask 輸出結果檔案,并在本地進行重新歸并排序,然後按照相同 key 的 KV 為一個組,調用客戶定義的
reduce()
方法進行邏輯運算,并收集運算輸出的結果 KV,然後調用客戶指定的 OutputFormat 将結果資料輸出到外部存儲。