天天看点

Spark Shuffle机制解析Shuffle简介与其发展Shuffle机制的实现Shuffle Writer过程解析Shuffle Reader过程解析

文章目录

  • Shuffle简介与其发展
  • Shuffle机制的实现
    • ShuffleManager接口简介
    • ShuffleManager的创建
    • ShuffleManager实现类SortShuffleManager
      • registerShuffle方法
      • getWriter方法
      • getReader方法
  • Shuffle Writer过程解析
    • 数据聚合与排序
      • 聚合排序
      • 溢写磁盘
    • 数据合并与持久化
    • 生成索引文件
  • Shuffle Reader过程解析
    • 获取map任务输出的位置信息
    • 获取blocks数据
      • 划分本地与远程Block
      • 获取远程Block
      • 获取本地Block
    • reduce端聚合数据
    • reduce端排序数据

Shuffle简介与其发展

Shuffle,即"洗牌",所有采用map-reduce思想的大数据计算框架的必经及最重要的阶段,用于打通map任务的输出与reduce任务的输入。Spark的早期版本hash shuffle中ShuffleMapTask任务会为每一个reduce任务创建一个bucket。假设有M个map任务,R个reduce任务,则map阶段一共会创建M*R个bucket。这种方式不仅文件数量很多,造成频繁的磁盘和网络I/O,而且内存开销也很大,GC频繁,经常出现OOM。Spark从2.0版本之后,hash shuffle机制被删除,只保留sort shuffle机制,其做了多种性能优化:

  1. 将map任务给每个partition的reduce任务输出的bucket合并到同一个文件中。
  2. map任务逐条输出计算结果,而不是一次性输出到内存,并使用AppendOnlyMap缓存及其聚合算法对中间结果进行聚合。同时进行溢出判断,当超出myMemoryThreshold的大小时,将数据溢写到磁盘,防止内存溢出。
  3. reduce任务对拉取到的map任务中间结果逐条读取,而不是一次性读入内存,并在内存中进行聚合和排序,这也大大减小了数据占用的内存。
  4. reduce任务将要拉取的block按照BlockManager地址划分,然后将同一BlockManager地址中的Block聚合为少量网络请求,减少网络I/O。

Shuffle机制的实现

Spark Shuffle机制的主要接口是ShuffleManager,而Spark从2.0版本之后,其默认实现为SortShuffleManager。

ShuffleManager接口简介

ShuffleManager是一个接口,定义了Spark的Shuffle机制的主要接口方法。

/**
 * Shuffle系统的可插拔接口。ShuffleManager的创建基于spark.shuffle.manager设置,
 * 且在SparkEnv中的driver和每个executor上创建。driver在该shuffle manager上注册shuffle,
 * executor(或者driver中本地运行的任务)能够请求读取或写入数据。
 * 
 * 注意:shuffle manager会在SparkEnv中实例化,因此其构造函数可将SparkConf和boolean isDriver作为参数
 */
private[spark] trait ShuffleManager {
  /**
   * 使用ShuffleManager注册一个shuffle,获取一个句柄,回传给任务
   * 用于注册一种shuffle机制,并返回对应的ShuffleHandle,handle内会存储shuffle依赖信息。
   * 根据该handle类型可以进一步确定采用ShuffleWriter类型。
   */
  def registerShuffle[K, V, C](
      shuffleId: Int,
      numMaps: Int,
      dependency: ShuffleDependency[K, V, C]): ShuffleHandle

  /** 为一个指定的分区提供ShuffleWriter,为executor中执行map任务时调用 */
  def getWriter[K, V](handle: ShuffleHandle, mapId: Int, context: TaskContext): ShuffleWriter[K, V]

  /**
   * 为一个范围(开始分区到结束分区-1, 闭区间)内的reduce分区提供ShuffleReader。
   * 在executor中执行reduce任务时调用
   */
  def getReader[K, C](
      handle: ShuffleHandle,
      startPartition: Int,
      endPartition: Int,
      context: TaskContext): ShuffleReader[K, C]

  /**
   * 从ShuffleManager移除shuffle的元数据
   * @return 移除成功返回true, 否则为false.*/
  def unregisterShuffle(shuffleId: Int): Boolean

  /**
   * 返回一个能够基于block坐标位置获取shuffle块数据的解析器
   */
  def shuffleBlockResolver: ShuffleBlockResolver

  /** Shut down this ShuffleManager. */
  def stop(): Unit
}
           

ShuffleManager的简易类图如下:

Spark Shuffle机制解析Shuffle简介与其发展Shuffle机制的实现Shuffle Writer过程解析Shuffle Reader过程解析

ShuffleManager的创建

ShuffleManager的初始化是在SparkEnv初始化时执行。在创建driver端环境(SparkContext)和executor端环境(ExecutorBackend)时,会进行创建。主要调用org.apache.spark.SparkEnv的create方法,来构建ShuffleManager。可以看到,其默认实现类是SortShuffleManager,当然也可以通过

spark.shuffle.manager

参数指定其他实现类。

// SparkEnv.scala

// Let the user specify short names for shuffle managers
// 注意sort, tungsten-sort对应的value均是SortShuffleManager
val shortShuffleMgrNames = Map(
  "sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName,
  "tungsten-sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName)
// 可以通过spark.shuffle.manager参数来指定第三方或其他方式实现的shuffle机制
val shuffleMgrName = conf.get("spark.shuffle.manager", "sort")
val shuffleMgrClass =
  shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase(Locale.ROOT), shuffleMgrName)
