天天看點

spark RDD Join 性能調優選擇join的類型選擇執行過程

閱讀本篇博文時,請先了解RDD的描述及作業排程:《深入了解Spark 2.1 Core (一):RDD的原理與源碼分析 》

Join資料是我們在Spark操作中的很重要的一部分。Spark Core 和Spark SQL的基本類型都支援join操作。雖然join很常用而且功能很強大,但是我們使用它的時候,我們不得不考慮網絡傳輸和所處理的資料集過大的問題。在Spark Core中,DAG優化器不像SQL優化器,它不能夠重指令或者下壓過濾。是以,Spark操作順序對于Spark Core顯得尤為重要。

spark RDD Join 性能調優選擇join的類型選擇執行過程

這裡寫圖檔描述

這篇博文,我們将介紹RDD類型的join操作。通常來說,join是一中非常昂貴的操作。對于每個child RDD 的partition,它從各個parent RDD 的 各個 partition中,擷取到與該child RDD partition上的key相對應的所有值。

假設有兩個parent RDD,它們都沒有已知的partitioner(可以了解為該RDD到其child RDD 重分區函數),那麼它們就需要shuffle,使得它們共享同一個partitioner,并且有着相同key的資料會在同一個partition裡面。如上圖“join with inputs not co-partitioned”。

若parent RDD有已知的partitioner(若已知的partitioner相同,兩個RDD會協同,那麼就能避免網絡傳輸,兩個parent RDD 的相同partition會在同一個節點上),那麼可能如上圖的“join with inputs co-partitioned”,隻能産生窄依賴。

和大多數的K/V操作一樣,随着key的數量的增加和記錄之間的距離的增加(需要尋找其所在的partition),join的花費會越來越高。

選擇join的類型

預設情況下,Spark隻會對兩個RDD的key的值進行join。在有多個相同key值的情況下,會生成所有的K/V對。是以,标準join的最好的情況是,兩個RDD有相同的key集合,而且該key集合中的key都是互斥的。若有重複的key,資料量會急劇的擴大以至于導緻性能問題。若有個key隻在一個RDD中出現了,那麼你将失去那行資料。是以,有以下幾條建議:

  1. 若兩個RDD都有有重複的key,join操作會使得資料量會急劇的擴大。所有,最好先使用distinct或者combineByKey操作來減少key空間或者用cogroup來處理重複的key,而不是産生所有的交叉結果。在combine時,進行機智的分區,可以避免第二次shuffle。
  2. 如果隻在一個RDD出現,那你将在無意中丢失你的資料。是以使用外連接配接會更加安全,這樣你就能確定左邊的RDD或者右邊的RDD的資料完整性,在join之後再過濾資料。
  3. 如果我們容易得到RDD的可以的有用的子集合,那麼我們可以先用filter或者reduce,如何在再用join。

總之,join通常是你在使用Spark時最昂貴的操作,需要在join之前應盡可能的先縮小你的資料。

假設,你有一個RDD存着(熊貓id,分數),另外一個RDD存着(熊貓id,郵箱位址)。若你想給每隻可愛的熊貓的郵箱發送她所得的最高的分數,你可以将RDD根據id進行join,然後計算最高的分數,如下:

def joinScoresWithAddress1( scoreRDD : RDD[(Long, Double)],
addressRDD : RDD[(Long, String )]) : RDD[(Long, (Double, String))]= {
    val joinedRDD = scoreRDD.join(addressRDD)
    joinedRDD.reduceByKey( (x, y) => if(x._1 > y._1) x else y )
}
           

然而,這可能不會比先減少分數資料的方案快。先計算最高的分數,那麼每個熊貓的分數資料就隻有一行,接下來再join位址資料:

def joinScoresWithAddress2( scoreRDD : RDD[(Long, Double)],
addressRDD : RDD[(Long, String )]) : RDD[(Long, (Double, String))]= {
    val bestScoreData = scoreRDD.reduceByKey((x, y) => if(x > y) x else y)
    bestScoreData.join(addressRDD)
}
           

若每個熊貓有1000個不同的分數,那麼這種做法的shuffle量就比上一種的小了1000倍。

如果你想要左外連接配接,保留分數資料中位址資料所沒有的熊貓,那麼你可以用leftOuterJoin來替代join。Spark還有fullOuterJoin和rightOuter,可以根據你想保留的記錄選擇使用。丢失的值會以None的形式存在:

