天天看點

Spark 資料傾斜及其解決方案

本文從資料傾斜的危害、現象、原因等方面,由淺入深闡述Spark資料傾斜及其解決方案。

  • 本文首發于 vivo網際網路技術 微信公衆号 https://mp.weixin.qq.com/s/lqMu6lfk-Ny1ZHYruEeBdA
  • 作者簡介:鄭志彬,畢業于華南理工大學計算機科學與技術(雙語班)。先後從事過電子商務、開放平台、移動浏覽器、推薦廣告和大資料、人工智能等相關開發和架構。目前在vivo智能平台中心從事 AI中台建設以及廣告推薦業務。擅長各種業務形态的業務架構、平台化以及各種業務解決方案。

一、什麼是資料傾斜

對 Spark/Hadoop 這樣的分布式大資料系統來講,資料量大并不可怕,可怕的是資料傾斜。

對于分布式系統而言,理想情況下,随着系統規模(節點數量)的增加,應用整體耗時線性下降。如果一台機器處理一批大量資料需要120分鐘,當機器數量增加到3台時,理想的耗時為120 / 3 = 40分鐘。但是,想做到分布式情況下每台機器執行時間是單機時的1 / N,就必須保證每台機器的任務量相等。不幸的是,很多時候,任務的配置設定是不均勻的,甚至不均勻到大部分任務被配置設定到個别機器上,其它大部分機器所配置設定的任務量隻占總得的小部分。比如一台機器負責處理 80% 的任務,另外兩台機器各處理 10% 的任務。

『不患多而患不均』,這是分布式環境下最大的問題。意味着計算能力不是線性擴充的,而是存在短闆效應: 一個 Stage 所耗費的時間,是由最慢的那個 Task 決定。

由于同一個 Stage 内的所有 task 執行相同的計算,在排除不同計算節點計算能力差異的前提下,不同 task 之間耗時的差異主要由該 task 所處理的資料量決定。是以,要想發揮分布式系統并行計算的優勢,就必須解決資料傾斜問題。

二、資料傾斜的危害

當出現資料傾斜時,小量任務耗時遠高于其它任務,進而使得整體耗時過大,未能充分發揮分布式系統的并行計算優勢。  

另外,當發生資料傾斜時,部分任務處理的資料量過大,可能造成記憶體不足使得任務失敗,并進而引進整個應用失敗。  

三、資料傾斜的現象

當發現如下現象時,十有八九是發生資料傾斜了:

  • 絕大多數 task 執行得都非常快,但個别 task 執行極慢,整體任務卡在某個階段不能結束。
  • 原本能夠正常執行的 Spark 作業,某天突然報出 OOM(記憶體溢出)異常,觀察異常棧,是我們寫的業務代碼造成的。這種情況比較少見。

TIPS

在 Spark streaming 程式中,資料傾斜更容易出現,特别是在程式中包含一些類似 sql 的 join、group 這種操作的時候。因為 Spark Streaming 程式在運作的時候,我們一般不會配置設定特别多的記憶體,是以一旦在這個過程中出現一些資料傾斜,就十分容易造成 OOM。

四、資料傾斜的原因

在進行 shuffle 的時候,必須将各個節點上相同的 key 拉取到某個節點上的一個 task 來進行處理,比如按照 key 進行聚合或 join 等操作。此時如果某個 key 對應的資料量特别大的話,就會發生資料傾斜。比如大部分 key 對應10條資料,但是個别 key 卻對應了100萬條資料,那麼大部分 task 可能就隻會配置設定到10條資料,然後1秒鐘就運作完了;但是個别 task 可能配置設定到了100萬資料,要運作一兩個小時。

是以出現資料傾斜的時候,Spark 作業看起來會運作得非常緩慢,甚至可能因為某個 task 處理的資料量過大導緻記憶體溢出。

五、問題發現與定位

1、通過 Spark Web UI

通過 Spark Web UI 來檢視目前運作的 stage 各個 task 配置設定的資料量(Shuffle Read Size/Records),進而進一步确定是不是 task 配置設定的資料不均勻導緻了資料傾斜。