val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)
           

ShuffleManager实现类SortShuffleManager

SortShuffleManager主要包含对registerShuffle、getWriter和getReader三个方法的覆盖实现。

registerShuffle方法

  1. 根据不同的条件,返回不同的ShuffleHandle。首先检查是否满足SortShuffleWriter.shouldBypassMergeSort条件,如果同时满足以下两个条件,那么会返回ByPassMergeSortShuffleHandle,将启用bypass merge-sort机制:
    • 该shuffle依赖中没有map端聚合操作(如groupByKey)。
    • 依赖分区数不大于spark.shuffle.sort.bypassMergeThreshold规定的值,默认值为200。
  2. 然后检查是否符合SortShuffleManager.canUseSerializedShuffle方法,如果同时满足以下三个条件,就会返回SerializedShuffleHandle,启用序列化Sort shuffle机制(也就是tungsten-sort):
    • 使用的序列化器支持序列化对象的重定位(如KyroSerializer)。
    • shuffle依赖中map端无聚合操作。
    • 分区数不大于MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE的值,即2^24。
  3. 最后如果前面两个检查都不满足,那么就返回默认的BaseShuffleHandle,采用基本的sort handle。
    /**
       * 获取一个ShuffleHandle传入到tasks.
       */
     override def registerShuffle[K, V, C](
                                            shuffleId: Int,
                                            numMaps: Int,
                                            dependency: ShuffleDependency[K, V, C]): ShuffleHandle = {
       if (SortShuffleWriter.shouldBypassMergeSort(conf, dependency)) {
         // 如果分区数小于spark.shuffle.sort.bypassMergeThreshold(默认为200)并且不需要map端的聚合,
     	// 则可以直接写对应分区个数的文件,且在结束时仅仅合并它们。
     	// 可以避免2次序列化和反序列化操作以将溢写的文件合并在一起(正常的代码方法会发生这种情况)。
         // 其缺点是一次打开多个文件,因此需要分配缓冲区更多的内存。
         new BypassMergeSortShuffleHandle[K, V](
           shuffleId, numMaps, dependency.asInstanceOf[ShuffleDependency[K, V, V]])
       } else if (SortShuffleManager.canUseSerializedShuffle(dependency)) {
         // 否则,尝试以序列化形式缓存map输出,且这样做更高效(需要满足一定条件)
         new SerializedShuffleHandle[K, V](
           shuffleId, numMaps, dependency.asInstanceOf[ShuffleDependency[K, V, V]])
       } else {
         // 否则,以反序列化形式缓存map输出
         new BaseShuffleHandle(shuffleId, numMaps, dependency)
       }
     }
               

getWriter方法

其实现比较简单,基于不同的ShuffleHandle类型,匹配获取相应的ShuffleWriter。对于tungsten-sort会使用UnsafeShuffleWriter,对于bypass会使用BypassMergeSortShuffleWriter,普通的sort则使用SortShuffleWriter。其都继承于ShuffleWriter抽象类,且实现了write方法。

getReader方法

ShuffleReader与ShuffleWriter不同,只有一种实现类,即BlockStoreShuffleReader。它继承自ShuffleReader接口,并实现了read方法。

Shuffle Writer过程解析

Shuffle Writer在ShuffleMapTask的runTask里面被调用,将map任务计算的结果写出成磁盘上的一个文件供reduce任务拉取使用。

// ShuffleMapTask.scala

override def runTask(context: TaskContext): MapStatus = {
  ...

  var writer: ShuffleWriter[Any, Any] = null
  try {
    val manager = SparkEnv.get.shuffleManager
    writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)
    writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
    writer.stop(success = true).get
  } catch {
    case e: Exception =>
      try {
        if (writer != null) {
          writer.stop(success = false)
        }
      } catch {
        case e: Exception =>
          log.debug("Could not stop writer", e)
      }
      throw e
  }
}
           

可以看到shuffle writer主要通过调用write和stop两个方法实现,其中stop方法主要进行内存的释放与spill临时文件的删除工作,write方法的实现是其关键,下面以普通的sort机制SortShuffleWriter为例对其write方法的实现进行详细分析。

// SortShuffleWriter.scala
/** Write a bunch of records to this task's output */
override def write(records: Iterator[Product2[K, V]]): Unit = {
  // 1. 数据聚合与排序
  sorter = if (dep.mapSideCombine) {
    require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!")
    new ExternalSorter[K, V, C](
      context, dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer)
  } else {
	// 在这种情况下,我们既不传递aggregator也不传递ordering给排序器,
	// 因为我们不在乎键是否在每个分区中排序。 
	// 如果正在运行的操作是sortByKey,则将在reduce侧完成聚合和排序。
    new ExternalSorter[K, V, V](
      context, aggregator = None, Some(dep.partitioner), ordering = None, dep.serializer)
  }
  sorter.insertAll(records)

  // 不要在shuffle写出时就打开合并的输出文件,因为它只是打开单个文件,因此通常非常快而无法准确测量
  // (see SPARK-3570).
  val output = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId)
  val tmp = Utils.tempFileWith(output)
  try {
    val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID)
	// 2. 数据合并与持久化
    val partitionLengths = sorter.writePartitionedFile(blockId, tmp)
	// 3. 生成索引文件
    shuffleBlockResolver.writeIndexFileAndCommit(dep.shuffleId, mapId, partitionLengths, tmp)
    mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths)
  } finally {
    if (tmp.exists() && !tmp.delete()) {
      logError(s"Error while deleting temp file ${tmp.getAbsolutePath}")
    }
  }
}
           

