天天看點

Spark-SQL綁定Spark-SQL綁定

文章目錄

  • Spark-SQL綁定
    • 中繼資料(Catalog)
      • 一、GlobalTempViewManager(全局臨時視圖管理)
      • 二、FunctionResourceLoader(函數資源加載器)
      • 三、FunctionRegistry(函數注冊)
      • 四、ExternalCatalog(外部系統Catalog)
    • 規則(Rule)
    • 分析(Analyzer)
      • 一、ResolveRelations(解析資料表)
      • 二、ResolveReferences(解析字段或表達式)
        • 1.resolveExpression(解析表達式)
        • 2.resolveLiteralFunction(解析常量函數)
      • 三、ResolveFunctions(解析函數)
      • 四、ResolveAggregateFunctions(解析聚合函數)
      • 五、ResolveAggAliasInGroupBy(解析分組字段)

Spark-SQL綁定

上一篇文章Spark-SQL解析講到了Spark-SQL通過Antlr4生成未解析的LogicalPlan。此時的LogicalPlan是Unresolve的,需要通過Catalog來綁定UnresolvedRelation和UnresolvedAttribute,生成解析後的LogicalPlan。

中繼資料(Catalog)

在Spark-SQL中,Catalog主要用于各種函數資源資訊和中繼資料資訊(資料庫、資料表、資料視圖、資料分區與函數等)的統一管理。

Spark-SQL中的Catalog體系實作以SessionCatalog為主體,使用者通過SparkSession來通路,SparkSession和SessionCatalog是一對一的關系。SessionCatalog的構造參數除了包括Spark-SQL和Hadoop的配置外,還涉及以下四個方面的内容:

class SessionCatalog(
    val externalCatalog: ExternalCatalog,//外部系統Catalog
    globalTempViewManager: GlobalTempViewManager,//全局臨時視圖管理
    functionRegistry: FunctionRegistry,//函數注冊
    conf: SQLConf,//SQL配置
    hadoopConf: Configuration,//hadoop 配置
    parser: ParserInterface,
    functionResourceLoader: FunctionResourceLoader)//函數資源加載器
           

一、GlobalTempViewManager(全局臨時視圖管理)

對應DataFrame中的createGlobalTempView方法。GlobalTempViewManager内部通過viewDefinitions來儲存視圖。可進行建立、更新、删除和重命名等操作。其中viewDefinitions的key是視圖名稱,value是視圖對應的LogicalPlan(在建立視圖是生成)。

private val viewDefinitions = new mutable.HashMap[String, LogicalPlan]
           

二、FunctionResourceLoader(函數資源加載器)

Spark-SQL中除了内置的函數外,還支援使用者自定義的函數和Hive的函數,這些函數可以通過外部jar包提供,FunctionResourceLoader的作用就是加載這些函數資源。

三、FunctionRegistry(函數注冊)

用來實作對函數的注冊、查找和删除,Spark-SQL中的實作類是SimpleFunctionRegistry,使用functionBuilders來儲存注冊的函數。key為函數名,value為函數的描述資訊。functionBuilders除了可以注冊自定義函數外,還注冊了很多已經定義好的内置函數。

private val functionBuilders =
    new mutable.HashMap[FunctionIdentifier, (ExpressionInfo, FunctionBuilder)]//儲存注冊的函數
           
object FunctionRegistry {
val expressions: Map[String, (ExpressionInfo, FunctionBuilder)] = Map(
    expression[Sum]("sum"),//求和
    expression[TimeWindow]("window"),//視窗window(start, 7day, 1day)
    // json
    expression[StructsToJson]("to_json"),//結構轉json
    expression[JsonToStructs]("from_json"),//json轉結構
)

    expressions.foreach {
      case (name, (info, builder)) => fr.registerFunction(FunctionIdentifier(name), info, builder)//注冊
    }
}
           

四、ExternalCatalog(外部系統Catalog)

主要用于外部系統的資料庫、資料表管理、資料分區和函數的管理和非臨時性存儲,支援HiveExternalCatalog和InMemoryCatalog兩種,前者将資訊儲存在記憶體中,主要用于測試;後者利用Hive中繼資料庫來實作持久化的管理,在生産環境中廣泛使用。ExternalCatalog的繼承關系如下圖:

Spark-SQL綁定Spark-SQL綁定