知道資料傾斜發生在哪一個 stage 之後,接着我們就需要根據 stage 劃分原理,推算出來發生傾斜的那個 stage 對應代碼中的哪一部分,這部分代碼中肯定會有一個 shuffle 類算子。可以通過 countByKey 檢視各個 key 的分布。

資料傾斜隻會發生在 shuffle 過程中。這裡給大家羅列一些常用的并且可能會觸發 shuffle 操作的算子: distinct、groupByKey、reduceByKey、aggregateByKey、join、cogroup、repartition 等。出現資料傾斜時,可能就是你的代碼中使用了這些算子中的某一個所導緻的。

2、通過 key 統計

也可以通過抽樣統計 key 的出現次數驗證。

由于資料量巨大,可以采用抽樣的方式,對資料進行抽樣,統計出現的次數,根據出現次數大小排序取出前幾個:

df.select("key").sample(false, 0.1)           // 資料采樣
    .(k => (k, 1)).reduceBykey(_ + _)         // 統計 key 出現的次數
    .map(k => (k._2, k._1)).sortByKey(false)  // 根據 key 出現次數進行排序
    .take(10)                                 // 取前 10 個。           

如果發現多數資料分布都較為平均,而個别資料比其他資料大上若幹個數量級,則說明發生了資料傾斜。

六、如何緩解資料傾斜

基本思路

  • 業務邏輯: 我們從業務邏輯的層面上來優化資料傾斜,比如要統計不同城市的訂單情況,那麼我們單獨對這一線城市來做 count,最後和其它城市做整合。
  • 程式實作: 比如說在 Hive 中,經常遇到 count(distinct)操作,這樣會導緻最終隻有一個 reduce,我們可以先 group 再在外面包一層 count,就可以了;在 Spark 中使用 reduceByKey 替代 groupByKey 等。
  • 參數調優: Hadoop 和 Spark 都自帶了很多的參數和機制來調節資料傾斜,合理利用它們就能解決大部分問題。

思路1. 過濾異常資料

如果導緻資料傾斜的 key 是異常資料,那麼簡單的過濾掉就可以了。

首先要對 key 進行分析,判斷是哪些 key 造成資料傾斜。具體方法上面已經介紹過了,這裡不贅述。

然後對這些 key 對應的記錄進行分析:

  1. 空值或者異常值之類的,大多是這個原因引起
  2. 無效資料,大量重複的測試資料或是對結果影響不大的有效資料
  3. 有效資料,業務導緻的正常資料分布

解決方案

對于第 1,2 種情況,直接對資料進行過濾即可。

第3種情況則需要特殊的處理,具體我們下面詳細介紹。

思路2. 提高 shuffle 并行度

Spark 在做 Shuffle 時,預設使用 HashPartitioner(非 Hash Shuffle)對資料進行分區。如果并行度設定的不合适,可能造成大量不相同的 Key 對應的資料被配置設定到了同一個 Task 上,造成該 Task 所處理的資料遠大于其它 Task,進而造成資料傾斜。

如果調整 Shuffle 時的并行度,使得原本被配置設定到同一 Task 的不同 Key 發配到不同 Task 上處理,則可降低原 Task 所需處理的資料量,進而緩解資料傾斜問題造成的短闆效應。

(1)操作流程

RDD 操作 可在需要 Shuffle 的操作算子上直接設定并行度或者使用 spark.default.parallelism 設定。如果是 Spark SQL,還可通過 SET spark.sql.shuffle.partitions=[num_tasks] 設定并行度。預設參數由不同的 Cluster Manager 控制。

dataFrame 和 sparkSql 可以設定 spark.sql.shuffle.partitions=[num_tasks] 參數控制 shuffle 的并發度,預設為200。

(2)适用場景

大量不同的 Key 被配置設定到了相同的 Task 造成該 Task 資料量過大。

(3)解決方案

調整并行度。一般是增大并行度,但有時如減小并行度也可達到效果。

(4)優勢