可以看到其利用ExternalSorter进行实现,主要包含三大步骤:

  1. 数据聚合与排序
  2. 数据合并与持久化
  3. 生成索引文件

数据聚合与排序

map端计算结果分区数据聚合排序主要由ExternalSorter.insertAll的实现。判断在Map端是否需要本地进行combine操作,从而决定是否传入aggregator和keyOrdering参数。将写入数据全部放入外部排序器ExternalSorter,并且根据是否需要spill进行spill操作。

// ExternalSorter.scala
def insertAll(records: Iterator[Product2[K, V]]): Unit = {
  // TODO: stop combining if we find that the reduction factor isn't high
  val shouldCombine = aggregator.isDefined

  if (shouldCombine) {
    // Combine values in-memory first using our AppendOnlyMap
    val mergeValue = aggregator.get.mergeValue
    val createCombiner = aggregator.get.createCombiner
    var kv: Product2[K, V] = null
    val update = (hadValue: Boolean, oldValue: C) => {
      if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2)
    }
    while (records.hasNext) {
      addElementsRead()
      kv = records.next()
	  // @volatile private var map = new PartitionedAppendOnlyMap[K, C]
      map.changeValue((getPartition(kv._1), kv._1), update)
      maybeSpillCollection(usingMap = true)
    }
  } else {
    // Stick values into our buffer
    while (records.hasNext) {
      addElementsRead()
      val kv = records.next()
	  // 如果没有定义aggregator,那么shouldCombine为false。
	  // 这时会调用PartitionedPairBuffer的insert方法,它只不过是把计算结果简单地缓存到数组中。
	  // @volatile private var buffer = new PartitionedPairBuffer[K, C]
      buffer.insert(getPartition(kv._1), kv._1, kv._2.asInstanceOf[C])
      maybeSpillCollection(usingMap = false)
    }
  }
}
           

如果需要聚合,则使用PartitionedAppendOnlyMap的changeValue方法来实现数据聚合排序;如果不进行combine操作,则使用PartitionedPairBuffer简单地添加数据存放于内存中。然后无论哪一种情况都需要判断内存是否足够,如果内存不够而且又申请不到内存,则需要进行本地磁盘溢写操作,把相关的数据溢写到临时文件。

聚合排序

一个任务的分区数量通常很多,通过在map端对计算结果在缓存中执行聚合和排序,能够节省I/O操作,进而提升系统性能。这种情况下,必须要定义聚合器(aggregator)函数,以便于对计算结果按照partitionID和key聚合后排序。ExternalSorter的insertAll方法中,如果定义了aggregator,则shouldCombine为true。此分支执行过程如下:

  1. 由于设置了聚合函数aggregator,则从聚合函数获取mergeValue、createCombiner等函数。
  2. 定义update偏函数,此函数用于操作mergeValue和createCombiner。
  3. 迭代之前创建的iterator,每读取一条Product2[K, V]。
  4. 以(分区索引, Product2[K, V]._1)为参数调用SizeTrackingAppendOnlyMap(PartitionedAppendOnlyMap的父类)的changeValue函数,与update函数配合,按照key值叠加value。
  5. 调用maybeSpillCollection方法,来处理SizeTrackingAppendOnlyMap溢出(当SizeTrackingAppendOnlyMap的大小超过myMemoryThreshold时,将集合中的数据写入磁盘并新建SizeTrackingAppendOnlyMap)。

溢写磁盘

Spark为了防止内存撑爆问题的发生,提供了函数maybeSpillCollection,通过maybeSpill决定集合数据是否要溢出,需要的话则溢写到磁盘上。

// ExternalSorter.scala
/**
 * 如果需要,将当前的内存集合数据溢写到磁盘上。
 *
 * @param usingMap 我们是使用map还是buffer来作为当前内存中的数据集合
 */
private def maybeSpillCollection(usingMap: Boolean): Unit = {
  var estimatedSize = 0L
  if (usingMap) {
    estimatedSize = map.estimateSize()
    if (maybeSpill(map, estimatedSize)) {
      map = new PartitionedAppendOnlyMap[K, C]
    }
  } else {
    estimatedSize = buffer.estimateSize()
    if (maybeSpill(buffer, estimatedSize)) {
      buffer = new PartitionedPairBuffer[K, C]
    }
  }

  if (estimatedSize > _peakMemoryUsedBytes) {
    _peakMemoryUsedBytes = estimatedSize
  }
}
           

maybeSpillCollection判定集合是否溢出主要由maybeSpill函数来决定:

  • 为当前线程尝试获取amountToRequest大小的内存(amountToRequest = 2*currentMemory - myMemoryThreshold)。
  • 如果获得的内存依然不足(myMemoryThreshold <= currentMemory),则调用spill,执行溢出操作。内存不足可能是申请到的内存为0或者已经申请得到的内存大小超过了myMemoryThreshold。
  • 溢出后续处理,如elementRead归零,已溢出内存字节数(memoryBytesSpilled)增加线程当前内存大小(currentMemory),释放当前线程占用的内存。

