天天看點

Spark Catalyst的實作分析

Spark SQL是Spark内部最核心以及社群最為活躍的元件,也是未來Spark對End-User最好的接口,支援SQL語句和類RDD的Dataset/DataFrame接口。相比在傳統的RDD上進行開發,Spark SQL的業務邏輯在執行前和執行過程中都有相應的優化工具對其進行自動優化(即Spark Catalyst以及Tungsten兩個元件),是以未來Spark SQL肯定是主流。

在Spark SQL發展早期,其實我個人是很排斥Spark SQL,在我看來,裸用RDD,可以最大程度上滿足我的

控制欲

,但是随着Spark的發展,也逐漸顯示個人思維的短見(Spark是滿足廣大業務團隊對資料分析的需求,最簡單,最自動化優化的接口肯定是最易被人接受和吹捧的);我排斥Spark SQL另外一個原因就是我Scala還不夠精通,在以前版本中,Spark對SQL語句的解析并生成AST樹,是使用Scala内部的

scala.util.parsing

來實作,那代碼的苦澀難懂,是無法用語言來描述的,而且AST樹是對SQL優化以及執行的過程了解的入口,是以隻能望而卻步了!在Spark 2.0以後,引入ANTLR來進行AST樹的解析,讓一切變都清晰!

Spark SQL和Catalyst分别對應了SQL執行期以及解析期的優化工作,是以Catalyst的了解是Spark SQL的第一步。在一些Catalyst的介紹以及講座中,下面一張圖是必出現,它描述了從SQL語句到最後執行Plan的生成過程中,除了Spark SQL,其他SQL引擎的工作原理也基本一緻,比如Hive之類的。

Spark Catalyst的實作分析

本文核心也是介紹Catalyst内部的實作,但是不是按照這張圖的步驟來介紹Catalyst的實作原理,而是按照SQL給人最直接幾個概念,比如Row,Expression,Plan來逐漸介紹它們的内部實作。

看過Spark SQL或者Catalyst人都知道,相比Spark Core的代碼,這一塊真的挺複雜了,各種算法邏輯,複雜的Scala文法元素,以及各種性能優化,代碼自動生成,可能得需要幾倍的時間,反複的琢磨,才能梳理清楚。

##1. Row

Spark SQL中處理的資料與傳統RDD最大差別在處理的每行資料的類型表示,傳統RDD不對每行資料進行類型要求,可以任何複雜資料結構,比如Map之類的,而Spark SQL中為

Row

Row

的概念是針對之前版本中的DataFrame而言的,在1.6版本中提出的Dataset其實也是有

Row

的概念,隻是會被隐式轉換掉而已,在Catalyst中,處理的對象為

InternalRow

,注意

InternalRow

Row

是有本質差別的,

Row

是API層面的概念,專門針對DataFrame,而

InternalRow

為内部概念。

Row

類為DataFrame中核心概念,它表示一行結構化資料的輸入,

Row

本身可以有Scheme,即支援設定每個字段的類型,支援GetInt之類的操作;它也可以無Scheme,即

Scheme=NULL

,支援泛化的Get操作,傳回Any類型資料。

Row

本身就有點像

Array,Tuple

,它的子類

GenericRow

GenericRowWithSchema

都是基于

Array[Any]

來實作,它的每個字段的類型為

Any

,是以

Row

可以了解為是無類型限制的,雖然它内部有

Scheme

,但是

Array[Any]

Row

的轉換,以及泛化的Get操作都是不受Scheme的限制,是以它是類型不安全的。

在Spark 1.6版本以後,提出了Dataset概念,它也是Spark從RDD,SchemeRDD,DataFrame演化過來最為标準化一個概念,相信在未來會是Spark中最為核心概念,

In the long run, we expect Datasets to become a powerful way to write more efficient Spark applications.

Dataset是Spark SQL中概念,在API層面上來說,它沒有像DataFrame一樣,強制要求輸入資料為

Row

,或者依賴

Row.fromSeq

等功能函數将輸入其他類型的資料強制轉換為Row,但是Dataset實際也是要求它處理的資料是

