TreeNode Library是Catalyst的核心類庫,文法樹的建構都是由一個個TreeNode組成。TreeNode本身是一個BaseType <: TreeNode[BaseType] 的類型,并且實作了Product這個trait,這樣可以存放異構的元素了。
TreeNode有三種形态:BinaryNode、UnaryNode、Leaf
Node.
在Catalyst裡,這些Node都是繼承自Logical Plan,可以說每一個TreeNode節點就是一個Logical Plan(包含Expression)(直接繼承自TreeNode)
主要繼承關系類圖如下:

二進制節點,即有左右孩子的二叉節點
[[TreeNode]] that has two children, [[left]] and [[right]].
trait BinaryNode[BaseType <: TreeNode[BaseType]] {
def left: BaseType
def right: BaseType
def children = Seq(left, right)
}
abstract class BinaryNode extends LogicalPlan with trees.BinaryNode[LogicalPlan] {
self: Product =>
節點定義比較簡單,左孩子,右孩子都是BaseType。 children是一個Seq(left, right)
下面列出主要繼承二進制節點的類,可以當查詢手冊用 :)
這裡提示下平常常用的二進制節點:Join和Union
一進制節點,即隻有一個孩子節點
A [[TreeNode]] with a single [[child]].
trait UnaryNode[BaseType <: TreeNode[BaseType]] {
def child: BaseType
def children = child :: Nil
abstract class UnaryNode extends LogicalPlan with trees.UnaryNode[LogicalPlan] {
下面列出主要繼承一進制節點的類,可以當查詢手冊用 :)
常用的二進制節點有,Project,Subquery,Filter,Limit ...等
葉子節點,沒有孩子節點的節點。
A [[TreeNode]] with no children.
trait LeafNode[BaseType <: TreeNode[BaseType]] {
def children = Nil
abstract class LeafNode extends LogicalPlan with trees.LeafNode[LogicalPlan] {
// Leaf nodes by definition cannot reference any input attributes.
override def references = Set.empty
下面列出主要繼承葉子節點的類,可以當查詢手冊用 :)
提示常用的葉子節點: Command類系列,一些Funtion函數,以及Unresolved Relation...etc.
簡單介紹一個TreeNode這個類的屬性和方法
currentId
private val currentId = new java.util.concurrent.atomic.AtomicLong
protected def nextId() = currentId.getAndIncrement()
sameInstance
判斷2個執行個體是否是同一個的時候,隻需要判斷TreeNode的id。
def sameInstance(other: TreeNode[_]): Boolean = {
this.id == other.id
fastEquals,更常用的一個快捷的判定方法,沒有重寫Object.Equals,這樣防止scala編譯器生成case
class equals 方法
def fastEquals(other: TreeNode[_]): Boolean = {
sameInstance(other) || this == other
}
map,flatMap,collect都是遞歸的對子節點進行應用PartialFunction,其它方法還有很多,篇幅有限這裡不一一描述了。
transform該方法接受一個PartialFunction,就是就是前一篇文章Analyzer裡提到的Batch裡面的Rule。
是會将Rule疊代應用到該節點的所有子節點,最後傳回這個節點的副本(一個和目前節點不同的節點,後面會介紹,其實就是利用反射來傳回一個修改後的節點)。
如果rule沒有對一個節點進行PartialFunction的操作,就傳回這個節點本身。
來看一個例子:
object GlobalAggregates extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transform { //apply方法這裡調用了logical plan(TreeNode) 的transform方法來應用一個PartialFunction。
case Project(projectList, child) if containsAggregates(projectList) =>
Aggregate(Nil, projectList, child)
}
def containsAggregates(exprs: Seq[Expression]): Boolean = {
exprs.foreach(_.foreach {
case agg: AggregateExpression => return true
case _ =>
})
false
這個方法真正的調用是transformChildrenDown,這裡提到了用先序周遊來對子節點進行遞歸的Rule應用。
如果在對目前節點應用rule成功,修改後的節點afterRule,來對其children節點進行rule的應用。
transformDown方法:
/**
* Returns a copy of this node where `rule` has been recursively applied to it and all of its
* children (pre-order). When `rule` does not apply to a given node it is left unchanged.
* @param rule the function used to transform this nodes children
*/
ef transformDown(rule: PartialFunction[BaseType, BaseType]): BaseType = {
val afterRule = rule.applyOrElse(this, identity[BaseType])
// Check if unchanged and then possibly return old copy to avoid gc churn.
if (this fastEquals afterRule) {
transformChildrenDown(rule) //修改前節點this.transformChildrenDown(rule)
} else {
afterRule.transformChildrenDown(rule) //修改後節點進行transformChildrenDown
最重要的方法transformChildrenDown:
對children節點進行遞歸的調用PartialFunction,利用最終傳回的newArgs來生成一個新的節點,這裡調用了makeCopy()來生成節點。
transformChildrenDown方法:
/**
* Returns a copy of this node where `rule` has been recursively applied to all the children of
* this node. When `rule` does not apply to a given node it is left unchanged.
* @param rule the function used to transform this nodes children
*/
def transformChildrenDown(rule: PartialFunction[BaseType, BaseType]): this.type = {
var changed = false
val newArgs = productIterator.map {
case arg: TreeNode[_] if children contains arg =>
val newChild = arg.asInstanceOf[BaseType].transformDown(rule) //遞歸子節點應用rule
if (!(newChild fastEquals arg)) {
changed = true
newChild
} else {
arg
}
case Some(arg: TreeNode[_]) if children contains arg =>
val newChild = arg.asInstanceOf[BaseType].transformDown(rule)
Some(newChild)
Some(arg)
case m: Map[_,_] => m
case args: Traversable[_] => args.map {
case arg: TreeNode[_] if children contains arg =>
val newChild = arg.asInstanceOf[BaseType].transformDown(rule)
if (!(newChild fastEquals arg)) {
changed = true
newChild
} else {
arg
}
case other => other
}
case nonChild: AnyRef => nonChild
case null => null
}.toArray
if (changed) makeCopy(newArgs) else this //根據作用結果傳回的newArgs數組,反射生成新的節點副本。
makeCopy方法,反射生成節點副本
* Creates a copy of this type of tree node after a transformation.
* Must be overridden by child classes that have constructor arguments
* that are not present in the productIterator.
* @param newArgs the new product arguments.
*/
def makeCopy(newArgs: Array[AnyRef]): this.type = attachTree(this, "makeCopy") {
try {
val defaultCtor = getClass.getConstructors.head //反射擷取預設構造函數的第一個
if (otherCopyArgs.isEmpty) {
defaultCtor.newInstance(newArgs: _*).asInstanceOf[this.type] //反射生成目前節點類型的節點
} else {
defaultCtor.newInstance((newArgs ++ otherCopyArgs).toArray: _*).asInstanceOf[this.type] //如果還有其它參數,++
}
} catch {
case e: java.lang.IllegalArgumentException =>
throw new TreeNodeException(
this, s"Failed to copy node. Is otherCopyArgs specified correctly for $nodeName? "
+ s"Exception message: ${e.getMessage}.")
}
現在準備從一段sql來出發,畫一下這個spark sql的整體樹的transformation。
SELECT * FROM (SELECT * FROM src) a join (select * from src)b on a.key=b.key
首先,我們先執行一下,在控制台裡看一下生成的計劃:
<span style="font-size:12px;">sbt/sbt hive/console
Using /usr/java/default as default JAVA_HOME.
Note, this will be overridden by -java-home if it is set.
[info] Loading project definition from /app/hadoop/shengli/spark/project/project
[info] Loading project definition from /app/hadoop/shengli/spark/project
[info] Set current project to root (in build file:/app/hadoop/shengli/spark/)
[info] Starting scala interpreter...
[info]
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.dsl._
import org.apache.spark.sql.catalyst.errors._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
import org.apache.spark.sql.catalyst.types._
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.execution
import org.apache.spark.sql.hive._
import org.apache.spark.sql.hive.test.TestHive._
import org.apache.spark.sql.parquet.ParquetTestData
scala> val query = sql("SELECT * FROM (SELECT * FROM src) a join (select * from src)b on a.key=b.key")</span>
第一步生成UnResolve Logical Plan 如下:
scala> query.queryExecution.logical
res0: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
Project [*]
Join Inner, Some(('a.key = 'b.key))
Subquery a
Project [*]
UnresolvedRelation None, src, None
Subquery b
如果畫成樹是這樣的,僅個人了解:
我将一開始介紹的三種Node分别用綠色UnaryNode,紅色Binary Node 和 藍色 LeafNode 來表示。
Analyzer會将允用Batch的Rules來對Unresolved Logical Plan Tree 進行rule應用,這裡用來EliminateAnalysisOperators将Subquery給消除掉,Batch("Resolution将Atrribute和Relation給Resolve了,Analyzed Logical Plan Tree如下圖:
我把Catalyst裡的Optimizer戲稱為Spark SQL的優化大師,因為整個Spark SQL的優化都是在這裡進行的,後面會有文章來講解Optimizer。
在這裡,優化的不明顯,因為SQL本身不複雜
scala> query.queryExecution.optimizedPlan
res3: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
Project [key#0,value#1,key#2,value#3]
Join Inner, Some((key#0 = key#2))
MetastoreRelation default, src, None
生成的樹如下圖:
最後一步是最終生成的實體執行計劃,裡面涉及到了Hive的TableScan,涉及到了HashJoin操作,還涉及到了Exchange,Exchange涉及到了Shuffle和Partition操作。
scala> query.queryExecution.executedPlan
res4: org.apache.spark.sql.execution.SparkPlan =
Project [key#0:0,value#1:1,key#2:2,value#3:3]
HashJoin [key#0], [key#2], BuildRight
Exchange (HashPartitioning [key#0:0], 150)
HiveTableScan [key#0,value#1], (MetastoreRelation default, src, None), None
Exchange (HashPartitioning [key#2:0], 150)
HiveTableScan [key#2,value#3], (MetastoreRelation default, src, None), None
生成的實體執行樹如圖:
本文介紹了Spark SQL的Catalyst架構核心TreeNode類庫,繪制了TreeNode繼承關系的類圖,了解了TreeNode這個類在Catalyst所起到的作用。文法樹中的Logical Plan均派生自TreeNode,并且Logical Plan派生出TreeNode的三種形态,即Binary Node, Unary Node, Leaft Node。 正式這幾種節點,組成了Spark
SQl的Catalyst的文法樹。
TreeNode的transform方法是核心的方法,它接受一個rule,會對目前節點的孩子節點進行遞歸的調用rule,最後會傳回一個TreeNode的copy,這種操作就是transformation,貫穿了Spark SQL執行的幾個核心階段,如Analyze,Optimize階段。
最後用一個實際的例子,展示出來Spark SQL的執行樹生成流程。
我目前的了解就是這些,如果分析不到位的地方,請大家多多指正。
——EOF——
原創文章,轉載請注明: