天天看點

[轉載] 是時候學習真正的 spark 技術了

本文轉自:https://mp.weixin.qq.com/s/awT4aawtTIkNKGI_2zn5NA

本站轉載已經過作者授權。任何形式的轉載都請聯系原作者(孫彪彪/[email protected])獲得授權并注明出處。

spark sql 可以說是 spark 中的精華部分了,我感覺整體複雜度是 spark streaming 的 5 倍以上,現在 spark 官方主推 structed streaming, spark streaming  維護的也不積極了, 我們基于 spark 來建構大資料計算任務,重心也要向 DataSet 轉移,原來基于 RDD 寫的代碼遷移過來,好處是非常大的,尤其是在性能方面,有質的提升,  spark sql 中的各種内嵌的性能優化是比人裸寫 RDD 遵守各種所謂的最佳實踐更靠譜的,尤其對新手來講, 比如有些最佳實踐講到先 filter 操作再 map 操作,這種 spark sql 中會自動進行謂詞下推,比如盡量避免使用 shuffle 操作,spark sql 中如果你開啟了相關的配置,會自動使用 broadcast join 來廣播小表,把 shuffle join 轉化為 map join 等等,真的能讓我們省很多心。

spark sql 的代碼複雜度是問題的本質複雜度帶來的,spark sql 中的 Catalyst 架構大部分邏輯是在一個 Tree 類型的資料結構上做各種折騰,基于 scala 來實作還是很優雅的,scala 的偏函數和強大的 Case 正則比對,讓整個代碼看起來還是清晰的, 這篇文章簡單的描述下 spark sql 中的一些機制和概念。

SparkSession 是我們編寫 spark 應用代碼的入口,啟動一個 spark-shell 會提供給你一個建立 SparkSession, 這個對象是整個 spark 應用的起始點,我們來看下 sparkSession 的一些重要的變量和方法:

[轉載] 是時候學習真正的 spark 技術了
上面提到的 sessionState 是一個很關鍵的東西,維護了目前 session 使用的所有的狀态資料,有以下各種需要維護的東西:
[轉載] 是時候學習真正的 spark 技術了
spark sql 内部使用 dataFrame 和 Dataset 來表示一個資料集合,然後你可以在這個資料集合上應用各種統計函數和算子,有人可能對  DataFrame 和 Dataset 分不太清,其實 DataFrame 就是一種類型為 Row 的 DataSet,

type DataFrame = Dataset[Row]
           

這裡說的 Row 類型在 Spark sql 對外暴露的 API 層面來說的, 然而 DataSet 并不要求輸入類型為 Row,也可以是一種強類型的資料,DataSet 底層處理的資料類型為 Catalyst 内部 InternalRow 或者 UnsafeRow 類型, 背後有一個 Encoder 進行隐式轉換,把你輸入的資料轉換為内部的 InternalRow,那麼這樣推論,DataFrame 就對應 RowEncoder。

在 Dataset 上進行 transformations 操作就會生成一個元素為 LogicalPlan 類型的樹形結構, 我們來舉個例子,假如我有一張學生表,一張分數表,需求是統計所有大于 11 歲的學生的總分。

[轉載] 是時候學習真正的 spark 技術了

這個 queryExecution 就是整個執行計劃的執行引擎, 裡面有執行過程中,各個中間過程變量,整個執行流程如下

[轉載] 是時候學習真正的 spark 技術了

那麼我們上面例子中的 sql 語句經過 Parser 解析後就會變成一個抽象文法樹,對應解析後的邏輯計劃 AST 為

[轉載] 是時候學習真正的 spark 技術了

形象一點用圖來表示

[轉載] 是時候學習真正的 spark 技術了

我們可以看到過濾條件變為了 Filter 節點,這個節點是 UnaryNode 類型, 也就是隻有一個孩子,兩個表中的資料變為了 UnresolvedRelation 節點,這個節點是 LeafNode 類型, 顧名思義,葉子節點, JOIN 操作就表位了 Join 節點, 這個是一個 BinaryNode 節點,有兩個孩子。

上面說的這些節點都是 LogicalPlan 類型的, 可以了解為進行各種操作的 Operator, spark sql 對應各種操作定義了各種 Operator。

[轉載] 是時候學習真正的 spark 技術了

這些 operator 組成的抽象文法樹就是整個 Catatyst 優化的基礎,Catatyst 優化器會在這個樹上面進行各種折騰,把樹上面的節點挪來挪去來進行優化。

現在經過 Parser 有了抽象文法樹,但是并不知道 score,sum 這些東西是啥,是以就需要 analyer 來定位, analyzer 會把 AST 上所有 Unresolved 的東西都轉變為 resolved 狀态,sparksql 有很多resolve 規則,都很好了解,例如 ResolverRelations 就是解析表(列)的基本類型等資訊,ResolveFuncions 就是解析出來函數的基本資訊,比如例子中的sum 函數,ResolveReferences 可能不太好了解,我們在 sql 語句中使用的字段比如 Select name 中的 name 對應一個變量, 這個變量在解析表的時候就作為一個變量(Attribute 類型)存在了,那麼 Select 對應的 Project 節點中對應的相同的變量就變成了一個引用,他們有相同的 ID,是以經過 ResolveReferences 處理後,就變成了 AttributeReference 類型   ,保證在最後真正加載資料的時候他們被賦予相同的值,就跟我們寫代碼的時候定義一個變量一樣,這些 Rule 就反複作用在節點上,指定樹節點趨于穩定,當然優化的次數多了會浪費性能,是以有的 rule  作用 Once, 有的 rule 作用 FixedPoint, 這都是要取舍的。好了, 不說廢話,我們做個小實驗。

