天天看點

Spark源碼分析之SparkSql的Analyzer,Optimizer

在上一篇博文中,我們深入的了解了SparkSql中的sql語句經過DDLParser、SparkSQLParser和SqlParser處理後得到了一個樹結構的Unresolved Logical Plan,這也是我們每一次使用sparkSql時必然會執行的,但是對于一些不是立刻需要傳回結果的造作,執行到這邊也就結束了,隻有遇到哪些諸如show,collect等需要立刻的傳回結果的操作,我們才會繼續後面的執行,這篇博文主要就是将這些。

第一步:使用Analyzer将Unresolved Logical Plan與資料字典(Catalog)進行綁定,生成Resolved LogicalPlan

在真正地需要計算的結果的時候,我們在SQLContext中執行下面的方法:

//實際上在後面真正對DataFrame操作的時候需要真正的去執行sql語句的時候
// 就會觸發sqlContext的executesql方法的執行
//該方法實際上會傳回一個QueryExecution,這個QueryExecution就會觸發後續的整個流程
protected[sql] def executeSql(sql: String): this.QueryExecution = executePlan(parseSql(sql))
protected[sql] def executePlan(plan: LogicalPlan) = new this.QueryExecution(plan)           

既然這段代碼的主要操作就是建立了一個QueryExecution對象,那麼此時我們就去看看這段代碼中幹了些什麼

protected[sql] class QueryExecution(val logical: LogicalPlan) {
    def assertAnalyzed(): Unit = analyzer.checkAnalysis(analyzed)

    //使用一個UnResolved LogicPlan去構造一個QueryExecution的執行個體對象
    //那麼sql語句的設計執行就會立即一步一步的觸發
    //調用analyzer來生成一個resolved LogicPlan
    lazy val analyzed: LogicalPlan = analyzer.execute(logical)

    //如果目前的這個執行計劃緩存中有,那麼就從緩存中讀取
    lazy val withCachedData: LogicalPlan = {
      assertAnalyzed()
      cacheManager.useCachedData(analyzed)
    }

    //針對resolvedPlan調用optimizer的execute進行優化,得到優化後的optimized LogicalPlan
    //獲得優化後的邏輯執行計劃
    lazy val optimizedPlan: LogicalPlan = optimizer.execute(withCachedData)

    // TODO: Don't just pick the first one...
    //使用sparkPlanner根據剛剛建立的一個optimized LogicalPlan建立一個sparkplan
    lazy val sparkPlan: SparkPlan = {
      SparkPlan.currentContext.set(self)
      planner.plan(optimizedPlan).next()
    }

    /*在sparksql中,邏輯執行計劃就是LogicalPlan,實體執行計劃就是SparkPlan*/
    // executedPlan should not be used to initialize any SparkPlan. It should be
    // only used for execution.
    //生成一個可以執行的sparkplan,此時就是physicplan,此時就是實體執行計劃
    //此時的話就已經綁定好了資料源,知各個表如何join
    //如果進行join,預設spark内部是會對小表進行廣播的
    lazy val executedPlan: SparkPlan = prepareForExecution.execute(sparkPlan)

    /** Internal version of the RDD. Avoids copies and has no schema */
      //調用sparkPlan (封裝了Physical plan)的execute方法,execute方法實際上就會執行實體執行計劃
    lazy val toRdd: RDD[InternalRow] = executedPlan.execute()

    protected def stringOrError[A](f: => A): String =
      try f.toString catch { case e: Throwable => e.toString }

    def simpleString: String =
      s"""== Physical Plan ==
         |${stringOrError(executedPlan)}
      """.stripMargin.trim

    override def toString: String = {
      def output =
        analyzed.output.map(o => s"${o.name}: ${o.dataType.simpleString}").mkString(", ")

      s"""== Parsed Logical Plan ==
         |${stringOrError(logical)}
         |== Analyzed Logical Plan ==
         |${stringOrError(output)}
         |${stringOrError(analyzed)}
         |== Optimized Logical Plan ==
         |${stringOrError(optimizedPlan)}
         |== Physical Plan ==
         |${stringOrError(executedPlan)}
         |Code Generation: ${stringOrError(executedPlan.codegenEnabled)}
      """.stripMargin.trim
    }
  }           