Row相似的類型

,即為Catalyst内部的

InternalRow

UnsafeRow

的概念。當我們執行

seq(1,2,3).toDs或者Rdd.toDs

時候,實際上對每一行記錄在内部做了很多轉換。DataFrame也被轉換為Dataset進行包裝,即将DataFame表示為

Dataset[Row]

Dataset核心概念是為

Encoder

,它就是我們上面說隐式轉換的幕後工具,如下所示Encoder内部的toRow函數,支援将輸入T類型轉換為

InternalRow

def toRow(t: T): InternalRow = try {
    inputRow(0) = t
    extractProjection(inputRow)
}
           

它支援将一個外部的資料類型,轉換為Dataset内部的

InternalRow

,比如上面的

seq(1,2,3)

就是需要将每個輸入的Int數字轉換為一個

InternalRow

DataFrame

同理需要将

Row

轉換為

InternalRow

。在轉換的過程中,是有類型的檢查,這也是與DataFrame最大的差別。

另外Dataset對外最為自豪一點就是序列化以及反序列化的性能,用官方話說:

Datasets also leverage Tungsten’s fast in-memory encoding,an order of magnitude faster than the default Java serializer

,那麼它是這麼實作呢?其實他就是上面談到的

Encoder

的功能,上面雖然我們說它傳回的是一個

InternalRow

,實際上,它傳回的是

InternalRow

一個子類,即

UnsafeRow

UnsafeRow

内部是基于Java的unsafe包來實作(Tungsten的功能),對Row中每個字段的操作都轉換為位元組的操作,換句話說它底層實際存儲結構是byte[],而且支援Kryo序列化,相比使用Java序列化工具來序列化數組/Row之類的複雜資料結構,它的性能肯定要好很多!

另外

InternalRow

還有一個子類,即

MutableRow

,而且

UnsafeRow

也是

MutableRow

的子類,它即為可修改的

InternalRow

,在很多地方都會出現這個,原理很簡單,支援set等操作而已。

##2. Expression

在SQL語句中,除了SELECT FROM等關鍵字以外,其他大部分元素都可以了解為Expression,比如

SELECT sum(a), a

,其中

sum(a)

a

都為Expression;

7-10補充:從SQL語句功能來說,Expression的功能可以劃分為Projection,Predicate,Ordering,Aggregate;其中Projection功能就是input資料進行加工,輸出為InternalRow;Predicate而是對輸入的InternalRow進行加工輸出為Bool,Ordering和Aggregate則是針對Sortby/Groupby專用的兩類表達式;你可能會說,那麼substr之類的函數表達式不在上面歸類?substr在sql語句中肯定不會單獨存在,他們都是作為子表達式出現在上面的幾類當中,比如用于判斷或者輸出資料的加工。
  • Expression是一個Tree結構,即可以通過多級的Child Expression來組合成複雜的Expression,比如前面

    sum(a)

    就是由

    sum

    a

     兩個簡單的Expression組合而成,比如更複雜的Expression有

    max(sum(a),sum(b))

    ;
  • Expression基本的功能是求值,比如

    abs(a)

    IfNull(A,B)

    之類的操作,他們都是對輸入Row進行加工,并輸出處理結果,即Expression需要實作

    def eval(input: InternalRow = null): Any

    函數來實作它的功能。
  • 既然Expression的功能是求值,那麼它就有輸入和輸出類型的限制。每個Expression都有

    def dataType: DataType

    類型變量來表示它的輸出類型,以及

    def checkInputDataTypes(): TypeCheckResult

    函數來校驗目前Expression的輸入(為Tree結構,那麼它的輸入即為Child Expression輸出)是否符合類型要求。
  • Expression功能是針對Row進行加工,但是可以把加工方法分為以下幾種
    • 原生的

      def eval(input: InternalRow = null): Any

      函數;
    • 對于Non Child Expression,Expression的計算是基于Child Expression計算結果進行二次加工的,是以對于Non Child Expression,對Eval進行預設實作,子類隻需要實作函數

      def nullSafeEval(input: Any): Any

      即可以,如下所示為隻有一個Child的Expression的實作:
      override def eval(input: InternalRow): Any = {
        	val value = child.eval(input)
        	if (value == null) {
        		null
        	} else {
        		nullSafeEval(value)
        	}
        }
        //比如ExpressionAbs,就是利用子Expression的結果結果之上,進行一個math.abs加工。
                 
    • Expression也可能是不支援eval的,即

      Unevaluable

      類型的Expression,一般有三種情況:1)是真的無法求值,比如處于Unresolved狀态的Expression;2)是不支援通過eval進行求值,而需要通過gencode的方式來實作Expression功能;3)Expression為

      RuntimeReplaceable

      類型,它僅僅是在parser階段一種臨時Expression,在優化階段,會被替換為别的Expression,是以它本身不需要有執行邏輯,但是得有替換相關的邏輯。
    • Projection

      類型,它本身不是Expression,但是它可以根據N個Expression,對輸入row的N個字段分别進行加工,輸出一個新的Row,即Expression的容器。
      abstract class Projection extends (InternalRow => InternalRow)
       def apply(input: InternalRow): InternalRow = {
           val outputArray = new Array(exprArray.length)
           var i = 0
           while (i < exprArray.length) {
             outputArray(i) = exprArray(i).eval(input)
             i += 1
           }
           new GenericInternalRow(outputArray)
         }
       //比如row序列化操作,可以把一個row序列化為unsaferow,相當與一個Projection
                 