實作簡單,隻需要參數調優。可用最小的代價解決問題。一般如果出現資料傾斜,都可以通過這種方法先試驗幾次,如果問題未解決,再嘗試其它方法。

(5)劣勢

适用場景少,隻是讓每個 task 執行更少的不同的key。無法解決個别key特别大的情況造成的傾斜,如果某些 key 的大小非常大,即使一個 task 單獨執行它,也會受到資料傾斜的困擾。并且該方法一般隻能緩解資料傾斜,沒有徹底消除問題。從實踐經驗來看,其效果一般。

TIPS 可以把資料傾斜類比為 hash 沖突。提高并行度就類似于 提高 hash 表的大小。

思路3. 自定義 Partitioner

(1)原理

使用自定義的 Partitioner(預設為 HashPartitioner),将原本被配置設定到同一個 Task 的不同 Key 配置設定到不同 Task。

例如,我們在 groupByKey 算子上,使用自定義的 Partitioner:

.groupByKey(new Partitioner() {
  @Override
  public int numPartitions() {
    return 12;
  }
 
  @Override
  public int getPartition(Object key) {
    int id = Integer.parseInt(key.toString());
    if(id >= 9500000 && id <= 9500084 && ((id - 9500000) % 12) == 0) {
      return (id - 9500000) / 12;
    } else {
      return id % 12;
    }
  }
})           
TIPS 這個做法相當于自定義 hash 表的 哈希函數。

使用自定義的 Partitioner 實作類代替預設的 HashPartitioner,盡量将所有不同的 Key 均勻配置設定到不同的 Task 中。

不影響原有的并行度設計。如果改變并行度,後續 Stage 的并行度也會預設改變,可能會影響後續 Stage。

适用場景有限,隻能将不同 Key 分散開,對于同一 Key 對應資料集非常大的場景不适用。效果與調整并行度類似,隻能緩解資料傾斜而不能完全消除資料傾斜。而且需要根據資料特點自定義專用的 Partitioner,不夠靈活。

思路4. Reduce 端 Join 轉化為 Map 端 Join

通過 Spark 的 Broadcast 機制,将 Reduce 端 Join 轉化為 Map 端 Join,這意味着 Spark 現在不需要跨節點做 shuffle 而是直接通過本地檔案進行 join,進而完全消除 Shuffle 帶來的資料傾斜。

Spark 資料傾斜及其解決方案
from pyspark.sql.functions import broadcast
result = broadcast(A).join(B, ["join_col"], "left")           

其中 A 是比較小的 dataframe 并且能夠整個存放在 executor 記憶體中。

(1)适用場景

參與Join的一邊資料集足夠小,可被加載進 Driver 并通過 Broadcast 方法廣播到各個 Executor 中。

(2)解決方案

在 Java/Scala 代碼中将小資料集資料拉取到 Driver,然後通過 Broadcast 方案将小資料集的資料廣播到各 Executor。或者在使用 SQL 前,将 Broadcast 的門檻值調整得足夠大,進而使 Broadcast 生效。進而将 Reduce Join 替換為 Map Join。

(3)優勢

避免了 Shuffle,徹底消除了資料傾斜産生的條件,可極大提升性能。

(4)劣勢

因為是先将小資料通過 Broadcase 發送到每個 executor 上,是以需要參與 Join 的一方資料集足夠小,并且主要适用于 Join 的場景,不适合聚合的場景,适用條件有限。

NOTES

使用Spark SQL時需要通過 SET spark.sql.autoBroadcastJoinThreshold=104857600 将 Broadcast 的門檻值設定得足夠大,才會生效。

思路5. 拆分 join 再 union

