天天看點

Spark源碼分析之HashShuffle讀寫流程一HashShuffle寫資料的機制二HashShuffle讀資料機制

一HashShuffle寫資料的機制

1.1HashWriter#write

# 判斷map端是否需要聚合,比如<a,1>和<a,1>都要寫入的話,那麼先生成<a,2>然後再進行後續的寫入工作判斷map端是否允許進行combine操作,如果允許則進行combine操作,否則直接傳回records

# 周遊記錄,并且對資料進行partitioner操作,進行分區,獲得一個分區号bucketIds,根據bucketId取得ShuffleWriterGroup裡的對應的writer将資料寫入檔案

# 通過ShuffleWriterGroup将資料<key,value>寫入

override defwrite(records:Iterator[Product2[K,V]]): Unit = {

  // 判斷map端是否需要聚合,比如<a,1>和<a,1>都要寫入的話,那麼先生成<a,2>然後再進行後續的寫入工作

  val iter= if (dep.aggregator.isDefined) {

    // 判斷map端是否允許進行combine操作,如果允許則進行combine操作,否則直接傳回records

    if (dep.mapSideCombine) {

      dep.aggregator.get.combineValuesByKey(records,context)

    } else {

      records

    }

  } else {

    require(!dep.mapSideCombine,"Map-side combine withoutAggregator specified!")

    records

  }

  // 周遊記錄,并且對資料進行partitioner操作,進行分區,獲得一個分區号bucketIds

  // 根據bucketId取得ShuffleWriterGroup裡的對應的writer将資料寫入檔案

  for (elem <- iter) {

    val bucketId = dep.partitioner.getPartition(elem._1)

    // 通過ShuffleWriterGroup将資料<key,value>寫入

    shuffle.writers(bucketId).write(elem._1, elem._2)

  }

}

1.2 FileShuffleBlockResolver#forMapTask

FileShuffleBlockResolver主要用于管理block的writer,每一個Reducer任務對應着一個檔案。

forMapTask:對于指定的map task擷取一個ShuffleWriterGroup,裡面一個reducer對應着一個writer

def forMapTask(shuffleId: Int, mapId: Int, numBuckets: Int, serializer: Serializer,
    writeMetrics: ShuffleWriteMetrics): ShuffleWriterGroup = {
  new ShuffleWriterGroup {
    shuffleStates.putIfAbsent(shuffleId, new ShuffleState(numBuckets))
    private val shuffleState = shuffleStates(shuffleId)
    private var fileGroup: ShuffleFileGroup = null

    val openStartTime = System.nanoTime
    val serializerInstance = serializer.newInstance()
    // 如果啟動了consolidation機制,spark.shuffle.consolidateFiles置為true
    val writers: Array[DiskBlockObjectWriter] = if (consolidateShuffleFiles) {
      // 擷取那些還沒有使用的檔案組
      fileGroup = getUnusedFileGroup()
      // 傳回reducer個數的DiskBlockObjectWriter對象,比如reducer個數為10則傳回10個,每一個reducer對應着
      // 每一個ShuffleMapTask裡的一個bucketId.即對于每一個bucket,都會擷取一個針對ShuffleFileGroup的
      // writer,而不是一個獨立的ShuffleBlockFile,這樣就實作多個MapTask輸出資料的合并
      Array.tabulate[DiskBlockObjectWriter](numBuckets) { bucketId =>
        val blockId = ShuffleBlockId(shuffleId, mapId, bucketId)
        blockManager.getDiskWriter(blockId, fileGroup(bucketId), serializerInstance, bufferSize,
          writeMetrics)
      }
    } else {//沒有開啟consolidation機制
      // 傳回reducer個數的DiskBlockObjectWriter對象
      Array.tabulate[DiskBlockObjectWriter](numBuckets) { bucketId =>
        // 建立blockId
        val blockId = ShuffleBlockId(shuffleId, mapId, bucketId)
        // 根據blockId擷取Block File
        val blockFile = blockManager.diskBlockManager.getFile(blockId)
        // 如果該檔案已經存在,則删除,因為可能以前失敗的task,已經建立過了
        if (blockFile.exists) {
          if (blockFile.delete()) {
            logInfo(s"Removed existing shuffle file $blockFile")
          } else {
            logWarning(s"Failed to remove existing shuffle file $blockFile")
          }
        }
        // 針對每一個blockFile都會生成一個writer
        blockManager.getDiskWriter(blockId, blockFile, serializerInstance, bufferSize,
          writeMetrics)
      }
    }

    writeMetrics.incShuffleWriteTime(System.nanoTime - openStartTime)
    // 釋放writers
    override def releaseWriters(success: Boolean) {
      // 如果開啟了consolidation機制,則如果成功的話,則記錄FileSegment的offset和length
      if (consolidateShuffleFiles) {
        if (success) {
          val offsets = writers.map(_.fileSegment().offset)
          val lengths = writers.map(_.fileSegment().length)
          fileGroup.recordMapOutput(mapId, offsets, lengths)
        }
        // 回收檔案組
        recycleFileGroup(fileGroup)
      } else {
        // 如果沒有開啟consolidation機制,則直接将完成的map 任務的id放入completedMapTasks
        shuffleState.completedMapTasks.add(mapId)
      }
    }
    // 擷取未使用的檔案組
    private def getUnusedFileGroup(): ShuffleFileGroup = {
      val fileGroup = shuffleState.unusedFileGroups.poll()
      if (fileGroup != null) fileGroup else newFileGroup()
    }
    // 産生一個新的檔案組
    private def newFileGroup(): ShuffleFileGroup = {
      val fileId = shuffleState.nextFileId.getAndIncrement()
      val files = Array.tabulate[File](numBuckets) { bucketId =>
        val filename = physicalFileName(shuffleId, bucketId, fileId)
        blockManager.diskBlockManager.getFile(filename)
      }
      val fileGroup = new ShuffleFileGroup(shuffleId, fileId, files)
      shuffleState.allFileGroups.add(fileGroup)
      fileGroup
    }
    // 回收檔案組
    private def recycleFileGroup(group: ShuffleFileGroup) {
      shuffleState.unusedFileGroups.add(group)
    }
  }
}      

