天天看点

spark源码学习(十一)---cacheManager分析

cacheManager的分析要从RDD说起,当读取数据的时候,调用RDD的iterator方法的时候,如果storeRageLevel不是none,那么说明之前进行过RDD的持久化,就不需要从从父RDD执行以获取数据,优先使用cacheManager获取持久化的数据。

final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
    if (storageLevel != StorageLevel.NONE) {
      SparkEnv.get.cacheManager.getOrCompute(this, split, context, storageLevel)
    } else {
      computeOrReadCheckpoint(split, context)
    }
  }
           

调用的cacheManager的getOrCompute方法,首先会使用blockManager的get方法获取数据,如果没有获取到数据,则后面调用acquireLockForPartition,实际也是调用的blockManager的get方法,如果还是没能获取到数据,使用checkpoint来获取数据,如果还是不能获取数据,那么只能重新计算该RDD。

def getOrCompute[T](
      rdd: RDD[T],
      partition: Partition,
      context: TaskContext,
      storageLevel: StorageLevel): Iterator[T] = {

    val key = RDDBlockId(rdd.id, partition.index)
    logDebug(s"Looking for partition $key")
    // 使用blockManager获取数据,直接调用blockManager的get方法,
    //get方法通过getLocal或者getRemote方法获取本地或者远程的数据,优先从本地获取
    //getLocal调用doGetLocal方法,上篇已经分析过
    blockManager.get(key) match {
      case Some(blockResult) =>
        // Partition is already materialized, so just return its values
        val inputMetrics = blockResult.inputMetrics
        val existingMetrics = context.taskMetrics
          .getInputMetricsForReadMethod(inputMetrics.readMethod)
        existingMetrics.incBytesRead(inputMetrics.bytesRead)

        val iter = blockResult.data.asInstanceOf[Iterator[T]]
        new InterruptibleIterator[T](context, iter) {
          override def next(): T = {
            existingMetrics.incRecordsRead(1)
            delegate.next()
          }
        }
        //如果没有能从blockManager获取到数据,虽然RDD持久化过,
        //但是可能数据既不在本地磁盘也不在远程blockManager的本地或者磁盘
      case None =>
        // Acquire a lock for loading this partition
        // If another thread already holds the lock, wait for it to finish return its results
        //acquireLockForPartition再次调用blockManager的get方法
        val storedValues = acquireLockForPartition[T](key)
        if (storedValues.isDefined) {
          return new InterruptibleIterator[T](context, storedValues.get)
        }

        // Otherwise, we have to load the partition ourselves
        try {
          logInfo(s"Partition $key not found, computing it")
          //如果RDD checkPoint过,那么执行checkpoint,否则只能从父RDD重新计算来获取数据
          val computedValues = rdd.computeOrReadCheckpoint(partition, context)

          // If the task is running locally, do not persist the result
          if (context.isRunningLocally) {
            return computedValues
          }

          // Otherwise, cache the values and keep track of any updates in block statuses
          val updatedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
          //由于持久化过该数据,但是没有找到该数据,因此需要交给blockManager重新持久化,
          //并通知BlockManagerMaster
          val cachedValues = putInBlockManager(key, computedValues, storageLevel, updatedBlocks)
          val metrics = context.taskMetrics
          val lastUpdatedBlocks = metrics.updatedBlocks.getOrElse(Seq[(BlockId, BlockStatus)]())
          metrics.updatedBlocks = Some(lastUpdatedBlocks ++ updatedBlocks.toSeq)
          new InterruptibleIterator(context, cachedValues)

        } finally {
          loading.synchronized {
            loading.remove(key)
            loading.notifyAll()
          }
        }
    }
  }
           

BlockManager的get方法优先获取local数据,如果不能获取local,则获取远程数据,而getLocal使用的就是上篇文章分析的doGetLocal方法,远程获取数据同理:

def get(blockId: BlockId): Option[BlockResult] = {
    val local = getLocal(blockId)
    if (local.isDefined) {
      logInfo(s"Found block $blockId locally")
      return local
    }
    val remote = getRemote(blockId)
    if (remote.isDefined) {
      logInfo(s"Found block $blockId remotely")
      return remote
    }
    None
  }
           

下面分析上面将数据通过BlockManager重新缓存的情况,在上面的val cachedValues = putInBlockManager(key, computedValues, storageLevel, updatedBlocks)中,根据存储级别来划分存入磁盘还是放入内存中,如果存入磁盘,那么直接写入,否则,会调用BlockManager的方法,不断的保证有内存可用的情况下放入内存,如果最后还是没能放到内存的数据,如果允许放入磁盘,那么写入磁盘:

