天天看點

Spark源碼分析之:ShuffleCoarseGrainedExecutorBackendShufleWriter

這一篇我們來分析Spark2.1的Shuffle流程。

其實ShuffleDependency從SparkContext初始化就已經被DAGScheduler劃分好了,本文主要探讨在Task運作過程中的ShufleWrite和ShuffleRead。

要從Task運作開始說起,就要知道Task在哪裡運作的。我們普遍認為Executor是負責執行Task的,但是我們發現Executor其實就是一個類

private[spark] class Executor(){}
           

而在一個Application送出後,用JPS指令檢視,會發現有AplicationMaster程序和CoarseGrainedExecutorBackend程序。沒錯,後者就是真實管理運作Task的程序。

CoarseGrainedExecutorBackend

做為一個程序,CoarseGrainedExecutorBackend負責與ApplicationMaster進行RPC通信,其中有一個receive方法重寫于RpcEndpoint,負責接收消息,當收到啟動task的指令時,就會調用Executor的launchTask方法,此方法負責啟動task。

Executor類中還有一個TaskRunner内部類,是一個實作了Runnable接口的線程。啟動task也就是将序列化task的描述資訊封裝成一個TaskRunner,将其放線上程池中運作。

//Executor類
def launchTask(
                  context: ExecutorBackend,
                  taskId: Long,
                  attemptNumber: Int,
                  taskName: String,
                  serializedTask: ByteBuffer): Unit = {
    // Runnable 接口的對象.
    val tr = new TaskRunner(context, taskId = taskId, attemptNumber = attemptNumber, taskName,
        serializedTask)
    runningTasks.put(taskId, tr)
    // 線上程池中執行 task
    threadPool.execute(tr)
}
           

由此我們看出,其實負責執行task的還是Executor,而與ApplicationMaster通信的卻是CoarseGrainedExecutorBackend,也就是CoarseGrainedExecutorBackend是一個中間代理商。

TaskRunner的run方法是task的具體實作邏輯。