對Package下的Expression進行一個大體的歸類,如下所示:

Name 歸類 功能描述
資料輸入: Expression為Tree結構,中間節點都為加工類型表單,而葉子節點即為資料産生節點
Attribute Catalyst裡面最為重要的概念,可以了解為表的屬性,在sql處理各個階段會有不同的形态,比如UnresolvedAttribute->AttributeReference->BoundReference,後面會具體分析
Literal 常量,支援各種類型的常量輸入
datetimeExpressions 傳回目前時間類型的常量,

CurrentDate

,

CurrentTimestamp

randomExpressions 支援生成一些随機數
其他一些輸入 比如擷取sql計算過程中的任務對應的InputFileName,SparkPartitionID
基本計算功能:
arithmetic nullSafeEval 數學Expression,支援

-

,

+

,

abs

+

,

-

,

*

,

/

,

%

,

max

,

min

,

pmod

數學運算符
bitwiseExpressions nullSafeEval 位運算數,支援IntegralType類型的

and

,

or

,

not

,

xor

位運算
mathExpressions nullSafeEval 數學函數,支援

cos

,

Sqrt

之類30多種,相當于Math包
stringExpressions nullSafeEval 字元串函數,支援

Substring

,

Length

之類30多種,相當于String包
decimalExpressions nullSafeEval Decimal類型的支援,支援

Unscaled

,

MakeDecimal

操作
datetimeExpressions nullSafeEval 時間類型的運算
collectionOperations nullSafeEval 容器的操作,支援容器

Contains

,

Sort

,

Size

三種操作
cast nullSafeEval 支援資料類型的轉換
misc nullSafeEval 功能函數包,支援MD5,crc32之類的函數功能
基本邏輯計算功能:
predicates eval/nullSafeEval類型 支援子Expression之間的邏輯運算,比如

AND

,

In

,

Or

,輸出Bool類型
regexpExpressions nullSafeEval 支援LIKE相關操作
conditionalExpressions eval 支援case,if,great,least四種邏輯判斷運算
nullExpressions eval/RuntimeReplaceable 與NULL/NA相關的判斷或者IF判斷功能,大部分都為RuntimeReplaceable,會被進行優化處理
其他類型:
complexTypeCreator eval SparkSql是支援複雜資料結構,比如Array,Map,Struct,這類Expression支援在sql語句上生成它們,比如select array
Generator eval 支援flatmap類似的操作,即将Row轉變為多個Row,支援Explode和自定義UserDefinedGenerator兩種,其中Explode支援将數組和map拆開為多個Row。

