天天看點

極光筆記丨Spark SQL 在極光的建設實踐

​​

極光筆記丨Spark SQL 在極光的建設實踐

極光進階工程師——蔡祖光

前言

Spark在2018開始在極光大資料平台部署使用,曆經多個版本的疊代,逐漸成為離線計算的核心引擎。目前在極光大資料平台每天運作的Spark任務有20000+,執行的Spark SQL平均每天42000條,本文主要介紹極光資料平台在使用Spark SQL的過程中總結的部分實踐經驗,包括以下方面内容:

  • Spark Extension的應用實踐
  • Spark Bucket Table的改造優化
  • 從Hive遷移到Spark SQL的實踐方案

一、Spark Extension應用實踐

Spark Extension作為Spark Catalyst擴充點在SPARK-18127中被引入,Spark使用者可以在SQL處理的各個階段擴充自定義實作,非常強大高效

1.1 血緣關系解析

在極光我們有自建的中繼資料管理平台,相關中繼資料由各資料元件進行資訊收集,其中對Spark SQL的血緣關系解析和收集就是通過自定義的Spark Extension實作的。

Spark Catalyst的SQL處理分成parser,analyzer,optimizer以及planner等多個步驟,其中analyzer,optimizer等步驟内部也分為多個階段,為了擷取最有效的血緣關系資訊,我們選擇最終的planner階段作為切入點,為此我們專門實作了一個planner strategy進行Spark SQL實體執行計劃的解析,并提取出讀寫表等中繼資料資訊并存儲到中繼資料管理平台

極光筆記丨Spark SQL 在極光的建設實踐

1.2 權限校驗

在資料安全方面,極光選擇用Ranger作為權限管理等元件,但在實際使用的過程中我們發現目前社群版本的Ranger主要提供的還是HDFS、HBase、Hive、Yarn的相關接入插件,在Spark方面需要自己去實作相關功能,對于以上問題我們同樣選擇用Spark Extension去幫助我們進行權限方面的二次開發,在實作的過程中我們借助了Ranger Hive-Plugin的實作原理,對Spark SQL通路Hive進行了權限校驗功能的實作。

1.3 參數控制

随着資料平台使用Spark SQL的業務同學越來越多,我們發現每個業務同學對于Spark的熟悉程度都有所不同,對Spark配置參數的了解也有好有壞,為了保障叢集整體運作的穩定性,我們對業務同學送出的Spark任務的進行了攔截處理,提取任務設定的配置參數,對其中配置不合理的參數進行屏蔽,并給出風險提示,有效的引導業務同學進行合理的線上操作。

二、Spark Bucket Table的改造優化

在Spark的實踐過程中,我們也積極關注業内其它公司優秀方案,在2020年我們參考位元組跳動對于Spark Bucket Table的優化思路,在此基礎上我們對極光使用的Spark進行了二次改造,完成如下優化項:

  • Spark Bucket Table和Hive Bucket Table的互相相容
  • Spark支援Bucket Num是整數倍的Bucket Join
  • Spark支援Join字段和Bucket字段是包含關系的Bucket Join

上述三點的優化,豐富了Bucket Join的使用場景,可以讓更多Join、Aggregate操作避免産生Shuffle,有效的提高了Spark SQL的運作效率.在完成相關優化以後,如何更好的進行業務改造推廣,成為了我們關心的問題。

通過對資料平台過往SQL執行記錄的分析,我們發現使用者ID和裝置ID的關聯查詢是十分高頻的一項操作,在此基礎上,我們通過之前SQL血緣關系解析收集到的中繼資料資訊,對每張表進行Join、Aggregate操作的高頻字段進行了分析整理,統計出最為合适的Bucket Cloumn,并在這些中繼資料的支撐下輔助我們進行Bucket Table的推廣改造。

三、Hive遷移Spark

随着公司業務的高速發展,在資料平台上送出的SQL任務持續不斷增長,對任務的執行時間和計算資源的消耗都提出了新的挑戰,出于上述原因,我們提出了Hive任務遷移到Spark SQL的工作目标,由此我們總結出了如下問題需求:

  • 如何更好的定位哪些Hive任務可以遷移,哪些不可以
  • 如何讓業務部門無感覺的從Hive遷移到Spark SQL
  • 如何進行對比分析,确認任務遷移前後的運作效果

3.1 Hive遷移分析程式的實作

在遷移業務job時,我們需要知道這個部門有哪些人,由于Azkaban在執行具體job時會有執行人資訊,是以我們可以根據執行人來推測有哪些job。分析程式使用了中繼資料系統的某些表資料和azkaban相關的一些庫表資訊,用來幫助我們收集遷移的部門下有多少hive job,以及該hive job有多少sql,sql文法通過率是多少,當然在遷移時還需要檢視Azkaban的具體執行耗時等資訊,用于幫助我們在精細化調參的時候大緻判斷消耗的資源是多少。