在此我們可以看到就是調用了analyzer的execute方法,與前面的DDLParser、SparkSQLParser、SQLParser等一樣,我們也先來看看這個類的繼承關系:

Spark源碼分析之SparkSql的Analyzer,Optimizer

那麼這邊調用的實際上是其父類RuleExecutor中的execute方法:

def execute(plan: TreeType): TreeType = {
    var curPlan = plan
    batches.foreach { batch =>
      val batchStartPlan = curPlan
      var iteration = 1
      var lastPlan = curPlan
      var continue = true

      // Run until fix point (or the max number of iterations as specified in the strategy.
      while (continue) {
        curPlan = batch.rules.foldLeft(curPlan) {
          case (plan, rule) =>
            val startTime = System.nanoTime()
            val result = rule(plan)
            val runTime = System.nanoTime() - startTime
            RuleExecutor.timeMap.addAndGet(rule.ruleName, runTime)

            if (!result.fastEquals(plan)) {
              logTrace(
                s"""
                  |=== Applying Rule ${rule.ruleName} ===
                  |${sideBySide(plan.treeString, result.treeString).mkString("\n")}
                """.stripMargin)
            }

            result
        }
        iteration += 1
        if (iteration > batch.strategy.maxIterations) {
          // Only log if this is a rule that is supposed to run more than once.
          if (iteration != 2) {
            logInfo(s"Max iterations (${iteration - 1}) reached for batch ${batch.name}")
          }
          continue = false
        }

        if (curPlan.fastEquals(lastPlan)) {
          logTrace(
            s"Fixed point reached for batch ${batch.name} after ${iteration - 1} iterations.")
          continue = false
        }
        lastPlan = curPlan
      }

      if (!batchStartPlan.fastEquals(curPlan)) {
        logDebug(
          s"""
          |=== Result of Batch ${batch.name} ===
          |${sideBySide(plan.treeString, curPlan.treeString).mkString("\n")}
        """.stripMargin)
      } else {
        logTrace(s"Batch ${batch.name} has no effect.")
      }
    }

    curPlan
  }           

了解這個方法之前我們先需要了解下面的概念:

在上一階段生成的Unresolved LogicalPlan後,Analyzer和optimizer肯定是通過一系列的操作才将這個未解析的邏輯計劃轉換為resolved LogicalPlan的,那麼這系列的操作就是一系列的Rule

那麼這邊的RuleExecutor的主要作用就是執行Rule

這個方法的主要作用就是通過周遊取出Analyzer中定義的batches裡存儲的每一個Batch,每一個Batch中會封裝同屬某一個類别的Rule及其相應的執行政策,我們看一段Analyzer中的Batch的代碼

lazy val batches: Seq[Batch] = Seq(
    Batch("Substitution", fixedPoint,
      CTESubstitution ::
      WindowsSubstitution ::
      Nil : _*),
    Batch("Resolution", fixedPoint,
      ResolveRelations ::
      ResolveReferences ::
      ResolveGroupingAnalytics ::
      ResolveSortReferences ::
      ResolveGenerate ::
      ResolveFunctions ::
      ResolveAliases ::
      ExtractWindowExpressions ::
      GlobalAggregates ::
      ResolveAggregateFunctions ::
      HiveTypeCoercion.typeCoercionRules ++
      extendedResolutionRules : _*),
    Batch("Nondeterministic", Once,
      PullOutNondeterministic),
    Batch("Cleanup", fixedPoint,
      CleanupAliases)
  )           