@volatile var task: Task[Any] = _
override def run(): Unit = {
    //...
    // 1.更新 task 的狀态
    execBackend.statusUpdate(taskId, TaskState.RUNNING, EMPTY_BYTE_BUFFER)
    // 2.把任務相關的資料反序列化出來
    val (taskFiles, taskJars, taskProps, taskBytes) = Task.deserializeWithDependencies(serializedTask)
    //3.反序列化确定task屬于哪個類型
    task = ser.deserialize[Task[Any]](taskBytes, Thread.currentThread.getContextClassLoader)
    // 4.開始運作 task
    val res = task.run(
        taskAttemptId = taskId,
        attemptNumber = attemptNumber,
        metricsSystem = env.metricsSystem)
           

 上述代碼第3步,确定task是哪個類型,Task是個抽象類,有以下實作類:

Spark源碼分析之:ShuffleCoarseGrainedExecutorBackendShufleWriter

 主要實作為:ShuffleMapTask和ResultTask。

Spark中的Stage分為兩種,ShuffleMapStage和ResultStage,ResultStage就是finallStage,也就是action算子所在的Stage,前面的所有Stage都是ShuffleMapStage。ShuffleMapStage對應的Task就是ShuffleMapTask,而ResultStage對應的Task就是ResultTask。

Spark源碼分析之:ShuffleCoarseGrainedExecutorBackendShufleWriter

ShufleWriter

也就是說,首先運作的task是ShuffleMapTask。那麼上面第4步的task.run會調用ShuffleMapTask的runtask方法。

//ShuffleMapTask類
override def runTask(context: TaskContext): MapStatus = {
    //...
    var writer: ShuffleWriter[Any, Any] = null
    try {
        val manager = SparkEnv.get.shuffleManager
        //1.擷取ShuffleWriter,傳入一個shuffleHandle
        writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)
        //2.寫資料
        writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
        writer.stop(success = true).get
    } catch {
        //...
    }
           

 如上代碼中所知,ShuffleWriter為ShuffleManager.getWriter所得。

getWriter

ShuffleManager為一個trait,其實作類隻有1個,為SortShuffleManager。

Spark源碼分析之:ShuffleCoarseGrainedExecutorBackendShufleWriter

 并且傳入了一個shuffleHandle,點進去發現在shuffleDependency方法下被初始化:

val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle(
    shuffleId, _rdd.partitions.length, this)
           

點開此SortShuffleManager的registerShuffle方法:

override def registerShuffle[K, V, C](
                                         shuffleId: Int,
                                         numMaps: Int,
                                         dependency: ShuffleDependency[K, V, C]): ShuffleHandle = {
    if (SortShuffleWriter.shouldBypassMergeSort(SparkEnv.get.conf, dependency)) {
        // If there are fewer than spark.shuffle.sort.bypassMergeThreshold partitions and we don't
        // need map-side aggregation, then write numPartitions files directly and just concatenate
        // them at the end. This avoids doing serialization and deserialization twice to merge
        // together the spilled files, which would happen with the normal code path. The downside is
        // having multiple files open at a time and thus more memory allocated to buffers.
        new BypassMergeSortShuffleHandle[K, V](
            shuffleId, numMaps, dependency.asInstanceOf[ShuffleDependency[K, V, V]])
    } else if (SortShuffleManager.canUseSerializedShuffle(dependency)) {
        // Otherwise, try to buffer map outputs in a serialized form, since this is more efficient:
        new SerializedShuffleHandle[K, V](
            shuffleId, numMaps, dependency.asInstanceOf[ShuffleDependency[K, V, V]])
    } else {
        // Otherwise, buffer map outputs in a deserialized form:
        new BaseShuffleHandle(shuffleId, numMaps, dependency)
    }
}
           

 此方法用于注冊shuffle,也就是選擇使用何種shuffle。

先看第一種:BypassMergeSortShuffleHandle

private[spark] object SortShuffleWriter {
    def shouldBypassMergeSort(conf: SparkConf, dep: ShuffleDependency[_, _, _]): Boolean = {
        // We cannot bypass sorting if we need to do map-side aggregation.
        //如果需要在map端聚合,如reduceByKey算子,則不能使用。
        if (dep.mapSideCombine) {
            require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!")
            false
        } else {//擷取"spark.shuffle.sort.bypassMergeThreshold"參數的值,預設為200
            val bypassMergeThreshold: Int = conf.getInt("spark.shuffle.sort.bypassMergeThreshold", 200)
            //分區數 <= 200使用
            dep.partitioner.numPartitions <= bypassMergeThreshold
        }
    }
}
           

 使用此Shuffle方式有兩個條件:

  1. 在map端不能有聚合;
  2. 分區數<=200;

第二種:canUseSerializedShuffle

def canUseSerializedShuffle(dependency: ShuffleDependency[_, _, _]): Boolean = {
        val shufId = dependency.shuffleId
        val numPartitions = dependency.partitioner.numPartitions
        //需要支援序列化重定向
        if (!dependency.serializer.supportsRelocationOfSerializedObjects) {
            log.debug(s"Can't use serialized shuffle for shuffle $shufId because the serializer, " +
                s"${dependency.serializer.getClass.getName}, does not support object relocation")
            false
        } 
        //不能在map端聚合
        else if (dependency.aggregator.isDefined) {
            log.debug(
                s"Can't use serialized shuffle for shuffle $shufId because an aggregator is defined")
            false
        } 
        //分區數大于16777216(2^24)
        else if (numPartitions > MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE) {
            log.debug(s"Can't use serialized shuffle for shuffle $shufId because it has more than " +
                s"$MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE partitions")
            false
        } else {
            log.debug(s"Can use serialized shuffle for shuffle $shufId")
            true
        }
    }
           

 使用此Shuffle方式有三個條件: 

  1. 支援序列化重定向,Serializer可以對已經序列化的對象進行排序,這種排序起到的效果和先對資料排序再序列化一緻。支援relocation的Serializer是KryoSerializer和SparkSQL的custom serializers。
  2. 不能在map端進行聚合。
  3. 分區數不能大于最大分區數2^24個。

如果不符合以上兩種,則選擇BaseShuffleHandle

接下來回過頭來看getWriter方法:

override def getWriter[K, V](
                                    handle: ShuffleHandle,
                                    mapId: Int,
                                    context: TaskContext): ShuffleWriter[K, V] = {
        numMapsForShuffle.putIfAbsent(
            handle.shuffleId, handle.asInstanceOf[BaseShuffleHandle[_, _, _]].numMaps)
        val env = SparkEnv.get
        // 根據不同的 Handle, 建立不同的 ShuffleWriter
        handle match {
            case unsafeShuffleHandle: SerializedShuffleHandle[[email protected], [email protected]] =>
                new UnsafeShuffleWriter(
                    env.blockManager,
                    shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver],
                    context.taskMemoryManager(),
                    unsafeShuffleHandle,
                    mapId,
                    context,
                    env.conf)
            case bypassMergeSortHandle: BypassMergeSortShuffleHandle[[email protected], [email protected]] =>
                new BypassMergeSortShuffleWriter(
                    env.blockManager,
                    shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver],
                    bypassMergeSortHandle,
                    mapId,
                    context,
                    env.conf)
            case other: BaseShuffleHandle[[email protected], [email protected], _] =>
                new SortShuffleWriter(shuffleBlockResolver, other, mapId, context)
        }
    }
           

