在上一篇博文中,我們深入的了解了SparkSql中的sql語句經過DDLParser、SparkSQLParser和SqlParser處理後得到了一個樹結構的Unresolved Logical Plan,這也是我們每一次使用sparkSql時必然會執行的,但是對于一些不是立刻需要傳回結果的造作,執行到這邊也就結束了,隻有遇到哪些諸如show,collect等需要立刻的傳回結果的操作,我們才會繼續後面的執行,這篇博文主要就是将這些。
第一步:使用Analyzer将Unresolved Logical Plan與資料字典(Catalog)進行綁定,生成Resolved LogicalPlan
在真正地需要計算的結果的時候,我們在SQLContext中執行下面的方法:
//實際上在後面真正對DataFrame操作的時候需要真正的去執行sql語句的時候
// 就會觸發sqlContext的executesql方法的執行
//該方法實際上會傳回一個QueryExecution,這個QueryExecution就會觸發後續的整個流程
protected[sql] def executeSql(sql: String): this.QueryExecution = executePlan(parseSql(sql))
protected[sql] def executePlan(plan: LogicalPlan) = new this.QueryExecution(plan)
既然這段代碼的主要操作就是建立了一個QueryExecution對象,那麼此時我們就去看看這段代碼中幹了些什麼
protected[sql] class QueryExecution(val logical: LogicalPlan) {
def assertAnalyzed(): Unit = analyzer.checkAnalysis(analyzed)
//使用一個UnResolved LogicPlan去構造一個QueryExecution的執行個體對象
//那麼sql語句的設計執行就會立即一步一步的觸發
//調用analyzer來生成一個resolved LogicPlan
lazy val analyzed: LogicalPlan = analyzer.execute(logical)
//如果目前的這個執行計劃緩存中有,那麼就從緩存中讀取
lazy val withCachedData: LogicalPlan = {
assertAnalyzed()
cacheManager.useCachedData(analyzed)
}
//針對resolvedPlan調用optimizer的execute進行優化,得到優化後的optimized LogicalPlan
//獲得優化後的邏輯執行計劃
lazy val optimizedPlan: LogicalPlan = optimizer.execute(withCachedData)
// TODO: Don't just pick the first one...
//使用sparkPlanner根據剛剛建立的一個optimized LogicalPlan建立一個sparkplan
lazy val sparkPlan: SparkPlan = {
SparkPlan.currentContext.set(self)
planner.plan(optimizedPlan).next()
}
/*在sparksql中,邏輯執行計劃就是LogicalPlan,實體執行計劃就是SparkPlan*/
// executedPlan should not be used to initialize any SparkPlan. It should be
// only used for execution.
//生成一個可以執行的sparkplan,此時就是physicplan,此時就是實體執行計劃
//此時的話就已經綁定好了資料源,知各個表如何join
//如果進行join,預設spark内部是會對小表進行廣播的
lazy val executedPlan: SparkPlan = prepareForExecution.execute(sparkPlan)
/** Internal version of the RDD. Avoids copies and has no schema */
//調用sparkPlan (封裝了Physical plan)的execute方法,execute方法實際上就會執行實體執行計劃
lazy val toRdd: RDD[InternalRow] = executedPlan.execute()
protected def stringOrError[A](f: => A): String =
try f.toString catch { case e: Throwable => e.toString }
def simpleString: String =
s"""== Physical Plan ==
|${stringOrError(executedPlan)}
""".stripMargin.trim
override def toString: String = {
def output =
analyzed.output.map(o => s"${o.name}: ${o.dataType.simpleString}").mkString(", ")
s"""== Parsed Logical Plan ==
|${stringOrError(logical)}
|== Analyzed Logical Plan ==
|${stringOrError(output)}
|${stringOrError(analyzed)}
|== Optimized Logical Plan ==
|${stringOrError(optimizedPlan)}
|== Physical Plan ==
|${stringOrError(executedPlan)}
|Code Generation: ${stringOrError(executedPlan.codegenEnabled)}
""".stripMargin.trim
}
}
在此我們可以看到就是調用了analyzer的execute方法,與前面的DDLParser、SparkSQLParser、SQLParser等一樣,我們也先來看看這個類的繼承關系:
![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsICdzFWRoRXdvN1LclHdpZXYyd2LcBzNvwVZ2x2bzNXak9CX90TQNNkRrFlQKBTSvwFbslmZvwFMwQzLcVmepNHdu9mZvwFVywUNMZTY18CX052bm9CX9UEVPlXQU50dFpWTmZEWjZXUYpVd1kmYr50MZV3YyI2cKJDT29GRjBjUIF2LcRHelR3LcJzLctmch1mclRXY39TMwkzM0QDN5ADMykDM3EDMy8CX0Vmbu4GZzNmLn9Gbi1yZtl2Lc9CX6MHc0RHaiojIsJye.jpg)
那麼這邊調用的實際上是其父類RuleExecutor中的execute方法:
def execute(plan: TreeType): TreeType = {
var curPlan = plan
batches.foreach { batch =>
val batchStartPlan = curPlan
var iteration = 1
var lastPlan = curPlan
var continue = true
// Run until fix point (or the max number of iterations as specified in the strategy.
while (continue) {
curPlan = batch.rules.foldLeft(curPlan) {
case (plan, rule) =>
val startTime = System.nanoTime()
val result = rule(plan)
val runTime = System.nanoTime() - startTime
RuleExecutor.timeMap.addAndGet(rule.ruleName, runTime)
if (!result.fastEquals(plan)) {
logTrace(
s"""
|=== Applying Rule ${rule.ruleName} ===
|${sideBySide(plan.treeString, result.treeString).mkString("\n")}
""".stripMargin)
}
result
}
iteration += 1
if (iteration > batch.strategy.maxIterations) {
// Only log if this is a rule that is supposed to run more than once.
if (iteration != 2) {
logInfo(s"Max iterations (${iteration - 1}) reached for batch ${batch.name}")
}
continue = false
}
if (curPlan.fastEquals(lastPlan)) {
logTrace(
s"Fixed point reached for batch ${batch.name} after ${iteration - 1} iterations.")
continue = false
}
lastPlan = curPlan
}
if (!batchStartPlan.fastEquals(curPlan)) {
logDebug(
s"""
|=== Result of Batch ${batch.name} ===
|${sideBySide(plan.treeString, curPlan.treeString).mkString("\n")}
""".stripMargin)
} else {
logTrace(s"Batch ${batch.name} has no effect.")
}
}
curPlan
}
了解這個方法之前我們先需要了解下面的概念:
在上一階段生成的Unresolved LogicalPlan後,Analyzer和optimizer肯定是通過一系列的操作才将這個未解析的邏輯計劃轉換為resolved LogicalPlan的,那麼這系列的操作就是一系列的Rule
那麼這邊的RuleExecutor的主要作用就是執行Rule
這個方法的主要作用就是通過周遊取出Analyzer中定義的batches裡存儲的每一個Batch,每一個Batch中會封裝同屬某一個類别的Rule及其相應的執行政策,我們看一段Analyzer中的Batch的代碼
lazy val batches: Seq[Batch] = Seq(
Batch("Substitution", fixedPoint,
CTESubstitution ::
WindowsSubstitution ::
Nil : _*),
Batch("Resolution", fixedPoint,
ResolveRelations ::
ResolveReferences ::
ResolveGroupingAnalytics ::
ResolveSortReferences ::
ResolveGenerate ::
ResolveFunctions ::
ResolveAliases ::
ExtractWindowExpressions ::
GlobalAggregates ::
ResolveAggregateFunctions ::
HiveTypeCoercion.typeCoercionRules ++
extendedResolutionRules : _*),
Batch("Nondeterministic", Once,
PullOutNondeterministic),
Batch("Cleanup", fixedPoint,
CleanupAliases)
)
在這個裡面的諸如CTESubstitution等都是一個Rule,它們繼承自Rule類,我們還可以看到這邊有一個fixedPoint,這個字段的意思就是當使用一個Rule的時候如果循環的次數達到了FixedPoint次,或者前後兩次樹結構沒有變化那麼就停止操作的政策。
下面我們再回到那個execute方法,它的意思其實就是去周遊每一個Batch,再周遊裡面的每一個Rule,按照指定的次數應用到logicalPlan上,注意LogicalPlan的本質上就是一個TreeNode,其間接繼承自TreeNode
如果給TreeNode應用Rule的TreeNode與之前相同,則退出目前的Rule,進行下一個Rule的處理,直到所有的Rule全部周遊結束。此時就意味着Unresolved LogicalPlan解析完畢,生成了一個Resolved LogicalPlan
第二步:使用Optimizer對剛剛生成的Resolved Logical Plan進行一系列的優化
與Analyzer的用法一樣,隻不過這邊的Optimizer使用的是預設的實作DefaultOptimizer,在其中定義了很多的優化政策:
//這裡封裝了每一個Spark SQL中,可以對邏輯執行計劃執行的優化政策
val batches =
// SubQueries are only needed for analysis and can be removed before execution.
Batch("Remove SubQueries", FixedPoint(100),
EliminateSubQueries) ::
Batch("Aggregate", FixedPoint(100),
ReplaceDistinctWithAggregate,
RemoveLiteralFromGroupExpressions) ::
Batch("Operator Optimizations", FixedPoint(100),
// Operator push down
SetOperationPushDown,
SamplePushDown,
PushPredicateThroughJoin,
PushPredicateThroughProject,
PushPredicateThroughGenerate,
ColumnPruning, //列剪裁,就是對我們要查詢的列進行剪裁
// Operator combine
ProjectCollapsing,
CombineFilters,
CombineLimits, //合并limit子句,取一個并集就可以了,這樣的話在後面limit執行的時候就執行一次就可以了
// Constant folding
NullPropagation, //針對null的優化,盡量避免值出現null的情況,否則null很容易産生資料傾斜
OptimizeIn,
ConstantFolding, //針對常量的優化,在這裡會直接計算可以獲得的常量,是以我們對自己的sql中可能出現的常量盡量直接給出
LikeSimplification,//like的簡化優化,
BooleanSimplification,//Boolean的簡化優化
RemovePositive,
SimplifyFilters,
SimplifyCasts,
SimplifyCaseConversionExpressions) ::
Batch("Decimal Optimizations", FixedPoint(100),
DecimalAggregates) ::
Batch("LocalRelation", FixedPoint(100),
ConvertToLocalRelation) :: Nil
}
在上面的代碼中已經注釋了一些優化的政策,了解這些優化對我們程式性能的提升還是有用處的,如果我們一開始寫的sql就是按照這個優化政策來寫的,那麼這邊就不需要再費大量的時間進行優化了
第三步:生成實體執行計劃
經過SqlParser、Analyzer、Optimizer的處理,生成的邏輯計劃還無法被當做一般的Job來處理,此時為了能夠将邏輯執行計劃按照其他Job一樣對待,需要将logicalPlan轉換為實體執行計劃
主要的代碼也就是
prepareForExecution.execute(sparkPlan)
,那麼此時prepareForExecution其實就是一個RuleExecutor的對象,隻不過其會設定這個對象的batch值,具體的邏輯遇上面的Analyzer和Optimizer差不多,此處不再贅述
第四步:執行實體執行計劃
調用
executedPlan.execute()
方法
至此整個SparkSQL的執行的整個過程便解析完畢,我們可以總結成下面的流程: