天天看點

Spark-SparkSQL深入學習系列六(轉自OopsOutOfMemory)

Plan。實體計劃是Spark SQL執行Spark job的前置,也是最後一道計劃。

  如圖:

Spark-SparkSQL深入學習系列六(轉自OopsOutOfMemory)

 話接上回,Optimizer接受輸入的Analyzed Logical Plan後,會有SparkPlanner來對Optimized Logical Plan進行轉換,生成Physical plans。

lazy val optimizedPlan = optimizer(analyzed)  

    // TODO: Don't just pick the first one...  

    lazy val sparkPlan = planner(optimizedPlan).next()  

  SparkPlanner的apply方法,會傳回一個Iterator[PhysicalPlan]。

  SparkPlanner繼承了SparkStrategies,SparkStrategies繼承了QueryPlanner。

  SparkStrategies包含了一系列特定的Strategies,這些Strategies是繼承自QueryPlanner中定義的Strategy,它定義接受一個Logical Plan,生成一系列的Physical Plan

@transient  

protected[sql] val planner = new SparkPlanner  

  protected[sql] class SparkPlanner extends SparkStrategies {  

  val sparkContext: SparkContext = self.sparkContext  

  val sqlContext: SQLContext = self  

  def numPartitions = self.numShufflePartitions //partitions的個數  

  val strategies: Seq[Strategy] =  //政策的集合  

    CommandStrategy(self) ::  

    TakeOrdered ::  

    PartialAggregation ::  

    LeftSemiJoin ::  

    HashJoin ::  

    InMemoryScans ::  

    ParquetOperations ::  

    BasicOperators ::  

    CartesianProduct ::  

    BroadcastNestedLoopJoin :: Nil  

etc......  

}  

QueryPlanner 是SparkPlanner的基類,定義了一系列的關鍵點,如Strategy,planLater和apply。

abstract class QueryPlanner[PhysicalPlan <: TreeNode[PhysicalPlan]] {  

  /** A list of execution strategies that can be used by the planner */  

  def strategies: Seq[Strategy]  

  /** 

   * Given a [[plans.logical.LogicalPlan LogicalPlan]], returns a list of `PhysicalPlan`s that can 

   * be used for execution. If this strategy does not apply to the give logical operation then an 

   * empty list should be returned. 

   */  

  abstract protected class Strategy extends Logging {  

    def apply(plan: LogicalPlan): Seq[PhysicalPlan]  //接受一個logical plan,傳回Seq[PhysicalPlan]  

  }  

   * Returns a placeholder for a physical plan that executes `plan`. This placeholder will be 

   * filled in automatically by the QueryPlanner using the other execution strategies that are 

   * available. 

  protected def planLater(plan: LogicalPlan) = apply(plan).next() //傳回一個占位符,占位符會自動被QueryPlanner用其它的strategies apply  