###2.1 Attribute詳解 Attribute直譯為屬性,在SQL中,可以簡單了解為輸入的Table中的字段,Attribute通過Name字段來進行命名。SQL語句通過Parse生成AST以後,SQL語句中的每個字段都會解析為UnresolvedAttribute,它是屬于Attribute的一個子類,比如

SELECT a

中的

a

就表示為

UnresolvedAttribute("a")

,還有一個特殊的UnresolvedAttribute,既為SQL語句中的

,它表示為

Star

,屬于UnresolvedAttribute類型的子類。

Analy需要對AST進行進一步的加工,其中有一個很重要的操作就是把整個AST中所有Unresolved的Attribute都轉變為resolved狀态,比如根據輸入Table将Star進行expand操作,對應的

Rule

名稱為

ResolveReferences

,具體實作細節這裡就不展開。

對于resolve操作除了将Star進行展開以外,它的主要功能就是關聯SQL語句所有位置用到的Attribute,即在Attribute的name基礎上,指定一個ID進行唯一标示,如果一個Attribute在兩處被多處被引用,ID即為同一個(怎麼實作的?Attribute Resolve操作時從底到頂來周遊整改AST,每一步都是根據底部已經resloved的Attribute來給頂部的Attribute指派,進而保證如果兩個Attribute是指向同一個,它們的ID肯定是一樣的;對于處于葉子節點Attribute是優先進行resolve的,比如

Rule:ResolveRelations

對處于底部的Relation進行ResolveRelatition)。可以這麼了解,做這些事情都是為了優化,實體存儲的Table可能有很多Attribute,而通過resolve操作,就指定整個計算過程中需要使用到Attribute,即可以隻從實體存儲中讀取相應字段,上層各種Expression對這些字段都轉變為引用,是以resolve以後的Attribute不是叫做

resolvedAttribute

,而是叫做

AttributeReference

對于一個中間節點的Expression,如果它對一個Attribute有引用,比如求一個字段值的長度

length(a)

,這裡

a

經過了UnresolvedAttribute到AttributeReference的轉化,但是針對一個輸入的Row,進行

length

Expression計算時,還是無法從

AttributeReference

中讀取相應在Row中的值,為什麼?雖然

AttributeReference

也是Expression,但是它是Unevaluable,為了擷取屬性在輸入Row中對應的值,需要對

AttributeReference

再進行一次

BindReferences

的轉化,生成

BoundReference

,這個操作本質就是将Expression和一個輸入Scheme進行關聯,Scheme由一組

AttributeReference

,它們之間是有順序的,通過擷取AttributeReference在

AttributeReference

組中的Index,并生成BoundReference,在對BoundReference進行eval時候,即可以使用該index擷取它在相應Row中的值。

下面是SQL語句中Attribute經過的多次轉化的過程:

SQL---parser---->UnresolvedAttribute----Analyser--->resolvedAttribute ----Bind---->BoundReference
           

###2.2 Expression Codegen Spark Sql是支援Expression層面代碼生成,首先第一個疑問是我們知道Expression有eval方法來暴露Expression的功能,為什麼還要搞一個Codegen呢?原因大體上有兩個,一是提高性能,二是支援一些不能通過eval來表達的功能。這裡主要解釋了第一個,即提高性能,ps:我還沒有去詳細的測試這一塊對性能影響大小,隻是從感官上做一個結論:

基于eval,Expression執行過程中應該是這樣的e1.eval(e2.eval(e3.eval(e4.eval(...)))),随着Expression的複雜度上升,這個執行深度應該會很大,而且每一個操作之間是基于參數和傳回進行傳遞,在作業系統系統層面是存在開銷的;其次如果一個Expression計算結果會被多次利用,它的結果是沒有辦法被二次利用。

那麼代碼生成是這麼解決這個問題的呢?

對于一個完整的SQL的執行,其中所有Expression隻是對應一個最終執行代碼的一個片段,注意是代碼片段,而不是函數,更加不是一個類,每個代碼片段由ExprCode來表示

case class ExprCode(var code: String, var isNull: String, var value: String)
           

