天天看點

2,MapReduce原理及源碼解讀

MapReduce原理及源碼解讀

目錄

  • MapReduce原理及源碼解讀
    • 一、分片
      • 靈魂拷問:為什麼要分片?
      • 1.1 對誰分片
      • 1.2 長度是否為0
      • 1.3 是否可以分片
      • 1.4 分片的大小
      • 1.5 開始分片
      • 1.6 分片後讀取會不會斷行
    • 二、Map階段
      • 2.1 執行個體化Mapper
      • 2.2 調用map()方法
    • 三、Shuffle階段
      • 靈魂拷問:哪來的Shuffle?
      • 3.1 shuffle的概念
      • 3.2 Map端Shuffle
      • 3.2.1 分區(partition)
      • 3.2.2 寫入環形緩沖區
      • 3.2.3 排序并溢寫(sortAndSpill):
      • 3.2.4 合并(merge):
      • 3.3 Reduce端Shuffle
      • 3.3.1 拉取(Copy)
      • 3.3.2 排序合并(Merge Sort)
      • 3.3.3 歸并分組(reduce)
    • 四、Reduce階段
      • 4.1 執行reduce()方法
      • 4.2 輸出最終結果

一、分片

靈魂拷問:為什麼要分片?

  • 分而治之:MapReduce(MR)的核心思想就是分而治之;何時分,如何分就要從原理和源碼來入手。做為碼農大家都知道,不管一個程式多麼複雜,在寫代碼和學習代碼之前最重要的就是搞懂輸入和輸出,而MR的輸入其實就是一個目錄。而所謂的分而治之其實也是在把大檔案分成小檔案,然後一個機器處理一個小檔案,最後再合并。是以MR的第一步就是對輸入的檔案進行分片。

1.1 對誰分片

  • 對每個檔案分片:分片是對輸入目錄中的每一個檔案進行分片。後面的分片都是針對單個檔案分片。
  • 源碼解讀(對誰分片):
// 分片的源碼位置
package org.apache.hadoop.mapreduce.lib.input;
abstract class FileInputFormat.java;

// 下面代碼所在方法
method getSplits();

// InputStatus表示一個切片類
List<InputSplit> splits = new ArrayList<InputSplit>();
// 得到所有輸入檔案
List<FileStatus> files = listStatus(job);
// 周遊每個檔案。 根據每個檔案來切片,而不是整個檔案夾
for (FileStatus file : files) {
      // 分片1
}
           

1.2 長度是否為0

  • 檔案長度:當檔案長度不為0時才會進行下面的分片操作;如果檔案長度為0,則會向分片清單中添加一個空的hosts檔案數組和空長度的檔案。也就是說,空檔案也會建立一個空的分片。
  • 源碼解讀(長度是否為0):
for (FileStatus file : files) {
       Path path = file.getPath();
       // 擷取檔案大小
       long length = file.getLen();
       if (length != 0) {
              // 分片2
        } else {// 如果文大小為空,預設就建立一個空的hosts檔案數組和空長度的檔案
           //Create empty hosts array for zero length files
              splits.add(makeSplit(path, 0, length, new String[0]));
        }
}
           

1.3 是否可以分片

  • 壓縮格式:并不是所有的檔案都可以分片,有一些壓縮格式的檔案是不可以分片的。是以隻會對可以分片的檔案進行分片,而不可以分片的檔案即使再大也會作為一個整體來處理,相當于一個片。
  • 源碼解讀(是否可以分片):
// 如果可以分片
if (isSplitable(job, path)) {
    // 分片3
} else { // not splitable
        splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),
        blkLocations[0].getCachedHosts()));
}

// 判斷一個檔案是否可以切片
// FileInputFormat抽象類中預設傳回true,子類TextInputFormat中實作如下
@Override
protected boolean isSplitable(JobContext context, Path file) {
     final CompressionCodec codec =
           new CompressionCodecFactory(context.getConfiguration()).getCodec(file);
     if (null == codec) {// 如果一個檔案的壓縮編碼為null,那麼表示可以切片
           return true;
     }// 如果一個檔案的壓縮編碼是SplittableCompressionCodec的子類,那麼表示目前檔案也可以切片
     return codec instanceof SplittableCompressionCodec;
}
           

1.4 分片的大小

  • 分片大小:分片太大就失去了分片的意義;如果分片很小,則管理和建構map任務的時間就會增多,效率變低。并且如果分片跨越兩個資料塊,那麼分片的部分資料需要通過網絡傳輸到map任務運作的節點上,效率會更低。是以分片的最佳大小應該和HDFS的分塊大小一緻。Hadoop2預設128M。
  • 源碼解讀(分片大小):
// FormatMinSplitSize是 1, MinSplitSize如果沒配置預設是 1
long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
// 如果沒配置,則預設是 Long類型的最大值
long maxSize = getMaxSplitSize(job);
// 塊大小,Hadoop2是128M,本地模式為32M
long blockSize = file.getBlockSize();
// 分片大小計算公式。預設就是blockSize的大小
long splitSize=Math.max(minSize, Math.min(maxSize, blockSize));

           
  • 自定義分片大小:由上面的公式可知,預設的分片大小就是blockSize的大小。如果要自定義大于blockSize,比如改為200M,就把minSize改為200;小于blockSize,比如20M,就把maxSize改為20
  • 1.1倍:最常見的問題就是:一個大小為130M的檔案,在分片大小為128M的叢集上會分成幾片?答案是1片;因為 128*1.1>130,準确來說應該是130 / 128 < 1.1 (源碼的公式)。也就是說,如果剩下的檔案大小在分片大小的1.1倍以内,就不會再分片了。要這個1.1倍,是為了優化性能;試想如果不這樣,當還剩下130M大小的時候,就會分成一塊128M,一塊2M,後面還要為這個2M的塊單獨開一個map任務,不劃算。至于為什麼是1.1,這個1.1是專家們通過反複試驗得出來的結果。
  • 源碼解讀(1.1倍):
// 當剩餘檔案的大小,大于分片大小的1.1倍時,才會分片
private static final double SPLIT_SLOP = 1.1;   // 10% slop
// bytesRemaining為檔案剩餘大小,splitSize為上面計算出的分片大小
while (((double) bytesRemaining) / splitSize > SPLIT_SLOP) {
        // 分片4
}
           

1.5 開始分片

  • 終于分片了:經過上面的層層條件,下面就是// 分片4中的分片代碼。與HDFS的實體分塊不同的是,MapReduce的分片隻是邏輯上的分片,即按照偏移量分片。
// 封裝一個分片資訊(包含檔案的路徑,分片的起始偏移量,要處理的大小,分片包含的塊的資訊,分片中包含的塊存在哪兒些機器上)
int blkIndex = getBlockIndex(blkLocations, length - bytesRemaining);
// makeSplit進行切片操作,傳回值是一個切片,并且加入到切片清單中
splits.add(makeSplit(path, length - bytesRemaining, splitSize,
               blkLocations[blkIndex].getHosts(),
               blkLocations[blkIndex].getCachedHosts()));
// 剩餘檔案大小
bytesRemaining -= splitSize;
           

1.6 分片後讀取會不會斷行

  • 不會:由于分片時是按照長度進行分片的,那就有很大可能會把一行資料分在兩個片裡面,是以分片的時候确實會斷行。如果讀取并處理斷行的資料,就會導緻結果不正确,那是肯定不行的。是以LineRecordReader類就充當了讀取記錄的角色,保證讀取不斷行;其中nextKeyValue()方法裡是真正給Mapper中的key指派的地方,并且調用了父類LineReader類中的readLine()方法來給value指派。
  • 源碼解讀(讀取時不斷行):
public class TextInputFormat extends FileInputFormat<LongWritable, Text> {
    @Override
    public RecordReader<LongWritable, Text>
    createRecordReader(InputSplit split,
                       TaskAttemptContext context) {
        String delimiter = context.getConfiguration().get(
                "textinputformat.record.delimiter");
        // 行分隔符
        byte[] recordDelimiterBytes = null;
        if (null != delimiter)
            recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8);
         // 傳回LineRecordReader對象
        return new LineRecordReader(recordDelimiterBytes);
    }
}

// 行記錄讀取類,提供讀取片中資料的功能,并且保證不斷行
public class LineRecordReader extends RecordReader<LongWritable, Text> {
    // ......其他代碼
    
    public void initialize(InputSplit genericSplit,
                           TaskAttemptContext context) throws IOException {
        // ......
        
        // 如果不是第一個分片,則開始位置退到下一行記錄的開始位置
        // 因為為了保證讀取時不斷行,每個塊都會向後多讀一行(最後一個除外)
        if (start != 0) {
            start += in.readLine(new Text(), 0, maxBytesToConsume(start));
        }
    }
    
