天天看点

Spark-SQL绑定Spark-SQL绑定

文章目录

  • Spark-SQL绑定
    • 元数据(Catalog)
      • 一、GlobalTempViewManager(全局临时视图管理)
      • 二、FunctionResourceLoader(函数资源加载器)
      • 三、FunctionRegistry(函数注册)
      • 四、ExternalCatalog(外部系统Catalog)
    • 规则(Rule)
    • 分析(Analyzer)
      • 一、ResolveRelations(解析数据表)
      • 二、ResolveReferences(解析字段或表达式)
        • 1.resolveExpression(解析表达式)
        • 2.resolveLiteralFunction(解析常量函数)
      • 三、ResolveFunctions(解析函数)
      • 四、ResolveAggregateFunctions(解析聚合函数)
      • 五、ResolveAggAliasInGroupBy(解析分组字段)

Spark-SQL绑定

上一篇文章Spark-SQL解析讲到了Spark-SQL通过Antlr4生成未解析的LogicalPlan。此时的LogicalPlan是Unresolve的,需要通过Catalog来绑定UnresolvedRelation和UnresolvedAttribute,生成解析后的LogicalPlan。

元数据(Catalog)

在Spark-SQL中,Catalog主要用于各种函数资源信息和元数据信息(数据库、数据表、数据视图、数据分区与函数等)的统一管理。

Spark-SQL中的Catalog体系实现以SessionCatalog为主体,用户通过SparkSession来访问,SparkSession和SessionCatalog是一对一的关系。SessionCatalog的构造参数除了包括Spark-SQL和Hadoop的配置外,还涉及以下四个方面的内容:

class SessionCatalog(
    val externalCatalog: ExternalCatalog,//外部系统Catalog
    globalTempViewManager: GlobalTempViewManager,//全局临时视图管理
    functionRegistry: FunctionRegistry,//函数注册
    conf: SQLConf,//SQL配置
    hadoopConf: Configuration,//hadoop 配置
    parser: ParserInterface,
    functionResourceLoader: FunctionResourceLoader)//函数资源加载器
           

一、GlobalTempViewManager(全局临时视图管理)

对应DataFrame中的createGlobalTempView方法。GlobalTempViewManager内部通过viewDefinitions来保存视图。可进行创建、更新、删除和重命名等操作。其中viewDefinitions的key是视图名称,value是视图对应的LogicalPlan(在创建视图是生成)。

private val viewDefinitions = new mutable.HashMap[String, LogicalPlan]
           

二、FunctionResourceLoader(函数资源加载器)

Spark-SQL中除了内置的函数外,还支持用户自定义的函数和Hive的函数,这些函数可以通过外部jar包提供,FunctionResourceLoader的作用就是加载这些函数资源。

三、FunctionRegistry(函数注册)

用来实现对函数的注册、查找和删除,Spark-SQL中的实现类是SimpleFunctionRegistry,使用functionBuilders来保存注册的函数。key为函数名,value为函数的描述信息。functionBuilders除了可以注册自定义函数外,还注册了很多已经定义好的内置函数。

private val functionBuilders =
    new mutable.HashMap[FunctionIdentifier, (ExpressionInfo, FunctionBuilder)]//保存注册的函数
           
object FunctionRegistry {
val expressions: Map[String, (ExpressionInfo, FunctionBuilder)] = Map(
    expression[Sum]("sum"),//求和
    expression[TimeWindow]("window"),//窗口window(start, 7day, 1day)
    // json
    expression[StructsToJson]("to_json"),//结构转json
    expression[JsonToStructs]("from_json"),//json转结构
)

    expressions.foreach {
      case (name, (info, builder)) => fr.registerFunction(FunctionIdentifier(name), info, builder)//注册
    }
}
           

四、ExternalCatalog(外部系统Catalog)

主要用于外部系统的数据库、数据表管理、数据分区和函数的管理和非临时性存储,支持HiveExternalCatalog和InMemoryCatalog两种,前者将信息保存在内存中,主要用于测试;后者利用Hive元数据库来实现持久化的管理,在生产环境中广泛使用。ExternalCatalog的继承关系如下图:

Spark-SQL绑定Spark-SQL绑定

总体来看,SessionCatalog用来管理所有表相关的元数据,包括数据库、数据表、数据视图、数据分区与函数等,其内部还用tempViews管理临时表信息,以及currentDb来表示当前操作的数据库名称。

