Spark是目前主流的大資料計算引擎,功能涵蓋了大資料領域的離線批處理、SQL類處理、流式/實時計算、機器學習、圖計算等各種不同類型的計算操作,應用範圍與前景非常廣泛。作為一種記憶體計算架構,Spark運算速度快,并能夠滿足UDF、大小表Join、多路輸出等多樣化的資料計算和處理需求。
作為國内專業的資料智能服務商,個推從早期的1.3版本便引入Spark,并基于Spark建設數倉,進行大規模資料的離線和實時計算。由于Spark 在2.x版本之前的優化重心在計算引擎方面,而在中繼資料管理方面并未做重大改進和更新。是以個推仍然使用Hive進行中繼資料管理,采用Hive中繼資料管理+ Spark計算引擎的大資料架構,以支撐自身大資料業務發展。個推還将Spark廣泛應用到報表分析、機器學習等場景中,為行業客戶和政府部門提供實時人口洞察、群體畫像建構等服務。

▲個推在實際業務場景中,分别使用SparkSQL 和 HiveSQL對一份3T資料進行了計算,上圖展示了跑數速度。資料顯示:在鎖死隊列(120G記憶體,<50core)前提下, SparkSQL2.3 的計算速度是Hive1.2 的5-10倍。
對企業來講,效率和成本始終是其進行海量資料處理和計算時所必須關注的問題。如何充分發揮Spark的優勢,在進行大資料作業時真正實作降本增效呢?個推将多年積累的Spark性能調優妙招進行了總結,與大家分享。
Spark性能調優-基礎篇
衆所周知,正确的參數配置對提升Spark的使用效率具有極大助力。是以,針對 不了解底層原理的Spark使用者,我們提供了可以直接抄作業的參數配置模闆,幫助相關資料開發、分析人員更高效地使用Spark進行離線批處理和SQL報表分析等作業。
推薦參數配置模闆如下:
Spark-submit 送出方式腳本
/xxx/spark23/xxx/spark-submit --master yarn-cluster \--name ${mainClassName} \--conf spark.serializer=org.apache.spark.serializer.KryoSerializer \--conf spark.yarn.maxAppAttempts=2 \--conf spark.executor.extraJavaOptions=-XX:+UseConcMarkSweepGC \--driver-memory 2g \--conf spark.sql.shuffle.partitions=1000 \--conf hive.metastore.schema.verification=false \--conf spark.sql.catalogImplementation=hive \--conf spark.sql.warehouse.dir=${warehouse} \--conf spark.sql.hive.manageFilesourcePartitions=false \--conf hive.metastore.try.direct.sql=true \--conf spark.executor.memoryOverhead=512M \--conf spark.yarn.executor.memoryOverhead=512 \--executor-cores 2 \--executor-memory 4g \--num-executors 50 \--class 啟動類 \${jarPath} \-M ${mainClassName}
spark-sql 送出方式腳本
option=/xxx/spark23/xxx/spark-sqlexport SPARK_MAJOR_VERSION=2${option} --master yarn-client \--driver-memory 1G \--executor-memory 4G \--executor-cores 2 \--num-executors 50 \--conf "spark.driver.extraJavaOptions=-Dlog4j.configuration=file:log4j.properties" \--conf spark.sql.hive.caseSensitiveInferenceMode=NEVER_INFER \--conf spark.sql.auto.repartition=true \--conf spark.sql.autoBroadcastJoinThreshold=104857600 \--conf "spark.sql.hive.metastore.try.direct.sql=true" \--conf spark.dynamicAllocation.enabled=true \--conf spark.dynamicAllocation.minExecutors=1 \--conf spark.dynamicAllocation.maxExecutors=200 \--conf spark.dynamicAllocation.executorIdleTimeout=10m \--conf spark.port.maxRetries=300 \--conf spark.executor.memoryOverhead=512M \--conf spark.yarn.executor.memoryOverhead=512 \--conf spark.sql.shuffle.partitions=10000 \--conf spark.sql.adaptive.enabled=true \--conf spark.sql.adaptive.shuffle.targetPostShuffleInputSize=134217728 \--conf spark.sql.parquet.compression.codec=gzip \--conf spark.sql.orc.compression.codec=zlib \--conf spark.ui.showConsoleProgress=true-f pro.sqlpro.sql 為業務邏輯腳本
Spark性能調優-進階篇
針對有意願了解Spark底層原理的讀者,本文梳理了standalone、Yarn-client、Yarn-cluster等3種常見任務送出方式的互動圖,以幫助相關使用者更直覺地了解Spark的核心技術原理、為閱讀接下來的進階篇内容打好基礎。
standalone
1) spark-submit 送出,通過反射的方式構造出1個DriverActor 程序;
2) Driver程序執行編寫的application,構造sparkConf,構造sparkContext;
3) SparkContext在初始化時,構造DAGScheduler、TaskScheduler,jetty啟動webui;
4) TaskScheduler 有sparkdeployschedulebackend 程序,去和Master通信,請求注冊Application;
5) Master 接受通信後,注冊Application,使用資源排程算法,通知Worker,讓worker啟動Executor;
6) worker會為該application 啟動executor,executor 啟動後,會反向注冊到TaskScheduler;
7) 所有Executor 反向注冊到TaskScheduler 後,Driver 結束sparkContext 的初始化;
8) Driver繼續往下執行編寫的application,每執行到1個action,就會建立1個job;
9) job 會被送出給DAGScheduler,DAGScheduler 會對job 劃分為多個stage(stage劃分算法),每個stage建立1個taskSet;
10) taskScheduler會把taskSet裡每1個task 都送出到executor 上執行(task 配置設定算法);
11) Executor 每接受到1個task,都會用taskRunner來封裝task,之後從executor 的線程池中取出1個線程,來執行這個taskRunner。(task runner:把編寫的代碼/算子/函數拷貝,反序列化,然後執行task)。
Yarn-client
1) 發送請求到ResourceManager(RM),請求啟動ApplicationMaster(AM);
2) RM 配置設定container 在某個NodeManager(NM)上,啟動AM,實際是個ExecutorLauncher;
3) AM向RM 申請container;
4) RM給AM 配置設定container;
5) AM 請求NM 來啟動相應的Executor;
6) executor 啟動後,反向注冊到Driver程序;
7) 後序劃分stage,送出taskset 和standalone 模式類似。
Yarn-cluster
2) RM 配置設定container 在某個NodeManager(NM)上,啟動AM;
6) executor 啟動後,反向注冊到AM;
了解了以上3種常見任務的底層互動後,接下來本文從存儲格式、資料傾斜、參數配置等3個方面來展開,為大家分享個推進行Spark性能調優的進階姿勢。
存儲格式(檔案格式、壓縮算法)
衆所周知,不同的SQL引擎在不同的存儲格式上,其優化方式也不同,比如Hive更傾向于orc,Spark則更傾向于parquet。同時,在進行大資料作業時,點查、寬表查詢、大表join操作相對頻繁,這就要求檔案格式最好采用列式存儲,并且可分割。是以我們推薦以parquet、orc 為主的列式存儲檔案格式和以gzip、snappy、zlib 為主的壓縮算法。在組合方式上,我們建議使用parquet+gzip、orc+zlib的組合方式,這樣的組合方式兼顧了列式存儲與可分割的情況,相比txt+gz 這種行式存儲且不可分割的組合方式更能夠适應以上大資料場景的需求。
個推以線上500G左右的資料為例,在不同的叢集環境與SQL引擎下,對不同的存儲檔案格式和算法組合進行了性能測試。測試資料表明:相同資源條件下,parquet+gz 存儲格式較text+gz存儲格式在多值查詢、多表join上提速至少在60%以上。
結合測試結果,我們對不同的叢集環境與SQL引擎下所推薦使用的存儲格式進行了梳理,如下表:
同時,我們也對parquet+gz、orc+zlib的記憶體消耗進行了測試。以某表的單個曆史分區資料為例,parquet+gz、orc+zlib比txt+gz 分别節省26%和49%的存儲空間。
完整測試結果如下表:
可見,parquet+gz、orc+zlib确實在降本提效方面效果顯著。那麼,如何使用這兩種存儲格式呢?步驟如下:
➤hive 與 spark 開啟指定檔案格式的壓縮算法
spark:
set spark.sql.parquet.compression.codec=gzip;set spark.sql.orc.compression.codec=zlib;
hive:
set hive.exec.compress.output=true;set mapreduce.output.fileoutputformat.compress=true;set mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.GzipCodec;
➤建表時指定檔案格式
parquet 檔案格式(序列化,輸入輸出類)
CREATE EXTERNAL TABLE `test`(rand_num double)PARTITIONED BY (`day` int)ROW FORMAT SERDE'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'STORED AS INPUTFORMAT'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'OUTPUTFORMAT'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat';
orc 檔案格式(序列化,輸入輸出類)
ROW FORMAT SERDE'org.apache.hadoop.hive.ql.io.orc.OrcSerde'STORED AS INPUTFORMAT'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat'OUTPUTFORMAT'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat';
➤線上表調
ALTER TABLE db1.table1_std SET TBLPROPERTIES ('parquet.compression'='gzip');ALTER TABLE db2.table2_std SET TBLPROPERTIES ('orc.compression'='ZLIB');
➤ctas 建表
create table tablename stored as parquet as select ……;create table tablename stored as orc TBLPROPERTIES ('orc.compress'='ZLIB') as select ……;
資料傾斜
資料傾斜分為map傾斜和reduce傾斜兩種情況。本文着重介紹reduce 傾斜,如SQL 中常見的group by、join 等都可能是其重災區。資料傾斜發生時,一般表現為:部分task 顯著慢于同批task,task 資料量顯著大于其他task,部分taskOOM、spark shuffle 檔案丢失等。
如下圖示例,在duration 列和shuffleReadSize/Records列,我們能明顯發現部分task 處理資料量顯著升高,耗時變長,造成了資料傾斜:
如何解決資料傾斜?
我們總結了7種資料傾斜解決方案,能夠幫助大家解決常見的資料傾斜問題:
解決方案一:使用 Hive ETL預處理資料
即在資料血緣關系中,把傾斜問題前移處理,進而使下遊使用方無需再考慮資料傾斜問題。
⁕該方案适用于下遊互動性強的業務,如秒級/分鐘級别提數查詢。
解決方案二:過濾少數導緻傾斜的key
即剔除傾斜的大key,該方案一般結合百分位點使用,如99.99%的id 記錄數為100條以内,那麼100條以外的id 就可考慮予以剔除。
⁕該方案在統計型場景下較為實用,而在明細場景下,需要看過濾的大key 是否為業務所側重和關注。
解決方案三:提高shuffle操作的并行度
即對spark.sql.shuffle.partitions參數進行動态調整,通過增加shuffle write task寫出的partition數量,來達到key的均勻配置設定。SparkSQL2.3 在預設情況下,該值為200。開發人員可以在啟動腳本增加如下參數,對該值進行動态調整:
conf spark.sql.shuffle.partitions=10000conf spark.sql.adaptive.enabled=true conf spark.sql.adaptive.shuffle.targetPostShuffleInputSize=134217728
⁕該方案非常簡單,但是對于key的均勻配置設定卻能起到較好的優化作用。比如,原本10個key,每個50條記錄,隻有1個partition,那麼後續的task需要處理500條記錄。通過增加partition 數量,可以使每個task 都處理50條記錄,10個task 并行跑數,耗時隻需要原來1個task 的1/10。但是該方案對于大key較難優化,比如,某個大key記錄數有百萬條,那麼大key 還是會被配置設定到1個task 中去。
解決方案四:将reducejoin轉為mapjoin
指的是在map端join,不走shuffle過程。以Spark為例,可以通過廣播變量的形式,将小RDD的資料下發到各個Worker節點上(Yarn 模式下是NM),在各個Worker節點上進行join。
⁕該方案适用于小表join大表場景(百G以上的資料體量)。此處的小表預設門檻值為10M,低于此門檻值的小表,可分發到worker節點。具體可調整的上限需要小于container配置設定的記憶體。
解決方案五:采樣傾斜key并分拆join操作
如下圖示例:A表 join B表,A表有大key、B表無大key,其中大key的id為1,有3條記錄。
如何進行分拆join操作呢?
首先将A表、B表中id1單獨拆分出來,剔除大key的A' 和 B' 先join,達到非傾斜的速度;
針對A表大key添加随機字首,B表擴容N倍,單獨join;join後剔除随機字首即可;
再對以上2部分union。
⁕該方案的本質還是減少單個task 處理過多資料時所引發的資料傾斜風險,适用于大key較少的情況。
解決方案六:使用随機字首和擴容RDD進行join
比如,A 表 join B表,以A表有大key、B表無大key為例:
對A表每條記錄打上[1,n] 的随機字首,B表擴容N倍,join。
join完成後剔除随機字首。
⁕該方案适用于大key較多的情況,但也會增加資源消耗。
解決方案七:combiner
即在map端做combiner操作,減少shuffle 拉取的資料量。
⁕該方案适合累加求和等場景。
在實際場景中,建議相關開發人員具體情況具體分析,針對複雜問題也可将以上方法進行組合使用。
Spark 參數配置
針對無資料傾斜的情況,我們梳理總結了參數配置參照表幫助大家進行Spark性能調優,這些參數的設定适用于2T左右資料的洞察與應用,基本滿足大多數場景下的調優需求。
總結
目前,Spark已經發展到了Spark3.x,最新版本為Spark 3.1.2 released (Jun 01, 2021)。Spark3.x的許多新特性,如動态分區修剪、Pandas API的重大改進、增強嵌套列的裁剪和下推等亮點功能,為進一步實作降本增效提供了好思路。未來,個推也将繼續保持對Spark演進的關注,并持續展開實踐和分享。