    public boolean nextKeyValue() throws IOException {
        // 給Mapper中輸入的key指派
        key.set(pos);
        // 執行個體化Mapper中輸入的value
        if (value == null) {
            value = new Text();
        }
        // 注意是<=end,在等于end時還會執行一次,多讀了一行,是以不會斷行
        while (getFilePosition() <= end || in.needAdditionalRecordAfterSplit()) {
            if (pos == 0) {
                newSize = skipUtfByteOrderMark();
            } else {
                // 給Mapper中輸入的value指派。
                // readLine方法會根據是否自定義行分隔符來調用不同的方法。
                newSize = in.readLine(value, maxLineLength, maxBytesToConsume(pos));
                pos += newSize;
            }
        }
    }
}
           

二、Map階段

2.1 執行個體化Mapper

  • 各種執行個體化:上面費了很大的勁來編寫分片TextInputFormat,和讀取類LineRecordReader;而這一切都是為了把輸入資料很好的傳給map()方法來運算,是以首先就要執行個體化我們自定義的Mapper類。
  • 源碼解讀(各種執行個體化):
package org.apache.hadoop.mapred;
class MapTask.java;

method runNewMapper();

// 通過反射來擷取Mapper。在Job中設定的Mapper,也就是自己定義的繼承自Mapper的類
org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE> mapper =
  (org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>)
    ReflectionUtils.newInstance(taskContext.getMapperClass(), job);
// 通過反射來得到 InputFormat。預設是TextInputFormat
org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE> inputFormat =
  (org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE>)
    ReflectionUtils.newInstance(taskContext.getInputFormatClass(), job);
// 獲得目前MapTask要處理的split
org.apache.hadoop.mapreduce.InputSplit split = null;
split = getSplitDetails(new Path(splitIndex.getSplitLocation()),
    splitIndex.getStartOffset());
LOG.info("Processing split: " + split);
// 根據InputFormat對象建立RecordReader對象。預設是LineRecordReader
org.apache.hadoop.mapreduce.RecordReader<INKEY,INVALUE> input =
  new NewTrackingRecordReader<INKEY,INVALUE>
    (split, inputFormat, reporter, taskContext);

// 初始化。用來打開檔案,并且調整檔案的頭指針
input.initialize(split, mapperContext);
// MapTask中調用Mapper的run()方法
mapper.run(mapperContext);
           

2.2 調用map()方法

  • 每行資料調用一次:從上面的代碼中我們知道,MapTask中會調用Mapper類的run()方法;而run()方法會在while循環中調用map()方法,由退出條件可知,是每一行資料調用一次map()方法。
  • 源碼解讀(怎麼調用map()方法):
public void run(Context context) throws IOException, InterruptedException {
    // 在所有map執行之前初始化,也可以根據業務需要來重寫此方法
    setup(context); 
    try {
        // context.nextKeyValue()其實就是LineRecordReader中的nextKeyValue()方法;
        // 在run方法中周遊所有的key,每行資料都執行一次自定義map方法;
        while (context.nextKeyValue()) {
            map(context.getCurrentKey(), context.getCurrentValue(), context);
        }
    } finally {
        // 父類Mapper中的setup()和cleanup()方法中什麼都沒做;
        // 隻執行一次,可以根據業務需要來重寫此方法;
        cleanup(context);
    }
}
           

三、Shuffle階段

靈魂拷問:哪來的Shuffle?

  • 理論與實作:看過源碼的都知道,其實源碼中根本就沒有什麼shuffle;shuffle隻是一個過程,确切的來說是連貫Map階段和reduce階段的一個理論過程,而它的實作主要在MapTask和ReduceTask類中。shuffle階段可以說是MapReduce中最核心的一個階段。

3.1 shuffle的概念

  • 作用:shuffle這個單詞的本意是洗牌、打亂的意思,而在這裡則是:将map端的無規則輸出按照指定的規則“打亂”成具有一定規則的資料,以便reduce端接收和處理。
  • 流程:shuffle的範圍是map輸出後到reduce輸入前。它的流程主要包括Map端shuffle和reduce端shuffle。
  • MapReduce大緻流程:
2,MapReduce原理及源碼解讀