據上所知,Handle對應的ShuffleWrite為:

  • SerializedShuffleHandle -> UnsafeShuffleWriter
  • BypassMergeSortShuffleHandle -> BypassMergeSortShuffleWriter
  • BaseShuffleHandle -> SortShuffleWriter

Write

BypassMergeSortShuffleWriter

此ShuffleWriter缺點比較大,僅供不需要聚合排序,分區數小于等于200時使用,其内部原理簡單,但是需要注意其中一行代碼:

public void write(Iterator<Product2<K, V>> records) throws IOException {
    //...
    partitionWriters = new DiskBlockObjectWriter[numPartitions];
    //...
}
           

 DiskBlockObjectWriter為緩沖塊block,此方式為每個partition都開辟了一塊32K大小的block緩沖區,當partition數量為10000個時,單一個MapTask就需要約320M記憶體,記憶體消耗過大,這也就是分區數小于200個使用的原因。

最後會将資料封裝成一個MapStatus發送給Driver。

SortShuffleWriter

override def write(records: Iterator[Product2[K, V]]): Unit = {
        // 一:
        sorter = if (dep.mapSideCombine) {
            require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!")
            new ExternalSorter[K, V, C](
                context, dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer)
        } else {
            // In this case we pass neither an aggregator nor an ordering to the sorter, because we don't
            // care whether the keys get sorted in each partition; that will be done on the reduce side
            // if the operation being run is sortByKey.
            new ExternalSorter[K, V, V](
                context, aggregator = None, Some(dep.partitioner), ordering = None, dep.serializer)
        }
        // 将 Map 任務的輸出記錄插入到緩存中
        sorter.insertAll(records)

        // Don't bother including the time to open the merged output file in the shuffle write time,
        // because it just opens a single file, so is typically too fast to measure accurately
        // (see SPARK-3570).
        // 資料 shuffle 資料檔案
        val output = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId)
        val tmp = Utils.tempFileWith(output)
        try { // 将 map 端緩存的資料寫入到磁盤中, 并生成 Block 檔案對應的索引檔案.
            val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID)
            // 記錄各個分區資料的長度
            val partitionLengths = sorter.writePartitionedFile(blockId, tmp)
            // 生成 Block 檔案對應的索引檔案. 此索引檔案用于記錄各個分區在 Block檔案中的偏移量, 以便于
            // Reduce 任務拉取時使用
            shuffleBlockResolver.writeIndexFileAndCommit(dep.shuffleId, mapId, partitionLengths, tmp)
            mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths)
        } finally {
            if (tmp.exists() && !tmp.delete()) {
                logError(s"Error while deleting temp file ${tmp.getAbsolutePath}")
            }
        }
    }
           
  • 根據不同的場景選取不同的方式。如果map端開啟了聚合,則
new ExternalSorter[K, V, C](context, dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer)

