背景
EMR Spark提供的Relational Cache功能,可以通過對資料模型進行預計算和高效地存儲,加速Spark SQL,為客戶實作利用Spark SQL對海量資料進行即時查詢的目的。Relational Cache的工作原理類似物化視圖,在使用者送出SQL語句時對語句進行分析,并選出可用的預計算結果來加速查詢。為了實作高效地預計算結果複用,我們建構的預計算緩存一般都較為通用,是以對于使用者query,還需進行進一步的計算方能獲得最終結果。是以,如何快速地找出比對的緩存,并建構出準确的新執行計劃,就顯得尤為重要。
在Hive 3.x中支援的Materialized View,利用了Apache Calcite對執行計劃進行重寫。考慮到Spark SQL使用Catalyst進行執行計劃優化,引入Calcite太重,是以EMR Spark中的Relational Cache實作了自己的Catalyst規則,用于重寫執行計劃。本文将介紹執行計劃重寫的相關内容。
執行計劃重寫
準備工作
Spark會把使用者查詢語句進行解析,依次轉化為Unresolved Logical Plan(未綁定的邏輯計劃)、Resolved Logical Plan(綁定的邏輯計劃)、Optimized Logical Plan(優化的邏輯計劃)、Physical Plan(實體計劃)。其中,未優化的邏輯計劃根據使用者查詢語句不同,會有較大差別,而Relational Cache作為優化的一部分,放在邏輯計劃優化過程中也較為合适,是以我們拿到的使用者查詢計劃會是優化中的邏輯計劃。要與優化中的邏輯計劃比對,我們選擇把這個重寫過程放在Spark優化器比較靠後的步驟中,同時,預先将Relational Cache的邏輯計劃進行解析,獲得優化後的Cache計劃,減小比對時的複雜程度。這樣,我們隻需比對做完了謂詞下推、謂詞合并等等優化之後的兩個邏輯計劃。
基本過程
在比對時,我們希望能盡可能多得比對計算和IO操作,是以,我們對目标計劃進行前序周遊,依次進行比對,嘗試找到最多的比對節點。而在判斷兩個節點是否比對時,我們采用後序周遊的方式,希望盡快發現不比對的情況,減少計劃比對的執行時間。然後我們會根據比對結果,對計劃進行重寫,包括對于Cache資料進行進一步的Filter、Project、Sort甚至Aggregate等操作,使其與比對節點完全等價,然後更新邏輯計劃節點的引用綁定,無縫替換到邏輯計劃中,這樣就能輕松獲得最終的重寫後的計劃。
Join比對
Spark中的Join都是二進制操作,而實際的Join順序可能根據一些政策會有很大差別,是以對于Join節點,必須進行特殊處理。我們會首先将邏輯計劃進行處理,根據緩存計劃的Join順序進行Join重排。這一步在樹狀比對之前就進行了,避免不斷重複Join重排帶來的時間浪費。重排後的Join可以更大機率地被我們比對到。
為了實作Cache的通用性,根據星型資料模型的特點,我們引入了Record Preserve的概念。這和傳統資料庫中的Primary Key/Foreign Key的關系較為類似,當有主鍵的表與非空外鍵指向的表在外鍵上進行Join時,記錄的條數不會變化,不會膨脹某條記錄,也不會丢失某條記錄。PK/FK的語意在大資料處理架構中經常缺失,我們引入了新的DDL讓使用者自定義Record Preserve Join的關系。當使用者定義A Inner Join B是對于A表Record Preserve時,我們也會把A Inner Join B和A的關系比對起來。有了PK/FK的幫助,我們能比對上的情況大大增加了,一個Relational Cache可以被更多看似差別巨大的查詢共享,這可以很好的為使用者節約額外的存儲開銷和預計算開銷。
Aggregate比對
一般的Aggregate比對較為簡單,而Spark支援的Grouping Set操作,會建構出Expand邏輯計劃節點,相當于把一條記錄轉為多條,使用Grouping ID進行标記。由于Expand的子節點是所有Grouping的情況共用的,這裡我們隻對子節點進行一次比對,再分别進行上面的Grouping屬性和Aggregate屬性的比對。主要是驗證目标聚合所需的屬性或者聚合函數都能從某個Grouping ID對應的聚合結果中計算出來,比如粗粒度的Sum可以對細粒度的Sum進行二次Sum求和,而粗粒度的Count對細粒度的Count也應通過二次Sum求和,粗粒度的Average無法僅從細粒度的Average中還原出來等等。
計劃重寫
找出比對的邏輯計劃之後,就是重寫邏輯計劃的過程。對于無需二次聚合的邏輯計劃,直接根據緩存資料的schema,從緩存資料的Relation中選擇所需列,根據條件過濾後,進行後續操作。如果還需二次聚合,選擇所需列時需保留外部要用的所有列,以及聚合時需要的列,還有聚合函數需要的資料。二次聚合的聚合函數需要根據實際情況進行重寫,確定能使用Relational Cache中已經初步聚合的結果。這裡面需要根據聚合的語意判斷是否能夠二次聚合。如果時Grouping Set的聚合,二次聚合之前還需選擇正确的Grouping ID進行過濾。經過二次聚合後,步驟大體和普通的重寫一緻,隻需替換到目标計劃中即可。
結果
我們以一個例子來具體說明邏輯計劃的重寫結果。Star Schema Benchmark(
論文連結)是星型模型資料分析的一個标準Benchmark,其結構定義如圖所示:

