天天看點

資料治理實踐 | 網易某業務線的計算資源治理

作者:一個資料人的自留地

本文從計算資源治理實踐出發,帶大家清楚認識計算資源治理到底該如何進行,并如何應用到其他項目中。

01前言

由于資料治理層面可以分多個層面且内容繁多(包括模型合規、資料品質、資料安全、計算/存儲資源、資料價值等治理内容),是以需要單獨拆分為6個子產品單獨去闡述其中内容。

筆者作為數倉開發經常會收到大量叢集資源滿載、任務産出延時等消息/郵件,甚至下遊數分及其他同學也會詢問任務運作慢的情況,在這裡很多數倉同學遇到這類問題第一想到的都是加資源解決,但事實真不一定是缺少資源,而是需要優化目前問題任務。是以本期從團隊做計算資源治理視角出發,帶大家清楚認識計算資源治理到底該如何進行。

02問題出現

在做計算治理之前(2022.12)我們團隊盤點了下目前計算資源存在的幾個問題:

(1)30+高消耗任務:由于數倉前中期業務擴張,要覆寫大量場景應用,存在大量問題代碼運作時資料傾斜,在消耗大量叢集計算資源下,産出時間也久;

(2)200w+的小檔案:目前任務存在未合并小檔案、任務Reduce數量過多、上遊資料源接入(尤其是API資料接入)會造成過多小檔案出現,小檔案過多會開啟更多資料讀取,執行會浪費大量的資源,嚴重影響性能;

(3)任務排程安排不合理:多數任務集中在淩晨2-5點執行且該區間CPU滿載,導緻該時間段資源消耗成了重災區,所有核心/非核心任務都在争搶資源,部分核心任務不能按時産出一直在等待階段;

(4)線上無效DQC(資料品質監控)&監控配置資源過小:存在部分曆史任務沒下線表及DQC場景,每日都在空跑無意義DQC浪費資源,同時DQC資源過少導緻DQC需要運作過長時間;

(5)重複開發任務/無用任務:早期協助下遊做了較多煙囪資料模型,因為種種原因,部分任務不再被使用,煙囪模型分散加工導緻資源複用率降低;

(6)任務缺少調優參數&部分任務仍然使用MapReduce/Spark2計算引擎:任務缺少調優參數導緻資源不能适配及動态調整,甚至線上仍有早期配置MapReduce/Spark2計算引擎導緻運作效率較低。

03思考與行動

3.1 治理前的思考:

在治理之前我想到一個問題,切入點該從哪裡開始最合适?

經過與團隊多次腦暴對目前治理優先級/改動成本大小/難度做了一個排序,我們先選擇從簡單的參數調優&任務引擎切換開始->小檔案治理->DQC治理->高消耗任務治理->排程安排->下線無用模型及沉澱名額到其他資料資産,同時在初期我們完成各類中繼資料接入搭建治理看闆以及團隊治理産出統計資料模型,并通過網易數帆提供的資料治理平台解決具體細節問題。

資料治理實踐 | 網易某業務線的計算資源治理

(資料治理平台截圖)

3.2 治理行動:

(1)大部分任務切換至Spark3計算引擎&補充任務調優參數

補充Spark調優參數(參數内容詳見文末),任務統一使用Spark3引擎加速,并充分利用Spark3的AQE特性及Z-Order排序算法特性。

AQE解釋:Spark 社群在 DAG Scheduler 中,新增了一個 API 在支援送出單個 Map 階段,以及在運作時修改 shuffle 分區數等等,而這些就是 AQE,在 Spark 運作時,每當一個 Shuffle、Map 階段進行完畢,AQE 就會統計這個階段的資訊,并且基于規則進行動态調整并修正還未執行的任務邏輯計算與實體計劃(在條件運作的情況下),使得 Spark 程式在接下來的運作過程中得到優化。

Z-Order解釋:Z-Order 是一種可以将多元資料壓縮到一維的技術,在時空索引以及圖像方面使用較廣,比如我們常用order by a,b,c 會面臨索引覆寫的問題,Z-Order by a,b,c 效果對每個字段是對等的

(2)小檔案治理

在這裡我們使用内部資料治理平台-資料治理360對存在小檔案較多表提供内容展示(本質采集HDFS對應路徑下檔案數的日志去顯示)

目前小檔案處理:

對于分區較多使用Spark3進行動态分區重新整理,(Spark3具備小檔案自動合并功能,如未使用Spark3可配置Spark3/Hive小檔案合并參數重新整理,參數詳見文末),代碼如下:

set hive.exec.dynamic.partition.mode=nonstrict;              insert overwrite table xxx.xxx partition (ds)              select column              ,ds              from xxx.xxx           

對于分區較少或未分區的表采用重建表,補資料方法回刷。

小檔案預防:

  • 使用Spark3引擎,自動合并小檔案
  • 減少Reduce的數量(可以使用參數進行控制)
  • 用Distribute By Rand控制分區中資料量
  • 添加合并小檔案參數
  • 将資料源抽取後的表做一個任務(本質也是回刷分區合并小檔案任務)去處理小檔案保障從資料源開始小檔案不向下遊流去

(3)DQC治理

無效DQC下線:難點在于需要查找所有DQC對應的線上任務,檢視該DQC任務是否與線上任務一一比對,進而找到無效DQC任務下線,内容繁雜耗時較多。

DQC資源:由于之前DQC配置資源為叢集預設參數,效率極低導緻所有DQC運作時長均超過10min,進而使得整體任務鍊路運作時長過久,調整Driver記憶體為2048M,Executor個數為2,Executor記憶體為4096M