  def apply(plan: LogicalPlan): Iterator[PhysicalPlan] = {  

    // Obviously a lot to do here still...  

    val iter = strategies.view.flatMap(_(plan)).toIterator //整合所有的Strategy,_(plan)每個Strategy應用plan上,得到所有Strategies執行完後生成的所有Physical Plan的集合,一個iter  

    assert(iter.hasNext, s"No plan for $plan")  

    iter //傳回所有實體計劃  

  繼承關系:

Spark-SparkSQL深入學習系列六(轉自OopsOutOfMemory)

 Spark Plan是Catalyst裡經過所有Strategies apply 的最終的實體執行計劃的抽象類,它隻是用來執行spark job的。

lazy val executedPlan: SparkPlan = prepareForExecution(sparkPlan)  

prepareForExecution其實是一個RuleExecutor[SparkPlan],當然這裡的Rule就是SparkPlan了。

 protected[sql] val prepareForExecution = new RuleExecutor[SparkPlan] {  

   val batches =  

     Batch("Add exchange", Once, AddExchange(self)) :: //添加shuffler操作如果必要的話  

     Batch("Prepare Expressions", Once, new BindReferences[SparkPlan]) :: Nil //Bind references  

 }  

Spark Plan繼承Query Plan[Spark Plan],裡面定義的partition,requiredChildDistribution以及spark sql啟動執行的execute方法。

abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging {  

  self: Product =>  

  // TODO: Move to `DistributedPlan`  

  /** Specifies how data is partitioned across different nodes in the cluster. */  

  def outputPartitioning: Partitioning = UnknownPartitioning(0) // TODO: WRONG WIDTH!  

  /** Specifies any partition requirements on the input data for this operator. */  

  def requiredChildDistribution: Seq[Distribution] =  

    Seq.fill(children.size)(UnspecifiedDistribution)  

   * Runs this query returning the result as an RDD. 

  def execute(): RDD[Row]  //真正執行查詢的方法execute,傳回的是一個RDD  

   * Runs this query returning the result as an array. 

  def executeCollect(): Array[Row] = execute().map(_.copy()).collect() //exe & collect  

  protected def buildRow(values: Seq[Any]): Row =  //根據目前的值,生成Row對象,其實是一個封裝了Array的對象。  

    new GenericRow(values.toArray)  

  關于Spark Plan的繼承關系,如圖:

Spark-SparkSQL深入學習系列六(轉自OopsOutOfMemory)

  Strategy,注意這裡Strategy是在execution包下的,在SparkPlanner裡定義了目前的幾種政策:

  LeftSemiJoin、HashJoin、PartialAggregation、BroadcastNestedLoopJoin、CartesianProduct、TakeOrdered、ParquetOperations、InMemoryScans、BasicOperators、CommandStrategy

Join分為好幾種類型:

case object Inner extends JoinType  

case object LeftOuter extends JoinType  

case object RightOuter extends JoinType  

case object FullOuter extends JoinType  

case object LeftSemi extends JoinType  

  如果Logical Plan裡的Join是joinType為LeftSemi的話,就會執行這種政策,

  這裡ExtractEquiJoinKeys是一個pattern定義在patterns.scala裡,主要是做模式比對用的。

  這裡比對隻要是等值的join操作,都會封裝為ExtractEquiJoinKeys對象,它會解析目前join,最後傳回(joinType, rightKeys, leftKeys, condition, leftChild, rightChild)的格式。

  最後傳回一個execution.LeftSemiJoinHash這個Spark Plan,可見Spark Plan的類圖繼承關系圖。

object LeftSemiJoin extends Strategy with PredicateHelper {  

   def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {  

     // Find left semi joins where at least some predicates can be evaluated by matching join keys  

     case ExtractEquiJoinKeys(LeftSemi, leftKeys, rightKeys, condition, left, right) =>  

       val semiJoin = execution.LeftSemiJoinHash(  //根據解析後的Join,執行個體化execution.LeftSemiJoinHash這個Spark Plan 傳回  

         leftKeys, rightKeys, planLater(left), planLater(right))  

       condition.map(Filter(_, semiJoin)).getOrElse(semiJoin) :: Nil  

     // no predicate can be evaluated by matching hash keys  

     case logical.Join(left, right, LeftSemi, condition) =>  //沒有Join key的,即非等值join連接配接的,傳回LeftSemiJoinBNL這個Spark Plan  

       execution.LeftSemiJoinBNL(   

         planLater(left), planLater(right), condition)(sqlContext) :: Nil  

     case _ => Nil  

   }  

  HashJoin是我們最見的操作,innerJoin類型,裡面提供了2種Spark Plan,BroadcastHashJoin 和 ShuffledHashJoin

  BroadcastHashJoin的實作是一種廣播變量的實作方法,如果設定了spark.sql.join.broadcastTables這個參數的表(表面逗号隔開)

  就會用spark的Broadcast Variables方式先将一張表給查詢出來,然後廣播到各個機器中,相當于Hive中的map join。

  ShuffledHashJoin是一種最傳統的預設的join方式,會根據shuffle key進行shuffle的hash join。

object HashJoin extends Strategy with PredicateHelper {  

   private[this] def broadcastHashJoin(  

       leftKeys: Seq[Expression],  

       rightKeys: Seq[Expression],  

       left: LogicalPlan,  

       right: LogicalPlan,  

       condition: Option[Expression],  

       side: BuildSide) = {  

     val broadcastHashJoin = execution.BroadcastHashJoin(  

       leftKeys, rightKeys, side, planLater(left), planLater(right))(sqlContext)  

     condition.map(Filter(_, broadcastHashJoin)).getOrElse(broadcastHashJoin) :: Nil  

   def broadcastTables: Seq[String] = sqlContext.joinBroadcastTables.split(",").toBuffer //擷取需要廣播的表  

     case ExtractEquiJoinKeys(  

             Inner,  

             leftKeys,  

             rightKeys,  

             condition,  

             left,  

             right @ PhysicalOperation(_, _, b: BaseRelation))  

       if broadcastTables.contains(b.tableName) => //如果右孩子是廣播的表,則buildSide取BuildRight  

         broadcastHashJoin(leftKeys, rightKeys, left, right, condition, BuildRight)  

             left @ PhysicalOperation(_, _, b: BaseRelation),  

             right)  

       if broadcastTables.contains(b.tableName) =>//如果左孩子是廣播的表,則buildSide取BuildLeft  

         broadcastHashJoin(leftKeys, rightKeys, left, right, condition, BuildLeft)  

     case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, left, right) =>  

       val hashJoin =  

         execution.ShuffledHashJoin( //根據hash key shuffle的 Hash Join  

           leftKeys, rightKeys, BuildRight, planLater(left), planLater(right))  

       condition.map(Filter(_, hashJoin)).getOrElse(hashJoin) :: Nil  

  PartialAggregation是一個部分聚合的政策,即有些聚合操作可以在local裡面完成的,就在local data裡完成,而不必要的去shuffle所有的字段。

object PartialAggregation extends Strategy {  

    def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {  

      case logical.Aggregate(groupingExpressions, aggregateExpressions, child) =>   

        // Collect all aggregate expressions.  

        val allAggregates =  

          aggregateExpressions.flatMap(_ collect { case a: AggregateExpression => a })  

        // Collect all aggregate expressions that can be computed partially.  

        val partialAggregates =  

          aggregateExpressions.flatMap(_ collect { case p: PartialAggregate => p })  

        // Only do partial aggregation if supported by all aggregate expressions.  

        if (allAggregates.size == partialAggregates.size) {  

          // Create a map of expressions to their partial evaluations for all aggregate expressions.  

          val partialEvaluations: Map[Long, SplitEvaluation] =  

            partialAggregates.map(a => (a.id, a.asPartial)).toMap  

          // We need to pass all grouping expressions though so the grouping can happen a second  

          // time. However some of them might be unnamed so we alias them allowing them to be  

          // referenced in the second aggregation.  

          val namedGroupingExpressions: Map[Expression, NamedExpression] = groupingExpressions.map {  

            case n: NamedExpression => (n, n)  

            case other => (other, Alias(other, "PartialGroup")())  

          }.toMap  

          // Replace aggregations with a new expression that computes the result from the already  

          // computed partial evaluations and grouping values.  

          val rewrittenAggregateExpressions = aggregateExpressions.map(_.transformUp {  

            case e: Expression if partialEvaluations.contains(e.id) =>  

              partialEvaluations(e.id).finalEvaluation  

            case e: Expression if namedGroupingExpressions.contains(e) =>  

              namedGroupingExpressions(e).toAttribute  

          }).asInstanceOf[Seq[NamedExpression]]  

          val partialComputation =  

            (namedGroupingExpressions.values ++  

             partialEvaluations.values.flatMap(_.partialEvaluations)).toSeq  

          // Construct two phased aggregation.  

          execution.Aggregate( //傳回execution.Aggregate這個Spark Plan  

            partial = false,  

            namedGroupingExpressions.values.map(_.toAttribute).toSeq,  

            rewrittenAggregateExpressions,  

            execution.Aggregate(  

              partial = true,  

              groupingExpressions,  

              partialComputation,  

              planLater(child))(sqlContext))(sqlContext) :: Nil  

        } else {  

          Nil  

        }  

      case _ => Nil  

    }  

  BroadcastNestedLoopJoin是用于Left Outer Join, RightOuter, FullOuter這三種類型的join

 而上述的Hash Join僅僅用于InnerJoin,這點要區分開來。

object BroadcastNestedLoopJoin extends Strategy {  

  def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {  

    case logical.Join(left, right, joinType, condition) =>  

      execution.BroadcastNestedLoopJoin(  

        planLater(left), planLater(right), joinType, condition)(sqlContext) :: Nil  

    case _ => Nil  

部分代碼;

    if (!matched && (joinType == LeftOuter || joinType == FullOuter)) {  //LeftOuter or FullOuter  

      matchedRows += buildRow(streamedRow ++ Array.fill(right.output.size)(null))  

  Iterator((matchedRows, includedBroadcastTuples))  

val includedBroadcastTuples = streamedPlusMatches.map(_._2)  

val allIncludedBroadcastTuples =  

  if (includedBroadcastTuples.count == 0) {  

    new scala.collection.mutable.BitSet(broadcastedRelation.value.size)  

  } else {  

    streamedPlusMatches.map(_._2).reduce(_ ++ _)  

val rightOuterMatches: Seq[Row] =  

  if (joinType == RightOuter || joinType == FullOuter) { //RightOuter or FullOuter  

    broadcastedRelation.value.zipWithIndex.filter {  

      case (row, i) => !allIncludedBroadcastTuples.contains(i)  

    }.map {  

      // TODO: Use projection.  

      case (row, _) => buildRow(Vector.fill(left.output.size)(null) ++ row)  

    Vector()  

笛卡爾積的Join,有待過濾條件的Join。  

主要是利用RDD的cartesian實作的。  

object CartesianProduct extends Strategy {  

    case logical.Join(left, right, _, None) =>  

      execution.CartesianProduct(planLater(left), planLater(right)) :: Nil  

    case logical.Join(left, right, Inner, Some(condition)) =>  

      execution.Filter(condition,  

        execution.CartesianProduct(planLater(left), planLater(right))) :: Nil  

  TakeOrdered是用于Limit操作的,如果有Limit和Sort操作。

  則傳回一個TakeOrdered的Spark Plan。

  主要也是利用RDD的takeOrdered方法來實作的排序後取TopN。

object TakeOrdered extends Strategy {  

    case logical.Limit(IntegerLiteral(limit), logical.Sort(order, child)) =>  

      execution.TakeOrdered(limit, order, planLater(child))(sqlContext) :: Nil  

支援ParquetOperations的讀寫,插入Table等。

object ParquetOperations extends Strategy {  

    // TODO: need to support writing to other types of files.  Unify the below code paths.  

    case logical.WriteToFile(path, child) =>  

      val relation =  

        ParquetRelation.create(path, child, sparkContext.hadoopConfiguration)  

      // Note: overwrite=false because otherwise the metadata we just created will be deleted  

      InsertIntoParquetTable(relation, planLater(child), overwrite=false)(sqlContext) :: Nil  

    case logical.InsertIntoTable(table: ParquetRelation, partition, child, overwrite) =>  

      InsertIntoParquetTable(table, planLater(child), overwrite)(sqlContext) :: Nil  

    case PhysicalOperation(projectList, filters: Seq[Expression], relation: ParquetRelation) =>  

      val prunePushedDownFilters =  

        if (sparkContext.conf.getBoolean(ParquetFilters.PARQUET_FILTER_PUSHDOWN_ENABLED, true)) {  

          (filters: Seq[Expression]) => {  

            filters.filter { filter =>  

              // Note: filters cannot be pushed down to Parquet if they contain more complex  

              // expressions than simple "Attribute cmp Literal" comparisons. Here we remove  

              // all filters that have been pushed down. Note that a predicate such as  

              // "(A AND B) OR C" can result in "A OR C" being pushed down.  

              val recordFilter = ParquetFilters.createFilter(filter)  

              if (!recordFilter.isDefined) {  

                // First case: the pushdown did not result in any record filter.  

                true  

              } else {  

                // Second case: a record filter was created; here we are conservative in  

                // the sense that even if "A" was pushed and we check for "A AND B" we  

                // still want to keep "A AND B" in the higher-level filter, not just "B".  

                !ParquetFilters.findExpression(recordFilter.get, filter).isDefined  

              }  

            }  

          }  

          identity[Seq[Expression]] _  

      pruneFilterProject(  

        projectList,  

        filters,  

        prunePushedDownFilters,  

        ParquetTableScan(_, relation, filters)(sqlContext)) :: Nil  

  InMemoryScans主要是對InMemoryRelation這個Logical Plan操作。

  調用的其實是Spark Planner裡的pruneFilterProject這個方法。

object InMemoryScans extends Strategy {  

     case PhysicalOperation(projectList, filters, mem: InMemoryRelation) =>  

       pruneFilterProject(  

         projectList,  

         filters,  

         identity[Seq[Expression]], // No filters are pushed down.  

         InMemoryColumnarTableScan(_, mem)) :: Nil  

  所有定義在org.apache.spark.sql.execution裡的基本的Spark Plan,它們都在org.apache.spark.sql.execution包下basicOperators.scala内的

  有Project、Filter、Sample、Union、Limit、TakeOrdered、Sort、ExistingRdd。

  這些是基本元素,實作都相對簡單,基本上都是RDD裡的方法來實作的。

object BasicOperators extends Strategy {  

   def numPartitions = self.numPartitions  

     case logical.Distinct(child) =>  

       execution.Aggregate(  

         partial = false, child.output, child.output, planLater(child))(sqlContext) :: Nil  

     case logical.Sort(sortExprs, child) =>  

       // This sort is a global sort. Its requiredDistribution will be an OrderedDistribution.  

       execution.Sort(sortExprs, global = true, planLater(child)):: Nil  

     case logical.SortPartitions(sortExprs, child) =>  

       // This sort only sorts tuples within a partition. Its requiredDistribution will be  

       // an UnspecifiedDistribution.  

       execution.Sort(sortExprs, global = false, planLater(child)) :: Nil  

     case logical.Project(projectList, child) =>  

       execution.Project(projectList, planLater(child)) :: Nil  

     case logical.Filter(condition, child) =>  

       execution.Filter(condition, planLater(child)) :: Nil  

     case logical.Aggregate(group, agg, child) =>  

       execution.Aggregate(partial = false, group, agg, planLater(child))(sqlContext) :: Nil  

     case logical.Sample(fraction, withReplacement, seed, child) =>  

       execution.Sample(fraction, withReplacement, seed, planLater(child)) :: Nil  

     case logical.LocalRelation(output, data) =>  

       val dataAsRdd =  

         sparkContext.parallelize(data.map(r =>  

           new GenericRow(r.productIterator.map(convertToCatalyst).toArray): Row))  

       execution.ExistingRdd(output, dataAsRdd) :: Nil  

     case logical.Limit(IntegerLiteral(limit), child) =>  

       execution.Limit(limit, planLater(child))(sqlContext) :: Nil  

     case Unions(unionChildren) =>  

       execution.Union(unionChildren.map(planLater))(sqlContext) :: Nil  

     case logical.Generate(generator, join, outer, _, child) =>  

       execution.Generate(generator, join = join, outer = outer, planLater(child)) :: Nil  

     case logical.NoRelation =>  

       execution.ExistingRdd(Nil, singleRowRdd) :: Nil  

     case logical.Repartition(expressions, child) =>  

       execution.Exchange(HashPartitioning(expressions, numPartitions), planLater(child)) :: Nil  

     case SparkLogicalPlan(existingPlan, _) => existingPlan :: Nil  

  CommandStrategy是專門針對Command類型的Logical Plan

  即set key = value 、 explain sql、 cache table xxx 這類操作

  SetCommand主要實作方式是SparkContext的參數

  ExplainCommand主要實作方式是利用executed Plan列印出tree string

  CacheCommand主要實作方式SparkContext的cache table和uncache table

case class CommandStrategy(context: SQLContext) extends Strategy {  

      case logical.SetCommand(key, value) =>  

        Seq(execution.SetCommand(key, value, plan.output)(context))  

      case logical.ExplainCommand(logicalPlan) =>  

        Seq(execution.ExplainCommand(logicalPlan, plan.output)(context))  

      case logical.CacheCommand(tableName, cache) =>  

        Seq(execution.CacheCommand(tableName, cache)(context))  

Spark Plan的Execution方式均為調用其execute()方法生成RDD,除了簡單的基本操作例如上面的basic operator實作比較簡單,其它的實作都比較複雜,大緻的實作我都在上面介紹了,本文就不詳細讨論了。

  本文從介紹了Spark SQL的Catalyst架構的Physical plan以及其如何從Optimized Logical Plan轉化為Spark Plan的過程,這個過程用到了很多的實體計劃政策Strategies,每個Strategies最後還是在RuleExecutor裡面被執行,最後生成一系列實體計劃Executed Spark Plans。

  Spark Plan是執行前最後一種計劃,當生成executed spark plan後,就可以調用collect()方法來啟動Spark Job來進行Spark SQL的真正執行了。

——EOF——

原創文章,轉載請注明:

Spark-SparkSQL深入學習系列六(轉自OopsOutOfMemory)