Analyzer位于Catalyst的analysis package下,主要職責是将Sql Parser 未能Resolved的Logical Plan 給Resolved掉。

Analyzer會使用Catalog和FunctionRegistry将UnresolvedAttribute和UnresolvedRelation轉換為catalyst裡全類型的對象。
Analyzer裡面有fixedPoint對象,一個Seq[Batch].
class Analyzer(catalog: Catalog, registry: FunctionRegistry, caseSensitive: Boolean)
extends RuleExecutor[LogicalPlan] with HiveTypeCoercion {
// TODO: pass this in as a parameter.
val fixedPoint = FixedPoint(100)
val batches: Seq[Batch] = Seq(
Batch("MultiInstanceRelations", Once,
NewRelationInstances),
Batch("CaseInsensitiveAttributeReferences", Once,
(if (caseSensitive) Nil else LowercaseAttributeReferences :: Nil) : _*),
Batch("Resolution", fixedPoint,
ResolveReferences ::
ResolveRelations ::
NewRelationInstances ::
ImplicitGenerate ::
StarExpansion ::
ResolveFunctions ::
GlobalAggregates ::
typeCoercionRules :_*),
Batch("AnalysisOperators", fixedPoint,
EliminateAnalysisOperators)
)
Analyzer裡的一些對象解釋:
FixedPoint:相當于疊代次數的上限。
/** A strategy that runs until fix point or maxIterations times, whichever comes first. */
case class FixedPoint(maxIterations: Int) extends Strategy
Batch: 批次,這個對象是由一系列Rule組成的,采用一個政策(政策其實是疊代幾次的别名吧,eg:Once)
/** A batch of rules. */,
protected case class Batch(name: String, strategy: Strategy, rules: Rule[TreeType]*)
Rule:了解為一種規則,這種規則會應用到Logical Plan 進而将UnResolved 轉變為Resolved
abstract class Rule[TreeType <: TreeNode[_]] extends Logging {
/** Name for this rule, automatically inferred based on class name. */
val ruleName: String = {
val className = getClass.getName
if (className endsWith "$") className.dropRight(1) else className
}
def apply(plan: TreeType): TreeType
}
Strategy:最大的執行次數,如果執行次數在最大疊代次數之前就達到了fix point,政策就會停止,不再應用了。
/**
* An execution strategy for rules that indicates the maximum number of executions. If the
* execution reaches fix point (i.e. converge) before maxIterations, it will stop.
*/
abstract class Strategy { def maxIterations: Int }
Analyzer解析主要是根據這些Batch裡面定義的政策和Rule來對Unresolved的邏輯計劃進行解析的。
這裡Analyzer類本身并沒有定義執行的方法,而是要從它的父類RuleExecutor[LogicalPlan]尋找,Analyzer也實作了HiveTypeCosercion,這個類是參考Hive的類型自動相容轉換的原理。如圖:
RuleExecutor:執行Rule的執行環境,它會将包含了一系列的Rule的Batch進行執行,這個過程都是串行的。
具體的執行方法定義在apply裡:
可以看到這裡是一個while循環,每個batch下的rules都對目前的plan進行作用,這個過程是疊代的,直到達到Fix Point或者最大疊代次數。
def apply(plan: TreeType): TreeType = {
var curPlan = plan
batches.foreach { batch =>
val batchStartPlan = curPlan
var iteration = 1
var lastPlan = curPlan
var continue = true
// Run until fix point (or the max number of iterations as specified in the strategy.
while (continue) {
curPlan = batch.rules.foldLeft(curPlan) {
case (plan, rule) =>
val result = rule(plan) //這裡将調用各個不同Rule的apply方法,将UnResolved Relations,Attrubute和Function進行Resolve
if (!result.fastEquals(plan)) {
logger.trace(
s"""
|=== Applying Rule ${rule.ruleName} ===
|${sideBySide(plan.treeString, result.treeString).mkString("\n")}
""".stripMargin)
}
result //傳回作用後的result plan
}
iteration += 1
if (iteration > batch.strategy.maxIterations) { //如果疊代次數已經大于該政策的最大疊代次數,就停止循環
logger.info(s"Max iterations ($iteration) reached for batch ${batch.name}")
continue = false
if (curPlan.fastEquals(lastPlan)) { //如果在多次疊代中不再變化,因為plan有個unique id,就停止循環。
logger.trace(s"Fixed point reached for batch ${batch.name} after $iteration iterations.")
lastPlan = curPlan
}
if (!batchStartPlan.fastEquals(curPlan)) {
logger.debug(
s"""
|=== Result of Batch ${batch.name} ===
|${sideBySide(plan.treeString, curPlan.treeString).mkString("\n")}
""".stripMargin)
} else {
logger.trace(s"Batch ${batch.name} has no effect.")
}
curPlan //傳回Resolved的Logical Plan
}
目前Spark SQL 1.0.0的Rule都定義在了Analyzer.scala的内部類。
在batches裡面定義了4個Batch。
MultiInstanceRelations、CaseInsensitiveAttributeReferences、Resolution、AnalysisOperators 四個。
這4個Batch是将不同的Rule進行歸類,每種類别采用不同的政策來進行Resolve。
如果一個執行個體在Logical Plan裡出現了多次,則會應用NewRelationInstances這兒Rule
Batch("MultiInstanceRelations", Once,
NewRelationInstances)
trait MultiInstanceRelation {
def newInstance: this.type
object NewRelationInstances extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = {
val localRelations = plan collect { case l: MultiInstanceRelation => l} //将logical plan應用partial function得到所有MultiInstanceRelation的plan的集合
val multiAppearance = localRelations
.groupBy(identity[MultiInstanceRelation]) //group by操作
.filter { case (_, ls) => ls.size > 1 } //如果隻取size大于1的進行後續操作
.map(_._1)
.toSet
//更新plan,使得每個執行個體的expId是唯一的。
plan transform {
case l: MultiInstanceRelation if multiAppearance contains l => l.newInstance
}
同樣是partital function,對目前plan應用,将所有比對的如UnresolvedRelation的别名alise轉換為小寫,将Subquery的别名也轉換為小寫。
總結:這是一個使屬性名大小寫不敏感的Rule,因為它将所有屬性都to lower case了。
object LowercaseAttributeReferences extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case UnresolvedRelation(databaseName, name, alias) =>
UnresolvedRelation(databaseName, name, alias.map(_.toLowerCase))
case Subquery(alias, child) => Subquery(alias.toLowerCase, child)
case q: LogicalPlan => q transformExpressions {
case s: Star => s.copy(table = s.table.map(_.toLowerCase))
case UnresolvedAttribute(name) => UnresolvedAttribute(name.toLowerCase)
case Alias(c, name) => Alias(c, name.toLowerCase)()
case GetField(c, name) => GetField(c, name.toLowerCase)
将Sql parser解析出來的UnresolvedAttribute全部都轉為對應的實際的catalyst.expressions.AttributeReference AttributeReferences
這裡調用了logical plan 的resolve方法,将屬性轉為NamedExepression。
object ResolveReferences extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
case q: LogicalPlan if q.childrenResolved =>
logger.trace(s"Attempting to resolve ${q.simpleString}")
q transformExpressions {
case u @ UnresolvedAttribute(name) =>
// Leave unchanged if resolution fails. Hopefully will be resolved next round.
val result = q.resolve(name).getOrElse(u)//轉化為NamedExpression
logger.debug(s"Resolving $u to $result")
result
}
這個比較好了解,還記得前面Sql parser嗎,比如select * from src,這個src表parse後就是一個UnresolvedRelation節點。
這一步ResolveRelations調用了catalog這個對象。Catalog對象裡面維護了一個tableName,Logical Plan的HashMap結果。
通過這個Catalog目錄來尋找目前表的結構,進而從中解析出這個表的字段,如UnResolvedRelations 會得到一個tableWithQualifiers。(即表和字段)
這也解釋了為什麼流程圖那,我會畫一個catalog在上面,因為它是Analyzer工作時需要的meta data。
object ResolveRelations extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case UnresolvedRelation(databaseName, name, alias) =>
catalog.lookupRelation(databaseName, name, alias)
如果在select語句裡隻有一個表達式,而且這個表達式是一個Generator(Generator是一個1條記錄生成到N條記錄的映射)
當在解析邏輯計劃時,遇到Project節點的時候,就可以将它轉換為Generate類(Generate類是将輸入流應用一個函數,進而生成一個新的流)。
object ImplicitGenerate extends Rule[LogicalPlan] {
case Project(Seq(Alias(g: Generator, _)), child) =>
Generate(g, join = false, outer = false, None, child)
在Project操作符裡,如果是*符号,即select * 語句,可以将所有的references都展開,即将select * 中的*展開成實際的字段。
object StarExpansion extends Rule[LogicalPlan] {
// Wait until children are resolved
case p: LogicalPlan if !p.childrenResolved => p
// If the projection list contains Stars, expand it.
case p @ Project(projectList, child) if containsStar(projectList) =>
Project(
projectList.flatMap {
case s: Star => s.expand(child.output) //展開,将輸入的Attributeexpand(input: Seq[Attribute]) 轉化為Seq[NamedExpression]
case o => o :: Nil
},
child)
case t: ScriptTransformation if containsStar(t.input) =>
t.copy(
input = t.input.flatMap {
case s: Star => s.expand(t.child.output)
}
)
// If the aggregate function argument contains Stars, expand it.
case a: Aggregate if containsStar(a.aggregateExpressions) =>
a.copy(
aggregateExpressions = a.aggregateExpressions.flatMap {
case s: Star => s.expand(a.child.output)
/**
* Returns true if `exprs` contains a [[Star]].
*/
protected def containsStar(exprs: Seq[Expression]): Boolean =
exprs.collect { case _: Star => true }.nonEmpty
這個和ResolveReferences差不多,這裡主要是對udf進行resolve。
将這些UDF都在FunctionRegistry裡進行查找。
object ResolveFunctions extends Rule[LogicalPlan] {
case q: LogicalPlan =>
case u @ UnresolvedFunction(name, children) if u.childrenResolved =>
registry.lookupFunction(name, children) //看是否注冊了目前udf
全局的聚合,如果遇到了Project就傳回一個Aggregate.
object GlobalAggregates extends Rule[LogicalPlan] {
case Project(projectList, child) if containsAggregates(projectList) =>
Aggregate(Nil, projectList, child)
def containsAggregates(exprs: Seq[Expression]): Boolean = {
exprs.foreach(_.foreach {
case agg: AggregateExpression => return true
case _ =>
})
false
這個是Hive裡的相容SQL文法,比如将String和Int互相轉換,不需要顯示的調用cast xxx as yyy了。如StringToIntegerCasts。
val typeCoercionRules =
PropagateTypes ::
ConvertNaNs ::
WidenTypes ::
PromoteStrings ::
BooleanComparisons ::
BooleanCasts ::
StringToIntegralCasts ::
FunctionArgumentConversion ::
CastNulls ::
Nil
将分析的操作符移除,這裡僅支援2種,一種是Subquery需要移除,一種是LowerCaseSchema。這些節點都會從Logical Plan裡移除。
object EliminateAnalysisOperators extends Rule[LogicalPlan] {
case Subquery(_, child) => child //遇到Subquery,不反悔本身,傳回它的Child,即删除了該元素
case LowerCaseSchema(child) => child
補充昨天DEBUG的一個例子,這個例子證明了如何将上面的規則應用到Unresolved Logical Plan:
當傳遞sql語句的時候,的确調用了ResolveReferences将mobile解析成NamedExpression。
可以對照這看執行流程,左邊是Unresolved Logical Plan,右邊是Resoveld Logical Plan。
先是執行了Batch Resolution,eg: 調用ResovelRalation這個RUle來使 Unresovled Relation 轉化為 SparkLogicalPlan并通過Catalog找到了其對于的字段屬性。
然後執行了Batch Analysis Operator。eg:調用EliminateAnalysisOperators來将SubQuery給remove掉了。
可能格式顯示的不太好,可以向右邊拖動下滾動軸看下結果。 :)
val exec = sqlContext.sql("select mobile as mb, sid as id, mobile*2 multi2mobile, count(1) times from (select * from temp_shengli_mobile)a where pfrom_id=0.0 group by mobile, sid, mobile*2")
14/07/21 18:23:32 DEBUG SparkILoop$SparkILoopInterpreter: Invoking: public static java.lang.String $line47.$eval.$print()
14/07/21 18:23:33 INFO Analyzer: Max iterations (2) reached for batch MultiInstanceRelations
14/07/21 18:23:33 INFO Analyzer: Max iterations (2) reached for batch CaseInsensitiveAttributeReferences
14/07/21 18:23:33 DEBUG Analyzer$ResolveReferences$: Resolving 'pfrom_id to pfrom_id#5
14/07/21 18:23:33 DEBUG Analyzer$ResolveReferences$: Resolving 'mobile to mobile#2
14/07/21 18:23:33 DEBUG Analyzer$ResolveReferences$: Resolving 'sid to sid#1
14/07/21 18:23:33 DEBUG Analyzer:
=== Result of Batch Resolution ===
!Aggregate ['mobile,'sid,('mobile * 2) AS c2#27], ['mobile AS mb#23,'sid AS id#24,('mobile * 2) AS multi2mobile#25,COUNT(1) AS times#26L] Aggregate [mobile#2,sid#1,(CAST(mobile#2, DoubleType) * CAST(2, DoubleType)) AS c2#27], [mobile#2 AS mb#23,sid#1 AS id#24,(CAST(mobile#2, DoubleType) * CAST(2, DoubleType)) AS multi2mobile#25,COUNT(1) AS times#26L]
! Filter ('pfrom_id = 0.0) Filter (CAST(pfrom_id#5, DoubleType) = 0.0)
Subquery a Subquery a
! Project [*] Project [data_date#0,sid#1,mobile#2,pverify_type#3,create_time#4,pfrom_id#5,p_status#6,pvalidate_time#7,feffect_time#8,plastupdate_ip#9,update_time#10,status#11,preserve_int#12]
! UnresolvedRelation None, temp_shengli_mobile, None Subquery temp_shengli_mobile
! SparkLogicalPlan (ExistingRdd [data_date#0,sid#1,mobile#2,pverify_type#3,create_time#4,pfrom_id#5,p_status#6,pvalidate_time#7,feffect_time#8,plastupdate_ip#9,update_time#10,status#11,preserve_int#12], MapPartitionsRDD[4] at mapPartitions at basicOperators.scala:174)
=== Result of Batch AnalysisOperators ===
! Subquery a Project [data_date#0,sid#1,mobile#2,pverify_type#3,create_time#4,pfrom_id#5,p_status#6,pvalidate_time#7,feffect_time#8,plastupdate_ip#9,update_time#10,status#11,preserve_int#12]
! Project [*] SparkLogicalPlan (ExistingRdd [data_date#0,sid#1,mobile#2,pverify_type#3,create_time#4,pfrom_id#5,p_status#6,pvalidate_time#7,feffect_time#8,plastupdate_ip#9,update_time#10,status#11,preserve_int#12], MapPartitionsRDD[4] at mapPartitions at basicOperators.scala:174)
! UnresolvedRelation None, temp_shengli_mobile, None
本文從源代碼角度分析了Analyzer在對Sql Parser解析出的UnResolve Logical Plan 進行analyze的過程中,所執行的流程。
流程是執行個體化一個SimpleAnalyzer,定義一些Batch,然後周遊這些Batch在RuleExecutor的環境下,執行Batch裡面的Rules,每個Rule會對Unresolved Logical Plan進行Resolve,有些可能不能一次解析出,需要多次疊代,直到達到max疊代次數或者達到fix point。這裡Rule裡比較常用的就是ResolveReferences、ResolveRelations、StarExpansion、GlobalAggregates、typeCoercionRules和EliminateAnalysisOperators。