作者:周克勇,花名一錘,阿裡巴巴計算平台事業部EMR團隊技術專家,大資料領域技術愛好者,對Spark有濃厚興趣和一定的了解,目前主要專注于EMR産品中開源計算引擎的優化工作。
背景和動機
SparkSQL多年來的性能優化集中在Optimizer和Runtime兩個領域。前者的目的是為了獲得最優的執行計劃,後者的目的是針對既定的計劃盡可能執行的更快。
相比于Runtime,Optimizer是更加通用的、跟實作無關的優化。無論是Java世界(Spark, Hive)還是C++世界(Impala, MaxCompute),無論是Batch-Based(Spark, Hive)還是MPP-Based(Impala, Presto),甚至無論是大資料領域還是傳統資料庫領域亦或HTAP領域(HyPer, ADB),在Optimizer層面考慮的都是非常類似的問題: Stats收集,Cost評估以及計劃選擇;采用的優化技術也比較類似,如JoinReorder, CTE, GroupKey Elimination等。盡管因為上下文不同(如是否有索引)在Cost Model的構造上會有不同,或者特定場景下采用不同的空間搜尋政策(如遺傳算法 vs. 動态規劃),但方法大體是相同的。
長期以來,Runtime的優化工作基本聚焦在解決當時的硬體瓶頸。如MapReduce剛出來時網絡帶寬是瓶頸,是以Google做了很多Locality方面的優化;Spark剛出來時解決的問題是磁盤IO,記憶體緩存的設計使得性能相比MapReduce有了數量級的提升;後來CPU成為了新的瓶頸[1],是以提升CPU性能成了近年來Runtime領域重要的優化方向。
提升CPU性能的兩個主流技術是以MonetDB/X100[2](如今演化為VectorWise[3])為代表的向量化(Vectorized Processing)技術和以HyPer[5][6]為代表的代碼生成(CodeGen)技術(其中Spark跟進的是CodeGen[9])。簡單來說,向量化技術沿用了火山模型,但與其讓SQL算子每次計算一條Record,向量化技術會積攢一批資料後再執行。逐批計算相比于逐條計算有了更大的優化空間,例如虛函數的開銷分攤,SIMD優化,更加Cache友好等。這個技術的劣勢在于算子之間傳遞的資料從條變成了批,是以增大了中間資料的物化開銷。CodeGen技術從另外一個角度解決虛函數開銷和中間資料物化問題:算子融合。簡單來說,CodeGen架構通過打破算子之間的界限把火山模型“壓平”了,把原來疊代器鍊壓縮成了大的for循環,同時生成語義相同的代碼(Java/C++/LLVM),緊接着用對應的工具鍊編譯生成的代碼,最後用編譯後的class(Java)或so(C++,LLVM)去執行,進而把解釋執行轉變成了編譯執行。此外,盡管還是逐條執行,由于抹去了函數調用,一條Record從(Stage内的)初始算子一直執行到結束算子都基本處于寄存器中,不會物化到記憶體。CodeGen技術的劣勢在于難以應用SIMD等優化。
兩個門派相愛相殺,在經曆了互相發論文驗證自家優于對方後[4][8]兩家走向了合作,合作産出了一系列項目和論文,而目前學界的主流看法也是兩者融合是最優解,一些采用融合做法的項目也應運而生,如進化版HyPer[6], Pelonton[7]等。
盡管學界已走到了融合,業界主流卻沒有很強的動力往融合的路子走,探究其主要原因一是目前融合的做法相比單獨的優化并沒有質的提升;二是融合技術目前沒有一個廣為接受的最優做法,還在探索階段;三是業界在單一的技術上還沒有發揮出最大潛力。以SparkSQL為例,從2015年SparkSQL首次露面自帶的Expression級别的Codegen,到後來參考HyPer實作的WholeStage Codegen,再經過多年的打磨,SparkSQL的Codegen技術已趨成熟,性能也獲得了兩次數量級的躍升。然而,也許是出于可維護性或開發者接受度的考慮,SparkSQL的Codegen一直限制在生成Java代碼,并沒有嘗試過NativeCode(C/C++, LLVM)。盡管Java的性能已經很優,但相比于Native Code還是有一定的Overhead,并缺乏SIMD(Java在做這方面feature),Prefetch等語義,更重要的是,Native Code直接操作裸金屬,易于極緻壓榨硬體性能,對一些加速器(如GPU)或新硬體(如AEP)的支援也更友善。
基于以上動機,EMR團隊探索并開發了SparkSQL Native Codegen架構,為SparkSQL換了引擎,新引擎帶來20%左右的性能提升,為EMR再次擷取世界第一立下汗馬功勞,本文講詳細介紹Native Codegen架構。
核心問題
做Native Codegen,核心問題有三個:
1.生成什麼?
2.怎麼生成?
3.如何內建到Spark?
生成什麼
針對生成什麼代碼,結合調研的結果以及開發同學的技術棧,有三個候選項:C/C++, LLVM, Weld IR。C/C++的優勢是實作相對簡單,隻需對照Spark生成的Java代碼邏輯改寫即可,劣勢是編譯時間過長,下圖是HyPer的測評資料,C++的編譯時間比LLVM高了一個數量級。

編譯時間過長對小query很不友好,極端case編譯時間比運作時間還要長。基于這個考慮,我們排除了C/C++選項。上圖看上去LLVM的編譯時間非常友好,而且很多Native CodeGen的引擎,如HyPer, Impala, 以及阿裡雲自研大資料引擎MaxCompute,ADB等,均采用了LLVM作為目标代碼。LLVM對我們來說(對你們則不一定:D)最大的劣勢就是過于底層,文法接近于彙編,試想用彙編重寫SparkSQL算子的工作量會有多酸爽。大多數引擎也不會用LLVM寫全量代碼,如HyPer僅把算子核心邏輯用LLVM生成,其他通用功能(如spill,複雜資料結構管理等)用C++編寫并提前編譯好。即使LLVM+C++節省了不少工作量,對我們來說依然不可接受,是以我們把目光轉向了第三個選項: Weld IR(Intermediate Representation)。
首先簡短介紹以下Weld。Weld的作者Shoumik Palkar是 Matei Zaharia的學生,後者大家一定很熟悉,Spark的作者。Weld最初想解決的問題是不同lib之間互相調用時資料傳輸的開銷,例如要在pandas裡調用numpy的接口,首先pandas把資料寫入記憶體,然後numpy讀取記憶體進行計算,對于極度優化的lib來說,記憶體的寫入和讀取的時間可能會遠超計算本身。針對這個問題,Weld開發了Common Runtime并配套提供了一組IR,再加上惰性求值的特性,隻需(簡單)修改lib使其符合Weld的規範,便可以做到不同lib共用Weld Runtime,Weld Runtime利用惰性求值實作跨lib的Pipeline,進而省去資料物化的開銷。Weld Runtime還做了若幹優化,如循環融合,循環展開,向量化,自适應執行等。此外,Weld支援調用C代碼,可以友善調用三方庫。
我們感興趣的是Weld提供的IR和對應的Runtime。Weld IR面向資料分析進行設計,是以語義上跟SQL非常接近,能較好的表達算子。資料結構層面,Weld IR最核心的資料結構是vec和struct,能較好地表達SparkSQL的UnsafeRow Batch;基于struct和vec可以構造dict,能較好的表達SQL裡重度使用的Hash結構。操作層面,Weld IR提供了類函數式語言的語義,如map, filter, iterator等,配合builder語義,能友善的表達Project, Filter, Agg, BroadCastJoin等算子語義。例如,以下IR表達了Filter + Project語義,具體含義是若第二列大于10,則傳回第一列:
|v: vec[{i32,i32}]| for(v,appender,|b,i,n| if(n.$1 > 10, merge(b,n.$0), b))
以下IR表達了groupBy的語義,具體含義是按照第一列做groupBy來計算第二列的sum:
|v: vec[{i32,i32}]| for(v,dictmerger[i32,i32,+],|b,i,n| merge(b,{n.$0,n.$1}))
具體的文法定義請參考Weld文檔(
https://github.com/weld-project/weld/blob/master/docs/language.md)。
Weld 開發者API提供了兩個核型接口:
- weld_module_compile, 把Weld IR編譯成可執行子產品(module)。
- weld_module_run, 執行編譯好的子產品。
基本流程如下圖所示,最終也是生成LLVM代碼。
由此,Weld IR的優勢就顯然易見了,既兼顧了性能(最終生成LLVM代碼),又兼顧了易用性(CodeGen Weld IR相比LLVM, C++友善很多)。基于這些考慮,我們最終選擇Weld IR作為目标代碼。
怎麼生成
SparkSQL原有的CodeGen架構之前簡單介紹過了,詳見
https://developer.aliyun.com/article/727277。我們參考了Spark原有的做法,支援了表達式級别,算子級别,以及WholeStage級别的Codegen。複用Producer-Consumer架構,每個算子負責生成自己的代碼,最後由WholeStageCodeGenExec負責組裝。
這個過程有兩個關鍵問題:
1.算子之間傳輸的媒體是什麼?
2.如何處理Weld不支援的算子?
傳輸媒體
不同于Java,Weld IR不提供循環結構,取而代之的是vec結構和其上的泛疊代器操作,是以Weld IR難以借鑒Java Codegen在Stage外層套個大循環,然後每個算子處理一條Record的模式,取而代之的做法是每個算子處理一批資料,IR層面做假物化,然後依賴Weld的Loop-Fusion優化去消除物化。例如前面提到的Filter後接Project,Filter算子生成的IR如下,過濾掉第二列<=10的資料:
|v:vec[{i32,i32}]| let res_fil = for(v,appender,|b,i,n| if(n.$1>10, merge(b,n), b)
Project算子生成的IR如下,傳回第一列資料:
let res_proj = for(res_fil,appender,|b,i,n| merge(b,n.$0))
表面上看上去Filter算子會把中間結果做物化,實際上Weld的Loop-Fusion優化器會消除此次物化,優化後代碼如下:
|v: vec[{i32,i32}]| for(v,appender,|b,i,n| if(n.$1 > 10, merge(b,n.$0), b))
盡管依賴Weld的Loop-Fusion優化可以極大簡化CodeGen的邏輯,但開發中我們發現Loop-Fusion過程非常耗時,對于複雜SQL(嵌套3層以上)甚至無法在有限時間給出結果。當時面臨兩個選擇:修改Weld的實作,或者修改CodeGen直接生成Loop-Fusion之後的代碼,我們選擇了後者。重構後生成的代碼如下,其中1,2,11行由Scan算子生成,3,4,5,6,8,9,10行由Filter算子生成,7行由Project算子生成。
|v: vec[{i32,i32}]|
for(v,appender,|b,i,n|
if(
n.$1 > 10,
merge(
b,
n.$0
),
b
)
)
這個優化使得編譯時間重回亞秒級别。
Fallback機制
受限于Weld目前的表達能力,一些算子無法用Weld實作,例如SortMergeJoin,Rollup等。即使是原版的Java CodeGen,一些算子如Outter Join也不支援CodeGen,是以如何做好Fallback是保證正确性的前提。我們采用的政策很直覺:若目前算子不支援Native CodeGen,則由Java CodeGen接管。這裡涉及的關鍵問題是Fallback的粒度:是算子級别還是Stage級别?
抛去實作難度不談,雖然直覺上算子粒度的Fallback更加合理,但實際上卻會導緻更嚴重的問題:Stage内部Pipeline的斷裂。如上文所述,CodeGen的一個優勢是把整個Stage的邏輯Pipeline化,打破算子之間的界限,單條Record從初始算子執行到結束算子,整個過程不存在物化。而算子粒度的Fallback則會導緻Stage内部一部分走Native Runtime,另一部分走Java Runtime,則兩者連接配接處無可避免存在中間資料物化,這個開銷通常會大于Native Runtime帶來的收益。
基于以上考慮,我們選擇了Stage級别的Fallback,在CodeGen階段一旦遇到不支援的算子,則整個Stage都Fallback到Java CodeGen。統計顯示,整個TPCDS Benchmark,命中Native CodeGen的Stage達到80%。
Spark內建
完成了代碼生成和Fallback機制,最後的問題就是如何跟Spark內建了。Spark的WholeStageCodegenExec的執行可以了解為一個黑盒,無論上遊是Table Scan,Shuffle Read,還是BroadCast,給到黑盒的輸入類型隻有兩種: RowBatch(上遊是Table Scan)或Row Iterator(上遊非Table Scan),而黑盒的輸出固定為Row Iterator,如下圖所示:
上文介紹我們選擇了Stage級别的Fallback,也就決定了黑盒要麼是Java Runtime,要麼是Native Runtime,不存在混合的情況,是以我們隻需要關心如何把Row Batch/Row Iterator轉化為Weld認識的記憶體布局,以及如何把Weld的輸出轉化成Row Iterator即可。為了進一步簡化問題,我們注意到,盡管Shuffle Reader/BroadCast的輸入是Row Iterator,但本質上遠端序列化的資料結構是Row Batch,隻不過Spark反序列化後轉換成Row Iterator後再喂給CodeGen Module,RowBatch包裝成Row Iterator非常簡易。是以Native Runtime的輸入輸出可以統一成RowBatch。
解決辦法呼之欲出了:把RowBatch轉換成Weld vec!但我們更進了一步,何不直接把Row Batch喂給Weld進而省去記憶體轉換呢?本質上Row Batch也是滿足某種規範的位元組流而已,Spark也提供了OffHeap模式把記憶體直接存堆外(僅針對Scan Stage。Shuffle資料和Broadcast資料需要讀到堆外),Weld可以直接通路。Spark UnsafeRow的記憶體布局大緻如下:
針對确定的schema,null bitmap和fixed-length data的結構是固定的,可以映射成struct,而針對var-length data我們的做法是把這些資料copy到連續的記憶體位址中。如此一來,針對無變長資料的RowBatch,我們直接把記憶體塊喂給Weld;針對有變長部分的資料,我們也隻需做大粒度的記憶體拷貝(把定長部分和變長部分分别拷出來),而無需做列級别的細粒度拷貝轉換。
繼續舉前文的Filter+Project的例子,一條Record包含兩個int列,其UnsafeRow的記憶體布局如下(為了對齊,Spark裡定長部分最少使用8位元組)。
顯而易見,這個結構可以很友善映射成Weld struct:
{i64,i64,i64}
而整個Row Batch便映射成Weld vec:
vec[{i64,i64,i64}]
如此便解決了Input的問題。而Weld Output轉RowBatch本質是以上過程的逆向操作,不再贅述。
解決了Java和Native之間的資料轉換問題,剩下的就是如何執行了。首先我們根據目前Stage的Mode來決定走Java Runtime還是Native Runtime。在Native分支,首先會執行StageInit做Stage級别的初始化工作,包括初始化Weld,加載編譯好的Weld Module,拉取Broadcast資料(若有)等;接着是一個循環,每個循環讀取一個RowBatch(來自Scan或Shuffle Reader)喂給Native Runtime執行,Output轉換并喂給Shuffle Writer。如下圖所示:
總結
本文介紹了EMR團隊在Spark Native Codegen方向的探索實踐,限于篇幅若幹技術點和優化沒有展開,後續可另開文詳解,例如:
1.極緻Native算子優化
2.資料轉換詳解
3.Weld Dict優化
大家感興趣的任何内容歡迎溝通: )
[1] Making Sense of Performance in Data Analytics Frameworks. Kay Ousterhout
[2] MonetDB/X100: Hyper-Pipelining Query Execution. Peter Boncz
[3] Vectorwise: a Vectorized Analytical DBMS. Marcin Zukowski
[4] Efficiently Compiling Efficient Query Plans for Modern Hardware. Thomas Neumann
[5] HyPer: A Hybrid OLTP&OLAP Main Memory Database System Based on Virtual Memory Snapshots. Alfons Kemper
[6] Data Blocks: Hybrid OLTP and OLAP on Compressed Storage using both Vectorization and Compilation. Harald Lang
[7] Relaxed Operator Fusion for In-Memory Databases: Making Compilation, Vectorization, and Prefetching Work Together At Last. Prashanth Menon
[8] Vectorization vs. Compilation in Query Execution. Juliusz Sompolski
[9]
https://databricks.com/blog/2016/05/23/apache-spark-as-a-compiler-joining-a-billion-rows-per-second-on-a-laptop.html相關閱讀推薦:
EMR Spark-SQL性能極緻優化揭秘 RuntimeFilter Plus EMR Spark-SQL性能極緻優化揭秘 概覽篇阿裡巴巴開源大資料技術團隊成立Apache Spark中國技術社群,定期推送精彩案例,技術專家直播,問答區近萬人Spark技術同學線上提問答疑,隻為營造純粹的Spark氛圍,歡迎釘釘掃碼加入!
對開源大資料和感興趣的同學可以加小編微信(下圖二維碼,備注“進群”)進入技術交流微信群。