[轉載] 是時候學習真正的 spark 技術了

我們使用 ResolverRelations 對我們的 AST 進行解析,解析後可以看到原來的 UnresolvedRelation 變成了 LocalRelation,這個表示一個本地記憶體中的表,這個表是我們使用 createOrReplaceTempView 的時候注冊在 catalog 中的,這個 relove 操作無非就是在 catalog 中查表,找出這個表的 schema, 而且解析出來相應的字段,把外層使用者定義的 各個 StructField 轉變為 AttibuteReference,使用 ID 進行了标記。

[轉載] 是時候學習真正的 spark 技術了

我們再使用 ResolveReferences 來搞一下,你會發現上層節點中的相同的字段都變成了擁有相同 ID 的引用,他們的類型都是 AttibuteReference。最終所有的 rule 都應用後,整個 AST 就變為了

[轉載] 是時候學習真正的 spark 技術了

下面重點來了,要進行邏輯優化了,我們看下邏輯優化有哪些:

[轉載] 是時候學習真正的 spark 技術了
[轉載] 是時候學習真正的 spark 技術了

sparksql 中的邏輯優化種類繁多,spark sql 中的 Catalyst 架構大部分邏輯是在一個 Tree 類型的資料結構上做各種折騰,基于 scala 來實作還是很優雅的,scala 的偏函數 和 強大的 Case 正則比對,讓整個代碼看起來還是清晰的,廢話少說,我們來搞個小實驗。

[轉載] 是時候學習真正的 spark 技術了

看到了沒,把我的 (100 + 10) 換成了 110。

[轉載] 是時候學習真正的 spark 技術了

使用 PushPredicateThroughJoin 把一個單單對 stu 表做過濾的 Filter 給下推到 Join 之前了,會少加載很多資料,性能得到了優化,我們來看下最終的樣子。

[轉載] 是時候學習真正的 spark 技術了

至少用了 ColumnPruning,PushPredicateThroughJoin,ConstantFolding,RemoveRedundantAliases 邏輯優化手段,現在我的小樹變成了:

[轉載] 是時候學習真正的 spark 技術了

做完邏輯優化,畢竟隻是抽象的邏輯層,還需要先轉換為實體執行計劃,将邏輯上可行的執行計劃變為 Spark 可以真正執行的計劃。

[轉載] 是時候學習真正的 spark 技術了

spark sql 把邏輯節點轉換為了相應的實體節點, 比如 Join 算子,Spark 根據不同場景為該算子制定了不同的算法政策,有BroadcastHashJoin、ShuffleHashJoin 以及 SortMergeJoin 等, 當然這裡面有很多優化的點,spark 在轉換的時候會根據一些統計資料來智能選擇,這就涉及到基于代價的優化,這也是很大的一塊,後面可以開一篇文章單講, 我們例子中的由于資料量小于 10M, 自動就轉為了 BroadcastHashJoin,眼尖的同學可以看到好像多了一些節點,我們來解釋下, BroadcastExchange 節點繼承 Exchage 類,用來在節點間交換資料,這裡的BroadcastExchange 就是會把 LocalTableScan出來的資料 broadcast 到每個 executor 節點,用來做 map-side join。最後的 Aggregate 操作被分為了兩步,第一步先進行并行聚合,然後對聚合後的結果,再進行 Final 聚合,這個就類似域名 map-reduce  裡面的 combine 和最後的 reduce, 中間加上了一個 Exchange hashpartitioning, 這個是為了保證相同的 key shuffle 到相同的分區,目前實體計劃的 Child 輸出資料的 Distribution 達不到要求的時候需要進行Shuffle,這個是在最後的 EnsureRequirement 階段插入的交換資料節點,在資料庫領域裡面,有那麼一句話,叫得 join 者得天下,我們重點講一些 spark sql 在 join 操作的時候做的一些取舍。

Join 操作基本上能上會把兩張 Join 的表分為大表和小表,大表作為流式周遊表,小表作為查找表,然後對大表中的每一條記錄,根據 Key 來取查找表中取相同 Key 的記錄。

spark 支援所有類型的 Join:

[轉載] 是時候學習真正的 spark 技術了

