天天看點

環形緩沖區-Hadoop Shuffle過程中的利器

這篇文章來自一個讀者在面試過程中的一個問題,Hadoop在shuffle過程中使用了一個資料結構-環形緩沖區。

環形隊列是在實際程式設計極為有用的資料結構,它是一個首尾相連的FIFO的資料結構,采用數組的線性空間,資料組織簡單。能很快知道隊列是否滿為空。能以很快速度的來存取資料。 因為有簡單高效的原因,甚至在硬體都實作了環形隊列。

環形隊列廣泛用于網絡資料收發,和不同程式間資料交換(比如核心與應用程式大量交換資料,從硬體接收大量資料)均使用了環形隊列。

環形緩沖區資料結構

Map過程中環形緩沖區是指資料被map處理之後會先放入記憶體,記憶體中的這片區域就是環形緩沖區。

環形緩沖區是在MapTask.MapOutputBuffer中定義的,相關的屬性如下:

// k/v accounting
// 存放meta資料的IntBuffer,都是int entry,占4byte
private IntBuffer kvmeta; // metadata overlay on backing store
int kvstart;            // marks origin of spill metadata
int kvend;              // marks end of spill metadata
int kvindex;            // marks end of fully serialized records
// 分割meta和key value内容的辨別
// meta資料和key value内容都存放在同一個環形緩沖區,是以需要分隔開
int equator;            // marks origin of meta/serialization
int bufstart;           // marks beginning of spill
int bufend;             // marks beginning of collectable
int bufmark;            // marks end of record
int bufindex;           // marks end of collected
int bufvoid;            // marks the point where we should stop
                        // reading at the end of the buffer
// 存放key value的byte數組,機關是byte,注意與kvmeta區分
byte[] kvbuffer;        // main output buffer
private final byte[] b0 = new byte[0];

// key value在kvbuffer中的位址存放在偏移kvindex的距離
private static final int VALSTART = 0;         // val offset in acct
private static final int KEYSTART = 1;         // key offset in acct
// partition資訊存在kvmeta中偏移kvindex的距離
private static final int PARTITION = 2;        // partition offset in acct
private static final int VALLEN = 3;           // length of value
// 一對key value的meta資料在kvmeta中占用的個數
private static final int NMETA = 4;            // num meta ints
// 一對key value的meta資料在kvmeta中占用的byte數
private static final int METASIZE = NMETA * 4; // size in bytes           

環形緩沖區其實是一個數組,數組中存放着key、value的序列化資料和key、value的中繼資料資訊,key/value的中繼資料存儲的格式是int類型,每個key/value對應一個中繼資料,中繼資料由4個int組成,第一個int存放value的起始位置,第二個存放key的起始位置,第三個存放partition,最後一個存放value的長度。

key/value序列化的資料和中繼資料在環形緩沖區中的存儲是由equator分隔的,key/value按照索引遞增的方向存儲,meta則按照索引遞減的方向存儲,将其數組抽象為一個環形結構之後,以equator為界,key/value順時針存儲,meta逆時針存儲。

初始化

環形緩沖區的結構在MapOutputBuffer.init中建立。

