天天看点

Spark源码解读(4)——RDD

1,RDD的转换共分为Transformation和Action两类

Transformation和Action的区别在于:Action会触发作业的提交,而Transformation不会触发作业的提交

如map()和collect()

def map[U: ClassTag](f: T => U): RDD[U] = withScope {
    val cleanF = sc.clean(f)
    new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
  }
           
def collect(): Array[T] = withScope {
    val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
    Array.concat(results: _*)
  }
           

2,RDD之间的依赖分为两类NarrowDependency和ShuffleDependency

RDD之间的依赖关系影响着Stage的划分,每遇到一个ShuffleDependency就会产生一个新的Stage

private def getMissingParentStages(stage: Stage): List[Stage] = {
    val missing = new HashSet[Stage]
    val visited = new HashSet[RDD[_]]
    // We are manually maintaining a stack here to prevent StackOverflowError
    // caused by recursively visiting
    val waitingForVisit = new Stack[RDD[_]]
    def visit(rdd: RDD[_]) {
      if (!visited(rdd)) {
        visited += rdd
        val rddHasUncachedPartitions = getCacheLocs(rdd).contains(Nil)
        if (rddHasUncachedPartitions) {
          for (dep <- rdd.dependencies) {
            dep match {
              case shufDep: ShuffleDependency[_, _, _] =>
                val mapStage = getShuffleMapStage(shufDep, stage.firstJobId)
                if (!mapStage.isAvailable) {
                  missing += mapStage
                }
              case narrowDep: NarrowDependency[_] =>
                waitingForVisit.push(narrowDep.rdd)
            }
          }
        }
      }
    }
    waitingForVisit.push(stage.rdd)
    while (waitingForVisit.nonEmpty) {
      visit(waitingForVisit.pop())
    }
    missing.toList
  }
           

以下面的转换为例分析Stage的划分

sc.parallelize(1 to 100).map(_%3).map(i=>(i,1)).reduceByKey(_+_).collect()
           

依照代码的逻辑,从最后一个RDD分析起:

def reduceByKey(func: (V, V) => V): RDD[(K, V)] = self.withScope {
    reduceByKey(defaultPartitioner(self), func)
  }
           
def defaultPartitioner(rdd: RDD[_], others: RDD[_]*): Partitioner = {
    val bySize = (Seq(rdd) ++ others).sortBy(_.partitions.size).reverse
    for (r <- bySize if r.partitioner.isDefined && r.partitioner.get.numPartitions > 0) {
      return r.partitioner.get
    }
    if (rdd.context.conf.contains("spark.default.parallelism")) {
      new HashPartitioner(rdd.context.defaultParallelism)
    } else {
      new HashPartitioner(bySize.head.partitions.size)
    }
  }
           

首先判断当前RDD是否有partitioner,如果没有则用默认的HashPartitioner

override val partitioner = if (preservesPartitioning) firstParent[T].partitioner else None
           

从MapPartitionsRdd代码可知,此处partitioner为None,因此defaultPartitioner返回的是HashPartitioner

def combineByKeyWithClassTag[C](
      createCombiner: V => C,
      mergeValue: (C, V) => C,
      mergeCombiners: (C, C) => C,
      partitioner: Partitioner,
      mapSideCombine: Boolean = true,
      serializer: Serializer = null)(implicit ct: ClassTag[C]): RDD[(K, C)] = self.withScope {
    require(mergeCombiners != null, "mergeCombiners must be defined") // required as of Spark 0.9.0
    if (keyClass.isArray) {
      if (mapSideCombine) {
        throw new SparkException("Cannot use map-side combining with array keys.")
      }
      if (partitioner.isInstanceOf[HashPartitioner]) {
        throw new SparkException("Default partitioner cannot partition array keys.")
      }
    }
    val aggregator = new Aggregator[K, V, C](
      self.context.clean(createCombiner),
      self.context.clean(mergeValue),
      self.context.clean(mergeCombiners))
    if (self.partitioner == Some(partitioner)) {
      self.mapPartitions(iter => {
        val context = TaskContext.get()
        new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))
      }, preservesPartitioning = true)
    } else {
      new ShuffledRDD[K, V, C](self, partitioner)
        .setSerializer(serializer)
        .setAggregator(aggregator)
        .setMapSideCombine(mapSideCombine)
    }
  }
           

从上文分析知,self.partitioner为None,partitioner为HashPartitioner,因此这里返回的是一个ShuffledRDD,从上面的代码可知Stage的划分主要是依据RDD的Dependency,ShuffleRDD的dependency是一个ShuffleDependency

override def getDependencies: Seq[Dependency[_]] = {
    List(new ShuffleDependency(prev, part, serializer, keyOrdering, aggregator, mapSideCombine))
  }
           

而MapPartitionsRDD的dependency为OneToOneDependency

def this(@transient oneParent: RDD[_]) =
    this(oneParent.context , List(new OneToOneDependency(oneParent)))
           

ParallelCollectionRDD的dependency则为Nil