数据合并与持久化

主要通过ExternalSorter类的writePartitionedFile方法来合并并持久化计算结果。如果溢写文件为空,则表示内存足够,没有溢写数据到磁盘,则返回一个对数据排序的迭代器,然后遍历数据写入磁盘文件。反之如果溢写文件不为空,则需要将溢写的文件和内存中的数据合并,合并之后则需要进行归并排序(merge-sort),再将数据刷到磁盘文件。最后返回一个每个分区数据的字节长度的数组。

// ExternalSorter.scala
/**
  * 将添加到此ExternalSorter中的所有数据写入磁盘存储中的文件中。这由SortShuffleWriter调用。
  *
  * @param blockId 要写出的block ID。索引文件名会是blockId.name + ".index"。
  * @return 文件中每个分区数据的字节长度的数组 (被map output tracker所使用)
  */
def writePartitionedFile(
    blockId: BlockId,
    outputFile: File): Array[Long] = {

  // 创建每一个分区对应的文件长度的数组
  val lengths = new Array[Long](numPartitions)
  val writer = blockManager.getDiskWriter(blockId, outputFile, serInstance, fileBufferSize,
    context.taskMetrics().shuffleWriteMetrics)
  // 判断是否有进行了spill的文件
  if (spills.isEmpty) {
    // 如果是空的表示我们只有内存数据,内存足够,不需要溢写结果到磁盘
	// 如果指定aggregator,就返回PartitionedAppendOnlyMap里的数据,否则返回
	// PartitionedPairBuffer里的数据
    val collection = if (aggregator.isDefined) map else buffer
	// 返回一个对结果排序的迭代器
    val it = collection.destructiveSortedWritablePartitionedIterator(comparator)
    while (it.hasNext) {
      val partitionId = it.nextPartition()
      while (it.hasNext && it.nextPartition() == partitionId) {
        it.writeNext(writer)
      }
	  // 数据刷到磁盘,并且创建FileSegment数组
      val segment = writer.commitAndGet()
      lengths(partitionId) = segment.length
    }
  } else {
    // 否则,表示有溢写文件,则需要进行归并排序(merge-sort)
	// 每一个分区的数据都写入到data文件的临时文件
    for ((id, elements) <- this.partitionedIterator) {
      if (elements.hasNext) {
        for (elem <- elements) {
          writer.write(elem._1, elem._2)
        }
		// 数据刷到磁盘,并且创建FileSegment数组
        val segment = writer.commitAndGet()
		// 构造一个分区文件长度的数组
        lengths(id) = segment.length
      }
    }
  }

  writer.close()
  context.taskMetrics().incMemoryBytesSpilled(memoryBytesSpilled)
  context.taskMetrics().incDiskBytesSpilled(diskBytesSpilled)
  context.taskMetrics().incPeakExecutionMemory(peakMemoryUsedBytes)

  lengths
}
           

当溢写文件不为空时,partitionedIterator通过对集合按照指定的比较器进行排序,并且按照partition id分组,生成迭代器,从而返回遍历所有数据的迭代器。如果没有溢写,则判断是否需要对key排序,如果不需要则只是将数据按照partitionId排序,否则首先按照partitionId排序,然后partition内部再按照key排序。如果发生溢写,则需要将磁盘上溢写文件和内存里的数据进行合并。

// ExternalSorter.scala
/**
 * 返回对写入此对象的所有数据进行迭代的迭代器,该数据按分区分组并由请求的aggregator聚合。 
 * 然后,对于每个分区,我们在其内容上都有一个迭代器,并且这些迭代器应按顺序进行访问(您不能在不读取前一个分区的情况下“跳到”一个分区)。 
 * 确保按分区ID的顺序为每个分区返回key-value对。
 * 
 * 目前,我们只是一次性合并所有溢出的文件,但是可以对其进行修改以支持分层合并。
 */
def partitionedIterator: Iterator[(Int, Iterator[Product2[K, C]])] = {
  val usingMap = aggregator.isDefined
  val collection: WritablePartitionedPairCollection[K, C] = if (usingMap) map else buffer
  // 如果没有发生磁盘溢写
  if (spills.isEmpty) {
    // 特殊情况: 如果我们只有内存中的数据,则不需要合并流,也许我们甚至不需要除了按分区ID外的其他任何东西排序
    if (!ordering.isDefined) {
	  // 用户并没有要求排序后的keys,因此数据只是按照partitionId排序,并不会对key进行排序
      groupByPartition(destructiveIterator(collection.partitionedDestructiveSortedIterator(None)))
    } else {
	  // 否则我们需要先按照partitionId排序,然后分区内部对key进行排序
      groupByPartition(destructiveIterator(
        collection.partitionedDestructiveSortedIterator(Some(keyComparator))))
    }
  } else {
	// 如果发生了溢写操作,则需要将磁盘上溢写文件和内存里的数据进行合并排序
    merge(spills, destructiveIterator(
      collection.partitionedDestructiveSortedIterator(comparator)))
  }
}
           

生成索引文件

map任务输出结果数据在持久化的时候会被写入同一文件,那么reduce任务如何从此文件中按照分区读取数据呢?IndexShuffleBlockManager的writeIndexFileAndCommit方法生成的分区索引文件,此文件使用偏移量来区分各个分区的计算结果,偏移量来自合并排序过程中记录的各个partition的长度。

// IndexShuffleBlockManager.scala
/**
 * 写出一个索引文件,其中包含每个块的偏移量,并在输出文件末尾添加一个最终偏移量。 
 * getBlockData将使用它来确定每个块的开始和结束位置。
 *
 * 它将把数据和索引文件作为一个原子操作提交,使用现有的或用新的替换。
 *
 * Note: 如果使用现有索引文件,则将更新“lengths”信息以匹配现有索引文件。
 */
def writeIndexFileAndCommit(
    shuffleId: Int,
    mapId: Int,
    lengths: Array[Long],
    dataTmp: File): Unit = {...}
           

Shuffle Reader过程解析

下游的reduce任务可能是ShuffleMapTask也有可能是ResultTask,首先会去Driver获取parent stage中ShuffleMapTask输出的位置信息,根据位置信息获取index文件,然后解析index文件,从index文件中获取相关的位置等信息,然后读data文件获取属于自己那部分内容。这都是在ShuffleRDD的compute方法中实现的。

// ShuffleRDD.scala
override def compute(split: Partition, context: TaskContext): Iterator[(K, C)] = {
  // ResultTask或者ShuffleMapTask在执行到ShuffleRDD时,
  // 肯定会调用ShuffleRDD的compute方法来计算当前这个RDD的partition数据
  val dep = dependencies.head.asInstanceOf[ShuffleDependency[K, V, C]]
  // 获取ShuffleManager的reader去拉取需要聚合的数据
  SparkEnv.get.shuffleManager.getReader(dep.shuffleHandle, split.index, split.index + 1, context)
    .read()
    .asInstanceOf[Iterator[(K, C)]]
}
           

前面已经提到shuffle的reader的实现类为BlockStoreShuffleReader,通过其read方法实现数据读取。

  • 获取map任务输出的位置信息。
  • 获取blocks数据:创建一个ShuffleBlockFetcherIterator,它获取多个块,对于本地块从本地读取,对于远程块则通过远程方法读取。
  • reduce端聚合数据:如果map端已经聚合过了,则对读取到的聚合结果进行聚合。如果map端没有聚合,则针对未合并的<k,v>进行聚合。
  • reduce端排序数据:如果需要对key排序,则进行排序。基于sort的shuffle实现过程中,默认只是按照partitionId排序。在每一个partition内部并没有排序,因此添加了keyOrdering变量,提供是否需要对分区内部的key排序。
    /** Read the combined key-values for this reduce task */
      override def read(): Iterator[Product2[K, C]] = {
        // 构造ShuffleBlockFetcherIterator,一个迭代器,它获取多个块,对于本地块,从本地读取
        // 对于远程块,通过远程方法读取
        // 基于配置文件对于流进行包装
        val wrappedStreams = new ShuffleBlockFetcherIterator(
          context,
          blockManager.shuffleClient,
          blockManager,
      	// MapOutputTracker在SparkEnv启动的时候实例化
          mapOutputTracker.getMapSizesByExecutorId(handle.shuffleId, startPartition, endPartition),
          serializerManager.wrapStream,
          // Note: we use getSizeAsMb when no suffix is provided for backwards compatibility
          SparkEnv.get.conf.getSizeAsMb("spark.reducer.maxSizeInFlight", "48m") * 1024 * 1024,
          SparkEnv.get.conf.getInt("spark.reducer.maxReqsInFlight", Int.MaxValue),
          SparkEnv.get.conf.get(config.REDUCER_MAX_BLOCKS_IN_FLIGHT_PER_ADDRESS),
          SparkEnv.get.conf.get(config.REDUCER_MAX_REQ_SIZE_SHUFFLE_TO_MEM),
          SparkEnv.get.conf.getBoolean("spark.shuffle.detectCorrupt", true))
      
        val serializerInstance = dep.serializer.newInstance()
      
        // 对于每一个流创建一个key/value迭代器
        val recordIter = wrappedStreams.flatMap { case (blockId, wrappedStream) =>
          // Note: 下面的asKeyValueIterator将key/value迭代器包装在NextIterator内部。 
      	// NextIterator确保在读取所有记录后对底层的InputStream调用close()方法。
          serializerInstance.deserializeStream(wrappedStream).asKeyValueIterator
        }
      
        // 为每个读取的记录更新上下文任务度量。
        val readMetrics = context.taskMetrics.createTempShuffleReadMetrics()
        val metricIter = CompletionIterator[(Any, Any), Iterator[(Any, Any)]](
          recordIter.map { record =>
            readMetrics.incRecordsRead(1)
            record
          },
          context.taskMetrics().mergeShuffleReadMetrics())
      
        // 为了支持任务取消,必须在此处使用可中断的迭代器
        val interruptibleIter = new InterruptibleIterator[(Any, Any)](context, metricIter)
    
        // 如果reduce端需要聚合
        val aggregatedIter: Iterator[Product2[K, C]] = if (dep.aggregator.isDefined) {
      	// 如果map端已经聚合过了,则对读取到的聚合结果进行聚合
          if (dep.mapSideCombine) {
            // 我们正在读取已经合并的值
            val combinedKeyValuesIterator = interruptibleIter.asInstanceOf[Iterator[(K, C)]]
      	  // 针对map端各个partition对key进行聚合后的结果再次聚合
            dep.aggregator.get.combineCombinersByKey(combinedKeyValuesIterator, context)
          } else {
      	  // 如果map端没有聚合,则针对未合并的<k,v>进行聚合
      	  // 我们不知道值的类型,但也不在乎--依赖*应该*确保与该聚合器兼容,这将把值类型转换为合并了的类型C
            val keyValuesIterator = interruptibleIter.asInstanceOf[Iterator[(K, Nothing)]]
            dep.aggregator.get.combineValuesByKey(keyValuesIterator, context)
          }
        } else {
          require(!dep.mapSideCombine, "Map-side combine without Aggregator specified!")
          interruptibleIter.asInstanceOf[Iterator[Product2[K, C]]]
        }
      
        // 如果定义了key的排序器,则进行排序。基于sort的shuffle实现过程中,默认只是按照partitionId排序
        // 在每一个partition内部并没有排序,因此添加了keyOrdering变量,提供是否需要对分区内部的key排序
        dep.keyOrdering match {
          case Some(keyOrd: Ordering[K]) =>
      	  // 为了减少内存压力和避免GC开销,引入了外部排序器,当内存不足时会根据配置文件
        	  // spark.shuffle.spill决定是否进行spill操作
            val sorter =
              new ExternalSorter[K, C, C](context, ordering = Some(keyOrd), serializer = dep.serializer)
            sorter.insertAll(aggregatedIter)
            context.taskMetrics().incMemoryBytesSpilled(sorter.memoryBytesSpilled)
            context.taskMetrics().incDiskBytesSpilled(sorter.diskBytesSpilled)
            context.taskMetrics().incPeakExecutionMemory(sorter.peakMemoryUsedBytes)
            CompletionIterator[Product2[K, C], Iterator[Product2[K, C]]](sorter.iterator, sorter.stop())
          case None =>
      	  // 不需要排序直接返回
            aggregatedIter
        }
      }
               

