天天看點

mapreduce原理完全剖析與shuffle機制

在前面幾篇文章都都大緻介紹了mapreduce的一些過程和原理,由于沒學那麼多是以有些表達的都很有欠缺,這裡給出了mapreduce原理的完全解析,shuffle機制,屬于純原理。

流程圖:

mapreduce原理完全剖析與shuffle機制

過程分析:

從maptask開始,它會調用InputFormat元件(預設是TextInputFormat),該元件會接着調用元件RecordReader元件,該元件會讀取maptask需要處理的檔案資訊,以kv的形式傳回,傳回打kv對會傳到mapper(自定義mapper繼承mapper重寫map方法)的map方法中,接着執行完我們在map()中寫的邏輯後,執行代碼context.write(k,v)将kv寫到了輸出收集器(OutputCollector)中,而收集器又接着會将kv寫到環形緩沖區中(對應一塊緩存,裡面存儲的是數組,每個kv都寫到該數組中,當寫入的量達到容量的80%的時候,為了防止溢出會做清除工作,逐漸将前面的kv溢出一直往後推,kv則接着寫到前面騰出來的區域,形成了環狀),緩沖區中每溢出部分kv會調用spiller元件對溢出的kv進行分區和排序(自定義排序和分區都在前面的文章有介紹,比如排序需要元件實作WritableComparable接口,分區則需要自定義分區元件繼承Partitioner類重寫getPartition方法),溢出到檔案中。最後所有的檔案都會合并為一個檔案同時會重新進行排序(當然還有分區,不過分區是固定了在哪裡區都是一樣,圖中是2個分區partition0和partition1)形成maptask的最終結果檔案。

就假設如圖中所示,隻有兩台機器對應兩個map task,生成了2個最終檔案,每個檔案都是2個分區partition0和partition1(對應兩個reduce task)。生成了最終結果檔案後,将兩個最終檔案的各自partition0分區的kv下載下傳到reduce task0的本地磁盤工作目錄,接着将這兩個partition0分區合并成一個新檔案并且排序,排序結果如圖中所有,它會使用元件進行比較排序,使得相同key的kv連續排列。接着會以 k 疊代器(與該key對應的多個value)傳到Reducer(通常是自定義Reducer繼承Reducer,重寫reduce()方法)的reduce()方法的形參中,然後執行自己的邏輯,最後使用context.write(k,v)一次寫一行的寫到OutPutFormat中(預設是TextOutPutForamat),該元件會調用RecordWrite的write()方法接kv輸出到hdfs中(或者本地檔案)。另一個reduce task當然也是同樣的處理,處理的是兩個partition1。

注:溢出和合并的時候會調用Combiner,其他什麼時候自己也不清楚,由于圖中執行的是wordcount程式,可以使用Combiner每次都進行歸并,比如溢出的本來是a,1 a,1 a,1這樣是不是麻煩呢?合并檔案的時候也會麻煩占用空間效率低等問題,但是可以使用自定義Combiner改寫成a,3就友善多了。而自定義的reduce程式就滿足這種特性,是以我們在用戶端程式中指定Combiner類job.setCombinerClass(WordcountReducer.class),即可。但是還是慎用,因為很多時候一旦Combiner會影響實際結果的。下面也給了個Combiner的代碼

public class WordcountCombiner extends Reducer<Text, IntWritable, Text, IntWritable>{
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values,
            Context context) throws IOException, InterruptedException {
        int count = ;
        for (IntWritable value : values) {
            count += value.get();
        } 
        context.write(key, new IntWritable(count));
    }
}
           

2、shuffle機制

mapreduce中,map階段處理的資料如何傳遞給reduce階段,是mapreduce架構中最關鍵的一個流程,這個流程就叫shuffle;shuffle: 洗牌、發牌——(核心機制:資料分區,排序,緩存);具體來說:就是将maptask輸出的處理結果資料,分發給reducetask,并在分發的過程中,對資料按key進行了分區和排序;

shuffle是MR處理流程中的一個過程,它的每一個處理步驟是分散在各個map task和reduce task節點上完成的,整體來看,分為3個操作:

1、分區partition

2、Sort根據key排序

3、Combiner進行局部value的合并

流程

其實就是圖中的黃色區域标注出來的,解釋已經寫在了上面,下面按照步驟給出shuffle過程。

1、maptask收集我們的map()方法輸出的kv對,放到記憶體緩沖區中

2、從記憶體緩沖區不斷溢出本地磁盤檔案,可能會溢出多個檔案

3、多個溢出檔案會被合并成大的溢出檔案

4、在溢出過程中,及合并的過程中,都要調用partitoner進行分組和針對key進行排序

5、reducetask根據自己的分區号,去各個maptask機器上取相應的結果分區資料

6、reducetask會取到同一個分區的來自不同maptask的結果檔案,reducetask會将這些檔案再進行合并(歸并排序)

合并成大檔案後,shuffle的過程也就結束了,後面進入reducetask的邏輯運算過程(從檔案中取出一個一個的鍵值對group,調用使用者自定義的reduce()方法)

3、大量小檔案的優化政策

問題:預設情況下,TextInputFormat(兩個功能一個是讀資料一個是切片)對任務進行切片機制是根據檔案規劃切片(比如hdfs預設block是128M那麼切片也是128M),不管檔案多小,都會是一個單獨的切片,都會交給一個map task這樣,如果有大量小檔案,就會産生大量的map task,導緻處理效率及其低下。

優化:

1、最好是在執行程式之前将多個小檔案自己先合成個大檔案再交給map task去處理。

2、如果已經是很多小檔案存在于hdfs中了,那麼可以使用另外一種InputFormat(CombinerInputFormat)來進行切片,它的切片邏輯跟FileInputFormat不同,它會将多個小檔案從邏輯上規劃到一個切片中,這樣就能将多個小檔案交給一個maptask去處理了。

繼續閱讀