總體來看,SessionCatalog用來管理所有表相關的中繼資料,包括資料庫、資料表、資料視圖、資料分區與函數等,其内部還用tempViews管理臨時表資訊,以及currentDb來表示目前操作的資料庫名稱。

protected val tempViews = new mutable.HashMap[String, LogicalPlan]//臨時表資訊
protected var currentDb: String = formatDatabaseName(DEFAULT_DATABASE)//目前資料庫名稱
           

規則(Rule)

Rule在對Unrelolve LogicalPlan邏輯算子樹的操作(如綁定、優化等)中扮演非常重要的角色,在這些操作中,經常需要對邏輯算子樹中某個節點進行改寫,或者對樹結構進行轉換。Rule提供了apply方法供子類複寫,讓子類可以實作特定的處理邏輯。

abstract class Rule[TreeType <: TreeNode[_]] extends Logging {
  val ruleName: String = {
    val className = getClass.getName
    if (className endsWith "$") className.dropRight(1) else className
  }
  def apply(plan: TreeType): TreeType//供子類實作規則邏輯
}
           

RuleExecutor是Rule的驅動程式,對Unrelolve LogicalPlan的綁定、優化以及後續實體算子的生成,都繼承了RuleExecutor。其重要的内部成員如下:

abstract class Strategy { def maxIterations: Int }//政策
case object Once extends Strategy { val maxIterations = 1 }//疊代一次
case class FixedPoint(maxIterations: Int) extends Strategy//疊代maxIterations次
protected case class Batch(name: String, strategy: Strategy, rules: Rule[TreeType]*)//一套規則
protected def batches: Seq[Batch]//定義規則執行順序,一套規則的集合
           

Strategy:政策,用于規定規則疊代執行次數,抽象類,定義了類型為int名字為maxIterations的屬性;

Once:從Strategy繼承,maxIterations=1,表示隻疊代執行一次規則;

FixedPoint:從Strategy繼承,表示執行maxIterations次規則;

Batch:定義一套規則,包含批次名、政策、包含的規則;

batches:批次的集合,類型為Seq[Batch],定義了規則的執行順序,具體内容由子類去定義。

execute方法:按照batches順序和Batch内的rules順序,對傳入的plan裡的節點進行疊代處理,具體處理邏輯由Rule子類去實作。

def execute(plan: TreeType): TreeType = {
    var curPlan = plan
    batches.foreach { batch =>
      val batchStartPlan = curPlan
      var iteration = 1
      var lastPlan = curPlan
      var continue = true
      while (continue) {
        curPlan = batch.rules.foldLeft(curPlan) {//周遊規則執行
          case (plan, rule) =>
            val result = rule(plan)
            result
        }
        iteration += 1
        if (iteration > batch.strategy.maxIterations) {
          continue = false
        }
        lastPlan = curPlan
      }
    }
    curPlan
  }
           

分析(Analyzer)

Analyzer會使用Catalog和FunctionRegistry将UnresolvedAttribute和UnresolvedRelation轉換為catalyst裡全類型的對象,生成Resolved LogicalPlan。接下來分析下Analyzer中定義的具體規則:

Spark-SQL綁定Spark-SQL綁定

Analyzer是RuleExecutor的子類,Analyzer的batches中定義了具體的規則和執行順序,規則比較多,這裡就挑幾個典型的分析:ResolveRelations、ResolveReferences、ResolveFunctions,其餘的套路基本一樣,隻是内部處理邏輯不同而已。