二HashShuffle讀資料機制

當ResultTask或者ShuffleMapTask在執行到ShuffledRDD的時候,肯定會調用compute的時候進行計算,就會通過ShuffleReader讀取資料

2.1HashShuffleReader#read

# 建立ShuffleBlockFetcherIterator,去拉取資料

# 對讀取到到的資料進行流處理

# 對讀取的資料進行聚合處理

# 對基于排序的shuffle機制,處理分區資料的二次排序

在基于排序的shuffle實作過程中,預設僅僅是基于Partitionid進行排序在分區的内部資料是沒有排序的,是以添加了keyOrdering變量,提供是否需要針對分區内部的資料進行排序

為了減少記憶體的壓力,避免GC開銷,引入了外部排序器對資料進行排序;當記憶體不足以容納排序的資料量時,會根據配置的spark.shuffle.spill屬性來決定是否需要spill到磁盤,預設情況下是打開的,如果不打開,在資料量比較大的時候會引發記憶體溢出問題

override def read(): Iterator[Product2[K, C]] = {
  // 建立ShuffleBlockFetcherIterator
  val blockFetcherItr = new ShuffleBlockFetcherIterator(
    context,
    blockManager.shuffleClient,
    blockManager,
    mapOutputTracker.getMapSizesByExecutorId(handle.shuffleId, startPartition),
    // 最多允許理請求總位元組數預設是48M
    SparkEnv.get.conf.getSizeAsMb("spark.reducer.maxSizeInFlight", "48m") * 1024 * 1024)

  // 對讀取到到的資料進行流處理
  val wrappedStreams = blockFetcherItr.map { case (blockId, inputStream) =>
    blockManager.wrapForCompression(blockId, inputStream)
  }

  val ser = Serializer.getSerializer(dep.serializer)
  val serializerInstance = ser.newInstance()

  val recordIter = wrappedStreams.flatMap { wrappedStream =>
    serializerInstance.deserializeStream(wrappedStream).asKeyValueIterator
  }

  val readMetrics = context.taskMetrics.createShuffleReadMetricsForDependency()
  val metricIter = CompletionIterator[(Any, Any), Iterator[(Any, Any)]](
    recordIter.map(record => {
      readMetrics.incRecordsRead(1)
      record
    }),
    context.taskMetrics().updateShuffleReadMetrics())

  // An interruptible iterator must be used here in order to support task cancellation
  val interruptibleIter = new InterruptibleIterator[(Any, Any)](context, metricIter)
  // 對讀取的資料進行聚合處理
  val aggregatedIter: Iterator[Product2[K, C]] = if (dep.aggregator.isDefined) {
    // 如果要求combine,則進行combine,如果map端已經做了聚合處理,那麼這個地方對讀取到的聚合結果進行處理
    if (dep.mapSideCombine) {
      // 針對各個map端各分區對key進行合并的結果再次聚合,map的合并可以大大減少網絡傳輸的資料量
      val combinedKeyValuesIterator = interruptibleIter.asInstanceOf[Iterator[(K, C)]]
      dep.aggregator.get.combineCombinersByKey(combinedKeyValuesIterator, context)
    } else {
      // 針對未合并的key-value的值進行合并
      val keyValuesIterator = interruptibleIter.asInstanceOf[Iterator[(K, Nothing)]]
      dep.aggregator.get.combineValuesByKey(keyValuesIterator, context)
    }
  } else {
    require(!dep.mapSideCombine, "Map-side combine without Aggregator specified!")
    interruptibleIter.asInstanceOf[Iterator[Product2[K, C]]]
  }

  // 在基于排序的shuffle實作過程中,預設僅僅是基于Partitionid進行排序
  // 在分區的内部資料是沒有排序的,是以添加了keyOrdering變量,提供是否需要
  // 針對分區内部的資料進行排序
  dep.keyOrdering match {
    /*
     *  為了減少記憶體的壓力,避免GC開銷,引入了外部排序器對資料進行排序;當記憶體不足以容納排序
     *  的資料量時,會根據配置的spark.shuffle.spill屬性來決定是否需要spill到磁盤,預設情況下
     *  是打開的,如果不打開,在資料量比較大的時候會引發記憶體溢出問題
     */
    case Some(keyOrd: Ordering[K]) =>
      val sorter = new ExternalSorter[K, C, C](ordering = Some(keyOrd), serializer = Some(ser))
      sorter.insertAll(aggregatedIter)
      context.taskMetrics().incMemoryBytesSpilled(sorter.memoryBytesSpilled)
      context.taskMetrics().incDiskBytesSpilled(sorter.diskBytesSpilled)
      context.internalMetricsToAccumulators(
        InternalAccumulator.PEAK_EXECUTION_MEMORY).add(sorter.peakMemoryUsedBytes)
      sorter.iterator
    // 不需要排序的時候直接傳回
    case None =>
      aggregatedIter
  }
}      

2.2ShuffleBlockFetcherIterator# initialize

ShuffleBlockFetcherIterator:從多個block上拉取資料

# 劃分本地和遠端block,确定資料讀取政策,傳回需要在遠端拉取block的請求集合

# 添加遠端請求到隊列

# 向block發送遠端請求,直到達到閥值

# 開始從本地block拉取資料

private[this] def initialize(): Unit = {
  // 添加一個任務完成的回到函數用于清理工作
  context.addTaskCompletionListener(_ => cleanup())

  // 劃分本地和遠端block,确定資料讀取政策,傳回需要在遠端拉取block的請求集合
  val remoteRequests = splitLocalRemoteBlocks()
  // 添加遠端請求到隊列
  fetchRequests ++= Utils.randomize(remoteRequests)

  // Send out initial requests for blocks, up to our maxBytesInFlight
  // 向block發送遠端請求,直到達到閥值
  while (fetchRequests.nonEmpty &&
    (bytesInFlight == 0 || bytesInFlight + fetchRequests.front.size <= maxBytesInFlight)) {
    sendRequest(fetchRequests.dequeue())
  }

  val numFetches = remoteRequests.size - fetchRequests.size
  logInfo("Started " + numFetches + " remote fetches in" + Utils.getUsedTimeMs(startTime))

  // 開始從本地block拉取資料
  fetchLocalBlocks()
  logDebug("Got local blocks in " + Utils.getUsedTimeMs(startTime))
}      

2.3splitLocalRemoteBlocks

劃分本地和遠端block,确定資料讀取政策,傳回需要在遠端拉取block的請求集合