3.2 Map端Shuffle

  • 作用:Map端的shuffle過程是對Map的結果進行分區、排序、溢寫、合并分區,最後寫入磁盤;最終會得到一個分區有序的檔案,即先按分區排序,再按key排序。
  • Map端shuffle大緻流程:
2,MapReduce原理及源碼解讀

3.2.1 分區(partition)

  • 概念:對于map的每一個輸出的鍵值對,都會根據key來生成partition再一起寫入環形緩沖區。每一個reduceTask會處理一個partition(第0個reduceTask處理partition為0的分區,以此類推)。
  • 如何分區:預設情況下,分區号是key的hash值對numReduceTask數量取模的結果。也可以自定義分區。
  • 源碼解讀(如何分區):
// 當設定的reduceTask數大于實際分區數時,可以正常執行,多出的分區為空檔案;
// 當設定的reduceTask數小于實際分區數時,會報錯。
job.setNumReduceTasks(4);
// 如果設定的 numReduceTasks大于 1,而又沒有設定自定義的 PartitionerClass
// 則會調用系統預設的 HashPartitioner實作類來計算分區。
job.setPartitionerClass(WordCountPartitioner.class);
           
// 自定義分區
public class WordCountPartitioner extends Partitioner<Text, IntWritable> {
    private static HashMap<String, Integer> map = new HashMap<>();
    static {
        map.put("0734", 0);
        map.put("0561", 1);
        map.put("0428", 2);
    }

    // 當 Mapper的輸出要寫入環形緩沖區時,會調用此方法來計算目前<K,V>的分區号
    @Override
    public int getPartition(Text text, IntWritable intWritable, int numPartitions) {
        String strText = text.toString();
        return map.getOrDefault(strText.substring(0, 4), 3);
    }
}
           
// MapTask.java$NewOutputCollector
public void write(K key, V value) throws IOException, InterruptedException {
      // 把 K,V以及分區号寫入環形緩沖區
      collector.collect(key, value,
                        partitioner.getPartition(key, value, partitions)); 
}
           

3.2.2 寫入環形緩沖區

2,MapReduce原理及源碼解讀
  • 概念:環形緩沖區是在記憶體中的一個位元組數組kvbuffer。kvbuffer不僅存放map輸出的<k, v>,還在另一端存放了<k, v>的索引(中繼資料) kvmeta,每個kvmeta包括value的起始位置、key的起始位置、partition值、value的長度,占用4個int長度。上圖中的bufindex和kvindex分别表示kvbuffer和kvmeta的指針。環形緩沖區的預設大小是100M,當寫入資料大小超過80%(80M)就會觸發Spill,溢寫到磁盤。
  • 源碼解讀(Spill):
// SpillThread線程在MapTask$MapOutputBuffer類中初始化,在init()方法中啟動。
// 它會一直監視環形緩沖區,當大小超過80%的時候,就會調用sortAndSpill()方法。
protected class SpillThread extends Thread {
    @Override
      public void run() {
            // ....
            // run方法中調用排序并溢寫方法
          while (true) {
              // ....
              sortAndSpill();
          }
            //.... 
      }
}
           

3.2.3 排序并溢寫(sortAndSpill):

  • 排序:觸發溢寫後,會先排序,再溢寫。排序是根據partition和key的升序排序,移動的隻是索引資料,排序的結果是将kvmeta中到的資料按照partition聚合在一起,同一個partition内再根據key排序。
2,MapReduce原理及源碼解讀
  • 溢寫:Spill線程根據排序後的kvmeta檔案,将一個個partition輸出到檔案,在這次溢寫過程中,會将環形緩沖區中已計算的資料(80M)寫入到一個檔案spill.out,是以引入了索引檔案spill.index,它記錄了partition在spill.out中的位置。

3.2.4 合并(merge):

2,MapReduce原理及源碼解讀
  • 概念:如果Map的資料很大,那麼就會觸發多次Spill,spill.out和spill.index檔案也會很多。是以最後就要把這些檔案合并,友善Reduce讀取。
  • 合并過程:合并過程中,首先會根據spill.index檔案,将spill.out檔案中的partition使用歸并排序分别寫入到相應的segment中,然後再把所有的segment寫入到一個file.out檔案中,并用file.out.index來記錄partition的索引。由于合并時可能有相同的key,是以如果設定了combine,那麼在寫入檔案之前還會調用自定義的combine方法。

3.3 Reduce端Shuffle