code為代碼片段的計算代碼,和eval函數功能一樣,這個片段核心功能是對一個row進行處理,并把處理結果的值寫到一個變量中,這個變量名稱即為value表示,isNull為false或者true字元串來表示這個代碼片段生成的value對應的值是否為Null。

如果由3個Expression,分别為
exp1:ExprCode(code1,isnull1,value1)
exp2:ExprCode(code2,isNull2,value2)
exp2:ExprCode(code3,isNull3,value3)
在SQL執行過程中,針對一個Row會生成下面的組合代碼
funtion(row) {
	//定義三個變量
	exp1.dataType value1 = defauleValue(exp1.dataType)
	exp2.dataType value2 = defauleValue(exp1.dataType)
	exp3.dataType value3 = defauleValue(exp1.dataType)
	//
	exp1.code1
	exp2.code2//可以使用value1變量的值
	exp3.code3//可以使用value1和value2的值
	....
}
           

Expression層面的Codegen的實作其實很簡單,這裡就是不詳細去描述,後面會重新針對codegen,包括Whole Stage Codegen一起做一次大大專題進行分析。

##3. LogicalPlan 如上所言,在SQL語句中,除了SELECT FROM等關鍵字以外,其他大部分元素都可以了解為Expression,那麼用什麼來表示剩下的SELECT FROM這些關鍵字呢?畢竟Expression隻是一些Eval功能函數或者代碼片段,需要一個東西來串聯這些片段,這個東西就是

Plan

,注意,我這裡說的是

Plan

,是一個統稱,而不是僅指向

LogicalPlan

。如開言圖所示,在SQL解析以及優化,到最後過程中,都是以

Plan

而存在,隻是每一步

Plan

功能不同而已。

Plan

表現形式也是Tree,節點之間的關系可以了解為一種操作次序,比如Plan葉子節點表示從磁盤讀取DB檔案,而Root節點表示最終資料的輸出;下面是

Plan

最常見的執行個體截圖。

Spark Catalyst的實作分析

用SQL語句來表示這個Plan即為:

SELECT project FROM table, table WHERE filter

Expression功能是對輸入Row進行加工,輸出可能是Any資料類型。而Plan輸出類型為

def output: Seq[Attribute]

表示的一組Attribute,比如上面的

Project

Table

肯定是輸出一個由

Seq[Attribute]

類型表示的Row,

Filter

感覺是輸出Ture/False,但是這裡說的Plan,而不是Filter類型的Expreesion,Filter類型的Plan會在内部根據Expression計算結果來判斷是否傳回Row,但是Row傳回的類型肯定也是由

Seq[Attribute]

表示的。

另外Expression與Plan關系是被包含,比如

Filter

肯定是基于一個

Expression

計算結果進行判斷,

Project

内部元素要麼是直接為

Star

,或者為

Attribute

,或者為複雜的

Expression

,比如

SUM

下面我開始分析

LogicalPlan

,它是SQL語句經過Parse以後第一步展現形式。基于ANTLR實作的SQL AST分析過程即為AST樹周遊過程,Catalyst也是對AST樹周遊過程中,完成LogicalPlan和所有依賴的Expression的建構,相關邏輯在

org.apache.spark.sql.catalyst.parser.AstBuilder

以及相關子類中,如果對ANTLR有一點基礎,這一塊代碼還是比較容易看懂,就不細化分析,我們着重放在LogicalPlan上面。

在上面示意圖中,我們看到

LogicalPlan

是由一些節點組成,在Spark SQL中,節點大體分為兩種類型:Operator和Command。其中我們上面看到的

Filter

都可以了解為Operator,而我們在SQL Cli中執行

set a=b

以及

addjar a

,它們都是Command類型的Plan,當然相比由很多Operator組成的多級複雜Plan,Command組成的Plan可能是單節點而存在,要簡單一些,下面我們對Operator做一些歸類。

Name 功能描述

Project

(projectList: Seq[NamedExpression], child: LogicalPlan)
SELECT語句輸出操作,其中projectList為輸出對象,每一個都為一個Expression,它們可能是Star,或者很複雜的Expression

Filter

(condition: Expression, child: LogicalPlan)
根據condition來對Child輸入的Rows進行過濾

