天天看點

MapReduce源碼分析之MapTask分析(二)

SpillThread分析

為什麼需要Spill

         記憶體大小總是有效,是以在Mapper在處理過程中,資料持續輸出到記憶體中時,必然需要有機制能将記憶體中的資料換出,合理的刷出到磁盤上。SpillThread就是用來完成這部分工作。

         SpillThread的線程處理函數隻是做一層封裝,當索引表中的kvstart和kvend指向一樣的索引位置時,會持續處于等待過程,等待外部通知需要觸發spill動作,當有spill請求時,會觸發StartSpill來喚醒SpillThread線程,進入到sortAndSpill。

    下面就是SpillThread線程體函數。

protected class SpillThread extends Thread {
      @Override
      public void run() {
        spillLock.lock();
        spillThreadRunning = true;
        try {
          while (true) {
            spillDone.signal();
            while (kvstart == kvend) {
			// 等待被喚醒
              spillReady.await();
            }
            try {
              spillLock.unlock();
			// spill處理
              sortAndSpill();
            } catch (...) {
			...
            } finally {
              spillLock.lock();
			// 重置索引區,更新buf緩沖區的尾部位置資訊
              if (bufend < bufindex && bufindex < bufstart) {
                bufvoid = kvbuffer.length;
              }
              kvstart = kvend;
              bufstart = bufend;
            }
          }
        } catch (InterruptedException e) {
          Thread.currentThread().interrupt();
        } finally {
          spillLock.unlock();
          spillThreadRunning = false;
        }
      }
    }
           

線程函數内的處理邏輯比較簡單,主要分為三個步驟:

1.等待喚醒

2.對記憶體中的資料進行排序并将資料溢出寫入到磁盤,這部分内部分析見下文。

3.重置索引區和緩存區的end标記

sortAndSpill

記憶體資料的溢出處理是有此函數進行封裝,下面我們将該函數按塊進行詳細分析。

private void sortAndSpill() throws IOException, ClassNotFoundException,
                                       InterruptedException {
      //approximate the length of the output file to be the length of the
      //buffer + header lengths for the partitions
      long size = (bufend >= bufstart
          ? bufend - bufstart
          : (bufvoid - bufend) + bufstart) +
                  partitions * APPROX_HEADER_LENGTH;
      FSDataOutputStream out = null;
      try {
		// part1
        // create spill file
        final SpillRecord spillRec = new SpillRecord(partitions);
        final Path filename =
            mapOutputFile.getSpillFileForWrite(numSpills, size);
        out = rfs.create(filename);
		
		// part2
        final int endPosition = (kvend > kvstart)
          ? kvend
          : kvoffsets.length + kvend;
        sorter.sort(MapOutputBuffer.this, kvstart, endPosition, reporter);
	
		
        int spindex = kvstart;
        IndexRecord rec = new IndexRecord();
        InMemValBytes value = new InMemValBytes();
        for (int i = 0; i < partitions; ++i) {
          IFile.Writer<K, V> writer = null;
          try {
			// part3
            long segmentStart = out.getPos();
            writer = new Writer<K, V>(job, out, keyClass, valClass, codec,
                                      spilledRecordsCounter);
			// part4
            if (combinerRunner == null) {
              // spill directly
              DataInputBuffer key = new DataInputBuffer();
              while (spindex < endPosition &&
                  kvindices[kvoffsets[spindex % kvoffsets.length]
                            + PARTITION] == i) {
                final int kvoff = kvoffsets[spindex % kvoffsets.length];
                getVBytesForOffset(kvoff, value);
                key.reset(kvbuffer, kvindices[kvoff + KEYSTART],
                          (kvindices[kvoff + VALSTART] - 
                           kvindices[kvoff + KEYSTART]));
                writer.append(key, value);
                ++spindex;
              }
            } else {
			// part5
              int spstart = spindex;
              while (spindex < endPosition &&
                  kvindices[kvoffsets[spindex % kvoffsets.length]
                            + PARTITION] == i) {
                ++spindex;
              }
              // Note: we would like to avoid the combiner if we've fewer
              // than some threshold of records for a partition
              if (spstart != spindex) {
                combineCollector.setWriter(writer);
                RawKeyValueIterator kvIter =
                  new MRResultIterator(spstart, spindex);
                combinerRunner.combine(kvIter, combineCollector);
              }
            }
		
			// part6
            // close the writer
            writer.close();
			
            // record offsets
            rec.startOffset = segmentStart;
            rec.rawLength = writer.getRawLength();
            rec.partLength = writer.getCompressedLength();
            spillRec.putIndex(rec, i);

            writer = null;
          } finally {
            if (null != writer) writer.close();
          }
        }
		
		// part7
        if (totalIndexCacheMemory >= INDEX_CACHE_MEMORY_LIMIT) {
          // create spill index file
          Path indexFilename =
              mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions
                  * MAP_OUTPUT_INDEX_RECORD_LENGTH);
          spillRec.writeToFile(indexFilename, job);
        } else {
          indexCacheList.add(spillRec);
          totalIndexCacheMemory +=
            spillRec.size() * MAP_OUTPUT_INDEX_RECORD_LENGTH;
        }
        LOG.info("Finished spill " + numSpills);
        ++numSpills;
      } finally {
        if (out != null) out.close();
      }
    }
           