2,MapReduce原理及源碼解讀

3.3.1 拉取(Copy)

  • 前期工作:Reduce任務會通過HTTP向各個Map任務拉取它所需的partition資料。當Map任務成功完成之後會通知 TaskTracker狀态已跟新,TaskTracker進而通知JobTracker(都是通過心跳機制實作),是以JobTracker中記錄了Map輸出和TaskTracker的映射關系。
  • 何時拉取:Reduce會定期向JobTracker擷取Map的輸出位置,一旦拿到輸出位置,Reduce任務就會立即從此輸出對應的TaskTracker上複制相應的partition資料到本地,而不是等到所有Map任務結束。

3.3.2 排序合并(Merge Sort)

  • 合并:copy過來的資料會先放入記憶體緩沖區中(大小是 JVM的heap size的70%),如果緩沖區放得下就直接把資料寫入記憶體,即記憶體到記憶體merge。如果緩沖區中的Map資料達到一定大小(緩沖區的66%)的時候,就會開啟記憶體merge,并将merge後的資料寫入磁盤,即記憶體到磁盤merge。當屬于該Reduce任務的map輸出全部拉取完成,則會在reduce任務的磁盤上生成多個檔案(如果所有map輸出的大小沒有超過緩沖區大小,則資料隻存在于記憶體中),這時開始最後的合并操作,即磁盤到磁盤merge。如果設定了combine,合并時也會執行。
  • 排序:由于map輸出的資料已經是有序的,是以reduce在合并時的排序是歸并排序,并且reduce端的copy和sort是同時進行的,最終會得到一個整體有序的資料。

3.3.3 歸并分組(reduce)

  • 歸并分組(reduce):當reduce任務執行完拉取和排序合并後,就會對相同的key進行分組。預設情況下是根據key對象中重寫的compareTo()方法來分組,如果設定了GroupingComparator,則會調用它的compare()方法來分組。reduce會把compareTo(或compare)方法計算傳回為 0 的key分為一組,最終會得到一個組<key, Iterable<value,>>,其中組的key是這一組的第一個資料的key,Iterable<value,>則是相同key的value疊代器。最後再對每一個組調用Reducer的reduce()方法。
  • 源碼解讀(分組):
// org.apache.hadoop.mapreduce.Reducer中的run()方法
while (context.nextKey()) {
     // 調用自定義 reduce方法
     reduce(context.getCurrentKey(), context.getValues(), context);
     // .....
}

// org.apache.hadoop.mapreduce.task.ReduceContextImpl中的方法
public boolean nextKey() throws IOException,InterruptedException {
    // 如果目前key與下一個key相同,則繼續往下走;
    // 這一步就是把相同的key放到一組, 他們的value放到一個疊代器中;當下一個key不同時再調用reduce方法
    while (hasMore && nextKeyIsSame) {
      nextKeyValue();
    }
    if (hasMore) {
      if (inputKeyCounter != null) {
        // 計數器
        inputKeyCounter.increment(1);
      }
      // 當nextKeyIsSame為false時,會再調用一次nextKeyValue(),而它的傳回值必為true;
      return nextKeyValue();
    } else {
      return false;
    }
}

@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
    if (hasMore) {
      nextKey = input.getKey();
      // 在執行reduce方法之前調用ReduceContext中定義的GroupComparator
      // 如果key的compareTo方法傳回0則 nextKeyIsSame為true,也就會分到一組
      nextKeyIsSame = comparator.compare(currentRawKey.getBytes(), 0, 
                                     currentRawKey.getLength(),
                                     nextKey.getData(),
                                     nextKey.getPosition(),
                                     nextKey.getLength() - nextKey.getPosition()
                                         ) == 0;
    } else {
      nextKeyIsSame = false;
    }
    inputValueCounter.increment(1);
    return true;
}
           

四、Reduce階段

4.1 執行reduce()方法

  • 歸并:上面的Shuffle階段已經将資料分組成了<key, Iteralble<value,>>格式的資料,是以對于相同的key隻會調用一次reduce()方法。
  • 注意事項:在reduce()方法中,一定要重新建立key對象,不要直接使用參數中的key。

4.2 輸出最終結果

  • 完結:整個MapReduce的輸出和輸入有點類似。輸出是執行個體化TextOutputFormat和LineRecordWrite對象。并由LineRecordWrite判斷是不是NullWriteable,最後輸出到檔案