文章目錄
- Spark-SQL綁定
-
- 中繼資料(Catalog)
-
- 一、GlobalTempViewManager(全局臨時視圖管理)
- 二、FunctionResourceLoader(函數資源加載器)
- 三、FunctionRegistry(函數注冊)
- 四、ExternalCatalog(外部系統Catalog)
- 規則(Rule)
- 分析(Analyzer)
-
- 一、ResolveRelations(解析資料表)
- 二、ResolveReferences(解析字段或表達式)
-
- 1.resolveExpression(解析表達式)
- 2.resolveLiteralFunction(解析常量函數)
- 三、ResolveFunctions(解析函數)
- 四、ResolveAggregateFunctions(解析聚合函數)
- 五、ResolveAggAliasInGroupBy(解析分組字段)
Spark-SQL綁定
上一篇文章Spark-SQL解析講到了Spark-SQL通過Antlr4生成未解析的LogicalPlan。此時的LogicalPlan是Unresolve的,需要通過Catalog來綁定UnresolvedRelation和UnresolvedAttribute,生成解析後的LogicalPlan。
中繼資料(Catalog)
在Spark-SQL中,Catalog主要用于各種函數資源資訊和中繼資料資訊(資料庫、資料表、資料視圖、資料分區與函數等)的統一管理。
Spark-SQL中的Catalog體系實作以SessionCatalog為主體,使用者通過SparkSession來通路,SparkSession和SessionCatalog是一對一的關系。SessionCatalog的構造參數除了包括Spark-SQL和Hadoop的配置外,還涉及以下四個方面的内容:
class SessionCatalog(
val externalCatalog: ExternalCatalog,//外部系統Catalog
globalTempViewManager: GlobalTempViewManager,//全局臨時視圖管理
functionRegistry: FunctionRegistry,//函數注冊
conf: SQLConf,//SQL配置
hadoopConf: Configuration,//hadoop 配置
parser: ParserInterface,
functionResourceLoader: FunctionResourceLoader)//函數資源加載器
一、GlobalTempViewManager(全局臨時視圖管理)
對應DataFrame中的createGlobalTempView方法。GlobalTempViewManager内部通過viewDefinitions來儲存視圖。可進行建立、更新、删除和重命名等操作。其中viewDefinitions的key是視圖名稱,value是視圖對應的LogicalPlan(在建立視圖是生成)。
private val viewDefinitions = new mutable.HashMap[String, LogicalPlan]
二、FunctionResourceLoader(函數資源加載器)
Spark-SQL中除了内置的函數外,還支援使用者自定義的函數和Hive的函數,這些函數可以通過外部jar包提供,FunctionResourceLoader的作用就是加載這些函數資源。
三、FunctionRegistry(函數注冊)
用來實作對函數的注冊、查找和删除,Spark-SQL中的實作類是SimpleFunctionRegistry,使用functionBuilders來儲存注冊的函數。key為函數名,value為函數的描述資訊。functionBuilders除了可以注冊自定義函數外,還注冊了很多已經定義好的内置函數。
private val functionBuilders =
new mutable.HashMap[FunctionIdentifier, (ExpressionInfo, FunctionBuilder)]//儲存注冊的函數
object FunctionRegistry {
val expressions: Map[String, (ExpressionInfo, FunctionBuilder)] = Map(
expression[Sum]("sum"),//求和
expression[TimeWindow]("window"),//視窗window(start, 7day, 1day)
// json
expression[StructsToJson]("to_json"),//結構轉json
expression[JsonToStructs]("from_json"),//json轉結構
)
expressions.foreach {
case (name, (info, builder)) => fr.registerFunction(FunctionIdentifier(name), info, builder)//注冊
}
}
四、ExternalCatalog(外部系統Catalog)
主要用于外部系統的資料庫、資料表管理、資料分區和函數的管理和非臨時性存儲,支援HiveExternalCatalog和InMemoryCatalog兩種,前者将資訊儲存在記憶體中,主要用于測試;後者利用Hive中繼資料庫來實作持久化的管理,在生産環境中廣泛使用。ExternalCatalog的繼承關系如下圖:
![](https://img.laitimes.com/img/_0nNw4CM6IyYiwiM6ICdiwiIwczX0xiRGZkRGZ0Xy9GbvNGL2EzXlpXazxSPndVWsxGShhmVXVWdWJzYwhnMMBjVtJWd0ckW65UbM5WOHJWa5kHT20ESjBjUIF2X0hXZ0xCMx81dvRWYoNHLrdEZwZ1Rh5WNXp1bwNjW1ZUba9VZwlHdssmch1mclRXY39CXldWYtlWPzNXZj9mcw1ycz9WL49zZuBnLwQDO3UTM1UTM2ETMxgTMwIzLc52YucWbp5GZzNmLn9Gbi1yZtl2Lc9CX6MHc0RHaiojIsJye.png)
總體來看,SessionCatalog用來管理所有表相關的中繼資料,包括資料庫、資料表、資料視圖、資料分區與函數等,其内部還用tempViews管理臨時表資訊,以及currentDb來表示目前操作的資料庫名稱。
protected val tempViews = new mutable.HashMap[String, LogicalPlan]//臨時表資訊
protected var currentDb: String = formatDatabaseName(DEFAULT_DATABASE)//目前資料庫名稱
規則(Rule)
Rule在對Unrelolve LogicalPlan邏輯算子樹的操作(如綁定、優化等)中扮演非常重要的角色,在這些操作中,經常需要對邏輯算子樹中某個節點進行改寫,或者對樹結構進行轉換。Rule提供了apply方法供子類複寫,讓子類可以實作特定的處理邏輯。
abstract class Rule[TreeType <: TreeNode[_]] extends Logging {
val ruleName: String = {
val className = getClass.getName
if (className endsWith "$") className.dropRight(1) else className
}
def apply(plan: TreeType): TreeType//供子類實作規則邏輯
}
RuleExecutor是Rule的驅動程式,對Unrelolve LogicalPlan的綁定、優化以及後續實體算子的生成,都繼承了RuleExecutor。其重要的内部成員如下:
abstract class Strategy { def maxIterations: Int }//政策
case object Once extends Strategy { val maxIterations = 1 }//疊代一次
case class FixedPoint(maxIterations: Int) extends Strategy//疊代maxIterations次
protected case class Batch(name: String, strategy: Strategy, rules: Rule[TreeType]*)//一套規則
protected def batches: Seq[Batch]//定義規則執行順序,一套規則的集合
Strategy:政策,用于規定規則疊代執行次數,抽象類,定義了類型為int名字為maxIterations的屬性;
Once:從Strategy繼承,maxIterations=1,表示隻疊代執行一次規則;
FixedPoint:從Strategy繼承,表示執行maxIterations次規則;
Batch:定義一套規則,包含批次名、政策、包含的規則;
batches:批次的集合,類型為Seq[Batch],定義了規則的執行順序,具體内容由子類去定義。
execute方法:按照batches順序和Batch内的rules順序,對傳入的plan裡的節點進行疊代處理,具體處理邏輯由Rule子類去實作。
def execute(plan: TreeType): TreeType = {
var curPlan = plan
batches.foreach { batch =>
val batchStartPlan = curPlan
var iteration = 1
var lastPlan = curPlan
var continue = true
while (continue) {
curPlan = batch.rules.foldLeft(curPlan) {//周遊規則執行
case (plan, rule) =>
val result = rule(plan)
result
}
iteration += 1
if (iteration > batch.strategy.maxIterations) {
continue = false
}
lastPlan = curPlan
}
}
curPlan
}
分析(Analyzer)
Analyzer會使用Catalog和FunctionRegistry将UnresolvedAttribute和UnresolvedRelation轉換為catalyst裡全類型的對象,生成Resolved LogicalPlan。接下來分析下Analyzer中定義的具體規則:
Analyzer是RuleExecutor的子類,Analyzer的batches中定義了具體的規則和執行順序,規則比較多,這裡就挑幾個典型的分析:ResolveRelations、ResolveReferences、ResolveFunctions,其餘的套路基本一樣,隻是内部處理邏輯不同而已。
lazy val batches: Seq[Batch] = Seq(
Batch("Hints", fixedPoint,
new ResolveHints.ResolveBroadcastHints(conf),
ResolveHints.RemoveAllHints),
Batch("Simple Sanity Check", Once,
LookupFunctions),
Batch("Resolution", fixedPoint,
ResolveRelations ::
ResolveReferences ::
ResolveAggAliasInGroupBy ::
ResolveFunctions ::
ResolveAggregateFunctions ::
......
下面都是基于上一篇的例子來講解:
SELECT SUM(AGE)
FROM
(SELECT A.ID,
A.NAME,
CAST(B.AGE AS LONG) AS AGE
FROM
NAME A INNER JOIN AGE B
ON A.ID == B.ID)
WHERE AGE >20
一、ResolveRelations(解析資料表)
經過ANTLR解析後的表是UnresolvedRelation。隻是一個字元串,沒有任何關系可言,Analyzer會通過ResolveRelations規則來将UnresolvedRelation解析成resolveRelation。
def apply(plan: LogicalPlan): LogicalPlan = plan.transformUp {
case i @ InsertIntoTable(u: UnresolvedRelation, parts, child, _, _) if child.resolved =>
EliminateSubqueryAliases(lookupTableFromCatalog(u)) match {
case v: View =>
u.failAnalysis(s"Inserting into a view is not allowed. View: ${v.desc.identifier}.")
case other => i.copy(table = other)
}
case u: UnresolvedRelation => resolveRelation(u)
}
def resolveRelation(plan: LogicalPlan): LogicalPlan = plan match {
case u: UnresolvedRelation if !isRunningDirectlyOnFiles(u.tableIdentifier) =>
val defaultDatabase = AnalysisContext.get.defaultDatabase
val foundRelation = lookupTableFromCatalog(u, defaultDatabase)
resolveRelation(foundRelation)
}
private def lookupTableFromCatalog(
u: UnresolvedRelation,
defaultDatabase: Option[String] = None): LogicalPlan = {
val tableIdentWithDb = u.tableIdentifier.copy(
database = u.tableIdentifier.database.orElse(defaultDatabase))
try {
catalog.lookupRelation(tableIdentWithDb)
} catch {
//異常捕獲
}
}
從源碼可以看出,當周遊邏輯算子樹的過程中比對到UnresolvedRelation節點時,會調用lookupTableFromCatalog方法從SessionCatalog中擷取Relation。執行過程如下圖所示:
二、ResolveReferences(解析字段或表達式)
經過ANTLR解析後的屬性是UnresolvedAttribute,ResolveReferences類似ResolveRelations,作用是将查詢的字段解析,比如常量、字段、函數、表達式等。
def apply(plan: LogicalPlan): LogicalPlan = plan.transformUp {
case p: LogicalPlan if !p.childrenResolved => p
case p: Project if containsStar(p.projectList) =>
p.copy(projectList = buildExpandedProjectList(p.projectList, p.child))
//省略代碼
case q: LogicalPlan =>
logTrace(s"Attempting to resolve ${q.simpleString}")
q.mapExpressions(resolve(_, q))
}
private def resolve(e: Expression, q: LogicalPlan): Expression = e match {
case u @ UnresolvedAttribute(nameParts) =>
val result =
withPosition(u) {
q.resolveChildren(nameParts, resolver)
.orElse(resolveLiteralFunction(nameParts, u, q))
.getOrElse(u)
}
logDebug(s"Resolving $u to $result")
result
case g @ Generate(generator, join, outer, qualifier, output, child) =>
val newG = resolveExpression(generator, child, throws = true)
if (newG.fastEquals(generator)) {
g
} else {
Generate(newG.asInstanceOf[Generator], join, outer, qualifier, output, child)
}
case UnresolvedExtractValue(child, fieldExpr) if child.resolved =>
ExtractValue(child, fieldExpr, resolver)
case _ => e.mapChildren(resolve(_, q))
}
下面介紹兩個重要函數:resolveExpression和resolveLiteralFunction
1.resolveExpression(解析表達式)
遇到表達式會進入resolveExpression。
protected[sql] def resolveExpression(
expr: Expression,
plan: LogicalPlan,
throws: Boolean = false): Expression = {
if (expr.resolved) return expr
try {
expr transformUp {
case GetColumnByOrdinal(ordinal, _) => plan.output(ordinal)
case u @ UnresolvedAttribute(nameParts) =>
withPosition(u) {
plan.resolve(nameParts, resolver)
.orElse(resolveLiteralFunction(nameParts, u, plan))
.getOrElse(u)
}
case UnresolvedExtractValue(child, fieldName) if child.resolved =>
ExtractValue(child, fieldName, resolver)
}
} catch {
case a: AnalysisException if !throws => expr
}
}
2.resolveLiteralFunction(解析常量函數)
當查詢的内容包含CURRENT_DATE、CURRENT_TIMESTAMP時,會進入resolveLiteralFunction解析,比如CURRENT_DATE經過解析後為current_date(None) AS current_date()#8
private def resolveLiteralFunction(
nameParts: Seq[String],
attribute: UnresolvedAttribute,
plan: LogicalPlan): Option[Expression] = {
if (nameParts.length != 1) return None
val isNamedExpression = plan match {
case Aggregate(_, aggregateExpressions, _) => aggregateExpressions.contains(attribute)
case Project(projectList, _) => projectList.contains(attribute)
case Window(windowExpressions, _, _, _) => windowExpressions.contains(attribute)
case _ => false
}
val wrapper: Expression => Expression =
if (isNamedExpression) f => Alias(f, toPrettySQL(f))() else identity
// support CURRENT_DATE and CURRENT_TIMESTAMP
val literalFunctions = Seq(CurrentDate(), CurrentTimestamp())
val name = nameParts.head
val func = literalFunctions.find(e => resolver(e.prettyName, name))
func.map(wrapper)
}
解析過程如圖所示:
三、ResolveFunctions(解析函數)
ANTLR解析後函數為UnresolvedGenerator或UnresolvedFunction,需要根據catalog綁定具體的函數:
object ResolveFunctions extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan.transformUp {
case q: LogicalPlan =>
q transformExpressions {
case u @ UnresolvedGenerator(name, children) =>
withPosition(u) {
catalog.lookupFunction(name, children) match {//查找函數
case generator: Generator => generator
}
}
case u @ UnresolvedFunction(funcId, children, isDistinct) =>
withPosition(u) {
catalog.lookupFunction(funcId, children) match {//查找函數
case wf: AggregateWindowFunction =>
wf
case agg: AggregateFunction => AggregateExpression(agg, Complete, isDistinct)
case other =>
if (isDistinct) {
other
}
}
}
}
}
def lookupFunction(
name: FunctionIdentifier,
children: Seq[Expression]): Expression = synchronized {
if (name.database.isEmpty && functionRegistry.functionExists(name)) {
return functionRegistry.lookupFunction(name, children)
}
四、ResolveAggregateFunctions(解析聚合函數)
ResolveAggregateFunctions會解析Having或Order By之後的表達式。例如
SELECT id,count(1) FROM table GROUP BY id having avg(age) > 20
會對avg(age) > 20進行解析。
val aggregatedCondition =
Aggregate(
grouping,
Alias(cond, "havingCondition")() :: Nil,
child)
val resolvedOperator = executeSameContext(aggregatedCondition)
對aggregatedCondition 重新執行一遍規則,執行ResolveReferences時會對字段進行解析。
五、ResolveAggAliasInGroupBy(解析分組字段)
作用是用Select中已解析的字段替換分組中的字段,在ResolveReferences之後使用。
SELECT id,count(1) FROM table GROUP BY id having avg(age) > 20
首先通過ResolveReferences會解析SELECT後面的id,之後通過ResolveAggAliasInGroupBy就會将GROUP BY之後的id用SELECT後面的id替換。
override def apply(plan: LogicalPlan): LogicalPlan = plan.transformUp {
case agg @ Aggregate(groups, aggs, child)
if conf.groupByAliases && child.resolved && aggs.forall(_.resolved) &&
groups.exists(!_.resolved) =>
agg.copy(groupingExpressions = mayResolveAttrByAggregateExprs(groups, aggs, child))
}
private def mayResolveAttrByAggregateExprs(
exprs: Seq[Expression], aggs: Seq[NamedExpression], child: LogicalPlan): Seq[Expression] = {
exprs.map { _.transform {
case u: UnresolvedAttribute if notResolvableByChild(u.name, child) =>
aggs.find(ne => resolver(ne.name, u.name)).getOrElse(u)
}}
}
以上規則是比較常用的重要規則,還有很多規則在這裡就不一一列出了,通過一系列的規則解析後,Analyzed LogicalPlan就生成了。之後将會進行
Optimizer(優化)以及實體計劃生成。具體過程将在後面再介紹。
參考資料
[1]: 《Spark SQL内部剖析》朱鋒 張韶全 黃明 著