傳入了一個聚合器和一個排序器。而如果沒有聚合,則

new ExternalSorter[K, V, V](context, aggregator = None, Some(dep.partitioner), ordering = None, dep.serializer)

是以隻有在map端需要聚合的算子才會排序,像sortByKey這種算子map不會排序,而是放在reduce端排序。

  • 接下來一步 sorter.insertAll(records)

records就是write方法的形參,也就是我們傳過來的資料。那這個insertAll是幹嘛的?

insertAll方法是我們第一步new出來的ExternalSorter類中的方法,

def insertAll(records: Iterator[Product2[K, V]]): Unit = {
    // TODO: stop combining if we find that the reduction factor isn't high
    val shouldCombine = aggregator.isDefined

    if (shouldCombine) {
      // Combine values in-memory first using our AppendOnlyMap
      val mergeValue = aggregator.get.mergeValue
      val createCombiner = aggregator.get.createCombiner
      var kv: Product2[K, V] = null
      val update = (hadValue: Boolean, oldValue: C) => {
        if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2)
      }
      //循環放資料
      while (records.hasNext) {
        //記錄自上次spill以來記憶體中的資料條數
        addElementsRead()
        //資料
        kv = records.next()
        //PartitionedAppendOnlyMap中update資料
        map.changeValue((getPartition(kv._1), kv._1), update)
        //檢查資料是否存放的下,如果存放不下則會擴充記憶體或者spill
        maybeSpillCollection(usingMap = true)
      }
    } else {
      // Stick values into our buffer
      //PartitionedPairBuffer
      while (records.hasNext) {
        addElementsRead()
        val kv = records.next()
        buffer.insert(getPartition(kv._1), kv._1, kv._2.asInstanceOf[C])
        maybeSpillCollection(usingMap = false)
      }
    }
  }
           

 為了提高聚合和排序的性能,Spark為ShuffleWriter的聚合排序過程設計了兩種資料結構:

Spark源碼分析之:ShuffleCoarseGrainedExecutorBackendShufleWriter
  •  PartitionedAppendOnlyMap:如果設定了Aggregator,我們可以将對象放入AppendOnlyMap中以将它們組合在一起
  •  PartitionedPairBuffer:如果沒有設定Aggregator,則放入buffer中。

下面我們先來看看這兩個資料結構

PartitionedAppendOnlyMap:是AppendOnlyMap的間接子類,在ShuffleWriter端使用,其内部如下:

Spark源碼分析之:ShuffleCoarseGrainedExecutorBackendShufleWriter

操作資料僅有一個insert方法。調用的是其父類AppendOnlyMap的update方法。

AppendOnlyMap

此類是一個類似于HashMap的資料結構,使用Hash表進行尋址,但不同于HashMap的是,AppendOnlyMap隻有update操作(暫且認為添加也算update),因為Shuffle的資料不需要删除,隻需要聚合,是以這樣設計一個輕量級的HashMap合情合理。另外一點不同的是,AppendOnlyMap解決Hash碰撞的方法為具有2的幂的哈希表大小的開放位址法的二次探測法,而HashMap為連結清單法。另外需要注意,AppendOnlyMap最多可存放375809638(0.7 * 2 ^ 29)個元素,0.7為加載因子,也就是當使用率為70%時擴容為原來的2倍。

至于怎麼實作添加操作,如果讀者感興趣可以搜此類的update方法詳細了解。

PartitionedPairBuffer:本質上類似于一個Array數組,僅供排序使用,可按partitionID或partitionID+Key排序。最多支援1073741823 (2 ^ 30 - 1)個元素。

回過頭來看上面的insertAll方法,如果有Aggregator,則使用PartitionedAppendOnlyMap,否則使用PartitionedPairBuffer。在最後,都有一個maybeSpillCollection操作,此方法是判斷每條資料插入記憶體緩沖區之後,是否需要spill檔案。