获取map任务输出的位置信息

在executor上执行的任务通过MapOutputTracker(确切地说是其子类MapOutputTrackerWorker)的getMapSizesByExecutorId去获取MapStatus。

  • 首先查看executor本地的mapStatuses中是否已经有了该shuffleId的信息,如果没有则同步查看是否有其他线程进行获取该shuffleId的map任务状态。
  • 如果没有则向MapOutputTrackerMasterEndpoint发送GetMapOutputStatuses消息。注意executor端发送请求是同步请求的。
  • MapOutputTrackerMasterEndpoint收到消息之后,MapOutputTrackerMaster会添加这个请求到队列,并且它有一个后台线程一直不断从该队列获取请求,获取请求之后返回。
  • 等获取到driver返回的map任务信息后,通过convertMapStatuses将其转换为map任务所在的地址及其数据大小后添加到本地的mapStatuses映射中。
    // MapOutputTracker.scala # MapOutputTrackerWorker
      override def getMapSizesByExecutorId(shuffleId: Int, startPartition: Int, endPartition: Int)
          : Seq[(BlockManagerId, Seq[(BlockId, Long)])] = {
        logDebug(s"Fetching outputs for shuffle $shuffleId, partitions $startPartition-$endPartition")
        // 根据shuffleId获取MapStatus
        val statuses = getStatuses(shuffleId)
        try {
      	// 将得到MapStatus数组进行转换为map任务所在的地址(即BlockManagerId)和map任务输出中分配给当前reduce任务的Block大小
          MapOutputTracker.convertMapStatuses(shuffleId, startPartition, endPartition, statuses)
        } catch {
          case e: MetadataFetchFailedException =>
            // We experienced a fetch failure so our mapStatuses cache is outdated; clear it:
            mapStatuses.clear()
            throw e
        }
      }
               

获取blocks数据

ShuffleBlockFetcherIterator是读取中间结果的关键。构造ShuffleBlockFetcherIterator的时候会调用到initialize方法,它的初始化过程如下:

  1. 使用splitLocalRemoteBlocks方法划分本地读取和远程读取的Block的请求。将FetchRequest随机排序后存入fetchRequests: newQueue[FetchRequest]。
  2. 遍历fetchRequests中的所有FetchRequest,远程请求Block中间结果。
  3. 调用fetchLocalBlocks获取本地Block。
    // ShuffleBlockFetcherIterator.scala
     private[this] def initialize(): Unit = {
       // Add a task completion callback (called in both success case and failure case) to cleanup.
       context.addTaskCompletionListener(_ => cleanup())
     
       // 切分本地和远程的block
       val remoteRequests = splitLocalRemoteBlocks()
       // 然后进行随机排序
       fetchRequests ++= Utils.randomize(remoteRequests)
       assert ((0 == reqsInFlight) == (0 == bytesInFlight),
         "expected reqsInFlight = 0 but found reqsInFlight = " + reqsInFlight +
         ", expected bytesInFlight = 0 but found bytesInFlight = " + bytesInFlight)
     
       // Send out initial requests for blocks, up to our maxBytesInFlight
       // 发送请求到远程获取数据
       fetchUpToMaxBytes()
     
       val numFetches = remoteRequests.size - fetchRequests.size
       logInfo("Started " + numFetches + " remote fetches in" + Utils.getUsedTimeMs(startTime))
     
       // 拉取本地的数据
       fetchLocalBlocks()
       logDebug("Got local blocks in " + Utils.getUsedTimeMs(startTime))
     }
               