part1:建立SpillRecord,建立檔案流

    SpillRecord是一個記錄集,用于記錄分區在資料檔案中的檔案起始位置,原始長度,壓縮後的長度資訊。

    SpillRecord的成員隻有兩個。一個是buf,長度為分區個數*每條分區索引資訊占用的長度,另一個是為記錄友善轉換成的LogBuffer。

    每條分區索引資訊占用的長度由MAP_OUTPUT_INDEX_RECORD_LENGTH來表示,占用24個位元組,即3個Long。

public SpillRecord(int numPartitions) {
    buf = ByteBuffer.allocate(
        numPartitions * MapTask.MAP_OUTPUT_INDEX_RECORD_LENGTH);
    entries = buf.asLongBuffer();
  }
           

    建立檔案流,檔案的路徑是根據numspill來産生的,第一個溢出的檔案就是spill0.out,以此類推後續的溢出的檔案就是spill0.out,spill1.out ...

    在每次滿足溢出的時候,都會産生一個溢出的檔案,這些溢出的檔案最後會在處理完Mapper在最後的flush階段觸發merge動作,将所有溢出的檔案進行合并為一個檔案。

part2:資料排序

    擷取溢出的處理的索引區間的尾部位置,這個索引區間是有kvstart,kvend所辨別出來,kvstart記錄了索引區開始使用的起始位置,kvend記錄了索引區使用的結束位置。這一段索引區所指向的資料緩沖區就是需要被處理刷入到檔案的。在上文,我們提到了因為是循環緩沖區,索引在沒有到緩沖區尾部時是kvstart<kvend,當kvend走到尾循環回來,kvstart>kvend。

在排序時,為處理簡單,指定出一個統一的區間,使用endpostion表示出尾部位置。當kvend在前,endposition為kvoffsets的長度+kvend。

    MapReduce的核心是對資料排序,在MapTask需要對每次溢出的資料按分區進行排序,保證分區内的資料是有序的,分區從小到大遞增。排序的工作是由sorter完成,排序在記憶體中排列完成。

    sorter是一個IndexedSorter類型,在MapOutputBuffer初始化時從conf中擷取map.sort.class所指定的sort類,預設是使用QuickSort。我們截取部分排序函數的部分代碼,來分析排序過程。