private def maybeSpillCollection(usingMap: Boolean): Unit = {
    var estimatedSize = 0L
    if (usingMap) {
      //估計目前集合大小
      estimatedSize = map.estimateSize()
      if (maybeSpill(map, estimatedSize)) {//判斷是否可以spill
        //如果spill了,就新開辟一塊新記憶體
        map = new PartitionedAppendOnlyMap[K, C]
      }
    } else {
      estimatedSize = buffer.estimateSize()
      if (maybeSpill(buffer, estimatedSize)) {
        buffer = new PartitionedPairBuffer[K, C]
      }
    }
    //記錄最大使用記憶體量
    if (estimatedSize > _peakMemoryUsedBytes) {
      _peakMemoryUsedBytes = estimatedSize
    }
  }
           

首先估計了目前緩沖區(集合)的大小,這一步是比較困難的。因為雖然我們知道AppendOnlyMap中持有的資料的長度和大小,但是數組裡面存放的是Key和Value的引用,并不是實際對象的大小,而且Value會不斷更新,實際大小不斷變化。是以想要準确的擷取到其大小相當困難,如果簡單的每次插入一條資料都掃描數組中的record并對其大小相加,那麼時間複雜度太高會極大影響效率。此estimateSize方法是Spark設計的一個增量式的高效估算算法,在每個record插入或更新時根據曆史統計值和目前變化量直接估算目前AppendOnlyMap的大小,算法時間複雜度為O(1),開銷很小。在record插入和聚合過程中會定期對目前AppendOnlyMap中的record進行抽樣,然後精确計算這些record的總大小,總個數,更新個數及平均值等,并作為曆史統計值。進行抽樣是因為record個數過多,難以對每個record進行精确計算。之後,每當有record插入或更新時,會根據曆史統計值和曆史平均的變化值,估算AppendOnlyMap的總大小。

拿到估算大小之後,就會嘗試spill,maybeSpill方法如下:

protected def maybeSpill(collection: C, currentMemory: Long): Boolean = {
    var shouldSpill = false
    //elementsRead:自上次spill後,讀取的元素個數。myMemoryThreshold:預設5M
    if (elementsRead % 32 == 0 && currentMemory >= myMemoryThreshold) {
      //申請記憶體,擴容2倍。
      //擴容2倍需要開辟的新記憶體。比如已占用6M,則需要新申請 12-5 = 7M
      val amountToRequest = 2 * currentMemory - myMemoryThreshold
      //實際申請到的新記憶體
      val granted = acquireMemory(amountToRequest)
      //現在實際擁有的記憶體。在記憶體足夠用也就是可以全部申請來的時候,myMemoryThreshold=2*currentMemory
      myMemoryThreshold += granted
      //如果擁有的記憶體小于已使用的記憶體,則會spill
      shouldSpill = currentMemory >= myMemoryThreshold
    }
    //如果資料量過大,已經超出了long的最大值,則仍然會spill
    shouldSpill = shouldSpill || _elementsRead > numElementsForceSpillThreshold
    if (shouldSpill) {// 執行spill
      _spillCount += 1
      logSpillage(currentMemory)
      spill(collection)
      _elementsRead = 0
      _memoryBytesSpilled += currentMemory
      releaseMemory()
    }
    shouldSpill
  }
           

 elementsRead為緩沖區元素個數,如果是32的倍數,并且使用記憶體已經大于預設的spill門檻值(預設5M),則擴容兩倍。如果擴容之後的容量還不足以放下目前使用容量,則溢出。舉個例子:假若目前使用了8M記憶體,也就是currentMemory=8M,而myMemoryThreshold=5M,是以會擴容到16M,需要新申請16-5=11M,但是我隻申請到了1M,那麼此時myMemoryThreshold=6M 小于 currentMemory,就需要溢出。另外如果元素個數超過了Long類型的最大值,則也會spill。

在insertAll方法之後,會将資料封裝成一個MapStatus發送給Driver。

UnsafeShuffleWriter

此方式是Spark2.0為提高效率新加入的方式,核心思想是在堆外記憶體中操作序列化的record對象(二進制資料),降低記憶體消耗和GC開銷。正好彌補了分區數超過200時BypassMergeSortShuffleWriter的不足。

Stop

至此stop,Shuffle的Wirte階段結束。

繼續閱讀