private[this] def splitLocalRemoteBlocks(): ArrayBuffer[FetchRequest] = {
  // 遠端請求從最多5個node去擷取資料,每一個節點拉取的資料取決于spark.reducer.maxMbInFlight即maxBytesInFlight參數
  // 加入整個叢集隻允許每次在5台拉取5G的資料,那麼每一節點隻允許拉取1G資料,這樣就可以允許他們并行從5個節點擷取,
  // 而不是主動從一個節點擷取
  val targetRequestSize = math.max(maxBytesInFlight / 5, 1L)
  logDebug("maxBytesInFlight: " + maxBytesInFlight + ", targetRequestSize: " + targetRequestSize)

  // 建立FetchRequest隊列,用于存放拉取的資料的請求,每一個請求可能包含多個block,
  // 具體多少取決于總的請求block大小是否超過目标閥值
  val remoteRequests = new ArrayBuffer[FetchRequest]

  // Tracks total number of blocks (including zero sized blocks)
  var totalBlocks = 0
  for ((address, blockInfos) <- blocksByAddress) {
    // 擷取block的大小,并更新總的block數量資訊
    totalBlocks += blockInfos.size
    // 要擷取的資料在本地
    if (address.executorId == blockManager.blockManagerId.executorId) {
      // 更新要從本地block拉取的集合
      localBlocks ++= blockInfos.filter(_._2 != 0).map(_._1)
      // 更新要拉取的block數量
      numBlocksToFetch += localBlocks.size
    } else {//資料不在本地時
      val iterator = blockInfos.iterator
      var curRequestSize = 0L // 目前請求的大小
      // 存放目前的遠端請求
      var curBlocks = new ArrayBuffer[(BlockId, Long)]
      // 周遊每一個block
      while (iterator.hasNext) {
        val (blockId, size) = iterator.next()
        // 過濾掉空的block
        if (size > 0) {
          curBlocks += ((blockId, size))
          // 更新要拉取的遠端的blockId的集合清單
          remoteBlocks += blockId
          // 更新要拉取的block數量
          numBlocksToFetch += 1
          curRequestSize += size
        } else if (size < 0) {
          throw new BlockException(blockId, "Negative block size " + size)
        }
        // 如果目前請求的大小已經超過了閥值
        if (curRequestSize >= targetRequestSize) {
          // 建立一個新的FetchRequest,放到請求隊列
          remoteRequests += new FetchRequest(address, curBlocks)
          // 清空目前block清單
          curBlocks = new ArrayBuffer[(BlockId, Long)]
          logDebug(s"Creating fetch request of $curRequestSize at $address")
          // 重置目前請求數量為0
          curRequestSize = 0
        }
      }
      // 最後添加請求到請求隊列
      if (curBlocks.nonEmpty) {
        remoteRequests += new FetchRequest(address, curBlocks)
      }
    }
  }
  logInfo(s"Getting $numBlocksToFetch non-empty blocks out of $totalBlocks blocks")
  remoteRequests
}      

2.4sendRequest:發送遠端fetch請求

private[this] def sendRequest(req: FetchRequest) {
  logDebug("Sending request for %d blocks (%s) from %s".format(
    req.blocks.size, Utils.bytesToString(req.size), req.address.hostPort))
  // 更新正在處理的請求的數量
  bytesInFlight += req.size

  // 将(blockId, size)轉換成map
  val sizeMap = req.blocks.map { case (blockId, size) => (blockId.toString, size) }.toMap
  // 擷取每一個請求的block清單的blockId
  val blockIds = req.blocks.map(_._1.toString)
  // 請求的遠端的位址
  val address = req.address
  // 調用ShuffleClient從遠端擷取資料
  shuffleClient.fetchBlocks(address.host, address.port, address.executorId, blockIds.toArray,
    new BlockFetchingListener {
      override def onBlockFetchSuccess(blockId: String, buf: ManagedBuffer): Unit = {
        if (!isZombie) {
          // Increment the ref count because we need to pass this to a different thread.
          // This needs to be released after use.
          buf.retain()
          results.put(new SuccessFetchResult(BlockId(blockId), address, sizeMap(blockId), buf))
          shuffleMetrics.incRemoteBytesRead(buf.size)
          shuffleMetrics.incRemoteBlocksFetched(1)
        }
        logTrace("Got remote block " + blockId + " after " + Utils.getUsedTimeMs(startTime))
      }

      override def onBlockFetchFailure(blockId: String, e: Throwable): Unit = {
        logError(s"Failed to get block(s) from ${req.address.host}:${req.address.port}", e)
        results.put(new FailureFetchResult(BlockId(blockId), address, e))
      }
    }
  )
}      

2.5 fetchLocalBlocks:從本地fetch資料

private[this] def fetchLocalBlocks() {
  val iter = localBlocks.iterator
  // 開始周遊本地的block
  while (iter.hasNext) {
    val blockId = iter.next()
    try {
      // 擷取本地block資料
      val buf = blockManager.getBlockData(blockId)
      shuffleMetrics.incLocalBlocksFetched(1)
      shuffleMetrics.incLocalBytesRead(buf.size)
      buf.retain()
      // 将結果放入results
      results.put(new SuccessFetchResult(blockId, blockManager.blockManagerId, 0, buf))
    } catch {
      case e: Exception =>
        // If we see an exception, stop immediately.
        logError(s"Error occurred while fetching local blocks", e)
        results.put(new FailureFetchResult(blockId, blockManager.blockManagerId, e))
        return
    }
  }
}      

繼續閱讀