spark sql 中 join 操作根據各種條件選擇不同的 join 政策,分為 BroadcastHashJoin, SortMergeJoin, ShuffleHashJoin。

  • BroadcastHashJoin:spark 如果判斷一張表存儲空間小于 broadcast 門檻值時(Spark 中使用參數 spark.sql.autoBroadcastJoinThreshold 來控制選擇 BroadcastHashJoin 的門檻值,預設是 10MB),就是把小表廣播到 Executor, 然後把小表放在一個 hash 表中作為查找表,通過一個 map 操作就可以完成 join 操作了,避免了性能代碼比較大的 shuffle 操作,不過要注意, BroadcastHashJoin 不支援 full outer join, 對于 right outer join, broadcast 左表,對于 left outer join,left semi join,left anti join ,broadcast 右表, 對于 inner join,那個表小就 broadcast 哪個。
  • SortMergeJoin:如果兩個表的資料都很大,比較适合使用 SortMergeJoin,  SortMergeJoin 使用shuffle 操作把相同 key 的記錄 shuffle 到一個分區裡面,然後兩張表都是已經排過序的,進行 sort merge 操作,代價也可以接受。
  • ShuffleHashJoin:就是在 shuffle 過程中不排序了,把查找表放在hash表中來進行查找 join,那什麼時候會進行 ShuffleHashJoin 呢?查找表的大小不能超過 spark.sql.autoBroadcastJoinThreshold 值,不然就使用  BroadcastHashJoin 了,每個分區的平均大小不能超過   spark.sql.autoBroadcastJoinThreshold ,這樣保證查找表可以放在記憶體中不 OOM, 還有一個條件是 大表是小表的 3 倍以上,這樣才能發揮這種 Join 的好處。

上面提到 AST 上面的節點已經轉換為了實體節點,這些實體節點最終從頭節點遞歸調用 execute 方法,裡面會在 child 生成的 RDD 上調用 transform操作就會産生一個串起來的 RDD 鍊, 就跟在 spark stremaing 裡面在 DStream 上面遞歸調用那樣。最後執行出來的圖如下:

[轉載] 是時候學習真正的 spark 技術了

可以看到這個最終執行的時候分分成了兩個 stage, 把小表 broeadcastExechage 到了大表上做 BroadcastHashJoin, 沒有進化 shuffle 操作,然後最後一步聚合的時候,先在 map 段進行了一次 HashAggregate sum 函數, 然後 Exchage 操作根據 name 把相同 key 的資料 shuffle 到同一個分區,然後做最終的 HashAggregate sum 操作,這裡有個 WholeStageCodegen 比較奇怪,這個是幹啥的呢,因為我們在執行 Filter ,Project 這些 operator 的時候,這些 operator 内部包含很多  Expression, 比如 SELECT sum(v),name, 這裡的 sum 和 v 都是 Expression,這裡面的 v 屬于 Attribute 變量表達式,表達式也是樹形資料結構,sum(v)  就是 sum 節點和 sum 的子節點 v 組成的一個樹形結構,這些表達式都是可以求值和生成代碼的,表達式最基本的功能就是求值,對輸入的 Row 進行計算 , Expression 需要實作 def eval(input: InternalRow = null): Any 函數來實作它的功能。

表達式是對 Row 進行加工,輸出的可以是任意類型,但是 Project 和 Filter 這些 Plan 輸出的類型是 def output: Seq[Attribute], 這個就是代表一組變量,比如我們例子中的 Filter (age >= 11) 這個plan, 裡面的 age>11 就是一個表達式,這個 > 表達式依賴兩個子節點, 一個Literal常量表達式求值出來就是 11, 另外一個是 Attribute 變量表達式 age, 這個變量在 analyze 階段轉變為了 AttributeReference 類型,但是它是Unevaluable,為了擷取屬性在輸入 Row 中對應的值, 還得根據 schema 關聯綁定一下這個變量在一行資料的 index, 生成 BoundReference,然後 BoundReference 這種表達式在 eval 的時候就可以根據 index 來擷取 Row 中的值。  age>11 這個表達式最終輸出類型為 boolean 類型,但是 Filter 這個 Plan 輸出類型是 Seq[Attribute] 類型。

可以想象到,資料在一個一個的 plan 中流轉,然後每個 plan 裡面表達式都會對資料進行處理,就相當于經過了一個個小函數的調用處理,這裡面就有大量的函數調用開銷,那麼我們是不是可以把這些小函數内聯一下,當成一個大函數,WholeStageCodegen 就是幹這事的。

[轉載] 是時候學習真正的 spark 技術了

可以看到最終執行計劃每個節點前面有個 * 号,說明整段代碼生成被啟用,在我們的例子中,Filter, Project,BroadcastHashJoin,Project,HashAggregate 這一段都啟用了整段代碼生成,級聯為了兩個大函數,有興趣可以使用 a.queryExecution.debug.codegen 看下生成後的代碼長什麼樣子。然而 Exchange 算子并沒有實作整段代碼生成,因為它需要通過網絡發送資料。

我今天的分享就到這裡,其實 spark sql 裡面有很多有意思的東西,但是因為問題的本質複雜度,導緻需要高度抽象才能把這一切理順,這樣就給代碼閱讀者帶來了了解困難, 但是你如果真正看進去了,就會有很多收獲。如果對本文有任何見解,歡迎在文末留言說出你的想法。

繼續閱讀