def outerJoinScoresWithAddress( scoreRDD : RDD[(Long, Double)],
addressRDD : RDD[(Long, String )]) : RDD[(Long, (Double, Option[String]))]= {
    val joinedRDD = scoreRDD.leftOuterJoin(addressRDD)
    joinedRDD.reduceByKey( (x, y) => if(x._1 > y._1) x else y )
}
           

選擇執行過程

由于join資料時,Spark需要被join的資料在相同的分區。所有,預設實作是進行shuffled hash join。shuffled hash join會将第二個資料集按照第一個資料分區,這麼一來有着相同hash值的key就會在相同的分區中了。雖然這種方法有用,但由于它需要shuffle,是以很昂貴。而以下情況可以避免shuffle:

  1. 連個RDD都有已知的partitioner
  2. 其中一個資料足夠小到儲存到記憶體這麼一來我們就可以使用廣播 hash join

注意,若RDD協同了,那麼網絡傳輸和shuffle都能避免。

通過配置設定已知Partitioner來加速Join

Spark是一個分布式的計算引擎,可以通過分區的形式将大批量的資料劃分成n份較小的資料集進行并行計算。這種思想應用到Join上便是Shuffle Hash Join了。利用key相同必然分區相同的這個原理,Spark将較大表的join分而治之,先将表劃分成n個分區,再對兩個表中相對應分區的資料分别進行Hash Join。其原理如下圖:
spark RDD Join 性能調優選擇join的類型選擇執行過程
這裡寫圖檔描述

而這僅僅适用于

join with inputs co-partitions

的情況。當在

join with inputs not co-partitions

首先将兩張表按照join keys進行了重新shuffle,保證join keys值相同的記錄會被分在相應的分區。分區後對每個分區内的資料進行排序,排序後再對相應的分區内的記錄進行連接配接。

但是shuffle操作十分昂貴,為了避免shuffle,我們可以通過使用下面的方案:

如果你在做join之前,其中一個RDD(RDD_A)不得不先進行一個shuffle操作,比如說aggregateByKey或者reduceByKey。你可以将該shuffle操作所使用的partitioner設定為另外一個RDD(RDD_B)的partitioner(若RDD_B的partitioner為None,則根據RDD_Bd的分區數new一個HashPartitioner),并持久化RDD_A,這樣一來就可以避免兩個RDD join時的shuffle了:

def joinScoresWithAddress3( scoreRDD : RDD[(Long, Double)],
addressRDD : RDD[(Long, String )]) : RDD[(Long, (Double, String))]= {
    val addressDataPartitioner = addressRDD.partitioner match {
      case (Some(p)) => p
      case (None) => new HashPartitioner(addressRDD.partitions.length)
    }
    val bestScoreData = scoreRDD.reduceByKey(addressDataPartitioner, (x, y) => if(x > y) x else y)
    bestScoreData.cache
    bestScoreData.join(addressRDD)
}
           

建議在重分區後總是進行持久化。

廣播 Hash Join

spark RDD Join 性能調優選擇join的類型選擇執行過程

這裡寫圖檔描述

若RDD_B小到足以存到記憶體,那麼我們可以使用廣播變量将它push到各個節點。這種方法不需要shuffle,而是RDD_A的各個分區,分别會被RDD_B中相關的值join上,形成Child RDD 對應的分區。有時候Spark會很機智的自動幫你做這件事。

部分手動廣播 Hash Join

有時候,我們的RDD_B并不能足夠小到都能裝進記憶體,但是有些RDD_A中的key會重複很多次,這時候你就可以想着隻廣播RDD_B中在RDD_A中出現最頻繁的那些值。當一種key值在RDD_A中多到一個partition都裝不下時,這種方法會非常有用。在這種情況下,你可以對RDD_A使用countByKeyApprox來近似得到哪些key需要廣播。然後,你将從RDD_B中filter出來需要廣播的RDD_B_0和不要廣播的RDD_B_1,将RDD_B_0 collect成本地的HashMap。使用sc.broadcast廣播該HashMap,使得每個節點都有一個備份,與RDD_A手動的執行join,得到結果RDD_C_1。再根據HashMap将RDD_A中多次重複的key值去掉,生成RDD_A_1。對RDD_A_1和RDD_B_1進行标準的join,得到結果RDD_C_0,并unoin上RDD_C_1,得到最終的結果。

這種方法雖然有點複雜,但是在對高度傾斜的資料進行處理時的效果很好。

繼續閱讀