Join

(left: LogicalPlan,right: LogicalPlan,joinType: JoinType,condition: Option[Expression])
left和right的輸出結果進行join操作

Intersect

(left: LogicalPlan, right: LogicalPlan)
left和right兩個Plan輸出的rows進行取交集運算。

Except

(left: LogicalPlan, right: LogicalPlan)
在left計算結果中剔除掉right中的計算結果

Union

(children: Seq[LogicalPlan])
将一組Childs的計算結果進行Union聯合

Sort

(order: Seq[SortOrder],global: Boolean, child: LogicalPlan)
對child的輸出進行sort排序

Repartition

(numPartitions: Int, shuffle: Boolean, child: LogicalPlan)
對child輸出的資料進行重新分區操作

InsertIntoTable

(table: LogicalPlan,child: LogicalPlan,...)
将child輸出的rows輸出到table中

Distinct

(child: LogicalPlan)
對child輸出的rows取重操作

GlobalLimit

(limitExpr: Expression, child: LogicalPlan)
對Child輸出的資料進行Limit限制

Sample

(child: LogicalPlan,....)
根據一些參數,從child輸出的Rows進行一定比例的取樣

Aggregate

(groupingExpressions: Seq[Expression],aggregateExpressions: Seq[NamedExpression],child: LogicalPlan)
對child輸出row進行aggregate操作,比如groupby之類的操作

這些Operator共同組成SELECT SQL語句中各種核心語言要素,而且Catatyst後面的所有優化邏輯都是針對SELECT語句進行優化。對于譬如CREATE TABLE以及SET之類的SQL語言元素,它們都是Command存在,相比SELECT,Command組成的Plan要簡單很多,不過它的種類倒是真的不少!

Name 功能描述

DataBase

操作類
支援ShowDatabase以及UseDatabase以及Create等操作

Table

操作類
多達13種,比如Create,Show,Alter等

View

操作類
CreateViewCommand支援View的建立

Partition

操作類
支援Partition新增删除等操作

Resources

操作類
比如AddJar之類的資源操作

Functions

操作類
支援新增函數,删除函數等操作

Cache

操作類
支援對Table進行cache和uncache操作

Set

操作
通過SetCommand執行對參數進行臨時修改

由Operator組成的Plan,僅僅隻是一組描述形式的而存在,畢竟隻是

LogicalPlan

,它們需要被轉換為最終的

PhysicalPlan

才能真正具有可執行的能力,而這些Command類型的Plan都是以

def run(sparkSession: SparkSession): Seq[Row]

函數暴露給Spark SQL,比如通過調用Table的run函數完成Table的建立等操作。是以我們可以肯定的是:Plan優化都是針對以Operator組成的Plan。

##4. Expression和Plan的Tree支援的操作

在Catalyst中,Expression和Plan都為Tree類型的資料結構,無論是從SQL語句中解析出Plan或者Expression,或針對Plan或Expression進行Analy以及Optimize操作,都需要針對Tree資料結構進行周遊,其中經典Tree周遊算法有先序和後序周遊。

另外由于TreeNode節點本身類型為

Product

(何為

Product

?在Scala中

Product

是最基本資料類型之一,如果一個

Case Class

 繼承

Product

,那麼即可以通過

productElement

函數或者

productIterator

疊代器對

Case Class

的參數資訊進行索引和周遊),并且所有Expression和Plan都是屬于

Product

類型,是以可以通過TreeNode内部定義的

mapProductIterator

函數對節點參數進行周遊。在周遊過程中,可以針對參數類型進行Case過濾,進而有選擇的處理本次希望處理的資料,比如對一個Expression中所有類型為Expression的子表達式進行操作,而可以忽略其他類型的參數。

對Plan或Expression進行周遊的目的:首先是為了收集一些資訊,比如針對Tree進行map/foreach操作;其次是為了對Tree節點内部的資訊進行修改,比如對PlanTree中每個Plan節點内部引用的Attribute進行Revole操作;最後就是為對Tree的資料結構進行修改,比如删除Tree的子節點,以及與子節點進行合并,比如Catasylt Optitimze就有大量Tree結構的修改。