思路很簡單,就是将一個 join 拆分成 傾斜資料集 Join 和 非傾斜資料集 Join,最後進行 union:

  1. 對包含少數幾個資料量過大的 key 的那個 RDD (假設是 leftRDD),通過 sample 算子采樣出一份樣本來,然後統計一下每個 key 的數量,計算出來資料量最大的是哪幾個 key。具體方法上面已經介紹過了,這裡不贅述。
  2. 然後将這 k 個 key 對應的資料從 leftRDD 中單獨過濾出來,并給每個 key 都打上 1~n 以内的随機數作為字首,形成一個單獨的 leftSkewRDD;而不會導緻傾斜的大部分 key 形成另外一個 leftUnSkewRDD。
  3. 接着将需要 join 的另一個 rightRDD,也過濾出來那幾個傾斜 key 并通過 flatMap 操作将該資料集中每條資料均轉換為 n 條資料(這 n 條資料都按順序附加一個 0~n 的字首),形成單獨的 rightSkewRDD;不會導緻傾斜的大部分 key 也形成另外一個 rightUnSkewRDD。
  4. 現在将 leftSkewRDD 與 膨脹 n 倍的 rightSkewRDD 進行 join,且在 Join 過程中将随機字首去掉,得到傾斜資料集的 Join 結果 skewedJoinRDD。注意到此時我們已經成功将原先相同的 key 打散成 n 份,分散到多個 task 中去進行 join 了。
  5. 對 leftUnSkewRDD 與 rightUnRDD 進行Join,得到 Join 結果 unskewedJoinRDD。
  6. 通過 union 算子将 skewedJoinRDD 與 unskewedJoinRDD 進行合并,進而得到完整的 Join 結果集。
  1. rightRDD 與傾斜 Key 對應的部分資料,需要與随機字首集 (1~n) 作笛卡爾乘積 (即将資料量擴大 n 倍),進而保證無論資料傾斜側傾斜 Key 如何加字首,都能與之正常 Join。
  2. skewRDD 的 join 并行度可以設定為 n * k (k 為 topSkewkey 的個數)。
  3. 由于傾斜Key與非傾斜Key的操作完全獨立,可并行進行。

兩張表都比較大,無法使用 Map 端 Join。其中一個 RDD 有少數幾個 Key 的資料量過大,另外一個 RDD 的 Key 分布較為均勻。

将有資料傾斜的 RDD 中傾斜 Key 對應的資料集單獨抽取出來加上随機字首,另外一個 RDD 每條資料分别與随機字首結合形成新的RDD(相當于将其資料增到到原來的N倍,N即為随機字首的總個數),然後将二者Join并去掉字首。然後将不包含傾斜Key的剩餘資料進行Join。最後将兩次Join的結果集通過union合并,即可得到全部Join結果。

相對于 Map 則 Join,更能适應大資料集的 Join。如果資源充足,傾斜部分資料集與非傾斜部分資料集可并行進行,效率提升明顯。且隻針對傾斜部分的資料做資料擴充,增加的資源消耗有限。

如果傾斜 Key 非常多,則另一側資料膨脹非常大,此方案不适用。而且此時對傾斜 Key 與非傾斜 Key 分開處理,需要掃描資料集兩遍,增加了開銷。

思路6. 大表 key 加鹽,小表擴大 N 倍 jion

如果出現資料傾斜的 Key 比較多,上一種方法将這些大量的傾斜 Key 分拆出來,意義不大。此時更适合直接對存在資料傾斜的資料集全部加上随機字首,然後對另外一個不存在嚴重資料傾斜的資料集整體與随機字首集作笛卡爾乘積(即将資料量擴大N倍)。

其實就是上一個方法的特例或者簡化。少了拆分,也就沒有 union。

一個資料集存在的傾斜 Key 比較多,另外一個資料集資料分布比較均勻。

(2)優勢

對大部分場景都适用,效果不錯。

(3)劣勢

需要将一個資料集整體擴大 N 倍,會增加資源消耗。

思路7. map 端先局部聚合

在 map 端加個 combiner 函數進行局部聚合。加上 combiner 相當于提前進行 reduce ,就會把一個 mapper 中的相同 key 進行聚合,減少 shuffle 過程中資料量 以及 reduce 端的計算量。這種方法可以有效的緩解資料傾斜問題,但是如果導緻資料傾斜的 key 大量分布在不同的 mapper 的時候,這種方法就不是很有效了。

TIPS 使用 reduceByKey 而不是 groupByKey。