protected val tempViews = new mutable.HashMap[String, LogicalPlan]//临时表信息
protected var currentDb: String = formatDatabaseName(DEFAULT_DATABASE)//当前数据库名称
           

规则(Rule)

Rule在对Unrelolve LogicalPlan逻辑算子树的操作(如绑定、优化等)中扮演非常重要的角色,在这些操作中,经常需要对逻辑算子树中某个节点进行改写,或者对树结构进行转换。Rule提供了apply方法供子类复写,让子类可以实现特定的处理逻辑。

abstract class Rule[TreeType <: TreeNode[_]] extends Logging {
  val ruleName: String = {
    val className = getClass.getName
    if (className endsWith "$") className.dropRight(1) else className
  }
  def apply(plan: TreeType): TreeType//供子类实现规则逻辑
}
           

RuleExecutor是Rule的驱动程序,对Unrelolve LogicalPlan的绑定、优化以及后续物理算子的生成,都继承了RuleExecutor。其重要的内部成员如下:

abstract class Strategy { def maxIterations: Int }//策略
case object Once extends Strategy { val maxIterations = 1 }//迭代一次
case class FixedPoint(maxIterations: Int) extends Strategy//迭代maxIterations次
protected case class Batch(name: String, strategy: Strategy, rules: Rule[TreeType]*)//一套规则
protected def batches: Seq[Batch]//定义规则执行顺序,一套规则的集合
           

Strategy:策略,用于规定规则迭代执行次数,抽象类,定义了类型为int名字为maxIterations的属性;

Once:从Strategy继承,maxIterations=1,表示只迭代执行一次规则;

FixedPoint:从Strategy继承,表示执行maxIterations次规则;

Batch:定义一套规则,包含批次名、策略、包含的规则;

batches:批次的集合,类型为Seq[Batch],定义了规则的执行顺序,具体内容由子类去定义。

execute方法:按照batches顺序和Batch内的rules顺序,对传入的plan里的节点进行迭代处理,具体处理逻辑由Rule子类去实现。

def execute(plan: TreeType): TreeType = {
    var curPlan = plan
    batches.foreach { batch =>
      val batchStartPlan = curPlan
      var iteration = 1
      var lastPlan = curPlan
      var continue = true
      while (continue) {
        curPlan = batch.rules.foldLeft(curPlan) {//遍历规则执行
          case (plan, rule) =>
            val result = rule(plan)
            result
        }
        iteration += 1
        if (iteration > batch.strategy.maxIterations) {
          continue = false
        }
        lastPlan = curPlan
      }
    }
    curPlan
  }
           

分析(Analyzer)

Analyzer会使用Catalog和FunctionRegistry将UnresolvedAttribute和UnresolvedRelation转换为catalyst里全类型的对象,生成Resolved LogicalPlan。接下来分析下Analyzer中定义的具体规则:

Spark-SQL绑定Spark-SQL绑定

Analyzer是RuleExecutor的子类,Analyzer的batches中定义了具体的规则和执行顺序,规则比较多,这里就挑几个典型的分析:ResolveRelations、ResolveReferences、ResolveFunctions,其余的套路基本一样,只是内部处理逻辑不同而已。

