直播回放: https://developer.aliyun.com/live/2894
以下是直播内容精華整理。
Spark3.0解決了超過3400個JIRAs,曆時一年多,是整個社群集體智慧的成果。Spark SQL和Spark Cores是其中的核心子產品,其餘子產品如PySpark等子產品均是建立在兩者之上。Spark3.0新增了太多的功能,無法一一列舉,下圖是其中24個相對來說比較重要的新功能,下文将會圍繞這些進行簡單介紹。

一、Performance
與性能相關的新功能主要有:
- Adaptive Query Execution
- Dynamic Partition Pruning
- Query Complication Speedup
- Join Hints
(一)Adaptive Query Execution
Adaptive Query Execution(AQE)在之前的版本裡已經有所實作,但是之前的架構存在一些缺陷,導緻使用不是很多,在Spark3.0中Databricks(Spark初創團隊建立的大資料與AI智能公司)和Intel的工程師合作,解決了相關的問題。
在Spark1.0中所有的Catalyst Optimizer都是基于規則 (rule) 優化的。為了産生比較好的查詢規則,優化器需要了解資料的特性,于是在Spark2.0中引入了基于代價的優化器 (cost-based optimizer),也就是所謂的CBO。然而,CBO也無法解決很多問題,比如:
- 資料統計資訊普遍缺失,統計資訊的收集代價較高;
- 儲存計算分離的架構使得收集到的統計資訊可能不再準确;
- Spark部署在某一單一的硬體架構上,cost很難被估計;
- Spark的UDF(User-defined Function)簡單易用,種類繁多,但是對于CBO來說是個黑盒子,無法估計其cost。
總而言之,由于種種限制,Spark的優化器無法産生最好的Plan。也正是因為上訴原因,運作期的自适應調整就變得相當重要,對于Spark更是如此,于是有了AQE,其基本方法也非常簡單易懂。如下圖所示,在執行完部分的查詢規劃後,Spark可以收集到結果的統計資訊,然後利用這些資訊再對查詢規劃重新進行優化。這個優化的過程不是一次性的,而是自适應的,也就是說随着查詢規劃的執行會不斷的進行優化, 而且盡可能地複用了現有優化器的已有優化規則。讓整個查詢優化變得更加靈活和自适應。
Spark3.0中AQE包括三個主要的運作期自适應功能:
- 可以基于運作期的統計資訊,将Sort Merge Join 轉換為Broadcast Hash Join;
- 可以基于資料運作中間結果的統計資訊,減少reducer數量,避免資料在shuffle期間的過量分區導緻性能損失;
- 可以處理資料分布不均導緻的skew join。
更多的資訊大家可以通過搜尋引擎查詢了解。
如果你是一個Spark的資深使用者,可能你讀了很多的調優寶典,其中第一條就是讓你的Join變得更快的方法就是盡可能地使用Broadcast Hash Join。比如你可以增加spark.sql.autoBroadcastJoinThreshold 門檻值,或者使用 broadcast HINT。但是這基本上屬于藝高人膽大。首先,這種方法很難調,一不小心就會Out of Memory,甚至性能變得更差,即使現在産生了一定效果,但是随着負載的變化可能調優會完全失敗。
也許你會想:Spark為什麼不解決這個問題呢?這裡有很多挑戰,比如:
- 統計資訊的缺失,統計資訊的不準确,那麼就是預設依據檔案大小來預估表的大小,但是檔案往往是壓縮的,尤其是列存儲格式,比如parquet 和 ORC,而Spark是基于行處理,如果資料連續重複,file size可能和真實的行存儲的真實大小,差别非常之大。這也是為何提高autoBroadcastJoinThreshold,即使不是太大也可能會導緻out of memory;
- Filter複雜、UDFs的使用都會使Spark無法準确估計Join輸入資料量的大小。當你的query plan異常大和複雜的時候,這點尤其明顯。
其中,Spark3.0中基于運作期的統計資訊,将Sort Merge Join 轉換為Broadcast Hash Join的過程如下圖所示。
也許你還會看到調優寶典告訴你調整shuffle産生的partitions的數量。而目前預設數量是200,但是這個200為什麼就不得而知了。然而,這個值設定為多少都不是最優的。其實在不同shuffle,資料的輸入大小和分布絕大多數都是不一樣。那麼簡單地用一個配置,讓所有的shuffle來遵循,顯然是不好的。要設得太小,每個partition的大小就會太大,那麼GC的壓力就會很大,aggregation和sort會更有可能的去spill資料到磁盤。但是,要是設太大,partition的大小就會太小,partition的數量會大。這個會導緻不必要的IO,也讓task排程器的壓力劇增。那麼排程器會導緻所有task都變慢。這一系列問題在query plan複雜的時候變得尤為突出,還可能會影響到其他性能,最後耗時耗力卻調優失敗。
對于這個問題的解決,AQE就有優勢了。如下圖所示,AQE可以在運作期動态的調整partition來達到性能最優。
此外,資料分布不均是Spark調優的一個疑難雜症,它的表現有多種,比如若幹task停滞不前,像是出現了bugs,又比如大量的disk spilling會導緻很多節點都無事可做。此外,你也許會看到out of memory這種異常。其解決方法也很多,比如找到skew values然後重寫query,或者在join的情況下增加skew keys來消除資料分布不均,但是無論哪種方法,都非常浪費時間,且後期難以維護。AQE解決問題的方式如下,其通過shuffle落地後的中間資料結果判斷哪些partition是skew的,如果partition過大,就将其分成若幹較小的partition,通過分而治之,總體性能大幅提升。
AQE的釋出可以說是一個時代的開始,未來将會更進一步發展,引入更多自适應規則,讓Spark可以随着資料分布和特性的變化自動改變Query plan,讓更多的query編譯靜态優化變成運作時的動态優化。
(二)Dynamic Partition Pruning
Dynamic Partition Pruning也是一個運作時的動态優化方法,簡單來說就是我們可以通過Query的某些分支的中間結果來避免不必要的partition讀取,這種方法是無法通過編譯期推測出來的,隻能在運作時根據結果來判斷,這種方法對資料倉庫的star-schema效果非常明顯,在TPC-DS獲得了非常明顯的加速,可以加速2-18倍。
(三)Join Hints
Join Hints是一個非常普遍的資料庫的優化政策,在3.0之前已經有了Broadcast hash join,3.0之後的版本加了Sort-merge join、Shuffle hash join和 Shuffle nested loop join,但是要注意謹慎使用,因為資料的特性不同,很難保證一直有效,即使有效,也不代表一直有效,随着時間的變化,你的資料變了,可能會讓你的query 變慢,變得不穩定。總體來說上面的四種Join的适用條件和特點如下所示,總而言之,使用Join Hints要謹慎。
二、Richer APIs
Spark3.0簡化了開發,不但增加了更多的新功能,也改善了衆多現有的功能,讓更多的用法成為可能,主要有:
- Accelerator-aware Scheduler
- Built-in Functions
- pandas UDF enhancements
- DELETE/UPDATE/MERGE in Catalyst
(一)pandas UDF enhancements
pandas UDF應該說是PySPark使用者中最喜愛的特性之一,對于其功能和性能的提升應該都是喜聞樂見的,其發展曆程如下圖所示。
最新的pandas UDF和之前的不同之處在于引入了Python Type Hints,現在使用者可以使用pandas中的資料類型比如pandas.Series等來表示pandas UDF的種類,不再需要記住原來的UDF類型,隻需要指定正确的輸入和輸出類型即可。此外,pandas UDF可以分為pandas UDF和pandas API。
(二)Accelerator-aware Scheduler
Accelerator-aware Scheduler是加速器的排程支援,狹義上也就是指GPU排程支援。加速器經常用來對特定負載做加速,目前,使用者還是需要指定什麼應用需要加速器資源,但是在将來我們會支援job或者stage級别的排程。Spark3.0中我們已經支援大多排程器,此外,我們還可以通過Web UI來監控GPU的使用,歡迎大家使用,更多詳細資料大家可以到社群學習。
(三)Built-in Functions
為了讓Spark3.0更友善實用,Spark社群按照其他的主流,比如資料庫廠商等,内嵌了如上圖所示的32個常用函數,這樣使用者就無須自己寫UDF,并且速度更快。比如針對map類型,Spark3.0新增加了map_keys和map_values,更加地友善易用。其他新增加的更多内嵌函數大家可以到社群具體了解。
三、Monitoring and Debuggability
Spark3.0也增加了一些對監控和調優的改進,主要有:
- Structured Streaming UI
- DDL/DML Enhancements
- Observable Metrics
- Event Log Rollover
(一)Structured Streaming UI
Structured Streaming是在Spark2.0中釋出的,在Spark3.0中加入了UI的配置。新的UI主要包括了兩種統計資訊:已完成的Streaming查詢聚合資訊和未完成的Streaming查詢的目前資訊,包括Input Rate、Process Rate、Batch Duration和Operate Duration。
(二)DDL/DML Enhancements
我們還增加了各種DDL/DML指令,比如EXPLAIN和。
EXPLAIN是性能調優的必備工具,讀取EXPLAIN是每個使用者的基本功,但是随着系統的運作,EXPLAIN的資訊越來越多,而且資訊多元、多樣,在新的版本中我們引入了新的FORMATTED模式,如下所示,在開頭處有一個非常精簡的樹狀圖,且之後的每個部分都有很詳細的解釋,更容易加更多的注意,這就從水準擴充變成了垂直擴充,更加的直覺。
(三)Observable Metrics
我們還引入了Observable Metrics用以觀測資料的品質。要知道資料品質對于很多Spark應用都是相當重要的,通常定義資料品質的Metrics還是非常容易的,比如用一些聚合參數,但是算出這個Metrics的值就非常麻煩,尤其對于流計算來說。
四、SQL Compatibility
SQL相容性也是Spark必不可提的話題,良好的相容性更友善使用者遷移到Spark平台,在Spark3.0中新增的主要功能有:
- ANSI Store Assignment
- Overflow Checking
- Reserved Keywords in Parser
- Proleptic Gregorian Calendar
也就是說,這個版本中我們讓insert遵守了ANSI Store Assignment,并且增加了運作時的overflow的檢查,還提供了一個模式讓SQL Parser來準确地遵守ANSI标準的保留字,還切換了Calendar,這樣更加符合ANSI的SQL标準。比如說我們想要插入兩列資料,類型是int和string,如果将int插入到了string中,還是可以的,不會發生資料精度的損失和資料丢失;但是如果我們嘗試将string類型插入到int類型中,就有可能發生資料損失甚至丢失。ANSI Store Assignment+Overflow Checking在輸入不合法的時候就會在運作時抛出異常,需要注意的是這個設定預設是關閉的,可以根據個人需要打開。
五、Built-in Data Sources
在這個版本中我們提升了預裝的資料源,比如Parquet table,我們可以對Nested Column做Column Pruning和Filter Pushdown,此外還支援了對CSV的Filter Pushdown,還引入了Binary Data Source來處理類似于二進制的圖檔檔案。
六、Extensibility and Ecosystem
Spark3.0繼續加強了對生态圈的建設:
- 對Data Source V2 API的持續改善和catalog支援;
- 支援Java 11;
- 支援Hadoop 3;
- 支援Hive 3。
(一)Data Source V2 API+Catalog Support
Spark3.0加上了對Catalog的支援來擴充Data Source API。Catalog plugin API可以讓使用者注冊自己的catalog來實作對中繼資料的處理,這樣可以讓Spark使用者更簡單友善的使用資料源的表。對于沒有實作Catalog plugin的資料源,使用者需要先注冊每個外部資料源的表才能通路,但是實作了Catalog plugin API之後我們隻需要注冊Catalog,然後就可以直接遠端通路和操作catalog的表。對于資料源的開發者來說,什麼時候支Data Source V2 API呢?下面是幾點建議:
不過這裡需要注意,Data Source V2還不是很穩定,開發者可能在未來還需要調整相關API的實作。
大資料的發展相當迅速,Spark3.0為了能更友善的部署,我們更新了對各個元件和環境版本的支援,但是要注意以下事項。
關于生态圈,這裡要提一下Koalas,它是一個純的Python庫,用Spark實作了絕大部分的pandas API,讓pandas使用者除了可以處理小資料,也可以處理大資料。Koalas對于pandas使用者來說可以将pandas的代碼擴充到大資料處理,使得學習PySpark變得更簡單;對于現有的PySpark使用者來說,多了更多的選擇,可以用pandas API來解決生産力問題。過去一年多,Koalas的下載下傳量是驚人的,在pip的下載下傳量單日已經超過了37000,而且還在不斷增長,5月的下載下傳量也達到了85萬。Koalas的代碼其實不多,主要是API的實作,執行還是由Spark來做,是以Spark性能的提升對于Koalas使用者來說是直接受益的。Koalas的釋出周期想當頻密,目前已經有33個釋出,歡迎大家下載下傳使用。
如何讀和了解Spark UI對大多數新使用者來說是一個很大的挑戰,尤其對SQL使用者來說,在Spark3.0中我們增加了自己的UI文檔
https://spark.apache.org/docs/latest/web-ui.html并且增加了SQL Reference ,
https://spark.apache.org/docs/latest/sql-ref.html等,更詳細的文檔使得使用者上手Spark的時候更加容易,歡迎大家去試一試Spark3.0,感受Spark的強大。
關鍵詞:Spark3.0、SQL、PySpark、Koalas、pandas、UDF、AQE
阿裡巴巴開源大資料技術團隊成立Apache Spark中國技術社群,定期推送精彩案例,技術專家直播,問答區近萬人Spark技術同學線上提問答疑,隻為營造純粹的Spark氛圍,歡迎釘釘掃碼加入!