Catalyst在實作Tree的操作上,代碼很是優雅的主要原因:它是基于Scala來實作。Scala的偏函數PartialFunction(偏函數是對函數定義域的一個子集進行定義的函數。 scala中用scala.PartialFunction[-T, +S]類來表示)可以清晰的描述操作目的,比如

PartialFunction[LogicalPlan, LogicalPlan]

是針對Plan類型的節點進行操作,而

PartialFunction[Expression, Expression]

是針對Expression進行操作;其次Scala強大的Case正則比對,讓在對Tree進行周遊過程,可以清晰确定這一次需要操作的對象,如果用别的語言來實作下面TypeFilter合并,其代碼将會是何等的苦澀。

case t1 @ TypedFilter(_, _, t2 @ TypedFilter(_, _, child))
	if t1.deserializer.dataType == t2.deserializer.dataType =>
	TypedFilter(combineFilterFunction(t2.func, t1.func), t1.deserializer, child)
//優雅Case結構比對語句,以及簡潔的CaseIF判斷,讓快速定位相應的邏輯節點,并對節點資料進行修改變的如此簡單。
           

同時無論是對Expression進行操作還是對Plan進行操作,Catalyst将他們都抽象為

Rule

,它的apply函數通過傳入一個TreeType類型的元素,并輸出一個TreeType類型的元素。

abstract class Rule[TreeType <: TreeNode[_]] extends Logging {
  val ruleName: String
  def apply(plan: TreeType): TreeType
}
           

在對一個元素進行

rule.apply

操作以後,可以針對前後的元素是否相等

curPlan.fastEquals(lastPlan)

,來确定該

rule

是否對該元素有效,其中無效可能是因為該rule沒有case比對到相應資訊,首先可能是真的沒有,其次

rule

是可以反複的應用到一個節點,直到節點狀态趨向穩定,即

rule

已經應用多次,已經找不到比對的資訊了。

另外可以将一組

Rule

組合為一個

Batch(name: String,rules: Rule[TreeType]*)

并把它封裝在

RuleExecutor

中,進而通過

RuleExecutor

将該組

Rule

的可執行接口提供給外部使用,比如大名頂頂的Optimize政策,就是一堆堆的Batch組成。

abstract class Optimizerextends RuleExecutor[LogicalPlan] {
	def batches: Seq[Batch] = {
		Batch("Finish Analysis", Once,
		EliminateSubqueryAliases,
		ReplaceExpressions,
		ComputeCurrentTime,
		GetCurrentDatabase(sessionCatalog),
		RewriteDistinctAggregates) ::
	....
           

如上所言,

Rule

是通過反複的應用在一個節點,直到節點狀态趨向穩定。但是如優化類型的

Rule

,很多時候,優化是沒有止境了,優化的越深,優化開銷就越大。是以我也需要一定的手段來控制

Batch

應用到何種程度,比如

Once extends Strategy

表示該

Batch

隻允許應用一次;而

FixedPoint extends Strategy

表示該

Batch

最多隻允許應用N次,當然如果

batch

在運作過程中,節點已經

穩定

,會立即退出嘗試的。

Spark SQL對Plan Tree或者内部Expression Tree的周遊分為幾個階段:對AST進行Parse操作,生成Unresolve Plan;對Unresolve Plan進行Analy(包括Resolve)操作,生成Logical Plan;對Logical Plan進行Optimize操作,生成Optimized Logical Plan;以及最後進行Planning操作,生成Physical Plan。這裡面的每一階段都可以簡述為應用一組BatchRule來對plan進行加工,但是裡面每一個Rule都是很值得去細節學習和分析的,實話,我也沒有一個一個去看!!!

本文主要是針對catalyst内部實作做了一些簡潔的分析,注重分析與catalyst相關的三個概念

Row,Expression,Plan

,因為對三個概念的了解與否,将決定是否可以看懂spark sql後面相關細節。 同時,Spark SQL真的很複雜,如果想真的完全了解Spark SQL内部的具體細節,這條路還是很長!fighting!

繼續閱讀