lazy val batches: Seq[Batch] = Seq(
    Batch("Hints", fixedPoint,
      new ResolveHints.ResolveBroadcastHints(conf),
      ResolveHints.RemoveAllHints),
    Batch("Simple Sanity Check", Once,
      LookupFunctions),
	Batch("Resolution", fixedPoint,
      ResolveRelations ::
      ResolveReferences ::
      ResolveAggAliasInGroupBy ::
      ResolveFunctions ::
      ResolveAggregateFunctions ::
      ......
           

下面都是基于上一篇的例子来讲解:

SELECT SUM(AGE)
 FROM
 (SELECT A.ID,
 A.NAME,
 CAST(B.AGE AS LONG) AS AGE 
 FROM 
 NAME A INNER JOIN AGE B 
 ON A.ID == B.ID) 
 WHERE AGE >20
           

一、ResolveRelations(解析数据表)

经过ANTLR解析后的表是UnresolvedRelation。只是一个字符串,没有任何关系可言,Analyzer会通过ResolveRelations规则来将UnresolvedRelation解析成resolveRelation。

def apply(plan: LogicalPlan): LogicalPlan = plan.transformUp {
      case i @ InsertIntoTable(u: UnresolvedRelation, parts, child, _, _) if child.resolved =>
        EliminateSubqueryAliases(lookupTableFromCatalog(u)) match {
          case v: View =>
            u.failAnalysis(s"Inserting into a view is not allowed. View: ${v.desc.identifier}.")
          case other => i.copy(table = other)
        }
      case u: UnresolvedRelation => resolveRelation(u)
    }
           
def resolveRelation(plan: LogicalPlan): LogicalPlan = plan match {
      case u: UnresolvedRelation if !isRunningDirectlyOnFiles(u.tableIdentifier) =>
        val defaultDatabase = AnalysisContext.get.defaultDatabase
        val foundRelation = lookupTableFromCatalog(u, defaultDatabase)
        resolveRelation(foundRelation)
    }
           
private def lookupTableFromCatalog(
        u: UnresolvedRelation,
        defaultDatabase: Option[String] = None): LogicalPlan = {
      val tableIdentWithDb = u.tableIdentifier.copy(
        database = u.tableIdentifier.database.orElse(defaultDatabase))
      try {
        catalog.lookupRelation(tableIdentWithDb)
      } catch {
      //异常捕获
      }
    }
           

从源码可以看出,当遍历逻辑算子树的过程中匹配到UnresolvedRelation节点时,会调用lookupTableFromCatalog方法从SessionCatalog中获取Relation。执行过程如下图所示:

Spark-SQL绑定Spark-SQL绑定

二、ResolveReferences(解析字段或表达式)

经过ANTLR解析后的属性是UnresolvedAttribute,ResolveReferences类似ResolveRelations,作用是将查询的字段解析,比如常量、字段、函数、表达式等。

def apply(plan: LogicalPlan): LogicalPlan = plan.transformUp {
      case p: LogicalPlan if !p.childrenResolved => p
      case p: Project if containsStar(p.projectList) =>
        p.copy(projectList = buildExpandedProjectList(p.projectList, p.child))
        //省略代码
      case q: LogicalPlan =>
        logTrace(s"Attempting to resolve ${q.simpleString}")
        q.mapExpressions(resolve(_, q))
    }
           
private def resolve(e: Expression, q: LogicalPlan): Expression = e match {
      case u @ UnresolvedAttribute(nameParts) =>
        val result =
          withPosition(u) {
            q.resolveChildren(nameParts, resolver)
              .orElse(resolveLiteralFunction(nameParts, u, q))
              .getOrElse(u)
          }
        logDebug(s"Resolving $u to $result")
        result
      case g @ Generate(generator, join, outer, qualifier, output, child) =>
        val newG = resolveExpression(generator, child, throws = true)
        if (newG.fastEquals(generator)) {
          g
        } else {
          Generate(newG.asInstanceOf[Generator], join, outer, qualifier, output, child)
        }
      case UnresolvedExtractValue(child, fieldExpr) if child.resolved =>
        ExtractValue(child, fieldExpr, resolver)
      case _ => e.mapChildren(resolve(_, q))
    }
           

下面介绍两个重要函数:resolveExpression和resolveLiteralFunction

1.resolveExpression(解析表达式)

遇到表达式会进入resolveExpression。

protected[sql] def resolveExpression(
      expr: Expression,
      plan: LogicalPlan,
      throws: Boolean = false): Expression = {
    if (expr.resolved) return expr
    try {
      expr transformUp {
        case GetColumnByOrdinal(ordinal, _) => plan.output(ordinal)
        case u @ UnresolvedAttribute(nameParts) =>
          withPosition(u) {
            plan.resolve(nameParts, resolver)
              .orElse(resolveLiteralFunction(nameParts, u, plan))
              .getOrElse(u)
          }
        case UnresolvedExtractValue(child, fieldName) if child.resolved =>
          ExtractValue(child, fieldName, resolver)
      }
    } catch {
      case a: AnalysisException if !throws => expr
    }
  }
           

2.resolveLiteralFunction(解析常量函数)

当查询的内容包含CURRENT_DATE、CURRENT_TIMESTAMP时,会进入resolveLiteralFunction解析,比如CURRENT_DATE经过解析后为current_date(None) AS current_date()#8

private def resolveLiteralFunction(
      nameParts: Seq[String],
      attribute: UnresolvedAttribute,
      plan: LogicalPlan): Option[Expression] = {
    if (nameParts.length != 1) return None
    val isNamedExpression = plan match {
      case Aggregate(_, aggregateExpressions, _) => aggregateExpressions.contains(attribute)
      case Project(projectList, _) => projectList.contains(attribute)
      case Window(windowExpressions, _, _, _) => windowExpressions.contains(attribute)
      case _ => false
    }
    val wrapper: Expression => Expression =
      if (isNamedExpression) f => Alias(f, toPrettySQL(f))() else identity
    // support CURRENT_DATE and CURRENT_TIMESTAMP
    val literalFunctions = Seq(CurrentDate(), CurrentTimestamp())
    val name = nameParts.head
    val func = literalFunctions.find(e => resolver(e.prettyName, name))
    func.map(wrapper)
  }
           

解析过程如图所示:

Spark-SQL绑定Spark-SQL绑定

三、ResolveFunctions(解析函数)

ANTLR解析后函数为UnresolvedGenerator或UnresolvedFunction,需要根据catalog绑定具体的函数:

object ResolveFunctions extends Rule[LogicalPlan] {
    def apply(plan: LogicalPlan): LogicalPlan = plan.transformUp {
      case q: LogicalPlan =>
        q transformExpressions {
          case u @ UnresolvedGenerator(name, children) =>
            withPosition(u) {
              catalog.lookupFunction(name, children) match {//查找函数
                case generator: Generator => generator
              }
            }
          case u @ UnresolvedFunction(funcId, children, isDistinct) =>
            withPosition(u) {
              catalog.lookupFunction(funcId, children) match {//查找函数
                case wf: AggregateWindowFunction =>
                wf
                case agg: AggregateFunction => AggregateExpression(agg, Complete, isDistinct)
                case other =>
                  if (isDistinct) {
                  other
              }
            }
        }
    }
  }
           
def lookupFunction(
      name: FunctionIdentifier,
      children: Seq[Expression]): Expression = synchronized {
    if (name.database.isEmpty && functionRegistry.functionExists(name)) {
      return functionRegistry.lookupFunction(name, children)
    }
           

四、ResolveAggregateFunctions(解析聚合函数)

ResolveAggregateFunctions会解析Having或Order By之后的表达式。例如

SELECT id,count(1) FROM table GROUP BY id having avg(age) > 20
           

会对avg(age) > 20进行解析。

val aggregatedCondition =
            Aggregate(
              grouping,
              Alias(cond, "havingCondition")() :: Nil,
              child)
          val resolvedOperator = executeSameContext(aggregatedCondition)
           

对aggregatedCondition 重新执行一遍规则,执行ResolveReferences时会对字段进行解析。

五、ResolveAggAliasInGroupBy(解析分组字段)

作用是用Select中已解析的字段替换分组中的字段,在ResolveReferences之后使用。

SELECT id,count(1) FROM table GROUP BY id having avg(age) > 20
           

首先通过ResolveReferences会解析SELECT后面的id,之后通过ResolveAggAliasInGroupBy就会将GROUP BY之后的id用SELECT后面的id替换。

override def apply(plan: LogicalPlan): LogicalPlan = plan.transformUp {
      case agg @ Aggregate(groups, aggs, child)
          if conf.groupByAliases && child.resolved && aggs.forall(_.resolved) &&
            groups.exists(!_.resolved) =>
        agg.copy(groupingExpressions = mayResolveAttrByAggregateExprs(groups, aggs, child))
    }
           
private def mayResolveAttrByAggregateExprs(
        exprs: Seq[Expression], aggs: Seq[NamedExpression], child: LogicalPlan): Seq[Expression] = {
      exprs.map { _.transform {
        case u: UnresolvedAttribute if notResolvableByChild(u.name, child) =>
          aggs.find(ne => resolver(ne.name, u.name)).getOrElse(u)
      }}
    }
           

以上规则是比较常用的重要规则,还有很多规则在这里就不一一列出了,通过一系列的规则解析后,Analyzed LogicalPlan就生成了。之后将会进行

Optimizer(优化)以及物理计划生成。具体过程将在后面再介绍。

参考资料

[1]: 《Spark SQL内部剖析》朱锋 张韶全 黄明 著

继续阅读