天天看點

MapReduce之Shuffle機制

目錄

一、什麼是shuffle,為什麼要用shuffle?

二、shuffle工作流程

三、流程圖詳細講解:以wordcount作為案例

一、什麼是shuffle,為什麼要用shuffle?

資料從Map階段傳遞給Reduce階段的過程就叫Shuffle,Shuffle機制是整個MapReduce架構中最核心的部分。

Mapreduce要確定每個reducer的輸入都是按鍵排序的。

二、shuffle工作流程

1、Collect階段:将MapTask的結果輸出到預設大小為100M的環形緩沖區,儲存key/value序列化資料,Partition分區資訊等。

2、Spill 階段:當記憶體中的資料達到閥值(預設80%)的時候,會将資料寫入本地磁盤,在将資料寫入磁盤之前需要對資料進行一次排序的操作,如果配置了combiner,還會将有相同分區号和key的資料進行排序。 

3、.Merge 階段:把所有溢出的臨時檔案進行一次合并操作,以確定一個MapTask最終隻産生一個中間資料檔案。

4、Copy階段: ReduceTask啟動Fetcher線程到已經完成MapTask的節點上複制一份屬于自己的資料,這些資料預設會儲存在記憶體的緩沖區中,當記憶體的緩沖區達到一定的閥值的時候,就會将資料寫到磁盤之上。

5、Merge階段:在ReduceTask遠端複制資料(http)的同時,會在背景開啟兩個線程(一個是記憶體到磁盤的合并,一個是磁盤到磁盤的合并)對記憶體到本地的資料檔案進行合并操作。

6、Sort階段:在對資料進行合并的同時,會進行排序操作,由于MapTask 階段已經對資料進行了局部的排序,ReduceTask隻需保證Copy的資料的最終整體有效性即可

簡而言之:

Maptask會不斷收集我們的map()方法輸出的kv對,放到記憶體緩沖區(預設100M)中,當緩沖區達到飽和的時候就會溢出到磁盤中,如果map的輸出結果很多,則會有多個溢出檔案,多個溢出檔案會被合并成一個大的溢出檔案,在檔案溢出、合并的過程中,都要調用partitoner進行分組和針對key進行排序(預設是按照Key的hash值對reduceTasks個數取模),之後reducetask根據自己的分區号,去各個maptask機器上取相應的結果分區資料,reducetask會将這些檔案再進行合并(歸并排序)。

三、流程圖詳細講解:以wordcount作為案例

MapReduce之Shuffle機制

No.1:job.split檔案是切片資訊。

FileInputFormat切片:在MapReduce的執行過程當中,預設的資料分片大小為100M,但是這個大小可能會随着hdfs中block大小的設定而發生變 化,具體的切分規則如下:

             max.split(100M)

             min.split(10M)

             block(128M)==>大小可以自定義

             max(min.split,min(max.split,block))

根據上述規則進行計算的話,上例上的分片大小為100M。1.0 hadoop最後切片是64M.

No.2:LineRecordReader中next()方法會反複調用,讀取資料,以偏移量(每一行的起始位置)、資料作為key、value發送給mapper

No.3:map()方法進行切分split,将單詞作為key,個數 1 作為value發送出去

No.4:mappr發出去的資料會被MapOutCollector收集儲存到記憶體的一個環形緩沖區

MapReduce之Shuffle機制

No.5:環形緩沖區資料達到門檻值的時候,為了能讓後面的資料進來,之前的資料會溢出到本地磁盤

No.6:在儲存到本地檔案之前,partition會對kv資料進行分區,預設是根據key的hashCode對reduceTasks個數取模得到的,相同的key會一個分區。

如果想改變分區規則,需要我們自定義一個類繼承Partitioner,重寫getPartition()方法,之前寫過省份根據電話号分區的案例。

No.7:如果溢出到磁盤上檔案達到一定數量,則會合并成一個更大檔案

No.8:所有的maptask任務完成後,會通過http發送給reducer

MapReduce之Shuffle機制

No.9:reducetask根據自己的分區号,去各個maptask機器上取相應的結果分區資料。同一台機器會把不同maptask的結果檔案再進行合并(歸并排序)成大檔案。

No.10:合并之後,shuffle過程也就結束了。reduce階段會進行合并等操作。這裡會有一個GroupingComparator分組器,會根據自己需求将指定資料按照組的形式存放在不同檔案中,之前寫過根據不同user求消費TopN的案例。

No.11:儲存在hdfs中。

總體流程圖:

MapReduce之Shuffle機制

繼續閱讀