lazy val batches: Seq[Batch] = Seq(
    Batch("Hints", fixedPoint,
      new ResolveHints.ResolveBroadcastHints(conf),
      ResolveHints.RemoveAllHints),
    Batch("Simple Sanity Check", Once,
      LookupFunctions),
	Batch("Resolution", fixedPoint,
      ResolveRelations ::
      ResolveReferences ::
      ResolveAggAliasInGroupBy ::
      ResolveFunctions ::
      ResolveAggregateFunctions ::
      ......
           

下面都是基于上一篇的例子來講解:

SELECT SUM(AGE)
 FROM
 (SELECT A.ID,
 A.NAME,
 CAST(B.AGE AS LONG) AS AGE 
 FROM 
 NAME A INNER JOIN AGE B 
 ON A.ID == B.ID) 
 WHERE AGE >20
           

一、ResolveRelations(解析資料表)

經過ANTLR解析後的表是UnresolvedRelation。隻是一個字元串,沒有任何關系可言,Analyzer會通過ResolveRelations規則來将UnresolvedRelation解析成resolveRelation。

def apply(plan: LogicalPlan): LogicalPlan = plan.transformUp {
      case i @ InsertIntoTable(u: UnresolvedRelation, parts, child, _, _) if child.resolved =>
        EliminateSubqueryAliases(lookupTableFromCatalog(u)) match {
          case v: View =>
            u.failAnalysis(s"Inserting into a view is not allowed. View: ${v.desc.identifier}.")
          case other => i.copy(table = other)
        }
      case u: UnresolvedRelation => resolveRelation(u)
    }
           
def resolveRelation(plan: LogicalPlan): LogicalPlan = plan match {
      case u: UnresolvedRelation if !isRunningDirectlyOnFiles(u.tableIdentifier) =>
        val defaultDatabase = AnalysisContext.get.defaultDatabase
        val foundRelation = lookupTableFromCatalog(u, defaultDatabase)
        resolveRelation(foundRelation)
    }
           
private def lookupTableFromCatalog(
        u: UnresolvedRelation,
        defaultDatabase: Option[String] = None): LogicalPlan = {
      val tableIdentWithDb = u.tableIdentifier.copy(
        database = u.tableIdentifier.database.orElse(defaultDatabase))
      try {
        catalog.lookupRelation(tableIdentWithDb)
      } catch {
      //異常捕獲
      }
    }
           

從源碼可以看出,當周遊邏輯算子樹的過程中比對到UnresolvedRelation節點時,會調用lookupTableFromCatalog方法從SessionCatalog中擷取Relation。執行過程如下圖所示:

Spark-SQL綁定Spark-SQL綁定

二、ResolveReferences(解析字段或表達式)

經過ANTLR解析後的屬性是UnresolvedAttribute,ResolveReferences類似ResolveRelations,作用是将查詢的字段解析,比如常量、字段、函數、表達式等。

def apply(plan: LogicalPlan): LogicalPlan = plan.transformUp {
      case p: LogicalPlan if !p.childrenResolved => p
      case p: Project if containsStar(p.projectList) =>
        p.copy(projectList = buildExpandedProjectList(p.projectList, p.child))
        //省略代碼
      case q: LogicalPlan =>
        logTrace(s"Attempting to resolve ${q.simpleString}")
        q.mapExpressions(resolve(_, q))
    }
           
private def resolve(e: Expression, q: LogicalPlan): Expression = e match {
      case u @ UnresolvedAttribute(nameParts) =>
        val result =
          withPosition(u) {
            q.resolveChildren(nameParts, resolver)
              .orElse(resolveLiteralFunction(nameParts, u, q))
              .getOrElse(u)
          }
        logDebug(s"Resolving $u to $result")
        result
      case g @ Generate(generator, join, outer, qualifier, output, child) =>
        val newG = resolveExpression(generator, child, throws = true)
        if (newG.fastEquals(generator)) {
          g
        } else {
          Generate(newG.asInstanceOf[Generator], join, outer, qualifier, output, child)
        }
      case UnresolvedExtractValue(child, fieldExpr) if child.resolved =>
        ExtractValue(child, fieldExpr, resolver)
      case _ => e.mapChildren(resolve(_, q))
    }
           

下面介紹兩個重要函數:resolveExpression和resolveLiteralFunction

1.resolveExpression(解析表達式)

遇到表達式會進入resolveExpression。

protected[sql] def resolveExpression(
      expr: Expression,
      plan: LogicalPlan,
      throws: Boolean = false): Expression = {
    if (expr.resolved) return expr
    try {
      expr transformUp {
        case GetColumnByOrdinal(ordinal, _) => plan.output(ordinal)
        case u @ UnresolvedAttribute(nameParts) =>
          withPosition(u) {
            plan.resolve(nameParts, resolver)
              .orElse(resolveLiteralFunction(nameParts, u, plan))
              .getOrElse(u)
          }
        case UnresolvedExtractValue(child, fieldName) if child.resolved =>
          ExtractValue(child, fieldName, resolver)
      }
    } catch {
      case a: AnalysisException if !throws => expr
    }
  }
           

2.resolveLiteralFunction(解析常量函數)

當查詢的内容包含CURRENT_DATE、CURRENT_TIMESTAMP時,會進入resolveLiteralFunction解析,比如CURRENT_DATE經過解析後為current_date(None) AS current_date()#8

private def resolveLiteralFunction(
      nameParts: Seq[String],
      attribute: UnresolvedAttribute,
      plan: LogicalPlan): Option[Expression] = {
    if (nameParts.length != 1) return None
    val isNamedExpression = plan match {
      case Aggregate(_, aggregateExpressions, _) => aggregateExpressions.contains(attribute)
      case Project(projectList, _) => projectList.contains(attribute)
      case Window(windowExpressions, _, _, _) => windowExpressions.contains(attribute)
      case _ => false
    }
    val wrapper: Expression => Expression =
      if (isNamedExpression) f => Alias(f, toPrettySQL(f))() else identity
    // support CURRENT_DATE and CURRENT_TIMESTAMP
    val literalFunctions = Seq(CurrentDate(), CurrentTimestamp())
    val name = nameParts.head
    val func = literalFunctions.find(e => resolver(e.prettyName, name))
    func.map(wrapper)
  }
           

解析過程如圖所示:

Spark-SQL綁定Spark-SQL綁定

三、ResolveFunctions(解析函數)

ANTLR解析後函數為UnresolvedGenerator或UnresolvedFunction,需要根據catalog綁定具體的函數:

object ResolveFunctions extends Rule[LogicalPlan] {
    def apply(plan: LogicalPlan): LogicalPlan = plan.transformUp {
      case q: LogicalPlan =>
        q transformExpressions {
          case u @ UnresolvedGenerator(name, children) =>
            withPosition(u) {
              catalog.lookupFunction(name, children) match {//查找函數
                case generator: Generator => generator
              }
            }
          case u @ UnresolvedFunction(funcId, children, isDistinct) =>
            withPosition(u) {
              catalog.lookupFunction(funcId, children) match {//查找函數
                case wf: AggregateWindowFunction =>
                wf
                case agg: AggregateFunction => AggregateExpression(agg, Complete, isDistinct)
                case other =>
                  if (isDistinct) {
                  other
              }
            }
        }
    }
  }
           
def lookupFunction(
      name: FunctionIdentifier,
      children: Seq[Expression]): Expression = synchronized {
    if (name.database.isEmpty && functionRegistry.functionExists(name)) {
      return functionRegistry.lookupFunction(name, children)
    }
           

四、ResolveAggregateFunctions(解析聚合函數)

ResolveAggregateFunctions會解析Having或Order By之後的表達式。例如

SELECT id,count(1) FROM table GROUP BY id having avg(age) > 20
           

會對avg(age) > 20進行解析。

val aggregatedCondition =
            Aggregate(
              grouping,
              Alias(cond, "havingCondition")() :: Nil,
              child)
          val resolvedOperator = executeSameContext(aggregatedCondition)
           

對aggregatedCondition 重新執行一遍規則,執行ResolveReferences時會對字段進行解析。

五、ResolveAggAliasInGroupBy(解析分組字段)

作用是用Select中已解析的字段替換分組中的字段,在ResolveReferences之後使用。

SELECT id,count(1) FROM table GROUP BY id having avg(age) > 20
           

首先通過ResolveReferences會解析SELECT後面的id,之後通過ResolveAggAliasInGroupBy就會将GROUP BY之後的id用SELECT後面的id替換。

override def apply(plan: LogicalPlan): LogicalPlan = plan.transformUp {
      case agg @ Aggregate(groups, aggs, child)
          if conf.groupByAliases && child.resolved && aggs.forall(_.resolved) &&
            groups.exists(!_.resolved) =>
        agg.copy(groupingExpressions = mayResolveAttrByAggregateExprs(groups, aggs, child))
    }
           
private def mayResolveAttrByAggregateExprs(
        exprs: Seq[Expression], aggs: Seq[NamedExpression], child: LogicalPlan): Seq[Expression] = {
      exprs.map { _.transform {
        case u: UnresolvedAttribute if notResolvableByChild(u.name, child) =>
          aggs.find(ne => resolver(ne.name, u.name)).getOrElse(u)
      }}
    }
           

以上規則是比較常用的重要規則,還有很多規則在這裡就不一一列出了,通過一系列的規則解析後,Analyzed LogicalPlan就生成了。之後将會進行

Optimizer(優化)以及實體計劃生成。具體過程将在後面再介紹。

參考資料

[1]: 《Spark SQL内部剖析》朱鋒 張韶全 黃明 著

繼續閱讀