天天看點

Hive、Inceptor資料傾斜詳解及解決

Hive、Inceptor資料傾斜詳解及解決

一、傾斜造成的原因

正常的資料分布理論上都是傾斜的,就是我們所說的20-80原理:80%的财富集中在20%的人手中, 80%的使用者隻使用20%的功能 , 20%的使用者貢獻了80%的通路量。

俗話是,一個人累死,其他人閑死的局面

這也違背了并行計算的初衷,首先一個節點要承受着巨大的壓力,而其他節點計算完畢後要一直等待這個忙碌的節點,也拖累了整體的計算時間,可以說效率是十分低下的。

下面舉個簡單的例子:

Hive、Inceptor資料傾斜詳解及解決

舉個 word count 的入門例子:

它的map 階段就是形成 (“aaa”,1)的形式,然後在reduce 階段進行 value 相加,得出 “aaa” 出現的次數。

若進行 word count 的文本有100G,其中 80G 全部是 “aaa” 剩下 20G 是其餘單詞,那就會形成 80G 的資料量交給一個 reduce 進行相加,其餘 20G 根據 key 不同分散到不同 reduce 進行相加的情況。如此就造成了資料傾斜,臨床反應就是 reduce 跑到 99%然後一直在原地等着 那80G 的reduce 跑完。

表現如下:

有一個多幾個reduce卡住

各種container報錯OOM

讀寫的資料量極大,至少遠遠超過其它正常的reduce

伴随着資料傾斜,會出現任務被kill等各種詭異的表現。

二、傾斜原理

原理剖析

在進行shuffle的時候,必須将各個節點上相同的Key拉取到某個節點上的一個task來進行處理,比如按照key進行聚合或者join操作。如果某個key對應的資料量特别大的話,會發生資料傾斜。

三、傾斜解決

  1. 将reduce join 轉為map join-----一般用于直接sql查詢的場景
什麼是MapJoin?

MapJoin顧名思義,就是在Map階段進行表之間的連接配接。而不需要進入到Reduce階段才進行連接配接。這樣就節省了在Shuffle階段時要進行的大量資料傳輸。進而起到了優化作業的作用。

MapJoin的原理:

通常情況下,要連接配接的各個表裡面的資料會分布在不同的Map中進行處理。即同一個Key對應的Value可能存在不同的Map中。這樣就必須等到Reduce中去連接配接。

要使MapJoin能夠順利進行,那就必須滿足這樣的條件:除了一份表的資料分布在不同的Map中外,其他連接配接的表的資料必須在每個Map中有完整的拷貝。

MapJoin适用的場景:

通過上面分析你會發現,并不是所有的場景都适合用MapJoin. 它通常會用在如下的一些情景:在二個要連接配接的表中,有一個很大,有一個很小,這個小表可以存放在記憶體中而不影響性能。

這樣我們就把小表檔案複制到每一個Map任務的本地,再讓Map把檔案讀到記憶體中待用。

MapJoin的實作方法:

     1)在Map-Reduce的驅動程式中使用靜态方法DistributedCache.addCacheFile()增加要拷貝的小表檔案,。JobTracker在作業啟動之前會擷取這個URI清單,并将相應的檔案拷貝到各個TaskTracker的本地磁盤上。

     2)在Map類的setup方法中使用DistributedCache.getLocalCacheFiles()方法擷取檔案目錄,并使用标準的檔案讀寫API讀取相應的檔案。

Hive内置提供的優化機制之一就包括MapJoin。

在Hive v0.7之前,需要使用hint提示 /*+ mapjoin(table) */才會執行MapJoin  。Hive v0.7之後的版本已經不需要給出MapJoin的訓示就進行優化。它是通過如下配置參數來控制的:

hive> set hive.auto.convert.join=true;

否則需要通過sql代碼進行修改

select /*+ mapjoin(A)*/ f.a,f.b from A t join B f  on ( f.a=t.a and f.ftime=20110802)

Hive還提供另外一個參數--表檔案的大小作為開啟和關閉MapJoin的門檻值。

hive.mapjoin.smalltable.filesize=25000000 即25M

實作原理:

普通的join是會走shuffle過程的,而一旦shuffle,就相當于會将相同key的資料拉取到一個shuffle read task中再進行join,此時就是reduce join。但是如果一個RDD是比較小的,則可以采用廣播小RDD全量資料+map算子來實作與join同樣的效果,也就是mao join ,而此時不會發生shuffle操作,也就不會發生資料傾斜。

方案優點:

對join操作導緻的資料傾斜,效果非常好,因為根本就不會發生shuffle,也就根本不會發生資料傾斜。

方案缺點:

适用場景較少,因為這個方案隻适用于一個大表和一個小表的情況。畢竟我們需要将小表進行廣播,此時會比較消耗記憶體資源,driver和每個Executor記憶體中都會駐留一份小RDD的全量資料。如果我們廣播出去的RDD資料比較大,比如10G以上,那麼就可能發生記憶體溢出了。是以并不适合兩個都是大表的情況。

    2.提高shuffle操作的并行度

方案使用場景:

若我們必須要面對資料傾斜問題,要這麼使用。

思路:

在對RDD執行shuffle算子時,給shuffle算子傳入一個參數,如reduceByKey(1000),該參數設定了這個shuffle算子執行時shuffle read task 的數量。對于Spark SQL中的shuffle類語句,如 groupBy 、join 等需要設定一個參數,即spark.sql.shuffle.partitions。該參數代表了shuffle read task 的并行度,預設值是200。

原理:

增加shuffle read task 的數量,可以讓原本配置設定給一個task的多個key配置設定給多個task,進而讓每個task處理比原來更少的資料。舉例來說,如果原本有5個key,每個key對應10條資料,這5個key都是配置設定給一個task的,那麼這個task就要處理50條資料。而增加了shuffle read task以後,每個task就配置設定到一個key,即每個task就處理10條資料,那麼自然每個task的執行時間都會變短了。

Hive、Inceptor資料傾斜詳解及解決

實作起來比較簡單,可以有效緩解和減輕資料傾斜的影響。

隻是緩解了資料傾斜而已,沒有徹底根除問題,根據實踐經驗來看,其效果有限。

3. 兩階段聚合(局部聚合+全局聚合)

方案使用場景:

對RDD執行reduceByKey等聚合類shuffle算子或者在Spark SQL中使用group by語句進行分組聚合時,比較适用這種方案。

思路:

這個方案的核心實作思路就是進行兩階段聚合。第一次是局部聚合,先給每個key都打上一個随機數,比如10以内的随機數,此時原先一樣的key就變成不一樣的了,比如(hello, 1) (hello, 1) (hello, 1) (hello, 1),就會變成(1_hello, 1) (1_hello, 1) (2_hello, 1) (2_hello, 1)。接着對打上随機數後的資料,執行reduceByKey等聚合操作,進行局部聚合,那麼局部聚合結果,就會變成了(1_hello, 2) (2_hello, 2)。然後将各個key的字首給去掉,就會變成(hello,2)(hello,2),再次進行全局聚合操作,就可以得到最終結果了,比如(hello, 4)。

Hive、Inceptor資料傾斜詳解及解決

方案優點:

對于聚合類的shuffle操作導緻的資料傾斜,效果是非常不錯的。通常都可以解決掉資料傾斜,或者至少是大幅度緩解資料傾斜,将Spark作業的性能提升數倍以上。

方案缺點:

僅僅适用于聚合類的shuffle操作,适用範圍相對較窄。如果是join類的shuffle操作,還得用其他的解決方案。

作者:少帥

繼續閱讀