由于線上直接檢測某條sql是否合乎spark語義需要具有相關的讀寫權限,直接開放權限給分析程式不安全。是以實作的思路是通過使用中繼資料系統存儲的庫表結構資訊,以及azkaban上有采集業務job執行的sql資訊。隻要擁有某條sql所需要的全部庫表資訊,我們就能在本地通過重建庫表結構分析該條sql是否合乎spark語義(當然線上環境和本地是有不同的,比如函數問題,但大多情況下是沒有問題的)。

極光筆記丨Spark SQL 在極光的建設實踐

                                                            圖3-1-1

以下為某資料部通過分析程式得到的SQL通過率

​​​​ 

3.2 SQL執行引擎的無感覺切換

目前業務方使用Hive的主要方式是通過beeline去連接配接hiveserver2,由于livy也提供了thriftserver子產品,是以beeline也可以直接連接配接livy。遷移的政策就是先把合乎Spark文法的SQL發往livy執行,如果執行失敗再切換到Hive進行兜底執行。

beeline可擷取使用者SQL,啟動beeline時通過thrift接口建立livy session,擷取使用者sql發送給livy 執行,期間執行進度等資訊可以查詢livy獲得,同時一個job對應一個session,以及每啟動一次 beeline對應一個session,當job執行完畢或者beeline被關閉時,關閉livy session。(如果spark不能成功執行則走之前hive的邏輯)

 ​​

極光筆記丨Spark SQL 在極光的建設實踐

                                                            圖3-2-1

有了以上切換思路以後,我們開始着手beeline程式的修改設計

beeline重要類圖如圖3-2-2所示, Beeline類是啟動類,擷取使用者指令行輸入并調用Commands類去 執行,Commands負責調用JDBC接口去執行和擷取結果, 單向調用流程如圖3-2-3所示。

極光筆記丨Spark SQL 在極光的建設實踐

                                                        圖3-2-2

極光筆記丨Spark SQL 在極光的建設實踐

                                                           圖3-2-3

由圖3-2-2和圖3-2-3可知,所有的操作都是通過DatabaseConnection這個對象去完成的,持有這個 對象的是DatabaseConnections這個對象,是以多計算引擎切換,通過政策适配

DatabaseConnections對象,這樣就能在不修改其他代碼的情況下切換執行引擎(即擷取不同的 connection)

極光筆記丨Spark SQL 在極光的建設實踐

                                                           圖3-2-4

3.3 任務遷移黑名單

前文有說到,當一個Hive任務用SQL分析程式走通,并且在遷移程式用livy進行Spark任務送出以後,還是會有可能執行失敗,這個時候我們會用Hive進行兜底執行保障任務穩定性。但是失敗的SQL會有多種原因,有的SQL确實用Hive執行穩定性更好,如果每次都先用Spark SQL執行失敗以後再用Hive執行會影響任務效率,基于以上目的,我們對遷移程式開發了黑名單功能,用來保障每個SQL可以找到它真正适合的執行引擎,考慮到beeline是輕量級用戶端,識别的功能應該放在livy-server側來做,開發一個類似HBO的功能來将這樣的異常SQL加入黑名單,節省遷移任務執行時間。

目标: 基于HBE(History-Based Executing)的異常SQL識别

有了上述目标以後我們主要通過如下方式進行了SQL黑名單的識别切換

  • SQL識别限定在相同appName中(縮小識别範圍避免識别錯誤)
  • 得到SQL抽象文法樹的後續周遊内容後生成md5值作為該sql的唯一性辨別
  • 把執行失敗超過N次的SQL資訊寫入黑名單
  • 下次執行時根據指派規則比較兩條SQL的結構樹特征
  • 對于在黑名單中的SQL不進行Spark SQL切換

3.4 遷移成果

今年經過遷移程式的遷移改造,HSQL最大降幅為50%+(後随今年業務增長有所回升)

​​​​

極光筆記丨Spark SQL 在極光的建設實踐

四、Spark3.0的應用

目前極光使用的Spark預設版本已經從2.X版本更新到了3.X版本,Spark3.X的AQE特性也輔助我們更好的使用Spark

實踐配置優化:

#spark3.0.0參數

#動态合并shuffle partitions

spark.sql.adaptive.coalescePartitions.enabled true

spark.sql.adaptive.coalescePartitions.minPartitionNum 1

spark.sql.adaptive.coalescePartitions.initialPartitionNum 500

spark.sql.adaptive.advisoryPartitionSizeInBytes 128MB

#動态優化資料傾斜,通過實際的資料特性考慮,skewedPartitionFactor我們設定成了1

spark.sql.adaptive.skewJoin.enabled true

spark.sql.adaptive.skewJoin.skewedPartitionFactor 1

spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes 512MB

繼續閱讀