在這個裡面的諸如CTESubstitution等都是一個Rule,它們繼承自Rule類,我們還可以看到這邊有一個fixedPoint,這個字段的意思就是當使用一個Rule的時候如果循環的次數達到了FixedPoint次,或者前後兩次樹結構沒有變化那麼就停止操作的政策。

下面我們再回到那個execute方法,它的意思其實就是去周遊每一個Batch,再周遊裡面的每一個Rule,按照指定的次數應用到logicalPlan上,注意LogicalPlan的本質上就是一個TreeNode,其間接繼承自TreeNode

Spark源碼分析之SparkSql的Analyzer,Optimizer

如果給TreeNode應用Rule的TreeNode與之前相同,則退出目前的Rule,進行下一個Rule的處理,直到所有的Rule全部周遊結束。此時就意味着Unresolved LogicalPlan解析完畢,生成了一個Resolved LogicalPlan

第二步:使用Optimizer對剛剛生成的Resolved Logical Plan進行一系列的優化

與Analyzer的用法一樣,隻不過這邊的Optimizer使用的是預設的實作DefaultOptimizer,在其中定義了很多的優化政策:

//這裡封裝了每一個Spark SQL中,可以對邏輯執行計劃執行的優化政策
  val batches =
    // SubQueries are only needed for analysis and can be removed before execution.
    Batch("Remove SubQueries", FixedPoint(100),
      EliminateSubQueries) ::
    Batch("Aggregate", FixedPoint(100),
      ReplaceDistinctWithAggregate,
      RemoveLiteralFromGroupExpressions) ::
    Batch("Operator Optimizations", FixedPoint(100),
      // Operator push down
      SetOperationPushDown,
      SamplePushDown,
      PushPredicateThroughJoin,
      PushPredicateThroughProject,
      PushPredicateThroughGenerate,
      ColumnPruning,  //列剪裁,就是對我們要查詢的列進行剪裁
      // Operator combine
      ProjectCollapsing,
      CombineFilters,
      CombineLimits,  //合并limit子句,取一個并集就可以了,這樣的話在後面limit執行的時候就執行一次就可以了
      // Constant folding
      NullPropagation,  //針對null的優化,盡量避免值出現null的情況,否則null很容易産生資料傾斜
      OptimizeIn,
      ConstantFolding, //針對常量的優化,在這裡會直接計算可以獲得的常量,是以我們對自己的sql中可能出現的常量盡量直接給出
      LikeSimplification,//like的簡化優化,
      BooleanSimplification,//Boolean的簡化優化
      RemovePositive,
      SimplifyFilters,
      SimplifyCasts,
      SimplifyCaseConversionExpressions) ::
    Batch("Decimal Optimizations", FixedPoint(100),
      DecimalAggregates) ::
    Batch("LocalRelation", FixedPoint(100),
      ConvertToLocalRelation) :: Nil
}           

在上面的代碼中已經注釋了一些優化的政策,了解這些優化對我們程式性能的提升還是有用處的,如果我們一開始寫的sql就是按照這個優化政策來寫的,那麼這邊就不需要再費大量的時間進行優化了

第三步:生成實體執行計劃

經過SqlParser、Analyzer、Optimizer的處理,生成的邏輯計劃還無法被當做一般的Job來處理,此時為了能夠将邏輯執行計劃按照其他Job一樣對待,需要将logicalPlan轉換為實體執行計劃

主要的代碼也就是

prepareForExecution.execute(sparkPlan)

,那麼此時prepareForExecution其實就是一個RuleExecutor的對象,隻不過其會設定這個對象的batch值,具體的邏輯遇上面的Analyzer和Optimizer差不多,此處不再贅述

第四步:執行實體執行計劃

調用

executedPlan.execute()

方法

至此整個SparkSQL的執行的整個過程便解析完畢,我們可以總結成下面的流程:

Spark源碼分析之SparkSql的Analyzer,Optimizer

繼續閱讀