private static void sortInternal(final IndexedSortable s, int p, int r,
      final Progressable rep, int depth) {
    if (null != rep) {
      rep.progress();
    }
    while (true) {
    if (r-p < 13) {
	//p為其實位置,r為結束位置,s為MapOutputBuffer
      for (int i = p; i < r; ++i) {
        for (int j = i; j > p && s.compare(j-1, j) > 0; --j) {
          s.swap(j, j-1);
        }
      }
      return;
    }
	....
}
           

    sort的關鍵兩步就是key之間比較,和交換。compare使用的和swap調用的都是MapOutputBuffer中的兩個函數,先看compare函數,comapre傳入的是兩個kvoffsets索引區的兩個index,因為endposition有可能是大于kevoffsets的長度,是以在取真實index的時候,需要對kvoffsets的長度進行取餘。比較會先取出kvoffsets中的值,再通過該值定位到k,v在二級索引區kvindices中記錄的k,v所屬的分區,在kvbuffer的位置,長度。排序優先級為,低分區->高分區,分區一樣則根據key排序。

    當符合條件,使用swap函數,交換kvoffsets中記錄kvindices的索引值,是以排序的開銷很小,不需要每次移動key,僅通過kvoffsets就完成比較排序。

public int compare(int i, int j) {
      final int ii = kvoffsets[i % kvoffsets.length];
      final int ij = kvoffsets[j % kvoffsets.length];
      // sort by partition
      if (kvindices[ii + PARTITION] != kvindices[ij + PARTITION]) {
        return kvindices[ii + PARTITION] - kvindices[ij + PARTITION];
      }
      // sort by key
      return comparator.compare(kvbuffer,
          kvindices[ii + KEYSTART],
          kvindices[ii + VALSTART] - kvindices[ii + KEYSTART],
          kvbuffer,
          kvindices[ij + KEYSTART],
          kvindices[ij + VALSTART] - kvindices[ij + KEYSTART]);
    }
	
    public void swap(int i, int j) {
      i %= kvoffsets.length;
      j %= kvoffsets.length;
      int tmp = kvoffsets[i];
      kvoffsets[i] = kvoffsets[j];
      kvoffsets[j] = tmp;
    }
           
MapReduce源碼分析之MapTask分析(二)

上圖就是排序前後的變化過程。排序前kvbuffer中有key為字元串,value為int值。

第一個item的key為"ba",第二個item的key為"aa",所屬分區都為分區1,按照字典序排序"aa","ba"。排序後,二級索引kvindices和kvbuffer都沒有變動,隻是在一級索引kvoffsets中交換指向,在kvoffsets[0]=1指向"aa",kvoffsets[1]=0指向"ba"。

part3: IFile資料格式

    IFile是一種存儲格式,用于表示MapTask在處理資料溢出到磁盤檔案,資料在磁盤檔案中以什麼形式組織。存儲形式為如下

KeyLength valueLength key Value
KeyLength valueLength key Value
EOF_MARKER EOF_MARKER 4位元組CRC

每個key,value輸出到檔案中,都會以上述keylength,valuelength,key,value的形式逐個排列,在close時,會輸出兩個标記,非别是key,value長度的标記,标記為-1表示key,value輸出結束,在尾部會後一個針對整個檔案的crc校驗碼。

IFile類内有兩個子類,分别是Reader,Writer用于讀取和寫入IFile檔案。 

Writer的内部成員:

//用于io操作的輸出流,基于checksum的流産生
    FSDataOutputStream out;
	//記錄原始的輸出流,也就是第一部分中産生的檔案流
    FSDataOutputStream rawOut;
	//基于檔案流産生的checksum輸出流,特點是write時内部會做crc
    IFileOutputStream checksumOut;

	//key,value的序列化,和"核心成員變量中的key,value序列化類一樣的功能"
    Class<K> keyClass;
    Class<V> valueClass;
    Serializer<K> keySerializer;
    Serializer<V> valueSerializer;

    public Writer(Configuration conf, FSDataOutputStream out, 
        Class<K> keyClass, Class<V> valueClass,
        CompressionCodec codec, Counters.Counter writesCounter)
        throws IOException {
	 //根據檔案流封了一層可以在輸出時做crc
      this.checksumOut = new IFileOutputStream(out);
      this.rawOut = out;
      this.start = this.rawOut.getPos();
      
      if (codec != null) {
		...
      } else {
		//writer内部用于io輸出的流是基于checksumOut産生的。
        this.out = new FSDataOutputStream(checksumOut,null);
      }
      
	// key,value序列化類,是輸出key,value到buffer中,真正寫的時候從buffer中取出
      this.keyClass = keyClass;
      this.valueClass = valueClass;
      SerializationFactory serializationFactory = new SerializationFactory(conf);
      this.keySerializer = serializationFactory.getSerializer(keyClass);
      this.keySerializer.open(buffer);
      this.valueSerializer = serializationFactory.getSerializer(valueClass);
      this.valueSerializer.open(buffer);
    }
           

part4:無Combiner處理

         當使用者沒有指定commbiner,就不需要做combiner處理,可以通過IFile.Writer直接将已排序好的資料逐個按分區輸出到磁盤檔案。區分是否是同一個分區的資料,是根據目前spindex所指向的一級索引kvoffsets所辨別的資料是否屬于目前分區号,如果是同一個分區,就使用writer進行輸出,否則切換到處理下一個分區。

         這裡有一點需要注意的是,二級索引kvindices中每一項(分區号,keyOffset,valOffset)辨別一對key,value,key的長度可以根據valOffset-keyOffset擷取到key的長度,而value的長度需要通過先取得kvindices中的下一項,通過下一個項中的key的偏移-目前的val的偏移擷取到val的長度。這部分的代碼會封裝在getVBytesForOffset

         writer的輸出比較簡單,輸出key,value之前,先輸出key長度,value長度。

public void append(DataInputBuffer key, DataInputBuffer value)
    throws IOException {
      int keyLength = key.getLength() - key.getPosition();
      if (keyLength < 0) {
        throw new IOException("Negative key-length not allowed: " + keyLength + 
                              " for " + key);
      }
      
      int valueLength = value.getLength() - value.getPosition();
      if (valueLength < 0) {
        throw new IOException("Negative value-length not allowed: " + 
                              valueLength + " for " + value);
      }

      WritableUtils.writeVInt(out, keyLength);
      WritableUtils.writeVInt(out, valueLength);
      out.write(key.getData(), key.getPosition(), keyLength); 
      out.write(value.getData(), value.getPosition(), valueLength); 

      // Update bytes written
      decompressedBytesWritten += keyLength + valueLength + 
                      WritableUtils.getVIntSize(keyLength) + 
                      WritableUtils.getVIntSize(valueLength);
      ++numRecordsWritten;
    }
           

part5: Combiner處理

    如果使用者指定過Combiner,那麼處理和無Combiner有一點小差别。需要在輸出的時候,針對同一分區内的資料做一次過濾。同一分區的資料區間通過spstart,spindex辨別出來。

    combineCollector.setWriter(writer);這裡将IFile.Writer設定進去,在combiner進行中調用collect将會調用到CombineOutputCollector.collect,這一步就是和無Combiner一樣将資料輸出到IFile.Writer中。

public synchronized void collect(K key, V value)
        throws IOException {
      outCounter.increment(1);
      writer.append(key, value);
      if ((outCounter.getValue() % progressBar) == 0) {
        progressable.progress();
      }
    }
           

    combinerRunner.combine(kvIter, combineCollector);是如何執行的呢,這裡會因為combinerRunner的不同而不同,我們關注的是舊的MR處理,是以我們跟蹤到OldCombinerRunner.combiner,可以看到流程實際上非常簡單,整個疊代的過程是判斷是否還有資料沒有被處理掉,有則一直循環,依次調用reduce函數,每處理一次相同的key的資料後,通過nextKey切換到下一個不同的key再次重複。在使用者的reduce函數内,因為collector是CombineOutputCollector,是以使用者在collector.collect輸出key,value,實際上是輸出到IFile.Writer流中。

protected void combine(RawKeyValueIterator kvIter,
                           OutputCollector<K,V> combineCollector
                           ) throws IOException {
	//combiner是一個Reduer
      Reducer<K,V,K,V> combiner = 
        ReflectionUtils.newInstance(combinerClass, job);
      try {
		//取得value的疊代器
        CombineValuesIterator<K,V> values = 
          new CombineValuesIterator<K,V>(kvIter, comparator, keyClass, 
                                         valueClass, job, Reporter.NULL,
                                         inputCounter);
		//判斷依據是spstart是否走到spindex
        while (values.more()) {
          combiner.reduce(values.getKey(), values, combineCollector,
              Reporter.NULL);
		//跳過相同key直到讀取到下一個不相同的key
          values.nextKey();
        }
      } finally {
        combiner.close();
      }
    }
           

    Reducer的處理函數reduce大家都知道入參是key和一串value,這裡的一串value通過Iterator來表示。void reduce(K2 key,Iterator<V2> values, OutputCollector<K3, V3> output, Reporterreporter) throws IOException;

    那這裡的Iterator<V2>values是如何從kvoffsets中将已經排序過的,相鄰的相同的key的value放在一起的呢,這部分功能是有CombineValuesIterator的父類ValuesIterator來實作的,ValuesIterator的基類是Iterator,Iterator的接口hasNext和next都有實作。

    ValuesIterator有一個一直被調用到的方法,是readNextKey用來擷取下一個key并判斷是否後續還有資料(more辨別)以及是否還有相同的key(hasNext辨別)。

private void readNextKey() throws IOException {
      //根據spstart是否到達spindex指向的區間尾部
	  more = in.next();
      if (more) {
        DataInputBuffer nextKeyBytes = in.getKey();
        keyIn.reset(nextKeyBytes.getData(), nextKeyBytes.getPosition(),
            nextKeyBytes.getLength() - nextKeyBytes.getPosition());
		//将keyIn中的key反序列化到nextKey變量中
        nextKey = keyDeserializer.deserialize(nextKey);
        //判斷是否還有相同的key存在
        hasNext = key != null && (comparator.compare(key, nextKey) == 0);
      } else {
        hasNext = false;
      }
    }

    public boolean hasNext() { return hasNext; }

    private int ctr = 0;
	public VALUE next() {
      if (!hasNext) {
        throw new NoSuchElementException("iterate past last value");
      }
      try {
		//傳回相同的key的value,讀取下一個key,如果key相同,
		//下一次仍會進入到next函數中,讀取到相同key的value
        readNextValue();
        readNextKey();
      } catch (IOException ie) {
        throw new RuntimeException("problem advancing post rec#"+ctr, ie);
      }
      reporter.progress();
      return value;
    }
	
	//在每調用一次reduce處理完相同key所對應的一串value,
	//會通過nextKey函數取得下一個不同的key,重新進入到reduce函數。
    /** Start processing next unique key. */
    void nextKey() throws IOException {
     //讀取到下一個不同的key,實際上combiner的處理,是不會進入到while循環内
      while (hasNext) { 
        readNextKey();
      }
      ++ctr;
      
      // move the next key to the current one
      KEY tmpKey = key;
      key = nextKey;
      nextKey = tmpKey;
      hasNext = more;
    }
           

part6:關閉流,記錄索引資訊

         在輸出輸出完成後,會調用IFile.Writer的close函數,插入兩個EOF_MARKER,并寫入checksum.

public void close() throws IOException {

      // Close the serializers
      keySerializer.close();
      valueSerializer.close();

      // Write EOF_MARKER for key/value length
      WritableUtils.writeVInt(out, EOF_MARKER);
      WritableUtils.writeVInt(out, EOF_MARKER);
      decompressedBytesWritten += 2 * WritableUtils.getVIntSize(EOF_MARKER);
      
      //Flush the stream
      out.flush();
  
      if (compressOutput) {
        // Flush
        compressedOut.finish();
        compressedOut.resetState();
      }
      
      // Close the underlying stream iff we own it...
      if (ownOutputStream) {
        out.close();
      }
      else {
        //寫入checksum
        checksumOut.finish();
      }

      compressedBytesWritten = rawOut.getPos() - start;

      if (compressOutput) {
        // Return back the compressor
        CodecPool.returnCompressor(compressor);
        compressor = null;
      }

      out = null;
      if(writtenRecordsCounter != null) {
        writtenRecordsCounter.increment(numRecordsWritten);
      }
    }
           

    checkSum的寫入可以看到IFileOutputStream的finish函數,會從DataChecksum取出4個位元組的checksum值寫入到檔案尾部。

public void finish() throws IOException {
    if (finished) {
      return;
    }
    finished = true;
    sum.writeValue(barray, 0, false);
    out.write (barray, 0, sum.getChecksumSize());
    out.flush(); 
  }
           

    關閉掉輸出流後,會将目前分區在磁盤檔案中的起始位置,結束位置資訊記錄到索引資訊中。索引資訊在記憶體中是存放在SpillRecord中。

part7: IFile的索引檔案

    每個IFile資料檔案就有一個對應的索引檔案和它一一對應,這個索引檔案有可能在記憶體,也有可能在磁盤上真實存在的索引檔案。IFile檔案對應的的索引資訊會在滿足條件的情況下記憶體中緩存着,一個IFile對應的索引資訊封裝在SpillRecord中,這個索引資訊SpillRecord存儲在indexCacheList中,當索引的cache超過1M大小後,那麼會将後來産生的索引資訊輸出到磁盤上形成一個索引檔案。這個索引檔案的檔案名為"spill"+ spillNumber +".out.index",spillNumber就是:numSpills變量所記錄的目前進行到第幾次spill。

         以每個檔案使用者設定了兩個ReduceTask那麼paritition個數為2,那麼IFile的索引檔案在磁盤中的形式為:

索引對應的資料 索引檔案存儲内容
Spill0的partition0 startOffset rawLength partLength
Spill0的partition1 startOffset rawLength partLength
Spill1的partition0 startOffset rawLength partLength
Spill1的partition1 startOffset rawLength partLength
8位元組的crc

Merge

         sortAndSpill已經将記憶體中的資料寫成一個個IFile資料檔案,這些檔案最終會被合并為一個資料檔案以及該資料檔案對應的索引檔案。Merge這部分将會分析資料檔案是如何被merge成單個檔案。

         先回到runOldMapper中,在前面我們介紹過這部分代碼了,再次重新看看這部分。collector.flush将會觸發将MapOutputBuffer中的剩餘資料flush到磁盤上,并最終将已經存在磁盤上的資料檔案合并為一個檔案。

runOldMapper:
      runner.run(in, new OldOutputCollector(collector, conf), reporter);
      collector.flush();

	// MapOutputBuffer.flush
    public synchronized void flush() throws IOException, ClassNotFoundException,
                                            InterruptedException {
      LOG.info("Starting flush of map output");
      spillLock.lock();
      try {
		//等待正在進行中的spill動作完成
        while (kvstart != kvend) {
          reporter.progress();
          spillDone.await();
        }
        if (sortSpillException != null) {
          throw (IOException)new IOException("Spill failed"
              ).initCause(sortSpillException);
        }
		//緩存中剩餘的資料,需要觸發一次spill動作将剩餘資料spill到磁盤上
        if (kvend != kvindex) {
          kvend = kvindex;
          bufend = bufmark;
          sortAndSpill();
        }
      } catch (InterruptedException e) {
        throw (IOException)new IOException(
            "Buffer interrupted while waiting for the writer"
            ).initCause(e);
      } finally {
        spillLock.unlock();
      }
      assert !spillLock.isHeldByCurrentThread();
      // shut down spill thread and wait for it to exit. Since the preceding
      // ensures that it is finished with its work (and sortAndSpill did not
      // throw), we elect to use an interrupt instead of setting a flag.
      // Spilling simultaneously from this thread while the spill thread
      // finishes its work might be both a useful way to extend this and also
      // sufficient motivation for the latter approach.
      try {
        spillThread.interrupt();
        spillThread.join();
      } catch (InterruptedException e) {
        throw (IOException)new IOException("Spill failed"
            ).initCause(e);
      }
      // release sort buffer before the merge
      kvbuffer = null;
	//觸發将小的spill檔案合并為大的spill檔案。
      mergeParts();
      Path outputPath = mapOutputFile.getOutputFile();
      fileOutputByteCounter.increment(rfs.getFileStatus(outputPath).getLen());
    }

	merge的動作會有mergeParts函數觸發,先看該函數的實作:
    private void mergeParts() throws IOException, InterruptedException, 
                                     ClassNotFoundException {
      // get the approximate size of the final output/index files
      long finalOutFileSize = 0;
      long finalIndexFileSize = 0;
      final Path[] filename = new Path[numSpills];
      final TaskAttemptID mapId = getTaskID();
	//擷取磁盤上的所有spill檔案的檔案名
      for(int i = 0; i < numSpills; i++) {
        filename[i] = mapOutputFile.getSpillFile(i);
        finalOutFileSize += rfs.getFileStatus(filename[i]).getLen();
      }
	//隻有一個spill檔案,那麼隻需要将資料檔案和索引檔案rename即可
      if (numSpills == 1) { //the spill is the final output
        rfs.rename(filename[0],
            new Path(filename[0].getParent(), "file.out"));
        if (indexCacheList.size() == 0) {
          rfs.rename(mapOutputFile.getSpillIndexFile(0),
              new Path(filename[0].getParent(),"file.out.index"));
        } else {
          indexCacheList.get(0).writeToFile(
                new Path(filename[0].getParent(),"file.out.index"), job);
        }
        return;
      }
	
	
      // read in paged indices
	//加載因記憶體中緩存不下而刷出到磁盤上的索引檔案到記憶體中
      for (int i = indexCacheList.size(); i < numSpills; ++i) {
        Path indexFileName = mapOutputFile.getSpillIndexFile(i);
        indexCacheList.add(new SpillRecord(indexFileName, job, null));
      }

      //make correction in the length to include the sequence file header
      //lengths for each partition
      finalOutFileSize += partitions * APPROX_HEADER_LENGTH;
      finalIndexFileSize = partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH;
	//擷取最終的spill檔案和index檔案的路徑名
      Path finalOutputFile =
          mapOutputFile.getOutputFileForWrite(finalOutFileSize);
      Path finalIndexFile =
          mapOutputFile.getOutputIndexFileForWrite(finalIndexFileSize);

      //The output stream for the final single output file
      FSDataOutputStream finalOut = rfs.create(finalOutputFile, true, 4096);
	
	//沒有觸發過spill動作,則形成一個空的spill檔案和index檔案。
      if (numSpills == 0) {
        //create dummy files
        IndexRecord rec = new IndexRecord();
        SpillRecord sr = new SpillRecord(partitions);
        try {
          for (int i = 0; i < partitions; i++) {
            long segmentStart = finalOut.getPos();
            Writer<K, V> writer =
              new Writer<K, V>(job, finalOut, keyClass, valClass, codec, null);
            writer.close();
            rec.startOffset = segmentStart;
            rec.rawLength = writer.getRawLength();
            rec.partLength = writer.getCompressedLength();
            sr.putIndex(rec, i);
          }
          sr.writeToFile(finalIndexFile, job);
        } finally {
          finalOut.close();
        }
        return;
      }
      {
        IndexRecord rec = new IndexRecord();
        final SpillRecord spillRec = new SpillRecord(partitions);
        for (int parts = 0; parts < partitions; parts++) {
          //create the segments to be merged
		//抽取spill檔案中屬于該paritition的索引形成一個segment
		//所有屬于同一個分區的資訊性能一個segment清單
          List<Segment<K,V>> segmentList =
            new ArrayList<Segment<K, V>>(numSpills);
          for(int i = 0; i < numSpills; i++) {
            IndexRecord indexRecord = indexCacheList.get(i).getIndex(parts);

            Segment<K,V> s =
              new Segment<K,V>(job, rfs, filename[i], indexRecord.startOffset,
                               indexRecord.partLength, codec, true);
            segmentList.add(i, s);

            if (LOG.isDebugEnabled()) {
              LOG.debug("MapId=" + mapId + " Reducer=" + parts +
                  "Spill =" + i + "(" + indexRecord.startOffset + "," +
                  indexRecord.rawLength + ", " + indexRecord.partLength + ")");
            }
          }
		
		//将屬于同一個分區的資料進行merge
          //merge
          @SuppressWarnings("unchecked")
          RawKeyValueIterator kvIter = Merger.merge(job, rfs,
                         keyClass, valClass, codec,
                         segmentList, job.getInt("io.sort.factor", 100),
                         new Path(mapId.toString()),
                         job.getOutputKeyComparator(), reporter,
                         null, spilledRecordsCounter);

          //write merged output to disk
		//将merge後的資料寫入到最終的spill檔案中
          long segmentStart = finalOut.getPos();
          Writer<K, V> writer =
              new Writer<K, V>(job, finalOut, keyClass, valClass, codec,
                               spilledRecordsCounter);
          if (combinerRunner == null || numSpills < minSpillsForCombine) {
            Merger.writeFile(kvIter, writer, reporter, job);
          } else {
            combineCollector.setWriter(writer);
            combinerRunner.combine(kvIter, combineCollector);
          }

          //close
          writer.close();

          // record offsets
		//記錄目前分區在spill.out檔案中的索引資訊
          rec.startOffset = segmentStart;
          rec.rawLength = writer.getRawLength();
          rec.partLength = writer.getCompressedLength();
          spillRec.putIndex(rec, parts);
        }
		//将索引資訊寫入到spill.index.out        		    
	    spillRec.writeToFile(finalIndexFile, job);
        finalOut.close();
		//将每次觸發spill而産生的spill檔案删除
        for(int i = 0; i < numSpills; i++) {
          rfs.delete(filename[i],true);
        }
      }
    }
           

mergeParts的處理流程主要分為幾個步驟:

1.擷取到磁盤上屬于這個Task的所有spill檔案名

2.整個MapTask運作過程中隻是觸發過一次spill動作,那麼隻需要做一下rename那mergeParts就算完成了。rename的過程是将spill0.out重命名為spill.out,将索引檔案spill0.out.index重命名為spill.out.index。如果spill0.out的索引檔案還在緩存中,則隻要将緩存的索引寫入到spill.out.index。

3.前面觸發産生的spill檔案的索引會緩存在cache中也就是在indexCacheList,因為cache大小有限,是以後面spill産生的索引資訊會落到索引檔案中,這裡需要加載因記憶體中緩存不下而刷出到磁盤上的索引檔案。

4.擷取最終的spill檔案的路徑名:spill.out和索引檔案的路徑名:spill.out.index,并建立spill.out檔案的輸出流。

5.如果傳遞給這個MapTask的一個空的InputSplit,那麼就沒有後續的merge動作,隻要在spill.out檔案中隻是輸出兩個end标記和一個4個位元組的crc,在spill.out.index中記錄下索引資訊。

6.首先,先介紹下兩個變量類型,IndexRecord和Segment。

IndexRecord:是記錄一分區的索引資訊,一個spill檔案的索引資訊是由n個partition的索引IndexRecord組成一個Spill檔案的索引SpillRecord。

Segment:類似IndexRecord,但是還多一些資訊,表示這個分區的索引是對應的那個spill檔案。

1)在這部分進行中,标明了磁盤上必然有有多個spill檔案,需要将這些spill檔案屬于同一個partition的索引資訊封裝在segment清單中。

2)Merge.merge需要根據segment 清單将不同spill檔案中同一個parition的資料進行merge。

3)在merge完成後,如果沒有指定combiner那麼直接通過IFile.Writer将資料寫入到檔案中,如果有則調用使用者指定的Combiner,對同一個key的資料進行過濾,combiner的處理在前面已經分析過了,不再累贅。

4)在IndexRecord中記錄合并後屬于這個partition的索引資訊,将該索引資訊記錄到SpillRecord中。

5)重複1)到4)直至對所有partition處理完畢。

7.将spill.out的索引檔案寫入到spill.out.index中。

8.删除spill檔案:spill0.out,...spilln.out,這裡有一點奇怪的是沒有删除spill檔案對應的索引檔案。我看到在hadoop2.4.0中也沒有删除,這個還不清楚是否故意而為之。

總結

         至此,整個詳細的MapTask的分析就此為完成了,在分析過程中我們知道了MapTask是如何使用循環緩存區管理資料,知道了資料在緩存不下是如何做spill處理的,spill輸出的資料格式,combiner如何處理,如何将多一個檔案merge為一個等等。也希望通過閱讀這部分源碼能學習到部分設計思路,能在未來的設計中提供多一種思路。