public void init(MapOutputCollector.Context context
                ) throws IOException, ClassNotFoundException {
...
  //MAP_SORT_SPILL_PERCENT = mapreduce.map.sort.spill.percent
  // map 端buffer所占的百分比
  //sanity checks
  final float spillper =
    job.getFloat(JobContext.MAP_SORT_SPILL_PERCENT, (float)0.8);
  //IO_SORT_MB = "mapreduce.task.io.sort.mb"
  // map 端buffer大小
  // mapreduce.task.io.sort.mb * mapreduce.map.sort.spill.percent 最好是16的整數倍
  final int sortmb = job.getInt(JobContext.IO_SORT_MB, 100);
  // 所有的spill index 在記憶體所占的大小的門檻值
  indexCacheMemoryLimit = job.getInt(JobContext.INDEX_CACHE_MEMORY_LIMIT,
                                     INDEX_CACHE_MEMORY_LIMIT_DEFAULT);
  ...
  // 排序的實作類,可以自己實作。這裡用的是改寫的快排
  sorter = ReflectionUtils.newInstance(job.getClass("map.sort.class",
        QuickSort.class, IndexedSorter.class), job);
  // buffers and accounting
  // 上面IO_SORT_MB的機關是MB,左移20位将機關轉化為byte
  int maxMemUsage = sortmb << 20;
  // METASIZE是中繼資料的長度,中繼資料有4個int單元,分别為
  // VALSTART、KEYSTART、PARTITION、VALLEN,而int為4個byte,
  // 是以METASIZE長度為16。下面是計算buffer中最多有多少byte來存中繼資料
  maxMemUsage -= maxMemUsage % METASIZE;
  // 中繼資料數組  以byte為機關
  kvbuffer = new byte[maxMemUsage];
  bufvoid = kvbuffer.length;
  // 将kvbuffer轉化為int型的kvmeta  以int為機關,也就是4byte
  kvmeta = ByteBuffer.wrap(kvbuffer)
     .order(ByteOrder.nativeOrder())
     .asIntBuffer();
  // 設定buf和kvmeta的分界線
  setEquator(0);
  bufstart = bufend = bufindex = equator;
  kvstart = kvend = kvindex;
  // kvmeta中存放中繼資料實體的最大個數
  maxRec = kvmeta.capacity() / NMETA;
  // buffer spill時的門檻值(不單單是sortmb*spillper)
  // 更加精确的是kvbuffer.length*spiller
  softLimit = (int)(kvbuffer.length * spillper);
  // 此變量較為重要,作為spill的動态衡量标準
  bufferRemaining = softLimit;
  ...
  // k/v serialization
  comparator = job.getOutputKeyComparator();
  keyClass = (Class<K>)job.getMapOutputKeyClass();
  valClass = (Class<V>)job.getMapOutputValueClass();
  serializationFactory = new SerializationFactory(job);
  keySerializer = serializationFactory.getSerializer(keyClass);
  // 将bb作為key序列化寫入的output
  keySerializer.open(bb);
  valSerializer = serializationFactory.getSerializer(valClass);
  // 将bb作為value序列化寫入的output
  valSerializer.open(bb);
  ...
  // combiner
  ...
  spillInProgress = false;
  // 最後一次merge時,在有combiner的情況下,超過此門檻值才執行combiner
  minSpillsForCombine = job.getInt(JobContext.MAP_COMBINE_MIN_SPILLS, 3);
  spillThread.setDaemon(true);
  spillThread.setName("SpillThread");
  spillLock.lock();
  try {
    spillThread.start();
    while (!spillThreadRunning) {
      spillDone.await();
    }
  } catch (InterruptedException e) {
    throw new IOException("Spill thread failed to initialize", e);
  } finally {
    spillLock.unlock();
  }
  if (sortSpillException != null) {
    throw new IOException("Spill thread failed to initialize",
        sortSpillException);
  }
}           

init是對環形緩沖區進行初始化構造,由mapreduce.task.io.sort.mb決定map中環形緩沖區的大小sortmb,預設是100M。

此緩沖區也用于存放meta,一個meta占用METASIZE(16byte),則其中用于存放資料的大小是maxMemUsage -= sortmb << 20 % METASIZE(由此可知最好設定sortmb轉換為byte之後是16的整數倍),然後用maxMemUsage初始化kvbuffer位元組數組和kvmeta整形數組,最後設定數組的一些辨別資訊。利用setEquator(0)設定kvbuffer和kvmeta的分界線,初始化的時候以0為分界線,kvindex為aligned - METASIZE + kvbuffer.length,其位置在環形數組中相當于按照逆時針方向減去METASIZE,由kvindex設定kvstart = kvend = kvindex,由equator設定bufstart = bufend = bufindex = equator,還得設定bufvoid = kvbuffer.length,bufvoid用于辨別用于存放資料的最大位置。

為了提高效率,當buffer占用達到門檻值之後,會進行spill,這個門檻值是由bufferRemaining進行檢查的,bufferRemaining由softLimit = (int)(kvbuffer.length spillper); bufferRemaining = softLimit;進行初始化指派,這裡需要注意的是softLimit并不是sortmbspillper,而是kvbuffer.length spillper,當sortmb << 20是16的整數倍時,才可以認為softLimit是sortmbspillper。

下面是setEquator的代碼