思路8. 加鹽局部聚合 + 去鹽全局聚合

這個方案的核心實作思路就是進行兩階段聚合。第一次是局部聚合,先給每個 key 都打上一個 1~n 的随機數,比如 3 以内的随機數,此時原先一樣的 key 就變成不一樣的了,比如 (hello, 1) (hello, 1) (hello, 1) (hello, 1) (hello, 1),就會變成 (1_hello, 1) (3_hello, 1) (2_hello, 1) (1_hello, 1) (2_hello, 1)。接着對打上随機數後的資料,執行 reduceByKey 等聚合操作,進行局部聚合,那麼局部聚合結果,就會變成了 (1_hello, 2) (2_hello, 2) (3_hello, 1)。然後将各個 key 的字首給去掉,就會變成 (hello, 2) (hello, 2) (hello, 1),再次進行全局聚合操作,就可以得到最終結果了,比如 (hello, 5)。

def antiSkew(): RDD[(String, Int)] = {
    val SPLIT = "-"
    val prefix = new Random().nextInt(10)
    pairs.map(t => ( prefix + SPLIT + t._1, 1))
        .reduceByKey((v1, v2) => v1 + v2)
        .map(t => (t._1.split(SPLIT)(1), t2._2))
        .reduceByKey((v1, v2) => v1 + v2)
}           

不過進行兩次 mapreduce,性能稍微比一次的差些。

七、Hadoop 中的資料傾斜

Hadoop 中直接貼近使用者使用的是 Mapreduce 程式和 Hive 程式,雖說 Hive 最後也是用 MR 來執行(至少目前 Hive 記憶體計算并不普及),但是畢竟寫的内容邏輯差別很大,一個是程式,一個是Sql,是以這裡稍作區分。

Hadoop 中的資料傾斜主要表現在 ruduce 階段卡在99.99%,一直99.99%不能結束。

這裡如果詳細的看日志或者和監控界面的話會發現:

  • 有一個多幾個 reduce 卡住
  • 各種 container報錯 OOM
  • 讀寫的資料量極大,至少遠遠超過其它正常的 reduce
  • 伴随着資料傾斜,會出現任務被 kill 等各種詭異的表現。

經驗: Hive的資料傾斜,一般都發生在 Sql 中 Group 和 On 上,而且和資料邏輯綁定比較深。

優化方法

這裡列出來一些方法和思路,具體的參數和用法在官網看就行了。

  1. map join 方式
  2. count distinct 的操作,先轉成 group,再 count
  3. 參數調優

    set hive.map.aggr=true

    set hive.groupby.skewindata=true

  4. left semi jion 的使用
  5. 設定 map 端輸出、中間結果壓縮。(不完全是解決資料傾斜的問題,但是減少了 IO 讀寫和網絡傳輸,能提高很多效率)

說明

hive.map.aggr=true: 在map中會做部分聚集操作,效率更高但需要更多的記憶體。

hive.groupby.skewindata=true: 資料傾斜時負載均衡,當選項設定為true,生成的查詢計劃會有兩個MRJob。第一個MRJob 中,Map的輸出結果集合會随機分布到Reduce中,每個Reduce做部分聚合操作,并輸出結果,這樣處理的結果是相同的GroupBy Key有可能被分發到不同的Reduce中,進而達到負載均衡的目的;第二個MRJob再根據預處理的資料結果按照GroupBy Key分布到Reduce中(這個過程可以保證相同的GroupBy Key被分布到同一個Reduce中),最後完成最終的聚合操作。

八、參考文章

  1. Spark性能優化之道——解決Spark資料傾斜(Data Skew)的N種姿勢
  2. 漫談千億級資料優化實踐:資料傾斜(純幹貨)
  3. 解決spark中遇到的資料傾斜問題

更多内容敬請關注 vivo 網際網路技術 微信公衆号

Spark 資料傾斜及其解決方案

注:轉載文章請先與微信号:labs2020 聯系。

分享 vivo 網際網路技術幹貨與沙龍活動,推薦最新行業動态與熱門會議。

繼續閱讀