private def putInBlockManager[T](
      key: BlockId,
      values: Iterator[T],
      level: StorageLevel,
      updatedBlocks: ArrayBuffer[(BlockId, BlockStatus)],
      effectiveStorageLevel: Option[StorageLevel] = None): Iterator[T] = {

    val putLevel = effectiveStorageLevel.getOrElse(level)
    if (!putLevel.useMemory) {
      /*
       * This RDD is not to be cached in memory, so we can just pass the computed values as an
       * iterator directly to the BlockManager rather than first fully unrolling it in memory.
       * 如果不是使用内存来持久化,那么直接写磁盘
       */
      updatedBlocks ++=
        blockManager.putIterator(key, values, level, tellMaster = true, effectiveStorageLevel)
      blockManager.get(key) match {
        case Some(v) => v.data.asInstanceOf[Iterator[T]]
        case None =>
          logInfo(s"Failure to store $key")
          throw new BlockException(key, s"Block manager failed to return cached value for $key!")
      }
    } else {
      /*
       * This RDD is to be cached in memory. In this case we cannot pass the computed values
       * to the BlockManager as an iterator and expect to read it back later. This is because
       * we may end up dropping a partition from memory store before getting it back.
       *
       * In addition, we must be careful to not unroll the entire partition in memory at once.
       * Otherwise, we may cause an OOM exception if the JVM does not have enough space for this
       * single partition. Instead, we unroll the values cautiously, potentially aborting and
       * dropping the partition to disk if applicable.
       * 如果使用内存来存储数据,那么使用memoryStore的unrollSafely方法来存储数据
       * 如果能写入内存,那么写入,但是如果unrollSafely方法不能写入内存,那么写入磁盘
       */
      blockManager.memoryStore.unrollSafely(key, values, updatedBlocks) match {
        case Left(arr) =>
          // We have successfully unrolled the entire partition, so cache it in memory
          updatedBlocks ++=
            blockManager.putArray(key, arr, level, tellMaster = true, effectiveStorageLevel)
          arr.iterator.asInstanceOf[Iterator[T]]
        case Right(it) =>
          // There is not enough space to cache this partition in memory
          val returnValues = it.asInstanceOf[Iterator[T]]
          //无法写入内存的数据而且可以写入磁盘的话那么写入磁盘
          if (putLevel.useDisk) {
            logWarning(s"Persisting partition $key to disk instead.")
            val diskOnlyLevel = StorageLevel(useDisk = true, useMemory = false,
              useOffHeap = false, deserialized = false, putLevel.replication)
            putInBlockManager[T](key, returnValues, level, updatedBlocks, Some(diskOnlyLevel))
          } else {
            returnValues
          }
      }
    }
  }
           

上面写入内存的方法调用了memoryStore的unrollSafely方法来写入内存,主要的代码如下:

try {
      while (values.hasNext && keepUnrolling) {
        vector += values.next()
        if (elementsUnrolled % memoryCheckPeriod == 0) {
          // If our vector's size has exceeded the threshold, request more memory
          val currentSize = vector.estimateSize()
          if (currentSize >= memoryThreshold) {
            val amountToRequest = (currentSize * memoryGrowthFactor - memoryThreshold).toLong
            // Hold the accounting lock, in case another thread concurrently puts a block that
            // takes up the unrolling space we just ensured here
            accountingLock.synchronized {
              if (!reserveUnrollMemoryForThisThread(amountToRequest)) {
                // If the first request is not granted, try again after ensuring free space
                // If there is still not enough space, give up and drop the partition
                val spaceToEnsure = maxUnrollMemory - currentUnrollMemory
                if (spaceToEnsure > 0) {
                  val result = ensureFreeSpace(blockId, spaceToEnsure)
                  //反复循环判断只要数据还没写入内存,并且还有空间那么就会写入内存,
                  //如果没有内存空间,就会调用ensureFreeSpace清空一些内存
                  
                  droppedBlocks ++= result.droppedBlocks
                }
                keepUnrolling = reserveUnrollMemoryForThisThread(amountToRequest)
              }
            }
            // New threshold is currentSize * memoryGrowthFactor
            memoryThreshold += amountToRequest
          }
        }
        elementsUnrolled += 1
      }
           

继续阅读