// setEquator(0)的代碼如下
private void setEquator(int pos) {
  equator = pos;
  // set index prior to first entry, aligned at meta boundary
  // 第一個 entry的末尾位置,即中繼資料和kv資料的分界線   機關是byte
  final int aligned = pos - (pos % METASIZE);
  // Cast one of the operands to long to avoid integer overflow
  // 中繼資料中存放資料的起始位置
  kvindex = (int)
    (((long)aligned - METASIZE + kvbuffer.length) % kvbuffer.length) / 4;
  LOG.info("(EQUATOR) " + pos + " kvi " + kvindex +
      "(" + (kvindex * 4) + ")");
}           

buffer初始化之後的抽象資料結構如下圖所示:

環形緩沖區資料結構圖

環形緩沖區-Hadoop Shuffle過程中的利器

寫入buffer

Map通過NewOutputCollector.write方法調用collector.collect向buffer中寫入資料,資料寫入之前已在NewOutputCollector.write中對要寫入的資料進行逐條分區,下面看下collect

// MapOutputBuffer.collect
public synchronized void collect(K key, V value, final int partition
                                 ) throws IOException {
  ...
  // 新資料collect時,先将剩餘的空間減去中繼資料的長度,之後進行判斷
  bufferRemaining -= METASIZE;
  if (bufferRemaining <= 0) {
    // start spill if the thread is not running and the soft limit has been
    // reached
    spillLock.lock();
    try {
      do {
        // 首次spill時,spillInProgress是false
        if (!spillInProgress) {
          // 得到kvindex的byte位置
          final int kvbidx = 4 * kvindex;
          // 得到kvend的byte位置
          final int kvbend = 4 * kvend;
          // serialized, unspilled bytes always lie between kvindex and
          // bufindex, crossing the equator. Note that any void space
          // created by a reset must be included in "used" bytes
          final int bUsed = distanceTo(kvbidx, bufindex);
          final boolean bufsoftlimit = bUsed >= softLimit;
          if ((kvbend + METASIZE) % kvbuffer.length !=
              equator - (equator % METASIZE)) {
            // spill finished, reclaim space
            resetSpill();
            bufferRemaining = Math.min(
                distanceTo(bufindex, kvbidx) - 2 * METASIZE,
                softLimit - bUsed) - METASIZE;
            continue;
          } else if (bufsoftlimit && kvindex != kvend) {
            // spill records, if any collected; check latter, as it may
            // be possible for metadata alignment to hit spill pcnt
            startSpill();
            final int avgRec = (int)
              (mapOutputByteCounter.getCounter() /
              mapOutputRecordCounter.getCounter());
            // leave at least half the split buffer for serialization data
            // ensure that kvindex >= bufindex
            final int distkvi = distanceTo(bufindex, kvbidx);
            final int newPos = (bufindex +
              Math.max(2 * METASIZE - 1,
                      Math.min(distkvi / 2,
                               distkvi / (METASIZE + avgRec) * METASIZE)))
              % kvbuffer.length;
            setEquator(newPos);
            bufmark = bufindex = newPos;
            final int serBound = 4 * kvend;
            // bytes remaining before the lock must be held and limits
            // checked is the minimum of three arcs: the metadata space, the
            // serialization space, and the soft limit
            bufferRemaining = Math.min(
                // metadata max
                distanceTo(bufend, newPos),
                Math.min(
                  // serialization max
                  distanceTo(newPos, serBound),
                  // soft limit
                  softLimit)) - 2 * METASIZE;
          }
        }
      } while (false);
    } finally {
      spillLock.unlock();
    }
  }
  // 将key value 及中繼資料資訊寫入緩沖區
  try {
    // serialize key bytes into buffer
    int keystart = bufindex;
    // 将key序列化寫入kvbuffer中,并移動bufindex
    keySerializer.serialize(key);
    // key所占空間被bufvoid分隔,則移動key,
    // 将其值放在連續的空間中便于sort時key的對比
    if (bufindex < keystart) {
      // wrapped the key; must make contiguous
      bb.shiftBufferedKey();
      keystart = 0;
    }
    // serialize value bytes into buffer
    final int valstart = bufindex;
    valSerializer.serialize(value);
    // It's possible for records to have zero length, i.e. the serializer
    // will perform no writes. To ensure that the boundary conditions are
    // checked and that the kvindex invariant is maintained, perform a
    // zero-length write into the buffer. The logic monitoring this could be
    // moved into collect, but this is cleaner and inexpensive. For now, it
    // is acceptable.
    bb.write(b0, 0, 0);

    // the record must be marked after the preceding write, as the metadata
    // for this record are not yet written
    int valend = bb.markRecord();

    mapOutputRecordCounter.increment(1);
    mapOutputByteCounter.increment(
        distanceTo(keystart, valend, bufvoid));

    // write accounting info
    kvmeta.put(kvindex + PARTITION, partition);
    kvmeta.put(kvindex + KEYSTART, keystart);
    kvmeta.put(kvindex + VALSTART, valstart);
    kvmeta.put(kvindex + VALLEN, distanceTo(valstart, valend));
    // advance kvindex
    kvindex = (kvindex - NMETA + kvmeta.capacity()) % kvmeta.capacity();
  } catch (MapBufferTooSmallException e) {
    LOG.info("Record too large for in-memory buffer: " + e.getMessage());
    spillSingleRecord(key, value, partition);
    mapOutputRecordCounter.increment(1);
    return;
  }
}           

