文章目录
- Shuffle简介与其发展
- Shuffle机制的实现
-
- ShuffleManager接口简介
- ShuffleManager的创建
- ShuffleManager实现类SortShuffleManager
-
- registerShuffle方法
- getWriter方法
- getReader方法
- Shuffle Writer过程解析
-
- 数据聚合与排序
-
- 聚合排序
- 溢写磁盘
- 数据合并与持久化
- 生成索引文件
- Shuffle Reader过程解析
-
- 获取map任务输出的位置信息
- 获取blocks数据
-
- 划分本地与远程Block
- 获取远程Block
- 获取本地Block
- reduce端聚合数据
- reduce端排序数据
Shuffle简介与其发展
Shuffle,即"洗牌",所有采用map-reduce思想的大数据计算框架的必经及最重要的阶段,用于打通map任务的输出与reduce任务的输入。Spark的早期版本hash shuffle中ShuffleMapTask任务会为每一个reduce任务创建一个bucket。假设有M个map任务,R个reduce任务,则map阶段一共会创建M*R个bucket。这种方式不仅文件数量很多,造成频繁的磁盘和网络I/O,而且内存开销也很大,GC频繁,经常出现OOM。Spark从2.0版本之后,hash shuffle机制被删除,只保留sort shuffle机制,其做了多种性能优化:
- 将map任务给每个partition的reduce任务输出的bucket合并到同一个文件中。
- map任务逐条输出计算结果,而不是一次性输出到内存,并使用AppendOnlyMap缓存及其聚合算法对中间结果进行聚合。同时进行溢出判断,当超出myMemoryThreshold的大小时,将数据溢写到磁盘,防止内存溢出。
- reduce任务对拉取到的map任务中间结果逐条读取,而不是一次性读入内存,并在内存中进行聚合和排序,这也大大减小了数据占用的内存。
- reduce任务将要拉取的block按照BlockManager地址划分,然后将同一BlockManager地址中的Block聚合为少量网络请求,减少网络I/O。
Shuffle机制的实现
Spark Shuffle机制的主要接口是ShuffleManager,而Spark从2.0版本之后,其默认实现为SortShuffleManager。
ShuffleManager接口简介
ShuffleManager是一个接口,定义了Spark的Shuffle机制的主要接口方法。
/**
* Shuffle系统的可插拔接口。ShuffleManager的创建基于spark.shuffle.manager设置,
* 且在SparkEnv中的driver和每个executor上创建。driver在该shuffle manager上注册shuffle,
* executor(或者driver中本地运行的任务)能够请求读取或写入数据。
*
* 注意:shuffle manager会在SparkEnv中实例化,因此其构造函数可将SparkConf和boolean isDriver作为参数
*/
private[spark] trait ShuffleManager {
/**
* 使用ShuffleManager注册一个shuffle,获取一个句柄,回传给任务
* 用于注册一种shuffle机制,并返回对应的ShuffleHandle,handle内会存储shuffle依赖信息。
* 根据该handle类型可以进一步确定采用ShuffleWriter类型。
*/
def registerShuffle[K, V, C](
shuffleId: Int,
numMaps: Int,
dependency: ShuffleDependency[K, V, C]): ShuffleHandle
/** 为一个指定的分区提供ShuffleWriter,为executor中执行map任务时调用 */
def getWriter[K, V](handle: ShuffleHandle, mapId: Int, context: TaskContext): ShuffleWriter[K, V]
/**
* 为一个范围(开始分区到结束分区-1, 闭区间)内的reduce分区提供ShuffleReader。
* 在executor中执行reduce任务时调用
*/
def getReader[K, C](
handle: ShuffleHandle,
startPartition: Int,
endPartition: Int,
context: TaskContext): ShuffleReader[K, C]
/**
* 从ShuffleManager移除shuffle的元数据
* @return 移除成功返回true, 否则为false.*/
def unregisterShuffle(shuffleId: Int): Boolean
/**
* 返回一个能够基于block坐标位置获取shuffle块数据的解析器
*/
def shuffleBlockResolver: ShuffleBlockResolver
/** Shut down this ShuffleManager. */
def stop(): Unit
}
ShuffleManager的简易类图如下:
ShuffleManager的创建
ShuffleManager的初始化是在SparkEnv初始化时执行。在创建driver端环境(SparkContext)和executor端环境(ExecutorBackend)时,会进行创建。主要调用org.apache.spark.SparkEnv的create方法,来构建ShuffleManager。可以看到,其默认实现类是SortShuffleManager,当然也可以通过
spark.shuffle.manager
参数指定其他实现类。
// SparkEnv.scala
// Let the user specify short names for shuffle managers
// 注意sort, tungsten-sort对应的value均是SortShuffleManager
val shortShuffleMgrNames = Map(
"sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName,
"tungsten-sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName)
// 可以通过spark.shuffle.manager参数来指定第三方或其他方式实现的shuffle机制
val shuffleMgrName = conf.get("spark.shuffle.manager", "sort")
val shuffleMgrClass =
shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase(Locale.ROOT), shuffleMgrName)
val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)
ShuffleManager实现类SortShuffleManager
SortShuffleManager主要包含对registerShuffle、getWriter和getReader三个方法的覆盖实现。
registerShuffle方法
- 根据不同的条件,返回不同的ShuffleHandle。首先检查是否满足SortShuffleWriter.shouldBypassMergeSort条件,如果同时满足以下两个条件,那么会返回ByPassMergeSortShuffleHandle,将启用bypass merge-sort机制:
- 该shuffle依赖中没有map端聚合操作(如groupByKey)。
- 依赖分区数不大于spark.shuffle.sort.bypassMergeThreshold规定的值,默认值为200。
- 然后检查是否符合SortShuffleManager.canUseSerializedShuffle方法,如果同时满足以下三个条件,就会返回SerializedShuffleHandle,启用序列化Sort shuffle机制(也就是tungsten-sort):
- 使用的序列化器支持序列化对象的重定位(如KyroSerializer)。
- shuffle依赖中map端无聚合操作。
- 分区数不大于MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE的值,即2^24。
- 最后如果前面两个检查都不满足,那么就返回默认的BaseShuffleHandle,采用基本的sort handle。
/** * 获取一个ShuffleHandle传入到tasks. */ override def registerShuffle[K, V, C]( shuffleId: Int, numMaps: Int, dependency: ShuffleDependency[K, V, C]): ShuffleHandle = { if (SortShuffleWriter.shouldBypassMergeSort(conf, dependency)) { // 如果分区数小于spark.shuffle.sort.bypassMergeThreshold(默认为200)并且不需要map端的聚合, // 则可以直接写对应分区个数的文件,且在结束时仅仅合并它们。 // 可以避免2次序列化和反序列化操作以将溢写的文件合并在一起(正常的代码方法会发生这种情况)。 // 其缺点是一次打开多个文件,因此需要分配缓冲区更多的内存。 new BypassMergeSortShuffleHandle[K, V]( shuffleId, numMaps, dependency.asInstanceOf[ShuffleDependency[K, V, V]]) } else if (SortShuffleManager.canUseSerializedShuffle(dependency)) { // 否则,尝试以序列化形式缓存map输出,且这样做更高效(需要满足一定条件) new SerializedShuffleHandle[K, V]( shuffleId, numMaps, dependency.asInstanceOf[ShuffleDependency[K, V, V]]) } else { // 否则,以反序列化形式缓存map输出 new BaseShuffleHandle(shuffleId, numMaps, dependency) } }
getWriter方法
其实现比较简单,基于不同的ShuffleHandle类型,匹配获取相应的ShuffleWriter。对于tungsten-sort会使用UnsafeShuffleWriter,对于bypass会使用BypassMergeSortShuffleWriter,普通的sort则使用SortShuffleWriter。其都继承于ShuffleWriter抽象类,且实现了write方法。
getReader方法
ShuffleReader与ShuffleWriter不同,只有一种实现类,即BlockStoreShuffleReader。它继承自ShuffleReader接口,并实现了read方法。
Shuffle Writer过程解析
Shuffle Writer在ShuffleMapTask的runTask里面被调用,将map任务计算的结果写出成磁盘上的一个文件供reduce任务拉取使用。
// ShuffleMapTask.scala
override def runTask(context: TaskContext): MapStatus = {
...
var writer: ShuffleWriter[Any, Any] = null
try {
val manager = SparkEnv.get.shuffleManager
writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)
writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
writer.stop(success = true).get
} catch {
case e: Exception =>
try {
if (writer != null) {
writer.stop(success = false)
}
} catch {
case e: Exception =>
log.debug("Could not stop writer", e)
}
throw e
}
}
可以看到shuffle writer主要通过调用write和stop两个方法实现,其中stop方法主要进行内存的释放与spill临时文件的删除工作,write方法的实现是其关键,下面以普通的sort机制SortShuffleWriter为例对其write方法的实现进行详细分析。
// SortShuffleWriter.scala
/** Write a bunch of records to this task's output */
override def write(records: Iterator[Product2[K, V]]): Unit = {
// 1. 数据聚合与排序
sorter = if (dep.mapSideCombine) {
require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!")
new ExternalSorter[K, V, C](
context, dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer)
} else {
// 在这种情况下,我们既不传递aggregator也不传递ordering给排序器,
// 因为我们不在乎键是否在每个分区中排序。
// 如果正在运行的操作是sortByKey,则将在reduce侧完成聚合和排序。
new ExternalSorter[K, V, V](
context, aggregator = None, Some(dep.partitioner), ordering = None, dep.serializer)
}
sorter.insertAll(records)
// 不要在shuffle写出时就打开合并的输出文件,因为它只是打开单个文件,因此通常非常快而无法准确测量
// (see SPARK-3570).
val output = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId)
val tmp = Utils.tempFileWith(output)
try {
val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID)
// 2. 数据合并与持久化
val partitionLengths = sorter.writePartitionedFile(blockId, tmp)
// 3. 生成索引文件
shuffleBlockResolver.writeIndexFileAndCommit(dep.shuffleId, mapId, partitionLengths, tmp)
mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths)
} finally {
if (tmp.exists() && !tmp.delete()) {
logError(s"Error while deleting temp file ${tmp.getAbsolutePath}")
}
}
}
可以看到其利用ExternalSorter进行实现,主要包含三大步骤:
- 数据聚合与排序
- 数据合并与持久化
- 生成索引文件
数据聚合与排序
map端计算结果分区数据聚合排序主要由ExternalSorter.insertAll的实现。判断在Map端是否需要本地进行combine操作,从而决定是否传入aggregator和keyOrdering参数。将写入数据全部放入外部排序器ExternalSorter,并且根据是否需要spill进行spill操作。
// ExternalSorter.scala
def insertAll(records: Iterator[Product2[K, V]]): Unit = {
// TODO: stop combining if we find that the reduction factor isn't high
val shouldCombine = aggregator.isDefined
if (shouldCombine) {
// Combine values in-memory first using our AppendOnlyMap
val mergeValue = aggregator.get.mergeValue
val createCombiner = aggregator.get.createCombiner
var kv: Product2[K, V] = null
val update = (hadValue: Boolean, oldValue: C) => {
if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2)
}
while (records.hasNext) {
addElementsRead()
kv = records.next()
// @volatile private var map = new PartitionedAppendOnlyMap[K, C]
map.changeValue((getPartition(kv._1), kv._1), update)
maybeSpillCollection(usingMap = true)
}
} else {
// Stick values into our buffer
while (records.hasNext) {
addElementsRead()
val kv = records.next()
// 如果没有定义aggregator,那么shouldCombine为false。
// 这时会调用PartitionedPairBuffer的insert方法,它只不过是把计算结果简单地缓存到数组中。
// @volatile private var buffer = new PartitionedPairBuffer[K, C]
buffer.insert(getPartition(kv._1), kv._1, kv._2.asInstanceOf[C])
maybeSpillCollection(usingMap = false)
}
}
}
如果需要聚合,则使用PartitionedAppendOnlyMap的changeValue方法来实现数据聚合排序;如果不进行combine操作,则使用PartitionedPairBuffer简单地添加数据存放于内存中。然后无论哪一种情况都需要判断内存是否足够,如果内存不够而且又申请不到内存,则需要进行本地磁盘溢写操作,把相关的数据溢写到临时文件。
聚合排序
一个任务的分区数量通常很多,通过在map端对计算结果在缓存中执行聚合和排序,能够节省I/O操作,进而提升系统性能。这种情况下,必须要定义聚合器(aggregator)函数,以便于对计算结果按照partitionID和key聚合后排序。ExternalSorter的insertAll方法中,如果定义了aggregator,则shouldCombine为true。此分支执行过程如下:
- 由于设置了聚合函数aggregator,则从聚合函数获取mergeValue、createCombiner等函数。
- 定义update偏函数,此函数用于操作mergeValue和createCombiner。
- 迭代之前创建的iterator,每读取一条Product2[K, V]。
- 以(分区索引, Product2[K, V]._1)为参数调用SizeTrackingAppendOnlyMap(PartitionedAppendOnlyMap的父类)的changeValue函数,与update函数配合,按照key值叠加value。
- 调用maybeSpillCollection方法,来处理SizeTrackingAppendOnlyMap溢出(当SizeTrackingAppendOnlyMap的大小超过myMemoryThreshold时,将集合中的数据写入磁盘并新建SizeTrackingAppendOnlyMap)。
溢写磁盘
Spark为了防止内存撑爆问题的发生,提供了函数maybeSpillCollection,通过maybeSpill决定集合数据是否要溢出,需要的话则溢写到磁盘上。
// ExternalSorter.scala
/**
* 如果需要,将当前的内存集合数据溢写到磁盘上。
*
* @param usingMap 我们是使用map还是buffer来作为当前内存中的数据集合
*/
private def maybeSpillCollection(usingMap: Boolean): Unit = {
var estimatedSize = 0L
if (usingMap) {
estimatedSize = map.estimateSize()
if (maybeSpill(map, estimatedSize)) {
map = new PartitionedAppendOnlyMap[K, C]
}
} else {
estimatedSize = buffer.estimateSize()
if (maybeSpill(buffer, estimatedSize)) {
buffer = new PartitionedPairBuffer[K, C]
}
}
if (estimatedSize > _peakMemoryUsedBytes) {
_peakMemoryUsedBytes = estimatedSize
}
}
maybeSpillCollection判定集合是否溢出主要由maybeSpill函数来决定:
- 为当前线程尝试获取amountToRequest大小的内存(amountToRequest = 2*currentMemory - myMemoryThreshold)。
- 如果获得的内存依然不足(myMemoryThreshold <= currentMemory),则调用spill,执行溢出操作。内存不足可能是申请到的内存为0或者已经申请得到的内存大小超过了myMemoryThreshold。
- 溢出后续处理,如elementRead归零,已溢出内存字节数(memoryBytesSpilled)增加线程当前内存大小(currentMemory),释放当前线程占用的内存。
数据合并与持久化
主要通过ExternalSorter类的writePartitionedFile方法来合并并持久化计算结果。如果溢写文件为空,则表示内存足够,没有溢写数据到磁盘,则返回一个对数据排序的迭代器,然后遍历数据写入磁盘文件。反之如果溢写文件不为空,则需要将溢写的文件和内存中的数据合并,合并之后则需要进行归并排序(merge-sort),再将数据刷到磁盘文件。最后返回一个每个分区数据的字节长度的数组。
// ExternalSorter.scala
/**
* 将添加到此ExternalSorter中的所有数据写入磁盘存储中的文件中。这由SortShuffleWriter调用。
*
* @param blockId 要写出的block ID。索引文件名会是blockId.name + ".index"。
* @return 文件中每个分区数据的字节长度的数组 (被map output tracker所使用)
*/
def writePartitionedFile(
blockId: BlockId,
outputFile: File): Array[Long] = {
// 创建每一个分区对应的文件长度的数组
val lengths = new Array[Long](numPartitions)
val writer = blockManager.getDiskWriter(blockId, outputFile, serInstance, fileBufferSize,
context.taskMetrics().shuffleWriteMetrics)
// 判断是否有进行了spill的文件
if (spills.isEmpty) {
// 如果是空的表示我们只有内存数据,内存足够,不需要溢写结果到磁盘
// 如果指定aggregator,就返回PartitionedAppendOnlyMap里的数据,否则返回
// PartitionedPairBuffer里的数据
val collection = if (aggregator.isDefined) map else buffer
// 返回一个对结果排序的迭代器
val it = collection.destructiveSortedWritablePartitionedIterator(comparator)
while (it.hasNext) {
val partitionId = it.nextPartition()
while (it.hasNext && it.nextPartition() == partitionId) {
it.writeNext(writer)
}
// 数据刷到磁盘,并且创建FileSegment数组
val segment = writer.commitAndGet()
lengths(partitionId) = segment.length
}
} else {
// 否则,表示有溢写文件,则需要进行归并排序(merge-sort)
// 每一个分区的数据都写入到data文件的临时文件
for ((id, elements) <- this.partitionedIterator) {
if (elements.hasNext) {
for (elem <- elements) {
writer.write(elem._1, elem._2)
}
// 数据刷到磁盘,并且创建FileSegment数组
val segment = writer.commitAndGet()
// 构造一个分区文件长度的数组
lengths(id) = segment.length
}
}
}
writer.close()
context.taskMetrics().incMemoryBytesSpilled(memoryBytesSpilled)
context.taskMetrics().incDiskBytesSpilled(diskBytesSpilled)
context.taskMetrics().incPeakExecutionMemory(peakMemoryUsedBytes)
lengths
}
当溢写文件不为空时,partitionedIterator通过对集合按照指定的比较器进行排序,并且按照partition id分组,生成迭代器,从而返回遍历所有数据的迭代器。如果没有溢写,则判断是否需要对key排序,如果不需要则只是将数据按照partitionId排序,否则首先按照partitionId排序,然后partition内部再按照key排序。如果发生溢写,则需要将磁盘上溢写文件和内存里的数据进行合并。
// ExternalSorter.scala
/**
* 返回对写入此对象的所有数据进行迭代的迭代器,该数据按分区分组并由请求的aggregator聚合。
* 然后,对于每个分区,我们在其内容上都有一个迭代器,并且这些迭代器应按顺序进行访问(您不能在不读取前一个分区的情况下“跳到”一个分区)。
* 确保按分区ID的顺序为每个分区返回key-value对。
*
* 目前,我们只是一次性合并所有溢出的文件,但是可以对其进行修改以支持分层合并。
*/
def partitionedIterator: Iterator[(Int, Iterator[Product2[K, C]])] = {
val usingMap = aggregator.isDefined
val collection: WritablePartitionedPairCollection[K, C] = if (usingMap) map else buffer
// 如果没有发生磁盘溢写
if (spills.isEmpty) {
// 特殊情况: 如果我们只有内存中的数据,则不需要合并流,也许我们甚至不需要除了按分区ID外的其他任何东西排序
if (!ordering.isDefined) {
// 用户并没有要求排序后的keys,因此数据只是按照partitionId排序,并不会对key进行排序
groupByPartition(destructiveIterator(collection.partitionedDestructiveSortedIterator(None)))
} else {
// 否则我们需要先按照partitionId排序,然后分区内部对key进行排序
groupByPartition(destructiveIterator(
collection.partitionedDestructiveSortedIterator(Some(keyComparator))))
}
} else {
// 如果发生了溢写操作,则需要将磁盘上溢写文件和内存里的数据进行合并排序
merge(spills, destructiveIterator(
collection.partitionedDestructiveSortedIterator(comparator)))
}
}
生成索引文件
map任务输出结果数据在持久化的时候会被写入同一文件,那么reduce任务如何从此文件中按照分区读取数据呢?IndexShuffleBlockManager的writeIndexFileAndCommit方法生成的分区索引文件,此文件使用偏移量来区分各个分区的计算结果,偏移量来自合并排序过程中记录的各个partition的长度。
// IndexShuffleBlockManager.scala
/**
* 写出一个索引文件,其中包含每个块的偏移量,并在输出文件末尾添加一个最终偏移量。
* getBlockData将使用它来确定每个块的开始和结束位置。
*
* 它将把数据和索引文件作为一个原子操作提交,使用现有的或用新的替换。
*
* Note: 如果使用现有索引文件,则将更新“lengths”信息以匹配现有索引文件。
*/
def writeIndexFileAndCommit(
shuffleId: Int,
mapId: Int,
lengths: Array[Long],
dataTmp: File): Unit = {...}
Shuffle Reader过程解析
下游的reduce任务可能是ShuffleMapTask也有可能是ResultTask,首先会去Driver获取parent stage中ShuffleMapTask输出的位置信息,根据位置信息获取index文件,然后解析index文件,从index文件中获取相关的位置等信息,然后读data文件获取属于自己那部分内容。这都是在ShuffleRDD的compute方法中实现的。
// ShuffleRDD.scala
override def compute(split: Partition, context: TaskContext): Iterator[(K, C)] = {
// ResultTask或者ShuffleMapTask在执行到ShuffleRDD时,
// 肯定会调用ShuffleRDD的compute方法来计算当前这个RDD的partition数据
val dep = dependencies.head.asInstanceOf[ShuffleDependency[K, V, C]]
// 获取ShuffleManager的reader去拉取需要聚合的数据
SparkEnv.get.shuffleManager.getReader(dep.shuffleHandle, split.index, split.index + 1, context)
.read()
.asInstanceOf[Iterator[(K, C)]]
}
前面已经提到shuffle的reader的实现类为BlockStoreShuffleReader,通过其read方法实现数据读取。
- 获取map任务输出的位置信息。
- 获取blocks数据:创建一个ShuffleBlockFetcherIterator,它获取多个块,对于本地块从本地读取,对于远程块则通过远程方法读取。
- reduce端聚合数据:如果map端已经聚合过了,则对读取到的聚合结果进行聚合。如果map端没有聚合,则针对未合并的<k,v>进行聚合。
- reduce端排序数据:如果需要对key排序,则进行排序。基于sort的shuffle实现过程中,默认只是按照partitionId排序。在每一个partition内部并没有排序,因此添加了keyOrdering变量,提供是否需要对分区内部的key排序。
/** Read the combined key-values for this reduce task */ override def read(): Iterator[Product2[K, C]] = { // 构造ShuffleBlockFetcherIterator,一个迭代器,它获取多个块,对于本地块,从本地读取 // 对于远程块,通过远程方法读取 // 基于配置文件对于流进行包装 val wrappedStreams = new ShuffleBlockFetcherIterator( context, blockManager.shuffleClient, blockManager, // MapOutputTracker在SparkEnv启动的时候实例化 mapOutputTracker.getMapSizesByExecutorId(handle.shuffleId, startPartition, endPartition), serializerManager.wrapStream, // Note: we use getSizeAsMb when no suffix is provided for backwards compatibility SparkEnv.get.conf.getSizeAsMb("spark.reducer.maxSizeInFlight", "48m") * 1024 * 1024, SparkEnv.get.conf.getInt("spark.reducer.maxReqsInFlight", Int.MaxValue), SparkEnv.get.conf.get(config.REDUCER_MAX_BLOCKS_IN_FLIGHT_PER_ADDRESS), SparkEnv.get.conf.get(config.REDUCER_MAX_REQ_SIZE_SHUFFLE_TO_MEM), SparkEnv.get.conf.getBoolean("spark.shuffle.detectCorrupt", true)) val serializerInstance = dep.serializer.newInstance() // 对于每一个流创建一个key/value迭代器 val recordIter = wrappedStreams.flatMap { case (blockId, wrappedStream) => // Note: 下面的asKeyValueIterator将key/value迭代器包装在NextIterator内部。 // NextIterator确保在读取所有记录后对底层的InputStream调用close()方法。 serializerInstance.deserializeStream(wrappedStream).asKeyValueIterator } // 为每个读取的记录更新上下文任务度量。 val readMetrics = context.taskMetrics.createTempShuffleReadMetrics() val metricIter = CompletionIterator[(Any, Any), Iterator[(Any, Any)]]( recordIter.map { record => readMetrics.incRecordsRead(1) record }, context.taskMetrics().mergeShuffleReadMetrics()) // 为了支持任务取消,必须在此处使用可中断的迭代器 val interruptibleIter = new InterruptibleIterator[(Any, Any)](context, metricIter) // 如果reduce端需要聚合 val aggregatedIter: Iterator[Product2[K, C]] = if (dep.aggregator.isDefined) { // 如果map端已经聚合过了,则对读取到的聚合结果进行聚合 if (dep.mapSideCombine) { // 我们正在读取已经合并的值 val combinedKeyValuesIterator = interruptibleIter.asInstanceOf[Iterator[(K, C)]] // 针对map端各个partition对key进行聚合后的结果再次聚合 dep.aggregator.get.combineCombinersByKey(combinedKeyValuesIterator, context) } else { // 如果map端没有聚合,则针对未合并的<k,v>进行聚合 // 我们不知道值的类型,但也不在乎--依赖*应该*确保与该聚合器兼容,这将把值类型转换为合并了的类型C val keyValuesIterator = interruptibleIter.asInstanceOf[Iterator[(K, Nothing)]] dep.aggregator.get.combineValuesByKey(keyValuesIterator, context) } } else { require(!dep.mapSideCombine, "Map-side combine without Aggregator specified!") interruptibleIter.asInstanceOf[Iterator[Product2[K, C]]] } // 如果定义了key的排序器,则进行排序。基于sort的shuffle实现过程中,默认只是按照partitionId排序 // 在每一个partition内部并没有排序,因此添加了keyOrdering变量,提供是否需要对分区内部的key排序 dep.keyOrdering match { case Some(keyOrd: Ordering[K]) => // 为了减少内存压力和避免GC开销,引入了外部排序器,当内存不足时会根据配置文件 // spark.shuffle.spill决定是否进行spill操作 val sorter = new ExternalSorter[K, C, C](context, ordering = Some(keyOrd), serializer = dep.serializer) sorter.insertAll(aggregatedIter) context.taskMetrics().incMemoryBytesSpilled(sorter.memoryBytesSpilled) context.taskMetrics().incDiskBytesSpilled(sorter.diskBytesSpilled) context.taskMetrics().incPeakExecutionMemory(sorter.peakMemoryUsedBytes) CompletionIterator[Product2[K, C], Iterator[Product2[K, C]]](sorter.iterator, sorter.stop()) case None => // 不需要排序直接返回 aggregatedIter } }
获取map任务输出的位置信息
在executor上执行的任务通过MapOutputTracker(确切地说是其子类MapOutputTrackerWorker)的getMapSizesByExecutorId去获取MapStatus。
- 首先查看executor本地的mapStatuses中是否已经有了该shuffleId的信息,如果没有则同步查看是否有其他线程进行获取该shuffleId的map任务状态。
- 如果没有则向MapOutputTrackerMasterEndpoint发送GetMapOutputStatuses消息。注意executor端发送请求是同步请求的。
- MapOutputTrackerMasterEndpoint收到消息之后,MapOutputTrackerMaster会添加这个请求到队列,并且它有一个后台线程一直不断从该队列获取请求,获取请求之后返回。
- 等获取到driver返回的map任务信息后,通过convertMapStatuses将其转换为map任务所在的地址及其数据大小后添加到本地的mapStatuses映射中。
// MapOutputTracker.scala # MapOutputTrackerWorker override def getMapSizesByExecutorId(shuffleId: Int, startPartition: Int, endPartition: Int) : Seq[(BlockManagerId, Seq[(BlockId, Long)])] = { logDebug(s"Fetching outputs for shuffle $shuffleId, partitions $startPartition-$endPartition") // 根据shuffleId获取MapStatus val statuses = getStatuses(shuffleId) try { // 将得到MapStatus数组进行转换为map任务所在的地址(即BlockManagerId)和map任务输出中分配给当前reduce任务的Block大小 MapOutputTracker.convertMapStatuses(shuffleId, startPartition, endPartition, statuses) } catch { case e: MetadataFetchFailedException => // We experienced a fetch failure so our mapStatuses cache is outdated; clear it: mapStatuses.clear() throw e } }
获取blocks数据
ShuffleBlockFetcherIterator是读取中间结果的关键。构造ShuffleBlockFetcherIterator的时候会调用到initialize方法,它的初始化过程如下:
- 使用splitLocalRemoteBlocks方法划分本地读取和远程读取的Block的请求。将FetchRequest随机排序后存入fetchRequests: newQueue[FetchRequest]。
- 遍历fetchRequests中的所有FetchRequest,远程请求Block中间结果。
- 调用fetchLocalBlocks获取本地Block。
// ShuffleBlockFetcherIterator.scala private[this] def initialize(): Unit = { // Add a task completion callback (called in both success case and failure case) to cleanup. context.addTaskCompletionListener(_ => cleanup()) // 切分本地和远程的block val remoteRequests = splitLocalRemoteBlocks() // 然后进行随机排序 fetchRequests ++= Utils.randomize(remoteRequests) assert ((0 == reqsInFlight) == (0 == bytesInFlight), "expected reqsInFlight = 0 but found reqsInFlight = " + reqsInFlight + ", expected bytesInFlight = 0 but found bytesInFlight = " + bytesInFlight) // Send out initial requests for blocks, up to our maxBytesInFlight // 发送请求到远程获取数据 fetchUpToMaxBytes() val numFetches = remoteRequests.size - fetchRequests.size logInfo("Started " + numFetches + " remote fetches in" + Utils.getUsedTimeMs(startTime)) // 拉取本地的数据 fetchLocalBlocks() logDebug("Got local blocks in " + Utils.getUsedTimeMs(startTime)) }
划分本地与远程Block
splitLocalRemoteBlocks方法用于划分哪些Block从本地获取,哪些需要远程拉取,是获取中间计算结果的关键。遍历已经按照BlockManagerId分组的blockInfo,如果blockInfo所在的Executor与当前Executor相同,则将它的BlockId存入localBlocks;否则,将blockInfo的BlockId和size累加到curBlocks,将blockId存入remoteBlocks,curRequestSize增加size的大小,每当curRequestSize >= targetRequestSize,则新建FetchRequest放入remoteRequests,并且为生成下一个FetchRequest做一些准备(如新建curBlocks,curRequestSize置为0)。遍历结束,curBlocks中如果仍然有缓存的(BlockId, Long),新建FetchRequest放入remoteRequests。此次请求不受maxBytesInFlight和targetRequestSize的影响。
// ShuffleBlockFetcherIterator.scala
private[this] def splitLocalRemoteBlocks(): ArrayBuffer[FetchRequest] = {
// 远端请求从最多5个node去获取数据,每一个节点拉取的数据取决于spark.reducer.maxMbInFlight即maxBytesInFlight参数
// 加入整个集群只允许每次在5台拉取5G的数据,那么每一节点只允许拉取1G数据,这样就可以允许他们并行从5个节点获取,
// 而不是主动从一个节点获取
val targetRequestSize = math.max(maxBytesInFlight / 5, 1L)
logDebug("maxBytesInFlight: " + maxBytesInFlight + ", targetRequestSize: " + targetRequestSize)
// 创建FetchRequest队列,用于存放拉取的数据的请求,每一个请求可能包含多个block,
// 具体多少取决于总的请求block大小是否超过目标阀值
val remoteRequests = new ArrayBuffer[FetchRequest]
var totalBlocks = 0
for ((address, blockInfos) <- blocksByAddress) {
// 获取block的大小,并更新总的block数量信息
totalBlocks += blockInfos.size
// 要获取的数据在本地
if (address.executorId == blockManager.blockManagerId.executorId) {
// 更新要从本地block拉取的集合
localBlocks ++= blockInfos.filter(_._2 != 0).map(_._1)
// 更新要拉取的block数量
numBlocksToFetch += localBlocks.size
} else {
//数据不在本地时
val iterator = blockInfos.iterator
var curRequestSize = 0L // 当前请求的大小
// 存放当前的远端请求
var curBlocks = new ArrayBuffer[(BlockId, Long)]
// 遍历每一个block
while (iterator.hasNext) {
val (blockId, size) = iterator.next()
// 过滤掉空的block
if (size > 0) {
curBlocks += ((blockId, size))
// 更新要拉取的远端的blockId的集合列表
remoteBlocks += blockId
// 更新要拉取的block数量
numBlocksToFetch += 1
curRequestSize += size
} else if (size < 0) {
throw new BlockException(blockId, "Negative block size " + size)
}
// 如果当前请求的大小已经超过了阀值
if (curRequestSize >= targetRequestSize) {
// 创建一个新的FetchRequest,放到请求队列
remoteRequests += new FetchRequest(address, curBlocks)
// 重置当前block列表
curBlocks = new ArrayBuffer[(BlockId, Long)]
logDebug(s"Creating fetch request of $curRequestSize at $address")
// 重置当前请求数量为0
curRequestSize = 0
}
}
// 添加最终的请求
if (curBlocks.nonEmpty) {
remoteRequests += new FetchRequest(address, curBlocks)
}
}
}
logInfo(s"Getting $numBlocksToFetch non-empty blocks out of $totalBlocks blocks")
remoteRequests
}
获取远程Block
fetchUpToMaxBytes用于请求远程结果,其通过send方法向某个远程节点发送中间结果拉取请求。利用FetchRequest里封装的blockId、size、address等信息,调用ShuffleClient的fetchBlocks方法获取其他节点上的中间计算结果。
// ShuffleBlockFetcherIterator.scala
private def fetchUpToMaxBytes(): Unit = {
// 发送拉取请求,直到达到maxBytesInFlight大小。
// 如果您不能立即从远程主机拉取,则将请求推迟到下一次可以处理的时间。
// 如果可能,处理所有未完成的延迟获取请求。
if (deferredFetchRequests.nonEmpty) {
for ((remoteAddress, defReqQueue) <- deferredFetchRequests) {
while (isRemoteBlockFetchable(defReqQueue) &&
!isRemoteAddressMaxedOut(remoteAddress, defReqQueue.front)) {
val request = defReqQueue.dequeue()
logDebug(s"Processing deferred fetch request for $remoteAddress with "
+ s"${request.blocks.length} blocks")
send(remoteAddress, request)
if (defReqQueue.isEmpty) {
deferredFetchRequests -= remoteAddress
}
}
}
}
// 如果可能,处理任何常规的拉取请求。
while (isRemoteBlockFetchable(fetchRequests)) {
val request = fetchRequests.dequeue()
val remoteAddress = request.address
if (isRemoteAddressMaxedOut(remoteAddress, request)) {
logDebug(s"Deferring fetch request for $remoteAddress with ${request.blocks.size} blocks")
val defReqQueue = deferredFetchRequests.getOrElse(remoteAddress, new Queue[FetchRequest]())
defReqQueue.enqueue(request)
deferredFetchRequests(remoteAddress) = defReqQueue
} else {
send(remoteAddress, request)
}
}
def send(remoteAddress: BlockManagerId, request: FetchRequest): Unit = {
sendRequest(request)
numBlocksInFlightPerAddress(remoteAddress) =
numBlocksInFlightPerAddress.getOrElse(remoteAddress, 0) + request.blocks.size
}
def isRemoteBlockFetchable(fetchReqQueue: Queue[FetchRequest]): Boolean = {
fetchReqQueue.nonEmpty &&
(bytesInFlight == 0 ||
(reqsInFlight + 1 <= maxReqsInFlight &&
bytesInFlight + fetchReqQueue.front.size <= maxBytesInFlight))
}
// 检查发送新的获取请求是否会超过从给定远程地址获取的最大块数。
def isRemoteAddressMaxedOut(remoteAddress: BlockManagerId, request: FetchRequest): Boolean = {
numBlocksInFlightPerAddress.getOrElse(remoteAddress, 0) + request.blocks.size >
maxBlocksInFlightPerAddress
}
}
获取本地Block
fetchLocalBlocks用于对本地中间计算结果的获取。其利用熟悉的BlockManager的getBlockData方法获取本地Block,最后将取到的中间结果存入results = new LinkedBlockingQueue[FetchResult]中。
/**
* 当我们获取远程块的同时,获取本地块。
* 这是可以的,因为在创建输入流时会延迟分配`ManagedBuffer`的内存,
* 因此我们在内存中跟踪的所有内容都是ManagedBuffer引用本身。
*/
private[this] def fetchLocalBlocks() {
val iter = localBlocks.iterator
while (iter.hasNext) {
val blockId = iter.next()
try {
val buf = blockManager.getBlockData(blockId)
shuffleMetrics.incLocalBlocksFetched(1)
shuffleMetrics.incLocalBytesRead(buf.size)
buf.retain()
results.put(new SuccessFetchResult(blockId, blockManager.blockManagerId, 0, buf, false))
} catch {
case e: Exception =>
// 如果发现异常,则立即停止。
logError(s"Error occurred while fetching local blocks", e)
results.put(new FailureFetchResult(blockId, blockManager.blockManagerId, e))
return
}
}
}
reduce端聚合数据
如果map端已经聚合过了,则对读取到的聚合结果进行聚合。如果map端没有聚合,则针对未合并的<k,v>进行聚合。对于map端已经聚合过的情况,聚合操作主要依赖aggregator的combineCombinersByKey方法,其主要依赖ExternalAppendOnlyMap的insertAll方法完成的,其实质也是使用SizeTrackingAppendOnlyMap。
// Aggregator.scala
def combineCombinersByKey(
iter: Iterator[_ <: Product2[K, C]],
context: TaskContext): Iterator[(K, C)] = {
val combiners = new ExternalAppendOnlyMap[K, C, C](identity, mergeCombiners, mergeCombiners)
combiners.insertAll(iter)
updateMetrics(context, combiners)
combiners.iterator
}
// ExternalAppendOnlyMap.scala
/**
* 将给定的键和值的迭代器插入到map中。
*
* 当底层的map需要增长时,检查shuffle内存的全局池是否有足够的空间来发生这种情况。
* 如果有,则分配增长该map所需的内存;否则,将内存中的map数据溢写出到磁盘。
*
* 不会跟踪第一个trackMemoryThreshold条目的shuffle内存使用情况。
*/
def insertAll(entries: Iterator[Product2[K, V]]): Unit = {
if (currentMap == null) {
throw new IllegalStateException(
"Cannot insert new elements into a map after calling iterator")
}
// 我们跨数据条目地重复使用该map的update函数,以避免每次都分配新的闭包
var curEntry: Product2[K, V] = null
val update: (Boolean, C) => C = (hadVal, oldVal) => {
if (hadVal) mergeValue(oldVal, curEntry._2) else createCombiner(curEntry._2)
}
while (entries.hasNext) {
curEntry = entries.next()
val estimatedSize = currentMap.estimateSize()
if (estimatedSize > _peakMemoryUsedBytes) {
_peakMemoryUsedBytes = estimatedSize
}
if (maybeSpill(currentMap, estimatedSize)) {
currentMap = new SizeTrackingAppendOnlyMap[K, C]
}
currentMap.changeValue(curEntry._1, update)
addElementsRead()
}
}
reduce端排序数据
基于sort的shuffle实现过程中,默认只是按照partitionId排序。在每一个partition内部并没有排序,因此添加了keyOrdering变量,提供是否需要对分区内部的key排序。如果在RDD的shuffle中指定了key排序,则使用ExternalSorter的insertAll方法进行排序,详细内容可参考Shuffle Writer中的数据聚合与排序小节。