摘要:為了解決過多依賴 Hive 的問題, SparkSQL 使用了一個新的 SQL 優化器替代 Hive 中的優化器, 這個優化器就是 Catalyst。
本文分享自華為雲社群《Spark 開源新特性:Catalyst 優化流程裁剪》,作者:hzjturbo 。
1. 問題背景

上圖是典型的Spark Catalyst優化器的布局,一條由使用者輸入的SQL,到真實可排程執行的RDD DAG任務,需要經曆以下五個階段:
- Parser: 将SQL解析成相應的抽象文法樹(AST),spark也稱為 Unresolved Logical Plan;
- Analyzer: 通過查找Metadata的Catalog資訊,将 Unresolved Logical Plan 變為 Resolved Logical Plan,這個過程會做表、列、資料類型等做校驗;
- Optimizer: 邏輯優化流程,通過一些優化規則對比對上的Plan做轉換,得到優化後的邏輯Plan
- Planner:根據Optimized Logical Plan的統計資訊等轉換成相應的Physical Plan
- Query Execution: 主要是執行前的一些preparations優化,比如AQE, Exchange Reuse, CodeGen stages合并等
上述的五個階段中,除了Parser (由Antlr實作),其他的每個階段都是由一個個規則(Rule)構成,總共大約有200+個,對于不同的規則,還可能需要跑多次,是以對于相對比較複雜的查詢,可能得到一個executed Plan都需要耗費數秒。
Databricks内部基準測試表明,對于TPC-DS查詢,每個查詢平均調用樹轉換函數約280k次,這遠遠超出了必要的範圍。是以,我們探索在每個樹節點中嵌入BitSet,以傳遞自身及其子樹的資訊,并利用計劃不變性來修剪不必要的周遊。通過原型實作驗證:在TPC-DS基準測試中,我們看到優化的速度約為50%,分析的速度約為30%,整個查詢編譯的速度約為34%(包括Hive元存儲RPC和檔案清單)[1]。
2. 設計實作
2.1 Tree Pattern Bits and Rule Id Bits
- Tree pattern bits
在TreeNode 增加nodePatterns屬性,所有繼承該類的節點可以通過複寫該屬性值來辨別自己的屬性。
/**
* @return a sequence of tree pattern enums in a TreeNode T. It does not include propagated
* patterns in the subtree of T.
*/
protected val nodePatterns: Seq[TreePattern] = Seq()
TreePattern 是一個枚舉類型, 對于每個節點/表達式都可以為其設定一個TreePattern友善辨別,具體可見 TreePatterns.scala 。
例如對于Join節點的nodePatterns:
override val nodePatterns : Seq[TreePattern] = {
var patterns = Seq(JOIN)
joinType match {
case _: InnerLike => patterns = patterns :+ INNER_LIKE_JOIN
case LeftOuter | FullOuter | RightOuter => patterns = patterns :+ OUTER_JOIN
case LeftSemiOrAnti(_) => patterns = patterns :+ LEFT_SEMI_OR_ANTI_JOIN
case NaturalJoin(_) | UsingJoin(_, _) => patterns = patterns :+ NATURAL_LIKE_JOIN
case _ =>
}
patterns
}
- Rule ID bits
将規則ID的緩存BitSet嵌入到每個樹/表達式節點T中,這樣我們就可以跟蹤規則R對于根植于T的子樹是有效還是無效。這樣,如果R在T上被調用,并且已知R無效,如果R再次應用于T(例如,R位于定點規則批進行中),我們可以跳過它。這個想法最初被用于Cascades optimizer,以加快探索性規劃。
Rule:
abstract class Rule[TreeType <: TreeNode[_]] extends SQLConfHelper with Logging {
// The integer id of a rule, for pruning unnecessary tree traversals.
protected lazy val ruleId = RuleIdCollection.getRuleId(this.ruleName)
TreeNode:
/**
* A BitSet of rule ids to record ineffective rules for this TreeNode and its subtree.
* If a rule R (which does not read a varying, external state for each invocation) is
* ineffective in one apply call for this TreeNode and its subtree, R will still be
* ineffective for subsequent apply calls on this tree because query plan structures are
* immutable.
*/
private val ineffectiveRules: BitSet = new BitSet(RuleIdCollection.NumRules)
2.2 Changes to The Transform Function Family
改造後的transform 方法相比之前的多了兩個判斷,如下所示
def transformDownWithPruning(
cond: TreePatternBits => Boolean, // 判斷是否存在可優化的節點,由規則設計者所提供
ruleId: RuleId = UnknownRuleId // 不會生效的規則ID,自動更新
)(rule: PartialFunction[BaseType, BaseType]): BaseType = {
// 如果上述兩個條件存在一個不滿足,直接跳過本次規則
if (!cond.apply(this) || isRuleIneffective(ruleId)) {
return this
}
// 執行rule的邏輯
val afterRule = CurrentOrigin.withOrigin(origin) {
rule.applyOrElse(this, identity[BaseType])
}
// Check if unchanged and then possibly return old copy to avoid gc churn.
if (this fastEquals afterRule) {
val rewritten_plan = mapChildren(_.transformDownWithPruning(cond, ruleId)(rule))
// 如果沒生效,把規則ID加入到不生效的BitSet裡
if (this eq rewritten_plan) {
markRuleAsIneffective(ruleId)
this
} else {
rewritten_plan
}
} else {
// If the transform function replaces this node with a new one, carry over the tags.
afterRule.copyTagsFrom(this)
afterRule.mapChildren(_.transformDownWithPruning(cond, ruleId)(rule))
}
}
2.3 Changes to An Individual Rule
規則的例子:
object OptimizeIn extends Rule[LogicalPlan] with SQLConfHelper {
def apply(plan: LogicalPlan): LogicalPlan = plan transform ({
case q: LogicalPlan => q transformExpressionsDown ({
case In(v, list) if list.isEmpty => ...
case expr @ In(v, list) if expr.inSetConvertible => ...
}, _.containsPattern(IN), ruleId) // 必須包含IN
}, _.containsPattern(IN), ruleId) // 必須包含IN
}
3. 測試結果
在Delta中使用TPC-DS SF10對TPC-DS查詢編譯時間進行了基準測試。結果如下:
- 圖1顯示了查詢編譯速度;
- 表1顯示了幾個關鍵樹周遊函數的調用計數和CPU減少的細分。
我簡單運作了開版本的TPCDSQuerySuite,該測試會把TPCDS的語句解析優化,并且檢查下生成的代碼(CodeGen),平均耗時的時間為三次運作得到的最優值, 得到的結果如下:
- 合入PR前[2], 包含156個Tpcds查詢,平均總耗時~56s
- 最新Spark開源代碼,包含150個Tpcds查詢,平均總耗時~19s
之是以最新的Tpcds查詢比合入PR前的條數少6條,是因為後續有個減少重複TPCDS的PR。總時長優化前是優化後的兩倍多。
參考引用
[1]. [SPARK-34916] Tree Traversal Pruning for Catalyst Transform/Resolve Function Families. SISP
[2]. [SPARK-35544][SQL] Add tree pattern pruning to Analyzer rules.
[3]. Building a SIMD Supported Vectorized Native Engine for Spark SQL. link
點選關注,第一時間了解華為雲新鮮技術~