每次寫入資料時,執行bufferRemaining -= METASIZE之後,檢查bufferRemaining,

如果大于0,直接将key/value序列化對和對應的meta寫入buffer中,key/value是序列化之後寫入的,key/value經過一些列的方法調用Serializer.serialize(key/value) -> WritableSerializer.serialize(key/value) -> BytesWritable.write(dataOut) -> DataOutputStream.write(bytes, 0, size) -> MapOutputBuffer.Buffer.write(b, off, len),最後由MapOutputBuffer.Buffer.write(b, off, len)将資料寫入kvbuffer中,write方法如下:

public void write(byte b[], int off, int len)
    throws IOException {
  // must always verify the invariant that at least METASIZE bytes are
  // available beyond kvindex, even when len == 0
  bufferRemaining -= len;
  if (bufferRemaining <= 0) {
    // writing these bytes could exhaust available buffer space or fill
    // the buffer to soft limit. check if spill or blocking are necessary
    boolean blockwrite = false;
    spillLock.lock();
    try {
      do {
        checkSpillException();

        final int kvbidx = 4 * kvindex;
        final int kvbend = 4 * kvend;
        // ser distance to key index
        final int distkvi = distanceTo(bufindex, kvbidx);
        // ser distance to spill end index
        final int distkve = distanceTo(bufindex, kvbend);

        // if kvindex is closer than kvend, then a spill is neither in
        // progress nor complete and reset since the lock was held. The
        // write should block only if there is insufficient space to
        // complete the current write, write the metadata for this record,
        // and write the metadata for the next record. If kvend is closer,
        // then the write should block if there is too little space for
        // either the metadata or the current write. Note that collect
        // ensures its metadata requirement with a zero-length write
        blockwrite = distkvi <= distkve
          ? distkvi <= len + 2 * METASIZE
          : distkve <= len || distanceTo(bufend, kvbidx) < 2 * METASIZE;

        if (!spillInProgress) {
          if (blockwrite) {
            if ((kvbend + METASIZE) % kvbuffer.length !=
                equator - (equator % METASIZE)) {
              // spill finished, reclaim space
              // need to use meta exclusively; zero-len rec & 100% spill
              // pcnt would fail
              resetSpill(); // resetSpill doesn't move bufindex, kvindex
              bufferRemaining = Math.min(
                  distkvi - 2 * METASIZE,
                  softLimit - distanceTo(kvbidx, bufindex)) - len;
              continue;
            }
            // we have records we can spill; only spill if blocked
            if (kvindex != kvend) {
              startSpill();
              // Blocked on this write, waiting for the spill just
              // initiated to finish. Instead of repositioning the marker
              // and copying the partial record, we set the record start
              // to be the new equator
              setEquator(bufmark);
            } else {
              // We have no buffered records, and this record is too large
              // to write into kvbuffer. We must spill it directly from
              // collect
              final int size = distanceTo(bufstart, bufindex) + len;
              setEquator(0);
              bufstart = bufend = bufindex = equator;
              kvstart = kvend = kvindex;
              bufvoid = kvbuffer.length;
              throw new MapBufferTooSmallException(size + " bytes");
            }
          }
        }

        if (blockwrite) {
          // wait for spill
          try {
            while (spillInProgress) {
              reporter.progress();
              spillDone.await();
            }
          } catch (InterruptedException e) {
              throw new IOException(
                  "Buffer interrupted while waiting for the writer", e);
          }
        }
      } while (blockwrite);
    } finally {
      spillLock.unlock();
    }
  }
  // here, we know that we have sufficient space to write
  if (bufindex + len > bufvoid) {
    final int gaplen = bufvoid - bufindex;
    System.arraycopy(b, off, kvbuffer, bufindex, gaplen);
    len -= gaplen;
    off += gaplen;
    bufindex = 0;
  }
  System.arraycopy(b, off, kvbuffer, bufindex, len);
  bufindex += len;
}           