划分本地与远程Block

splitLocalRemoteBlocks方法用于划分哪些Block从本地获取,哪些需要远程拉取,是获取中间计算结果的关键。遍历已经按照BlockManagerId分组的blockInfo,如果blockInfo所在的Executor与当前Executor相同,则将它的BlockId存入localBlocks;否则,将blockInfo的BlockId和size累加到curBlocks,将blockId存入remoteBlocks,curRequestSize增加size的大小,每当curRequestSize >= targetRequestSize,则新建FetchRequest放入remoteRequests,并且为生成下一个FetchRequest做一些准备(如新建curBlocks,curRequestSize置为0)。遍历结束,curBlocks中如果仍然有缓存的(BlockId, Long),新建FetchRequest放入remoteRequests。此次请求不受maxBytesInFlight和targetRequestSize的影响。

// ShuffleBlockFetcherIterator.scala
private[this] def splitLocalRemoteBlocks(): ArrayBuffer[FetchRequest] = {
  // 远端请求从最多5个node去获取数据,每一个节点拉取的数据取决于spark.reducer.maxMbInFlight即maxBytesInFlight参数
  // 加入整个集群只允许每次在5台拉取5G的数据,那么每一节点只允许拉取1G数据,这样就可以允许他们并行从5个节点获取,
  // 而不是主动从一个节点获取
  val targetRequestSize = math.max(maxBytesInFlight / 5, 1L)
  logDebug("maxBytesInFlight: " + maxBytesInFlight + ", targetRequestSize: " + targetRequestSize)

  // 创建FetchRequest队列,用于存放拉取的数据的请求,每一个请求可能包含多个block,
  // 具体多少取决于总的请求block大小是否超过目标阀值
  val remoteRequests = new ArrayBuffer[FetchRequest]

  var totalBlocks = 0
  for ((address, blockInfos) <- blocksByAddress) {
    // 获取block的大小,并更新总的block数量信息
    totalBlocks += blockInfos.size
    // 要获取的数据在本地
    if (address.executorId == blockManager.blockManagerId.executorId) {
      // 更新要从本地block拉取的集合
      localBlocks ++= blockInfos.filter(_._2 != 0).map(_._1)
      // 更新要拉取的block数量
      numBlocksToFetch += localBlocks.size
    } else {
	  //数据不在本地时
      val iterator = blockInfos.iterator
      var curRequestSize = 0L // 当前请求的大小
      // 存放当前的远端请求
      var curBlocks = new ArrayBuffer[(BlockId, Long)]
      // 遍历每一个block
      while (iterator.hasNext) {
        val (blockId, size) = iterator.next()
        // 过滤掉空的block
        if (size > 0) {
          curBlocks += ((blockId, size))
          // 更新要拉取的远端的blockId的集合列表
          remoteBlocks += blockId
          // 更新要拉取的block数量
          numBlocksToFetch += 1
          curRequestSize += size
        } else if (size < 0) {
          throw new BlockException(blockId, "Negative block size " + size)
        }
        // 如果当前请求的大小已经超过了阀值
        if (curRequestSize >= targetRequestSize) {
          // 创建一个新的FetchRequest,放到请求队列
          remoteRequests += new FetchRequest(address, curBlocks)
          // 重置当前block列表
          curBlocks = new ArrayBuffer[(BlockId, Long)]
          logDebug(s"Creating fetch request of $curRequestSize at $address")
          // 重置当前请求数量为0
          curRequestSize = 0
        }
      }
      // 添加最终的请求
      if (curBlocks.nonEmpty) {
        remoteRequests += new FetchRequest(address, curBlocks)
      }
    }
  }
  logInfo(s"Getting $numBlocksToFetch non-empty blocks out of $totalBlocks blocks")
  remoteRequests
}
           

获取远程Block

fetchUpToMaxBytes用于请求远程结果,其通过send方法向某个远程节点发送中间结果拉取请求。利用FetchRequest里封装的blockId、size、address等信息,调用ShuffleClient的fetchBlocks方法获取其他节点上的中间计算结果。

