天天看點

Spark-SparkSQL深入學習系列三(轉自OopsOutOfMemory)

    Analyzer位于Catalyst的analysis package下,主要職責是将Sql Parser 未能Resolved的Logical Plan 給Resolved掉。

Spark-SparkSQL深入學習系列三(轉自OopsOutOfMemory)

    Analyzer會使用Catalog和FunctionRegistry将UnresolvedAttribute和UnresolvedRelation轉換為catalyst裡全類型的對象。

    Analyzer裡面有fixedPoint對象,一個Seq[Batch].

class Analyzer(catalog: Catalog, registry: FunctionRegistry, caseSensitive: Boolean)  

  extends RuleExecutor[LogicalPlan] with HiveTypeCoercion {  

  // TODO: pass this in as a parameter.  

  val fixedPoint = FixedPoint(100)  

  val batches: Seq[Batch] = Seq(  

    Batch("MultiInstanceRelations", Once,  

      NewRelationInstances),  

    Batch("CaseInsensitiveAttributeReferences", Once,  

      (if (caseSensitive) Nil else LowercaseAttributeReferences :: Nil) : _*),  

    Batch("Resolution", fixedPoint,  

      ResolveReferences ::  

      ResolveRelations ::  

      NewRelationInstances ::  

      ImplicitGenerate ::  

      StarExpansion ::  

      ResolveFunctions ::  

      GlobalAggregates ::  

      typeCoercionRules :_*),  

    Batch("AnalysisOperators", fixedPoint,  

      EliminateAnalysisOperators)  

  )  

    Analyzer裡的一些對象解釋:

    FixedPoint:相當于疊代次數的上限。

/** A strategy that runs until fix point or maxIterations times, whichever comes first. */  

case class FixedPoint(maxIterations: Int) extends Strategy  

    Batch: 批次,這個對象是由一系列Rule組成的,采用一個政策(政策其實是疊代幾次的别名吧,eg:Once)

/** A batch of rules. */,  

protected case class Batch(name: String, strategy: Strategy, rules: Rule[TreeType]*)  

   Rule:了解為一種規則,這種規則會應用到Logical Plan 進而将UnResolved 轉變為Resolved

abstract class Rule[TreeType <: TreeNode[_]] extends Logging {  

  /** Name for this rule, automatically inferred based on class name. */  

  val ruleName: String = {  

    val className = getClass.getName  

    if (className endsWith "$") className.dropRight(1) else className  

  }  

  def apply(plan: TreeType): TreeType  

}  

   Strategy:最大的執行次數,如果執行次數在最大疊代次數之前就達到了fix point,政策就會停止,不再應用了。

/** 

 * An execution strategy for rules that indicates the maximum number of executions. If the 

 * execution reaches fix point (i.e. converge) before maxIterations, it will stop. 

 */  

abstract class Strategy { def maxIterations: Int }  

   Analyzer解析主要是根據這些Batch裡面定義的政策和Rule來對Unresolved的邏輯計劃進行解析的。

   這裡Analyzer類本身并沒有定義執行的方法,而是要從它的父類RuleExecutor[LogicalPlan]尋找,Analyzer也實作了HiveTypeCosercion,這個類是參考Hive的類型自動相容轉換的原理。如圖:

Spark-SparkSQL深入學習系列三(轉自OopsOutOfMemory)

    RuleExecutor:執行Rule的執行環境,它會将包含了一系列的Rule的Batch進行執行,這個過程都是串行的。

    具體的執行方法定義在apply裡:

    可以看到這裡是一個while循環,每個batch下的rules都對目前的plan進行作用,這個過程是疊代的,直到達到Fix Point或者最大疊代次數。

def apply(plan: TreeType): TreeType = {  

   var curPlan = plan  

   batches.foreach { batch =>  

     val batchStartPlan = curPlan  

     var iteration = 1  

     var lastPlan = curPlan  

     var continue = true  

     // Run until fix point (or the max number of iterations as specified in the strategy.  

     while (continue) {  

       curPlan = batch.rules.foldLeft(curPlan) {  

         case (plan, rule) =>  

           val result = rule(plan) //這裡将調用各個不同Rule的apply方法,将UnResolved Relations,Attrubute和Function進行Resolve  

           if (!result.fastEquals(plan)) {  

             logger.trace(  

               s"""  

                 |=== Applying Rule ${rule.ruleName} ===  

                 |${sideBySide(plan.treeString, result.treeString).mkString("\n")}  

               """.stripMargin)  

           }  

           result //傳回作用後的result plan  

       }  

       iteration += 1  

       if (iteration > batch.strategy.maxIterations) { //如果疊代次數已經大于該政策的最大疊代次數,就停止循環  

         logger.info(s"Max iterations ($iteration) reached for batch ${batch.name}")  

         continue = false  

       if (curPlan.fastEquals(lastPlan)) { //如果在多次疊代中不再變化,因為plan有個unique id,就停止循環。  

         logger.trace(s"Fixed point reached for batch ${batch.name} after $iteration iterations.")  

       lastPlan = curPlan  

     }  

     if (!batchStartPlan.fastEquals(curPlan)) {  

       logger.debug(  

         s"""  

         |=== Result of Batch ${batch.name} ===  

         |${sideBySide(plan.treeString, curPlan.treeString).mkString("\n")}  

       """.stripMargin)  

     } else {  

       logger.trace(s"Batch ${batch.name} has no effect.")  

   }  

   curPlan //傳回Resolved的Logical Plan  

 }  

    目前Spark SQL 1.0.0的Rule都定義在了Analyzer.scala的内部類。

    在batches裡面定義了4個Batch。

    MultiInstanceRelations、CaseInsensitiveAttributeReferences、Resolution、AnalysisOperators 四個。

    這4個Batch是将不同的Rule進行歸類,每種類别采用不同的政策來進行Resolve。

Spark-SparkSQL深入學習系列三(轉自OopsOutOfMemory)

如果一個執行個體在Logical Plan裡出現了多次,則會應用NewRelationInstances這兒Rule

Batch("MultiInstanceRelations", Once,  

     NewRelationInstances)  

trait MultiInstanceRelation {  

  def newInstance: this.type  

object NewRelationInstances extends Rule[LogicalPlan] {   

  def apply(plan: LogicalPlan): LogicalPlan = {  

    val localRelations = plan collect { case l: MultiInstanceRelation => l} //将logical plan應用partial function得到所有MultiInstanceRelation的plan的集合   

    val multiAppearance = localRelations  

      .groupBy(identity[MultiInstanceRelation]) //group by操作  

      .filter { case (_, ls) => ls.size > 1 } //如果隻取size大于1的進行後續操作  

      .map(_._1)  

      .toSet  

    //更新plan,使得每個執行個體的expId是唯一的。  

    plan transform {  

      case l: MultiInstanceRelation if multiAppearance contains l => l.newInstance  

    }  

同樣是partital function,對目前plan應用,将所有比對的如UnresolvedRelation的别名alise轉換為小寫,将Subquery的别名也轉換為小寫。

總結:這是一個使屬性名大小寫不敏感的Rule,因為它将所有屬性都to lower case了。

object LowercaseAttributeReferences extends Rule[LogicalPlan] {  

  def apply(plan: LogicalPlan): LogicalPlan = plan transform {  

    case UnresolvedRelation(databaseName, name, alias) =>  

      UnresolvedRelation(databaseName, name, alias.map(_.toLowerCase))  

    case Subquery(alias, child) => Subquery(alias.toLowerCase, child)  

    case q: LogicalPlan => q transformExpressions {  

      case s: Star => s.copy(table = s.table.map(_.toLowerCase))  

      case UnresolvedAttribute(name) => UnresolvedAttribute(name.toLowerCase)  

      case Alias(c, name) => Alias(c, name.toLowerCase)()  

      case GetField(c, name) => GetField(c, name.toLowerCase)  

将Sql parser解析出來的UnresolvedAttribute全部都轉為對應的實際的catalyst.expressions.AttributeReference AttributeReferences

這裡調用了logical plan 的resolve方法,将屬性轉為NamedExepression。

object ResolveReferences extends Rule[LogicalPlan] {  

  def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {  

    case q: LogicalPlan if q.childrenResolved =>  

      logger.trace(s"Attempting to resolve ${q.simpleString}")  

      q transformExpressions {  

        case u @ UnresolvedAttribute(name) =>  

          // Leave unchanged if resolution fails.  Hopefully will be resolved next round.  

          val result = q.resolve(name).getOrElse(u)//轉化為NamedExpression  

          logger.debug(s"Resolving $u to $result")  

          result  

      }  

這個比較好了解,還記得前面Sql parser嗎,比如select * from src,這個src表parse後就是一個UnresolvedRelation節點。

這一步ResolveRelations調用了catalog這個對象。Catalog對象裡面維護了一個tableName,Logical Plan的HashMap結果。

通過這個Catalog目錄來尋找目前表的結構,進而從中解析出這個表的字段,如UnResolvedRelations 會得到一個tableWithQualifiers。(即表和字段) 

這也解釋了為什麼流程圖那,我會畫一個catalog在上面,因為它是Analyzer工作時需要的meta data。

object ResolveRelations extends Rule[LogicalPlan] {  

    def apply(plan: LogicalPlan): LogicalPlan = plan transform {  

      case UnresolvedRelation(databaseName, name, alias) =>  

        catalog.lookupRelation(databaseName, name, alias)  

如果在select語句裡隻有一個表達式,而且這個表達式是一個Generator(Generator是一個1條記錄生成到N條記錄的映射)

當在解析邏輯計劃時,遇到Project節點的時候,就可以将它轉換為Generate類(Generate類是将輸入流應用一個函數,進而生成一個新的流)。

object ImplicitGenerate extends Rule[LogicalPlan] {  

    case Project(Seq(Alias(g: Generator, _)), child) =>  

      Generate(g, join = false, outer = false, None, child)  

在Project操作符裡,如果是*符号,即select * 語句,可以将所有的references都展開,即将select * 中的*展開成實際的字段。

  object StarExpansion extends Rule[LogicalPlan] {  

      // Wait until children are resolved  

      case p: LogicalPlan if !p.childrenResolved => p  

      // If the projection list contains Stars, expand it.  

      case p @ Project(projectList, child) if containsStar(projectList) =>   

        Project(  

          projectList.flatMap {  

            case s: Star => s.expand(child.output) //展開,将輸入的Attributeexpand(input: Seq[Attribute]) 轉化為Seq[NamedExpression]  

            case o => o :: Nil  

          },  

          child)  

      case t: ScriptTransformation if containsStar(t.input) =>  

        t.copy(  

          input = t.input.flatMap {  

            case s: Star => s.expand(t.child.output)  

          }  

        )  

      // If the aggregate function argument contains Stars, expand it.  

      case a: Aggregate if containsStar(a.aggregateExpressions) =>  

        a.copy(  

          aggregateExpressions = a.aggregateExpressions.flatMap {  

            case s: Star => s.expand(a.child.output)  

    /** 

     * Returns true if `exprs` contains a [[Star]]. 

     */  

    protected def containsStar(exprs: Seq[Expression]): Boolean =  

      exprs.collect { case _: Star => true }.nonEmpty  

這個和ResolveReferences差不多,這裡主要是對udf進行resolve。

将這些UDF都在FunctionRegistry裡進行查找。

object ResolveFunctions extends Rule[LogicalPlan] {  

    case q: LogicalPlan =>  

        case u @ UnresolvedFunction(name, children) if u.childrenResolved =>  

          registry.lookupFunction(name, children) //看是否注冊了目前udf  

全局的聚合,如果遇到了Project就傳回一個Aggregate.

object GlobalAggregates extends Rule[LogicalPlan] {  

    case Project(projectList, child) if containsAggregates(projectList) =>  

      Aggregate(Nil, projectList, child)  

  def containsAggregates(exprs: Seq[Expression]): Boolean = {  

    exprs.foreach(_.foreach {  

      case agg: AggregateExpression => return true  

      case _ =>  

    })  

    false  

這個是Hive裡的相容SQL文法,比如将String和Int互相轉換,不需要顯示的調用cast xxx  as yyy了。如StringToIntegerCasts。

val typeCoercionRules =  

  PropagateTypes ::  

  ConvertNaNs ::  

  WidenTypes ::  

  PromoteStrings ::  

  BooleanComparisons ::  

  BooleanCasts ::  

  StringToIntegralCasts ::  

  FunctionArgumentConversion ::  

  CastNulls ::  

  Nil  

将分析的操作符移除,這裡僅支援2種,一種是Subquery需要移除,一種是LowerCaseSchema。這些節點都會從Logical Plan裡移除。

object EliminateAnalysisOperators extends Rule[LogicalPlan] {  

    case Subquery(_, child) => child //遇到Subquery,不反悔本身,傳回它的Child,即删除了該元素  

    case LowerCaseSchema(child) => child  

  補充昨天DEBUG的一個例子,這個例子證明了如何将上面的規則應用到Unresolved Logical Plan:

  當傳遞sql語句的時候,的确調用了ResolveReferences将mobile解析成NamedExpression。

  可以對照這看執行流程,左邊是Unresolved Logical Plan,右邊是Resoveld Logical Plan。

  先是執行了Batch Resolution,eg: 調用ResovelRalation這個RUle來使 Unresovled Relation 轉化為 SparkLogicalPlan并通過Catalog找到了其對于的字段屬性。

  然後執行了Batch Analysis Operator。eg:調用EliminateAnalysisOperators來将SubQuery給remove掉了。

  可能格式顯示的不太好,可以向右邊拖動下滾動軸看下結果。 :) 

val exec = sqlContext.sql("select mobile as mb, sid as id, mobile*2 multi2mobile, count(1) times from (select * from temp_shengli_mobile)a where pfrom_id=0.0 group by mobile, sid,  mobile*2")  

14/07/21 18:23:32 DEBUG SparkILoop$SparkILoopInterpreter: Invoking: public static java.lang.String $line47.$eval.$print()  

14/07/21 18:23:33 INFO Analyzer: Max iterations (2) reached for batch MultiInstanceRelations  

14/07/21 18:23:33 INFO Analyzer: Max iterations (2) reached for batch CaseInsensitiveAttributeReferences  

14/07/21 18:23:33 DEBUG Analyzer$ResolveReferences$: Resolving 'pfrom_id to pfrom_id#5  

14/07/21 18:23:33 DEBUG Analyzer$ResolveReferences$: Resolving 'mobile to mobile#2  

14/07/21 18:23:33 DEBUG Analyzer$ResolveReferences$: Resolving 'sid to sid#1  

14/07/21 18:23:33 DEBUG Analyzer:   

=== Result of Batch Resolution ===  

!Aggregate ['mobile,'sid,('mobile * 2) AS c2#27], ['mobile AS mb#23,'sid AS id#24,('mobile * 2) AS multi2mobile#25,COUNT(1) AS times#26L]   Aggregate [mobile#2,sid#1,(CAST(mobile#2, DoubleType) * CAST(2, DoubleType)) AS c2#27], [mobile#2 AS mb#23,sid#1 AS id#24,(CAST(mobile#2, DoubleType) * CAST(2, DoubleType)) AS multi2mobile#25,COUNT(1) AS times#26L]  

! Filter ('pfrom_id = 0.0)                                                                                                                   Filter (CAST(pfrom_id#5, DoubleType) = 0.0)  

   Subquery a                                                                                                                                 Subquery a  

!   Project [*]                                                                                                                                Project [data_date#0,sid#1,mobile#2,pverify_type#3,create_time#4,pfrom_id#5,p_status#6,pvalidate_time#7,feffect_time#8,plastupdate_ip#9,update_time#10,status#11,preserve_int#12]  

!    UnresolvedRelation None, temp_shengli_mobile, None                                                                                         Subquery temp_shengli_mobile  

!                                                                                                                                                SparkLogicalPlan (ExistingRdd [data_date#0,sid#1,mobile#2,pverify_type#3,create_time#4,pfrom_id#5,p_status#6,pvalidate_time#7,feffect_time#8,plastupdate_ip#9,update_time#10,status#11,preserve_int#12], MapPartitionsRDD[4] at mapPartitions at basicOperators.scala:174)  

=== Result of Batch AnalysisOperators ===  

!  Subquery a                                                                                                                                 Project [data_date#0,sid#1,mobile#2,pverify_type#3,create_time#4,pfrom_id#5,p_status#6,pvalidate_time#7,feffect_time#8,plastupdate_ip#9,update_time#10,status#11,preserve_int#12]  

!   Project [*]                                                                                                                                SparkLogicalPlan (ExistingRdd [data_date#0,sid#1,mobile#2,pverify_type#3,create_time#4,pfrom_id#5,p_status#6,pvalidate_time#7,feffect_time#8,plastupdate_ip#9,update_time#10,status#11,preserve_int#12], MapPartitionsRDD[4] at mapPartitions at basicOperators.scala:174)  

!    UnresolvedRelation None, temp_shengli_mobile, None                                                                                       

    本文從源代碼角度分析了Analyzer在對Sql Parser解析出的UnResolve Logical Plan 進行analyze的過程中,所執行的流程。

    流程是執行個體化一個SimpleAnalyzer,定義一些Batch,然後周遊這些Batch在RuleExecutor的環境下,執行Batch裡面的Rules,每個Rule會對Unresolved Logical Plan進行Resolve,有些可能不能一次解析出,需要多次疊代,直到達到max疊代次數或者達到fix point。這裡Rule裡比較常用的就是ResolveReferences、ResolveRelations、StarExpansion、GlobalAggregates、typeCoercionRules和EliminateAnalysisOperators。