write方法将key/value寫入kvbuffer中,如果bufindex+len超過了bufvoid,則将寫入的内容分開存儲,将一部分寫入bufindex和bufvoid之間,然後重置bufindex,将剩餘的部分寫入,這裡不區分key和value,寫入key之後會在collect中判斷bufindex < keystart,當bufindex小時,則key被分開存儲,執行bb.shiftBufferedKey(),value則直接寫入,不用判斷是否被分開存儲,key不能分開存儲是因為要對key進行排序。

這裡需要注意的是要寫入的資料太長,并且kvinde==kvend,則抛出MapBufferTooSmallException異常,在collect中捕獲,将此資料直接spill到磁盤spillSingleRecord,也就是當單條記錄過長時,不寫buffer,直接寫入磁盤。

下面看下bb.shiftBufferedKey()代碼

// BlockingBuffer.shiftBufferedKey
protected void shiftBufferedKey() throws IOException {
  // spillLock unnecessary; both kvend and kvindex are current
  int headbytelen = bufvoid - bufmark;
  bufvoid = bufmark;
  final int kvbidx = 4 * kvindex;
  final int kvbend = 4 * kvend;
  final int avail =
    Math.min(distanceTo(0, kvbidx), distanceTo(0, kvbend));
  if (bufindex + headbytelen < avail) {
    System.arraycopy(kvbuffer, 0, kvbuffer, headbytelen, bufindex);
    System.arraycopy(kvbuffer, bufvoid, kvbuffer, 0, headbytelen);
    bufindex += headbytelen;
    bufferRemaining -= kvbuffer.length - bufvoid;
  } else {
    byte[] keytmp = new byte[bufindex];
    System.arraycopy(kvbuffer, 0, keytmp, 0, bufindex);
    bufindex = 0;
    out.write(kvbuffer, bufmark, headbytelen);
    out.write(keytmp);
  }
}           

shiftBufferedKey時,判斷首部是否有足夠的空間存放key,有沒有足夠的空間,則先将首部的部分key寫入keytmp中,然後分兩次寫入,再次調用Buffer.write,如果有足夠的空間,分兩次copy,先将首部的部分key複制到headbytelen的位置,然後将末尾的部分key複制到首部,移動bufindex,重置bufferRemaining的值。

key/value寫入之後,繼續寫入中繼資料資訊并重置kvindex的值。

spill

一次寫入buffer結束,當寫入資料比較多,bufferRemaining小于等于0時,準備進行spill,首次spill,spillInProgress為false,此時檢視bUsed = distanceTo(kvbidx, bufindex),此時bUsed >= softLimit 并且 (kvbend + METASIZE) % kvbuffer.length == equator - (equator % METASIZE),則進行spill,調用startSpill

private void startSpill() {
  // 中繼資料的邊界指派
  kvend = (kvindex + NMETA) % kvmeta.capacity();
  // key/value的邊界指派
  bufend = bufmark;
  // 設定spill運作辨別
  spillInProgress = true;
  ...
  // 利用重入鎖,對spill線程進行喚醒
  spillReady.signal();
}           

startSpill喚醒spill線程之後,程序spill操作,但此時map向buffer的寫入操作并沒有阻塞,需要重新邊界equator和bufferRemaining的值,先來看下equator和bufferRemaining值的設定:

// 根據已經寫入的kv得出每個record的平均長度
final int avgRec = (int) (mapOutputByteCounter.getCounter() /
  mapOutputRecordCounter.getCounter());
// leave at least half the split buffer for serialization data
// ensure that kvindex >= bufindex
// 得到空餘空間的大小
final int distkvi = distanceTo(bufindex, kvbidx);
// 得出新equator的位置
final int newPos = (bufindex +
  Math.max(2 * METASIZE - 1,
          Math.min(distkvi / 2,
                   distkvi / (METASIZE + avgRec) * METASIZE)))
  % kvbuffer.length;
setEquator(newPos);
bufmark = bufindex = newPos;
final int serBound = 4 * kvend;
// bytes remaining before the lock must be held and limits
// checked is the minimum of three arcs: the metadata space, the
// serialization space, and the soft limit
bufferRemaining = Math.min(
    // metadata max
    distanceTo(bufend, newPos),
    Math.min(
      // serialization max
      distanceTo(newPos, serBound),
      // soft limit
      softLimit)) - 2 * METASIZE;           

因為equator是kvbuffer和kvmeta的分界線,為了更多的空間存儲kv,則最多拿出distkvi的一半來存儲meta,并且利用avgRec估算distkvi能存放多少個record和meta對,根據record和meta對的個數估算meta所占空間的大小,從distkvi/2和meta所占空間的大小中取最小值,又因為distkvi中最少得存放一個meta,所占空間為METASIZE,在選取kvindex時需要求aligned,aligned最多為METASIZE-1,總和上述因素,最終選取equator為(bufindex + Math.max(2 METASIZE - 1, Math.min(distkvi / 2, distkvi / (METASIZE + avgRec) METASIZE)))。equator選取之後,設定bufmark = bufindex = newPos和kvindex,但此時并不設定bufstart、bufend和kvstart、kvend,因為這幾個值要用來表示spill資料的邊界。

spill之後,可用的空間減少了,則控制spill的bufferRemaining也應該重新設定,bufferRemaining取三個值的最小值減去2METASIZE,三個值分别是meta可用占用的空間distanceTo(bufend, newPos),kv可用空間distanceTo(newPos, serBound)和softLimit。這裡為什麼要減去2METASIZE,一個是spill之前kvend到kvindex的距離,另一個是當時的kvindex空間????此時,已有一個record要寫入buffer,需要從bufferRemaining中減去目前record的中繼資料占用的空間,即減去METASIZE,另一個METASIZE是在計算equator時,沒有包括kvindex到kvend(spill之前)的這段METASIZE,是以要減去這個METASIZE。

接下來解析下SpillThread線程,檢視其run方法:

public void run() {
  spillLock.lock();
  spillThreadRunning = true;
  try {
    while (true) {
      spillDone.signal();
      // 判斷是否在spill,false則挂起SpillThread線程,等待喚醒
      while (!spillInProgress) {
        spillReady.await();
      }
      try {
        spillLock.unlock();
        // 喚醒之後,進行排序和溢寫到磁盤
        sortAndSpill();
      } catch (Throwable t) {
        sortSpillException = t;
      } finally {
        spillLock.lock();
        if (bufend < bufstart) {
          bufvoid = kvbuffer.length;
        }
        kvstart = kvend;
        bufstart = bufend;
        spillInProgress = false;
      }
    }
  } catch (InterruptedException e) {
    Thread.currentThread().interrupt();
  } finally {
    spillLock.unlock();
    spillThreadRunning = false;
  }
}

run中主要是sortAndSpill,