// ShuffleBlockFetcherIterator.scala
private def fetchUpToMaxBytes(): Unit = {
  // 发送拉取请求,直到达到maxBytesInFlight大小。 
  // 如果您不能立即从远程主机拉取,则将请求推迟到下一次可以处理的时间。

  // 如果可能,处理所有未完成的延迟获取请求。
  if (deferredFetchRequests.nonEmpty) {
    for ((remoteAddress, defReqQueue) <- deferredFetchRequests) {
      while (isRemoteBlockFetchable(defReqQueue) &&
          !isRemoteAddressMaxedOut(remoteAddress, defReqQueue.front)) {
        val request = defReqQueue.dequeue()
        logDebug(s"Processing deferred fetch request for $remoteAddress with "
          + s"${request.blocks.length} blocks")
        send(remoteAddress, request)
        if (defReqQueue.isEmpty) {
          deferredFetchRequests -= remoteAddress
        }
      }
    }
  }

  // 如果可能,处理任何常规的拉取请求。
  while (isRemoteBlockFetchable(fetchRequests)) {
    val request = fetchRequests.dequeue()
    val remoteAddress = request.address
    if (isRemoteAddressMaxedOut(remoteAddress, request)) {
      logDebug(s"Deferring fetch request for $remoteAddress with ${request.blocks.size} blocks")
      val defReqQueue = deferredFetchRequests.getOrElse(remoteAddress, new Queue[FetchRequest]())
      defReqQueue.enqueue(request)
      deferredFetchRequests(remoteAddress) = defReqQueue
    } else {
      send(remoteAddress, request)
    }
  }

  def send(remoteAddress: BlockManagerId, request: FetchRequest): Unit = {
    sendRequest(request)
    numBlocksInFlightPerAddress(remoteAddress) =
      numBlocksInFlightPerAddress.getOrElse(remoteAddress, 0) + request.blocks.size
  }

  def isRemoteBlockFetchable(fetchReqQueue: Queue[FetchRequest]): Boolean = {
    fetchReqQueue.nonEmpty &&
      (bytesInFlight == 0 ||
        (reqsInFlight + 1 <= maxReqsInFlight &&
          bytesInFlight + fetchReqQueue.front.size <= maxBytesInFlight))
  }

  // 检查发送新的获取请求是否会超过从给定远程地址获取的最大块数。
  def isRemoteAddressMaxedOut(remoteAddress: BlockManagerId, request: FetchRequest): Boolean = {
    numBlocksInFlightPerAddress.getOrElse(remoteAddress, 0) + request.blocks.size >
      maxBlocksInFlightPerAddress
  }
}
           

获取本地Block

fetchLocalBlocks用于对本地中间计算结果的获取。其利用熟悉的BlockManager的getBlockData方法获取本地Block,最后将取到的中间结果存入results = new LinkedBlockingQueue[FetchResult]中。

/**
 * 当我们获取远程块的同时,获取本地块。 
 * 这是可以的,因为在创建输入流时会延迟分配`ManagedBuffer`的内存,
 * 因此我们在内存中跟踪的所有内容都是ManagedBuffer引用本身。
 */
private[this] def fetchLocalBlocks() {
  val iter = localBlocks.iterator
  while (iter.hasNext) {
    val blockId = iter.next()
    try {
      val buf = blockManager.getBlockData(blockId)
      shuffleMetrics.incLocalBlocksFetched(1)
      shuffleMetrics.incLocalBytesRead(buf.size)
      buf.retain()
      results.put(new SuccessFetchResult(blockId, blockManager.blockManagerId, 0, buf, false))
    } catch {
      case e: Exception =>
        // 如果发现异常,则立即停止。
        logError(s"Error occurred while fetching local blocks", e)
        results.put(new FailureFetchResult(blockId, blockManager.blockManagerId, e))
        return
    }
  }
}
           

reduce端聚合数据

如果map端已经聚合过了,则对读取到的聚合结果进行聚合。如果map端没有聚合,则针对未合并的<k,v>进行聚合。对于map端已经聚合过的情况,聚合操作主要依赖aggregator的combineCombinersByKey方法,其主要依赖ExternalAppendOnlyMap的insertAll方法完成的,其实质也是使用SizeTrackingAppendOnlyMap。

// Aggregator.scala
def combineCombinersByKey(
    iter: Iterator[_ <: Product2[K, C]],
    context: TaskContext): Iterator[(K, C)] = {
  val combiners = new ExternalAppendOnlyMap[K, C, C](identity, mergeCombiners, mergeCombiners)
  combiners.insertAll(iter)
  updateMetrics(context, combiners)
  combiners.iterator
}

// ExternalAppendOnlyMap.scala
/**
 * 将给定的键和值的迭代器插入到map中。
 *
 * 当底层的map需要增长时,检查shuffle内存的全局池是否有足够的空间来发生这种情况。
 * 如果有,则分配增长该map所需的内存;否则,将内存中的map数据溢写出到磁盘。
 *
 * 不会跟踪第一个trackMemoryThreshold条目的shuffle内存使用情况。
 */
def insertAll(entries: Iterator[Product2[K, V]]): Unit = {
  if (currentMap == null) {
    throw new IllegalStateException(
      "Cannot insert new elements into a map after calling iterator")
  }
  // 我们跨数据条目地重复使用该map的update函数,以避免每次都分配新的闭包
  var curEntry: Product2[K, V] = null
  val update: (Boolean, C) => C = (hadVal, oldVal) => {
    if (hadVal) mergeValue(oldVal, curEntry._2) else createCombiner(curEntry._2)
  }

  while (entries.hasNext) {
    curEntry = entries.next()
    val estimatedSize = currentMap.estimateSize()
    if (estimatedSize > _peakMemoryUsedBytes) {
      _peakMemoryUsedBytes = estimatedSize
    }
    if (maybeSpill(currentMap, estimatedSize)) {
      currentMap = new SizeTrackingAppendOnlyMap[K, C]
    }
    currentMap.changeValue(curEntry._1, update)
    addElementsRead()
  }
}
           

reduce端排序数据

基于sort的shuffle实现过程中,默认只是按照partitionId排序。在每一个partition内部并没有排序,因此添加了keyOrdering变量,提供是否需要对分区内部的key排序。如果在RDD的shuffle中指定了key排序,则使用ExternalSorter的insertAll方法进行排序,详细内容可参考Shuffle Writer中的数据聚合与排序小节。

继续阅读