天天看點

7.5 MapReduce程式的核心運作機制

任務目的

  • 知曉用戶端對 Map 階段并行度的規劃
  • 掌握 FileInputFormat 的預設切片機制
  • 掌握手動設定 ReduceTask 數量的方法
  • 了解 MapReduce 程式的運作流程

任務清單

  • 任務1:MapTask 并行度決定機制
  • 任務2:ReduceTask 并行度決定機制
  • 任務3:MapReduce 程式的運作流程

詳細任務步驟

任務1:MapTask 并行度決定機制

1.1 MapTask 并行度

  MapTask 并行度決定 Map 階段的任務處理并發度,進而影響到整個 Job 的處理速度。

  那麼, MapTask 并行執行個體是否越多越好呢?其并行度又是如何決定呢?

  一個 Job 的 Map 階段并行度由用戶端在送出 Job 時決定, 用戶端對 Map 階段并行度的規劃的基本邏輯為:

  将待處理資料執行邏輯切片(即按照一個特定切片大小,将待處理資料劃分成邏輯上的多個 split),然後每一個 split 配置設定一個 MapTask 并行執行個體處理。

  這段邏輯及形成的切片規劃描述檔案,是由 FileInputFormat 實作類的 ​

​getSplits()​

​​ 方法完成的。該方法傳回的是 ​

​List<InputSplit>​

​​, InputSplit 封裝了每一個邏輯切片的資訊,包括長度和位置資訊,而 ​

​getSplits()​

​ 方法傳回一組 InputSplit。

1.2 FileInputFormat 切片機制

  1. FileInputFormat 中預設的切片機制:

  (1)簡單地按照檔案的内容長度進行切片

預設等于 block 大小

  (3)切片時不考慮資料集整體,而是逐個針對每一個檔案單獨切片

  比如待處理資料有兩個檔案:

File1.txt 200M
File2.txt 100M      

  經過 ​

​getSplits()​

​ 方法處理之後,形成的切片資訊是:

File1.txt-split1    0-128M
File1.txt-split2    129M-200M
File2.txt-split1    0-100M      

  2. FileInputFormat 中切片的大小的參數配置:

long splitSize = computeSplitSize(Math.max(minSize, Math.min(maxSize, blockSize))),翻譯一下就是求這三個值的中間值。

  切片主要由這幾個值來運算決定:

  • blocksize: 預設是 128M,可通過​

    ​dfs.blocksize​

    ​ 修改
  • minSize: 預設是 1,可通過​

    ​mapreduce.input.fileinputformat.split.minsize​

    ​ 修改
  • maxsize: 預設是 Long.MaxValue,可通過​

    ​mapreduce.input.fileinputformat.split.maxsize​

    ​ 修改

預設情況下,切片大小等于 blocksize。

  • 如果 maxsize 調的比 blocksize 小,則切片會小于 blocksize,而且就等于配置的這個參數的值;
  • 如果 minsize 調的比 blocksize 大,則切片會大于 blocksize。

    但是,不論怎麼調參數,都不能讓多個小檔案“劃入”一個 split。

任務2:ReduceTask 并行度決定機制

  ReduceTask 的并行度同樣影響整個 Job 的執行并發度和執行效率,但與 MapTask 的并發數由切片數決定不同, ReduceTask 數量的決定是可以直接手動設定:

job.setNumReduceTasks(4);  //預設值是 1,手動設定為 4      

  ReduceTask 的數量預設為 1,我們手動設定為 4,表示運作 4 個 ReduceTask,相應的輸出結果會有4個,如下圖所示:

7.5 MapReduce程式的核心運作機制

圖16

 

  如果設定為 0,表示不運作 ReduceTask 任務,也就是沒有 Reduce 階段,隻有 Map 階段,Map 階段的輸出結果作為最終的輸出結果。

  如果資料分布不均勻,就有可能在 Reduce 階段産生資料傾斜。

  注意: ReduceTask 數量并不是任意設定,還要考慮業務邏輯需求,有些情況下,需要計算全局彙總結果,就隻能有 1 個 ReduceTask。

任務3:MapReduce 程式的運作流程

3.1 MapReduce 結構

  一個完整的 MapReduce 程式在分布式運作時有兩類執行個體程序:

  • MRAppMaster(MapReduce Application Master):負責整個程式的過程排程及狀态協調
  • YarnChild(MapTask):負責 Map 階段的整個資料處理流程,階段并發任務
  • YarnChild(ReduceTask):負責 Reduce 階段的整個資料處理流程,階段彙總任務

  以上兩個階段 MapTask 和 ReduceTask 的程序都是 YarnChild,并不是說這 MapTask 和 ReduceTask 就跑在同一個 YarnChild 程序裡。

  運作任意一個 MapReduce 程式,使用​

​jps​

​指令檢視程序:

7.5 MapReduce程式的核心運作機制

圖1

 

  從圖中可以看出,MapReduce 運作時,開啟了 MRAppMaster 和 YarnChild 程序,此圖中的 YarnChild 代表的是 MapTask。運作完 MapTask 階段,此 YarnChild 程序會關閉,随後再運作 ReduceTask 階段,此時還會開啟一個名為 YarnChild 的程序,但是通過檢視程序号發現,此時的 YarnChild 程序是一個新的程序,與 MapTask 階段的 YarnChild 不是同一個程序,如下圖所示:

7.5 MapReduce程式的核心運作機制

圖2

 

3.2 MapReduce 程式的運作流程

  1. 一個 MapReduce 程式啟動的時候,最先啟動的是 MRAppMaster, MRAppMaster 啟動後根據本次 Job 的描述資訊,計算出需要的 MapTask 執行個體數量,然後向叢集申請機器啟動相應數量的 MapTask 程序;

  2. MapTask 程序啟動之後,根據給定的資料切片(哪個檔案的哪個偏移量範圍)範圍進行資料處理,主體流程為:

  • 利用客戶指定的 InputFormat 來擷取 RecordReader 讀取資料,形成輸入 KV 對
  • 将輸入 KV 對傳遞給客戶定義的​

    ​map()​

    ​​ 方法,做邏輯運算,并将​

    ​map()​

    ​ 方法輸出的 KV 對收集到緩存
  • 将緩存中的 KV 對按照 K分區排序後不斷溢寫到磁盤檔案

  3. MRAppMaster 監控到所有 MapTask 程序任務完成之後(真實情況是,某些 MapTask 程序處理完成後,就會開始啟動 ReduceTask 去已完成的 MapTask 處 fetch 資料),會根據客戶指定的參數啟動相應數量的 ReduceTask 程序,并告知 ReduceTask 程序要處理的資料範圍(資料分區);

  

  4. ReduceTask 程序啟動之後,根據 MRAppMaster 告知的待處理資料所在位置,從若幹台 MapTask 運作所在機器上擷取到若幹個 MapTask 輸出結果檔案,并在本地進行重新歸并排序,然後按照相同 key 的 KV 為一個組,調用客戶定義的 ​

​reduce()​

​ 方法進行邏輯運算,并收集運算輸出的結果 KV,然後調用客戶指定的 OutputFormat 将結果資料輸出到外部存儲。

繼續閱讀