private void sortAndSpill() throws IOException, ClassNotFoundException,
                                   InterruptedException {
  //approximate the length of the output file to be the length of the
  //buffer + header lengths for the partitions
  final long size = distanceTo(bufstart, bufend, bufvoid) +
              partitions * APPROX_HEADER_LENGTH;
  FSDataOutputStream out = null;
  try {
    // create spill file
    // 用來存儲index檔案
    final SpillRecord spillRec = new SpillRecord(partitions);
    // 建立寫入磁盤的spill檔案
    final Path filename =
        mapOutputFile.getSpillFileForWrite(numSpills, size);
    // 打開檔案流
    out = rfs.create(filename);
    // kvend/4 是截止到目前位置能存放多少個中繼資料實體
    final int mstart = kvend / NMETA;
    // kvstart 處能存放多少個中繼資料實體
    // 中繼資料則在mstart和mend之間,(mstart - mend)則是中繼資料的個數
    final int mend = 1 + // kvend is a valid record
      (kvstart >= kvend
      ? kvstart
      : kvmeta.capacity() + kvstart) / NMETA;
    // 排序  隻對中繼資料進行排序,隻調整中繼資料在kvmeta中的順序
    // 排序規則是MapOutputBuffer.compare,
    // 先對partition進行排序其次對key值排序
    sorter.sort(MapOutputBuffer.this, mstart, mend, reporter);
    int spindex = mstart;
    // 建立rec,用于存放該分區在資料檔案中的資訊
    final IndexRecord rec = new IndexRecord();
    final InMemValBytes value = new InMemValBytes();
    for (int i = 0; i < partitions; ++i) {
      // 臨時檔案是IFile格式的
      IFile.Writer<K, V> writer = null;
      try {
        long segmentStart = out.getPos();
        FSDataOutputStream partitionOut = CryptoUtils.wrapIfNecessary(job, out);
        writer = new Writer<K, V>(job, partitionOut, keyClass, valClass, codec,
                                  spilledRecordsCounter);
        // 往磁盤寫資料時先判斷是否有combiner
        if (combinerRunner == null) {
          // spill directly
          DataInputBuffer key = new DataInputBuffer();
          // 寫入相同partition的資料
          while (spindex < mend &&
              kvmeta.get(offsetFor(spindex % maxRec) + PARTITION) == i) {
            final int kvoff = offsetFor(spindex % maxRec);
            int keystart = kvmeta.get(kvoff + KEYSTART);
            int valstart = kvmeta.get(kvoff + VALSTART);
            key.reset(kvbuffer, keystart, valstart - keystart);
            getVBytesForOffset(kvoff, value);
            writer.append(key, value);
            ++spindex;
          }
        } else {
          int spstart = spindex;
          while (spindex < mend &&
              kvmeta.get(offsetFor(spindex % maxRec)
                        + PARTITION) == i) {
            ++spindex;
          }
          // Note: we would like to avoid the combiner if we've fewer
          // than some threshold of records for a partition
          if (spstart != spindex) {
            combineCollector.setWriter(writer);
            RawKeyValueIterator kvIter =
              new MRResultIterator(spstart, spindex);
            combinerRunner.combine(kvIter, combineCollector);
          }
        }

        // close the writer
        writer.close();

        // record offsets
        // 記錄目前partition i的資訊寫入索檔案rec中
        rec.startOffset = segmentStart;
        rec.rawLength = writer.getRawLength() + CryptoUtils.cryptoPadding(job);
        rec.partLength = writer.getCompressedLength() + CryptoUtils.cryptoPadding(job);
        // spillRec中存放了spill中partition的資訊,便于後續堆排序時,取出partition相關的資料進行排序
        spillRec.putIndex(rec, i);

        writer = null;
      } finally {
        if (null != writer) writer.close();
      }
    }
    // 判斷記憶體中的index檔案是否超出門檻值,超出則将index檔案寫入磁盤
    // 當超出門檻值時隻是把目前index和之後的index寫入磁盤
    if (totalIndexCacheMemory >= indexCacheMemoryLimit) {
      // create spill index file
      // 建立index檔案
      Path indexFilename =
          mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions
              * MAP_OUTPUT_INDEX_RECORD_LENGTH);
      spillRec.writeToFile(indexFilename, job);
    } else {
      indexCacheList.add(spillRec);
      totalIndexCacheMemory +=
        spillRec.size() * MAP_OUTPUT_INDEX_RECORD_LENGTH;
    }
    LOG.info("Finished spill " + numSpills);
    ++numSpills;
  } finally {
    if (out != null) out.close();
  }
}           

sortAndSpill中,有mstart和mend得到一共有多少條record需要spill到磁盤,調用sorter.sort對meta進行排序,先對partition進行排序,然後按key排序,排序的結果隻調整meta的順序。

排序之後,判斷是否有combiner,沒有則直接将record寫入磁盤,寫入時是一個partition一個IndexRecord,如果有combiner,則将該partition的record寫入kvIter,然後調用combinerRunner.combine執行combiner。