(4)高消耗任務調優

這裡存在2個難點:優化效果不可控、高消耗任務調整到何種程度算合适,針對這個這個難點我們取所有核心資料資産任務均值,保障單個任務消耗小于平均消耗,同時我們針對目前高消耗任務列舉出如下可優化的方式:

  • 關聯表過多,需拆分
  • 關聯時一對多,資料膨脹
  • 資源配置過多,運作時資源嚴重浪費,需要将配置調小(包括Driver記憶體、Executor個數、Executor記憶體)
  • 代碼結尾添加Distribute By Rand(),用來控制Map輸出結果的分發
  • 查詢中列和行未裁剪、分區未限定、Where條件未限定
  • SQL中Distinct切換為Group by(Distinct會被hive翻譯成一個全局唯一Reduce任務來做去重操作,Group by則會被hive翻譯成分組聚合運算,會有多個Reduce任務并行處理,每個Reduce對收到的一部分資料組,進行每組聚合(去重))
  • 關聯後計算切換為子查詢計算好後再關聯
  • 使用Map Join(Map Join會把小表全部讀入記憶體中,在Map階段直接拿另外一個表的資料和記憶體中表資料做比對,由于在Map是進行了Join操作,省去了Reduce運作的效率也會高很多)可用參數代替

(5)任務排程合理優化

對于排程優化一開始會無從下手,統計淩晨2-5點區間下大概600+任務難梳理,同時存在任務依賴,修改起來可能會對下遊整體有大的影響,是以我們選擇循序漸進先梳理再改善。

  • 找到所有表的輸出輸入點即啟始ODS與末尾ADS
  • 劃分其中核心表/非核心表,及對應任務開始時間與結束時間
  • 按照梳理内容把非核心的任務穿插在目前叢集資源非高峰時期(2點前與5點後),同時把核心任務排程提前,保障CDM層任務及時産出
  • 對實踐後内容再度調優,達到資源最大使用率

(6)煙囪任務下沉&無用任務下線

煙囪表過多,需下沉名額到DWS中提升複用性,對于無用任務也需要及時下線(這裡需要拿到中繼資料血緣最好到報表層級的資料血緣,防止任務下線後導緻可視化内容問題産生),減少開發資源消耗。

04治理效果

(1)Hive與Spark2任務更新Spark3.1,總計更新任務137個,更新任務後總體任務執行效率提升43%,cpu資源消耗降低41%,記憶體資源消耗降低46%

(2)治理小檔案數大于10000+以上的數倉表總計30+張,小檔案總數由216w下降至67w

(3)下線無效DQC任務總計50+,修改DQC配置資源降低運作時長,由原來10min優化至3min内

(4)完成線上20+個任務優化及10+個任務下線及10+表名額下沉,優化後節省任務耗時146分鐘,減少CPU損耗800w+,降低記憶體消耗2600w+(相當于節省了8個200+字段1億資料量任務消耗)

(5)排程重新配置設定後2-5點資源使用率由90+%降低至50+%,保障日用資源趨勢圖無大突刺波動

05小結

計算資源治理核心在于降本增效,用有限資源去運作更多任務,通過一系列治理操作也讓數倉同學積累技術經驗同時規範化自身開發标準,讓治理反推進組内技術進步。

計算資源治理是一件長久之事,并不能因為資源緊張才去治理,而要将計算治理常态化,可通過周/月資源掃描内容及時推送給每個同學,并為之打分,讓每個任務都有源可循,有方法可優化。

參數内容

參數并不是設定越多任務性能越好,根據資料量、消耗、運作時間進行調整達到合理效果。

Hive:

(1)set hive.auto.convert.join = true; (是否自動轉化成Map Join)

(2)set hive.map.aggr=true; (用于控制負載均衡,頂層的聚合操作放在Map階段執行,進而減輕清洗階段資料傳輸和Reduce階段的執行時間,提升總體性能,該設定會消耗更多的記憶體)

(3)set hive.groupby.skewindata=true; (用于控制負載均衡,當資料出現傾斜時,如果該變量設定為true,那麼Hive會自動進行負載均衡)

(4)set hive.merge.mapfiles=true; (用于hive引擎合并小檔案使用)

(5)set mapreduce.map.memory.mb=4096; (設定Map記憶體大小,解決Memory占用過大/小)

(6)set mapreduce.reduce.memory.mb=4096;(設定Reduce記憶體大小,解決Memory占用過大/小)

(7)set hive.exec.dynamic.partition.mode=nonstrict;(動态分區開啟)

Spark:

(1)set spark.sql.legacy.parquet.datetimeRebaseModeInRead=LEGACY;(用于spark3中字段類型不比對(例如datetime無法轉換成date),消除sql中時間歧義,将Spark .sql. LEGACY . timeparserpolicy設定為LEGACY來恢複Spark 3.0之前的狀态來轉化)

(2)set spark.sql.adaptive.enabled=true;(是否開啟調整Partition功能,如果開啟,spark.sql.shuffle.partitions設定的Partition可能會被合并到一個Reducer裡運作。平台預設開啟,同時強烈建議開啟。理由:更好利用單個Executor的性能,還能緩解小檔案問題)

(3)set spark.sql.hive.convertInsertingPartitionedTable=false;(解決資料無法同步Impala問題,使用Spark3引擎必填)

(4)set spark.sql.finalStage.adaptive.advisoryPartitionSizeInBytes=2048M;(Spark小檔案合并)

作者簡介

語興,網易資料開發工程師。

繼續閱讀