private[spark] class ParallelCollectionRDD[T: ClassTag](
    sc: SparkContext,
    @transient private val data: Seq[T],
    numSlices: Int,
    locationPrefs: Map[Int, Seq[String]])
    extends RDD[T](sc, Nil) {
           

因此这里将会被划分成2个Stage

Spark源码解读(4)——RDD

下面对上面的转换过程进行简单的修改,看看对Stage的划分有什么影响:

sc.parallelize(1 to 100).map(_%3).map(i=>(i,1)).reduceByKey(_+_).reduceByKey(_+_)collect()
           
Spark源码解读(4)——RDD
sc.parallelize(1 to 100).map(_%3).map(i=>(i,1)).reduceByKey(_+_).map(i=>(i._1,i._2*2)).reduceByKey(_+_).collect()
           
Spark源码解读(4)——RDD
sc.parallelize(1 to 100).map(_%3).map(i=>(i,1)).reduceByKey(_+_).filter(i=>i._2>10).reduceByKey(_+_).collect()
           
Spark源码解读(4)——RDD

这里有一个非常有意思的现象,reduce + reduce 没有产生新的stage,但是reduce + map + reduce 产生了新的stage,而reduce + filter + reduce又没有产生新的stage,这里主要涉及到Dependency的传递问题。首先,对于reduce操作并不是必然产生一个ShuffleRDD,也有可能是一个MapPartitionsRDD,主要看当前的partitioner和self.partitioner是否相等

if (self.partitioner == Some(partitioner)) {
      self.mapPartitions(iter => {
        val context = TaskContext.get()
        new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))
      }, preservesPartitioning = true)
    } else {
      new ShuffledRDD[K, V, C](self, partitioner)
        .setSerializer(serializer)
        .setAggregator(aggregator)
        .setMapSideCombine(mapSideCombine)
    }
           

对于reduce + reduce,因为前后两个partitioner相等,因此不需要产生新的ShuffleDependency,而对于MapPartitionsRDD是否采用parent的partitioner取决于preserversPartitioning参数的值

private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag](
    var prev: RDD[T],
    f: (TaskContext, Int, Iterator[T]) => Iterator[U],  // (TaskContext, partition index, iterator)
    preservesPartitioning: Boolean = false)
  extends RDD[U](prev) {

  override val partitioner = if (preservesPartitioning) firstParent[T].partitioner else None
           

而map()操作和filter()在preserversPartitioning参数的取值上是不同的:

def map[U: ClassTag](f: T => U): RDD[U] = withScope {
    val cleanF = sc.clean(f)
    new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
  }
           
def filter(f: T => Boolean): RDD[T] = withScope {
    val cleanF = sc.clean(f)
    new MapPartitionsRDD[T, T](
      this,
      (context, pid, iter) => iter.filter(cleanF),
      preservesPartitioning = true)
  }
           

3,RDD的执行

上文分析了RDD之间的依赖,以及依赖对Stage划分的影响,当完成DAG的Stage划分之后,就需要将各Stage以TaskSet的形式发送到各个Executor执行,下面主要分析这部分逻辑

Driver在完成资源调度为Task分配完Executor之后,向Executor发送LaunchTask信息

executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))
           

Executor在收到Driver的LaunchTask信息之后,创建了一个TaskRunner对象,并调用threadPool.execute()方法执行Task

def launchTask(
      context: ExecutorBackend,
      taskId: Long,
      attemptNumber: Int,
      taskName: String,
      serializedTask: ByteBuffer): Unit = {
    val tr = new TaskRunner(context, taskId = taskId, attemptNumber = attemptNumber, taskName,
      serializedTask)
    runningTasks.put(taskId, tr)
    threadPool.execute(tr)
  }
           

需要注意的是这里的threadPool是一个守护线程池

private val threadPool = ThreadUtils.newDaemonCachedThreadPool("Executor task launch worker")
           

最终调用Task的runTask()方法

(runTask(context), context.collectAccumulators())
           

Task主要有两个子类ShuffleMapTask和ResultTask

对于ShuffleMapTask主要是通过调用rdd.iterator()来进行计算

writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
           

最终会调用RDD.compute()方法,下面是MapPartitionsRDD的compute方法的实现:

override def compute(split: Partition, context: TaskContext): Iterator[U] =
    f(context, split.index, firstParent[T].iterator(split, context))
           

通过firstParent.iterator()的方式,调用父RDD的interator,实现逐层向上调用

ShuffledRDD过程相对复杂,后续博文会做详细的讨论。

对于ResultTask:

func(context, rdd.iterator(partition, context))
           

这里的func是在调用sc.runJob()时传递的func参数:

def runJob[T, U: ClassTag](rdd: RDD[T], func: (TaskContext, Iterator[T]) => U): Array[U] = {
    runJob(rdd, func, 0 until rdd.partitions.length)
  }
           

对于collect()操作这里的func为:

(iter: Iterator[T]) => iter.toArray
           
(ctx: TaskContext, it: Iterator[T]) => cleanedFunc(it)
           

对于saveAsTextFile()操作这里的func为:

val writeToFile = (context: TaskContext, iter: Iterator[(K, V)]) => {
      // Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it
      // around by taking a mod. We expect that no task will be attempted 2 billion times.
      val taskAttemptId = (context.taskAttemptId % Int.MaxValue).toInt

      val (outputMetrics, bytesWrittenCallback) = initHadoopOutputMetrics(context)

      writer.setup(context.stageId, context.partitionId, taskAttemptId)
      writer.open()
      var recordsWritten = 0L

      Utils.tryWithSafeFinallyAndFailureCallbacks {
        while (iter.hasNext) {
          val record = iter.next()
          writer.write(record._1.asInstanceOf[AnyRef], record._2.asInstanceOf[AnyRef])

          // Update bytes written metric every few records
          maybeUpdateOutputMetrics(bytesWrittenCallback, outputMetrics, recordsWritten)
          recordsWritten += 1
        }
      }(finallyBlock = writer.close())
      writer.commit()
      bytesWrittenCallback.foreach { fn => outputMetrics.setBytesWritten(fn()) }
      outputMetrics.setRecordsWritten(recordsWritten)
    }
           

Task启动的流程大致如下:

Spark源码解读(4)——RDD

继续阅读