寫入磁盤之後,将spillx.out對應的spillRec放入記憶體indexCacheList.add(spillRec),如果所占記憶體totalIndexCacheMemory超過了indexCacheMemoryLimit,則建立index檔案,将此次及以後的spillRec寫入index檔案存入磁盤。

最後spill次數遞增。sortAndSpill結束之後,回到run方法中,執行finally中的代碼,對kvstart和bufstart指派,kvstart = kvend,bufstart = bufend,設定spillInProgress的狀态為false。

在spill的同時,map往buffer的寫操作并沒有停止,依然在調用collect,再次回到collect方法中,

// MapOutputBuffer.collect
public synchronized void collect(K key, V value, final int partition
                                 ) throws IOException {
  ...
  // 新資料collect時,先将剩餘的空間減去中繼資料的長度,之後進行判斷
  bufferRemaining -= METASIZE;
  if (bufferRemaining <= 0) {
    // start spill if the thread is not running and the soft limit has been
    // reached
    spillLock.lock();
    try {
      do {
        // 首次spill時,spillInProgress是false
        if (!spillInProgress) {
          // 得到kvindex的byte位置
          final int kvbidx = 4 * kvindex;
          // 得到kvend的byte位置
          final int kvbend = 4 * kvend;
          // serialized, unspilled bytes always lie between kvindex and
          // bufindex, crossing the equator. Note that any void space
          // created by a reset must be included in "used" bytes
          final int bUsed = distanceTo(kvbidx, bufindex);
          final boolean bufsoftlimit = bUsed >= softLimit;
          if ((kvbend + METASIZE) % kvbuffer.length !=
              equator - (equator % METASIZE)) {
            // spill finished, reclaim space
            resetSpill();
            bufferRemaining = Math.min(
                distanceTo(bufindex, kvbidx) - 2 * METASIZE,
                softLimit - bUsed) - METASIZE;
            continue;
          } else if (bufsoftlimit && kvindex != kvend) {
            ...
          }
        }
      } while (false);
    } finally {
      spillLock.unlock();
    }
  }
  ...
}           

有新的record需要寫入buffer時,判斷bufferRemaining -= METASIZE,此時的bufferRemaining是在開始spill時被重置過的(此時的bufferRemaining應該比初始的softLimit要小),當bufferRemaining小于等最後一個METASIZE是目前record進入collect之後bufferRemaining減去的那個METASIZE。

于0時,進入if,此時spillInProgress的狀态為false,進入if (!spillInProgress),startSpill時對kvend和bufend進行了重置,則此時(kvbend + METASIZE) % kvbuffer.length != equator - (equator % METASIZE),調用resetSpill(),将kvstart、kvend和bufstart、bufend設定為上次startSpill時的位置。此時buffer已将一部分内容寫入磁盤,有大量空餘的空間,則對bufferRemaining進行重置,此次不spill。

bufferRemaining取值為Math.min(distanceTo(bufindex, kvbidx) - 2 * METASIZE, softLimit - bUsed) - METASIZE

private void resetSpill() {
  final int e = equator;
  bufstart = bufend = e;
  final int aligned = e - (e % METASIZE);
  // set start/end to point to first meta record
  // Cast one of the operands to long to avoid integer overflow
  kvstart = kvend = (int)
    (((long)aligned - METASIZE + kvbuffer.length) % kvbuffer.length) / 4;
  LOG.info("(RESET) equator " + e + " kv " + kvstart + "(" +
    (kvstart * 4) + ")" + " kvi " + kvindex + "(" + (kvindex * 4) + ")");
}           

當bufferRemaining再次小于等于0時,進行spill,這以後就都是套路了。環形緩沖區分析到此結束。

本文轉載自公衆号:大資料技術與架構

原文連結

阿裡巴巴開源大資料技術團隊成立Apache Spark中國技術社群,定期推送精彩案例,技術專家直播,問答區近萬人Spark技術同學線上提問答疑,隻為營造純粹的Spark氛圍,歡迎釘釘掃碼加入!

環形緩沖區-Hadoop Shuffle過程中的利器

對開源大資料和感興趣的同學可以加小編微信(下圖二維碼,備注“進群”)進入技術交流微信群。

環形緩沖區-Hadoop Shuffle過程中的利器