我們建構Relational Cache的SQL語句如下:
SELECT GROUPING_ID() AS grouping_id, lo_discount, s_city, c_city, p_category, d_year, lo_quantity, d_weeknuminyear, s_nation, s_region, p_mfgr, c_region, d_yearmonth, p_brand, c_nation, d_yearmonthnum, SUM(lo_revenue) AS lo_revenue_SUM, SUM(lo_supplycost) AS lo_supplycost_SUM, SUM(V_REVENUE) AS V_REVENUE_SUM
FROM supplier, p_lineorder, dates, customer, part
WHERE lo_orderdate = d_datekey AND lo_custkey = c_custkey AND lo_suppkey = s_suppkey AND lo_partkey = p_partkey
GROUP BY lo_discount, s_city, c_city, p_category, d_year, lo_quantity, d_weeknuminyear, s_nation, s_region, p_mfgr, c_region, d_yearmonth, p_brand, c_nation, d_yearmonthnum GROUPING SETS ((d_year, d_weeknuminyear, lo_discount, lo_quantity), (d_year, lo_discount, lo_quantity), (lo_discount, lo_quantity), (d_yearmonthnum, lo_discount, lo_quantity), (d_year, p_category, p_brand, s_region), (d_year, p_category, s_region), (d_year, s_region), (d_year, s_region, c_region, s_nation, c_nation), (d_year, s_city, c_city, s_nation, c_nation), (d_year, s_city, c_city), (d_year, d_yearmonth, s_city, c_city), (d_year, s_region, c_region, c_nation, p_mfgr), (d_year, s_region, s_nation, c_region, p_mfgr, p_category), (d_year, s_nation, s_city, c_region, p_brand, p_category, p_brand), (d_year, s_nation, s_city, c_region, p_brand, p_category), (d_year, s_nation, s_city, c_region, p_category, p_brand))
我們從中選出一條查詢作為示例。具體查詢語句:
select c_city, s_city, d_year, sum(lo_revenue) as revenue
from customer, lineorder, supplier, dates
where lo_custkey = c_custkey
and lo_suppkey = s_suppkey
and lo_orderdate = d_datekey
and c_nation = 'UNITED KINGDOM'
and (c_city='UNITED KI1' or c_city='UNITED KI5')
and (s_city='UNITED KI1' or s_city='UNITED KI5')
and s_nation = 'UNITED KINGDOM'
and d_yearmonth = 'Dec1997'
group by c_city, s_city, d_year
order by d_year asc, revenue desc
原始邏輯計劃如下所示:
Sort [d_year#39 ASC NULLS FIRST, revenue#0L DESC NULLS LAST], true
+- Aggregate [c_city#6, s_city#31, d_year#39], [c_city#6, s_city#31, d_year#39, sum(lo_revenue#23L) AS revenue#0L]
+- Project [c_city#6, lo_revenue#23L, s_city#31, d_year#39]
+- Join Inner, (lo_orderdate#16 = d_datekey#35)
:- Project [c_city#6, lo_orderdate#16, lo_revenue#23L, s_city#31]
: +- Join Inner, (lo_suppkey#15 = s_suppkey#28)
: :- Project [c_city#6, lo_suppkey#15, lo_orderdate#16, lo_revenue#23L]
: : +- Join Inner, (lo_custkey#13 = c_custkey#3)
: : :- Project [c_custkey#3, c_city#6]
: : : +- Filter (((isnotnull(c_nation#7) && (c_nation#7 = UNITED KINGDOM)) && ((c_city#6 = UNITED KI1) || (c_city#6 = UNITED KI5))) && isnotnull(c_custkey#3))
: : : +- HiveTableRelation `ssb`.`customer`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [c_custkey#3, c_name#4, c_address#5, c_city#6, c_nation#7, c_region#8, c_phone#9, c_mktsegment#10]
: : +- Project [lo_custkey#13, lo_suppkey#15, lo_orderdate#16, lo_revenue#23L]
: : +- Filter ((isnotnull(lo_custkey#13) && isnotnull(lo_suppkey#15)) && isnotnull(lo_orderdate#16))
: : +- HiveTableRelation `ssb`.`lineorder`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [lo_orderkey#11L, lo_linenumber#12L, lo_custkey#13, lo_partkey#14, lo_suppkey#15, lo_orderdate#16, lo_orderpriotity#17, lo_shippriotity#18, lo_quantity#19L, lo_extendedprice#20L, lo_ordtotalprice#21L, lo_discount#22L, lo_revenue#23L, lo_supplycost#24L, lo_tax#25L, lo_commitdate#26, lo_shipmode#27]
: +- Project [s_suppkey#28, s_city#31]
: +- Filter (((isnotnull(s_nation#32) && ((s_city#31 = UNITED KI1) || (s_city#31 = UNITED KI5))) && (s_nation#32 = UNITED KINGDOM)) && isnotnull(s_suppkey#28))
: +- HiveTableRelation `ssb`.`supplier`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [s_suppkey#28, s_name#29, s_address#30, s_city#31, s_nation#32, s_region#33, s_phone#34]
+- Project [d_datekey#35, d_year#39]
+- Filter ((isnotnull(d_yearmonth#41) && (d_yearmonth#41 = Dec1997)) && isnotnull(d_datekey#35))
+- HiveTableRelation `ssb`.`dates`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [d_datekey#35, d_date#36, d_dayofweek#37, d_month#38, d_year#39, d_yearmonthnum#40, d_yearmonth#41, d_daynuminweek#42, d_daynuminmonth#43, d_daynuminyear#44, d_monthnuminyear#45, d_weeknuminyear#46, d_sellingseason#47, d_lastdayinweekfl#48, d_lastdayinmonthfl#49, d_holidayfl#50, d_weekdayfl#51]
重寫後的一個邏輯計劃如下:
Sort [d_year#47 ASC NULLS FIRST, revenue#558L DESC NULLS LAST], true
+- Aggregate [c_city#22, s_city#39, d_year#47], [c_city#22, s_city#39, d_year#47, sum(cast(lo_revenue_SUM#773L as bigint)) AS revenue#558L]
+- Filter ((((((((isnotnull(s_nation#40) && ((s_city#39 = UNITED KI1) || (s_city#39 = UNITED KI5))) && (s_nation#40 = UNITED KINGDOM)) && isnotnull(d_yearmonth#49)) && (d_yearmonth#49 = Dec1997)) && isnotnull(c_nation#23)) && (c_nation#23 = UNITED KINGDOM)) && ((c_city#22 = UNITED KI1) || (c_city#22 = UNITED KI5))) && (grouping_id#662 = 19322))
+- Relation[grouping_id#662,lo_discount#759,s_city#39,c_city#22,p_category#762,lo_quantity#763,d_weeknuminyear#764,s_nation#40,s_region#766,p_mfgr#767,c_region#768,p_brand1#769,c_nation#23,d_yearmonthnum#771,d_yearmonth#49,lo_revenue_SUM#773L,lo_supplycost_SUM#774L,V_REVENUE_SUM#775L,d_year#47] parquet
由此可見,執行計劃大大簡化,我們可以做到亞秒級響應使用者的命中查詢。
進一步優化
在實際測試過程中,我們發現當多個Relational Cache存在時,比對時間線性增長明顯。由于我們在metastore中存儲的是Cache的SQL語句,取SQL語句和再次解析的時間都不容小觑,這就使得比對過程明顯增長,背離了我們追求亞秒級響應的初衷。是以我們在Spark中建構了邏輯計劃緩存,将解析過的Relational Cache的計劃緩存在記憶體中,每個Relational Cache隻緩存一份,計劃本身占用空間有限,是以我們可以緩存住幾乎所有的Relational Cache的優化後的邏輯計劃,進而在第一次查詢之後,所有查詢都不再收到取SQL語句和再次解析的延遲困擾。經過這樣的優化,比對時間大幅減少到100ms的量級。
總結與思考
Relational Cache實作了一種基于Cache的優化方案,讓Spark SQL能夠用于即時查詢的場景下,滿足使用者對海量資料秒級查詢的需求。通過對使用者查詢的動态改寫,可以大大提高緩存的使用率,擴充緩存的命中場景,有效提高查詢性能。現有方案也有很多可優化的地方,比如重複的回溯周遊時間複雜度較高,不如在邏輯計劃節點内部更新維護可比對的資訊。考慮到對Spark的侵入性,我們暫時選擇了現有方案,後續根據實際的使用情況,還會進一步優化我們的邏輯計劃重寫過程。而重寫的邏輯計劃還涉及到基于不同的Relational Cache Plan會有不同的重寫方式,在這些重寫結果中如何根據執行代價選擇最優的重寫方案,将會在後續文章中進行揭秘,敬請期待!