天天看點

hadoop之MR核心shuffle

🍊首先祝福的大家端午節快樂!别人劃龍舟,我寫部落格,也算是參加端午節的活動!廢話不多說,今天我們來介紹一下MapReduce的核心思想!對以前内容感興趣的小夥伴可以檢視下面的内容:

  • 連結: Spark之處理布爾、數值和字元串類型的資料.
  • 連結: Spark之Dataframe基本操作.
  • 連結: Spark之處理布爾、數值和字元串類型的資料.
  • 連結: Spark之核心架構.

🍈今天我來學習hadoop中最重要的内容——MapReduce的過程,我們将介紹Map,Reduce,shuffle等詳細内容。

目錄

    • 1.MapReduce原理
    • 2.Map階段的處理
      • 2.1 inputFormat 資料輸入
        • 2.1.1 切片與MapTask并行度決定機制
    • 3.Shuffle過程
      • 3.1 shuffle過程中的partitioner
      • 3.2 shuffle過程中的排序
    • 參考資料

1.MapReduce原理

MapReduce是hadoop的核心,主要分為Map和Reduce階段。Map階段也叫做MapTask,Reduce也叫做ReduceTask。

hadoop之MR核心shuffle

這裡我們放上一張完整的mapreduce的流程圖,每個步驟的編号代表執行的順序,我們這裡隻列出了一個maptask的執行流程,其他maptask流程一樣。細節部分後面會進行講解。

hadoop之MR核心shuffle

我們這裡先給大家展示一下mapreduce的大緻過程!細節我們接下來說。

hadoop之MR核心shuffle

這張圖接上面,因為太長了,寫不下。

hadoop之MR核心shuffle

2.Map階段的處理

2.1 inputFormat 資料輸入

在上圖中我們可以看見map階段有個inputFormat的過程,這個過程主要是将輸入的資料轉化為<key,value>形式的資料。

2.1.1 切片與MapTask并行度決定機制

MapTask的并行度是由MapTask的數量決定的,我們知道,MapTask的運作是互不相關的,它們是并行運算的,是以MapTask的并行度決定Map階段的任務處理并發度,進而影響整個Job的處理速度。

  • 資料塊block:是HDFS實體上把資料分成一塊一塊,資料塊是HDFS的存儲資料機關。
  • 資料切片:資料切片隻是在邏輯上對輸入進行分片,并不會在磁盤上将其切分為片進行存儲,資料切片切片是MR程式計算輸入資料的機關,一個切片會對應啟動一個MapTask。

資料塊與資料切片的差別如下:

hadoop之MR核心shuffle

注意事項:

我們這裡用下圖來說明:

  • 一般情況下資料塊大小為多少,切片大小就為多少,這樣可以使資料切片在在一個資料塊block中,友善處理,提高效率。
  • 有多少個資料切片,就有多少個MapTask數量
  • 切片隻針對單個檔案,不針對整體資料集。
    hadoop之MR核心shuffle
    FileinputFormat切片源碼解析
    hadoop之MR核心shuffle
    大家重點可以看下紅色的部分。

說到FileinputFormat,其實它下面有很繼承的子類:TextInputFormat、KeyValueTextInputFormat、NLineInputformat、CombineTextInputFormat和自定義InputForamt等。

3.Shuffle過程

Shuffle的中文意思就是洗牌,而mapreudce中Shuffle就是指:Map方法之後,Reduce方法之前的資料處理過程。

用一個wordcount來舉例子。

hadoop之MR核心shuffle

接下來我們來詳細看一下這個過程:

hadoop之MR核心shuffle

map的shuffle

  1. map1方法之後将<key,value>傳入環形緩沖區,進行分區操作。
  2. 區内需要進行排序,排序的手段是快排,按照key的索引排序
  3. 将區内的資料進行合并(combiner操作,是一個可選的操作)
  4. 将環形緩沖區多次溢出的分區結果進行歸并排序,形成一個大的有序的分區。
  5. 大的分區也可以進行combiner操作(可選),最後進行壓縮。可以加快map将資料傳送給reduce的效率
  6. 最後将分區資料寫入磁盤,等待reduce的拉取

需要注意上圖隻展示了一個map1的資料處理過程,還有map2、mp3等等。

reduce的shuffle

  1. 根據分區結果拉取指定資料,拉取資料首先是放在記憶體裡面,如果記憶體不夠的話,會寫到磁盤上
  2. 對每個map來的分區資料進行歸并排序
  3. 按照相同的Key值進行分組
  4. 分組後進入對應的reduce方法中

3.1 shuffle過程中的partitioner

map的shuffle進行分區操作,預設方式主要采取的是哈希分區,就是就是key的hashcode的值與reduceTask的任務進行取餘,當然自己也可以重寫partitioner方法。

有以下幾點需要注意:

  • 如果reduceTask的數量>getpartition的結果數,則會産生多個空的輸出檔案。
  • 如果1<reduceTask的數量<getpartition的結果數,則會有一部分分區資料無處安放,會報錯。
  • 如果reduceTask的數量等于1,則不管你mapTask設定多少分區檔案,都隻交給這一個reduceTask,也就會隻産生一個結果檔案。

3.2 shuffle過程中的排序

maptask和reducetask均會對資料按照keyj進行排序。該操作屬于Hadoop的預設行為,任何應用程式中的資料均會被排序,而不管邏輯上是否需要。

  1. 對于maptask,它會将處理的結果暫時放到環形緩沖區,當環形緩沖區使用率達到一定門檻值後,再對環形緩沖區進行一次快速排序(記憶體中),并将這些遊有序資料溢寫到磁盤上,當資料處理完畢後,他會對磁盤上所有的檔案進行歸并排序。
  2. 對于reducetask,它從每一個maptask上遠端拷貝相應的資料檔案,如果檔案大小超過一定門檻值,則溢寫到磁盤,否則存儲在記憶體中。如果磁盤上的檔案數目達到一定的門檻值,則進行一次歸并排序以生成一個更大的檔案;如果記憶體中檔案大小或者數目超過一定門檻值,則進行一次合并後将資料溢寫到磁盤上,當所有的資料拷貝完畢後,reducetask統一對記憶體和磁盤的所有資料進行一次歸并排序。
  3. 部分排序:mapreduce根據輸入記錄的鍵值對資料集排序。保證輸出的每個檔案内部有序。
  4. 全排序:最終輸出的結果隻有一個檔案,且檔案内部有序,實作方式知識設定一個reduceTask,但該方法在處理大型檔案時效率極低,因為一台機器處理所有檔案,完全失去了mapreduce所提供的并行架構的優勢。
  5. 輔助排序:用的較少

參考資料

參考了全網較為熱門的MR過程,總結出來。

《尚矽谷的hadoop3.0實戰》

《大資料hadoop實戰》

《hadoop實戰指南》

繼續閱讀