spark性能優化: shuffle調優,資料傾斜調優
在日常開發spark任務的過程中,我們有時會發現在某個Stage中一個task或是幾個task相對于其它的task的執行速度慢,那這個stage的執行時間就取決于最慢的task的執行時間,這個時候可能就發生了資料傾斜。
何謂資料傾斜?資料傾斜指的是,并行處理的資料集中,某一部分(如Spark或Kafka的一個Partition)的資料顯著多于其它部分,進而使得該部分的處理速度成為整個資料集處理的瓶頸。
shuffle過程産生資料傾斜的原理:在shuffle的過程中,會把各個節點上相同的key拉取到某個節點上的某個分區中,在對應的生成task,此時如果某個key對應的資料量比較大的話,就會導緻資料傾斜。在spark UI中我們可以觀察到每個stage的task的運作情況,如果發現了資料傾斜的情況我們可以使用些簡單的手段定位到是那些key導緻了資料傾斜下面是個例子,取出key對應資料量最大的前十個key。
// 取樣 檢視導緻資料傾斜的key
val countSkewRDD = bigRDD.mapValues(_ => 1L).reduceByKey(_ + _)
val skewArr: Array[Int] = countSkewRDD.sortBy(item => {
item._2
}, false).take(10).map(item => {
item._1
})
下面介紹下資料傾斜的幾種解決方案
資料的預處理
根據業務情況,如果任務的資料源是Hive的話,在Hive中某些key資料量過大,業務場景是需要頻繁的進行spark計算,可以考慮這種方案。具體實作是在Hive中先進行聚合操作或是join操作,說白了也就是要在spark中進行的會導緻資料傾斜的運算提前到了Hive中,spark隻需要讀取hive處理好的資料。但是這樣的優化方案也隻是治标不治本,在hive中仍然會發生資料傾斜,雖然緩解了spark中資料傾斜的問題。
過濾掉導緻資料傾斜的key
這個方案呢,前提是通過取樣或是一些統計手段觀察到了導緻資料傾斜的key,并且這些key對計算結果并沒有影響,才可以使用,具體實作方案也很簡單就是過濾掉一部分資料就可以了,但是适用的場景很少。
提高shuffle的并行度
如果實際的項目中真的發生了資料傾斜的情況,那麼這可能是最簡單的一種緩解資料傾斜的方案了,也就是提高并行度。
在會導緻shuffle的算子中指定numPartitions,例如groupByKey(100),這個參數是指定Partition的數量這個數量也決定了task的數量,也可以通過指定spark.default.parallelism參數來提高并行度,在spark sql 中的 group by join 等可以通過spark.sql.shuffle.partitions來配置。這種方案的原理也很簡單,增加shuffle read task的數量,可以讓原本配置設定給一個task的多個key配置設定給多個task,進而讓每個task處理比原來更少的資料。但是這種方案的弊端是隻能緩解資料傾斜的問題。并不能夠從根本上解決資料傾斜。
自定義Partitioner
使用自定義的Partitioner(預設為HashPartitioner),将原本被配置設定到同一個Task的不同Key配置設定到不同Task。但缺點也很明顯,适用場景有限,隻能将不同Key分散開,對于同一Key對應資料集非常大的場景不适用。效果與調整并行度類似,隻能緩解資料傾斜而不能完全消除資料傾斜。而且需要根據資料特點自定義專用的Partitioner,不夠靈活。
groupByKey(new TestPartitioner(12))
/**
* 自定義分區
*/
class TestPartitioner(numParts: Int) extends Partitioner {
override def numPartitions: Int = numParts
override def getPartition(key: Any): Int = {
var id = key.toString.toInt
if (id == 6) {
id = (new scala.util.Random).nextInt(10000 * 100 * 6)
}
id % numParts
}
}
将Reduce side Join轉變為Map side Join
在對RDD使用join類操作,或者是在Spark SQL中使用join語句時,而且join操作中的一個RDD或表的資料量比較小(比如幾百M或者一兩G),比較适用此方案。不使用join算子進行連接配接操作,而使用Broadcast變量與map類算子實作join操作,進而完全規避掉shuffle類的操作,徹底避免資料傾斜的發生和出現。将較小RDD中的資料直接通過collect算子拉取到Driver端的記憶體中來,然後對其建立一個Broadcast變量;接着對另外一個RDD執行map類算子,在算子函數内,從Broadcast變量中擷取較小RDD的全量資料,與目前RDD的每一條資料按照連接配接key進行比對,如果連接配接key相同的話,那麼就将兩個RDD的資料用你需要的方式連接配接起來。這種方案的弊端是需要對小表進行廣播,每個Executor也會駐留一份資料,并且在對大表進行map的過程中可能要循環比對,實踐下來優化效果不明顯而且使用場景較少。
/**
*
* 對于join的資料傾斜的情況把小表廣播出去 -> 大表再map比對
*
* @param spark
*/
def calForBroadcast(spark: SparkSession): Unit = {
val personDFBig = readData(spark, dataDirBig)
val personDFSmall = readData(spark, dataDirSmall)
val skewRDD = mapForSkew(personDFBig, sourceCountBig)
val kvRDD = mapForKV(personDFSmall)
val broadCast = spark.sparkContext.broadcast(kvRDD.collect())
def function(tuple: (Int, Row)): (Int, Row) = {
for (value <- broadCast.value) {
if (value._1.equals(tuple._1))
return (tuple._1, tuple._2)
}
(tuple._1, tuple._2)
}
val resultRDD = skewRDD.map(function)
resultRDD.count()
}
兩階段聚合(局部聚合+全局聚合)
對于spark的聚合算子,如各種by key,先給每個key前面加個随機的字首(注意:後面很多解決方案都是加上個随機字首),這時候大量相同key的資料的key就不一樣了 例如 (a,person1),(a,person2),(a,person3) 變成了(1_a,person1),(3_a,person2),(1_a,person3)。這樣大量相同key的資料就被随機的拆分開了,之後在進行一次局部的聚合,例如将key都為1_a的資料聚合起來,然後在将各個key的随機字首去除,在進行一次全局的聚合。這個方案會減少相同key大量聚合到同一個task的情況,将資料拆分開來,減少shuffle壓力。但是僅僅适用于聚合類的shuffle操作,适用範圍相對較窄。如果是join類的shuffle操作,還得用其他的解決方案。
skewRDD.map(item =>
((new Random).nextInt(200) + "_" + item._1, item._2)
).groupByKey(
).map(item => {
(item._1.split("_", -1)(0), item._2)
}).groupByKey(
).count()
采樣傾斜key并分拆join操作
在兩個表進行join過程中,如果兩個表資料量都比較大,其中一張表的資料是均勻的而另外一張表中某幾個key資料量比較大可以采用如下方法。具體實作如下,對包含幾個資料量比較大的RDD,通過一些統計方法取出資料量大的幾個key,然後将這些key對應的資料從原來的RDD過濾出來,形成一個單獨的RDD,在對這些資料的key都加n以内的随機數,剩下的key在做成另外一個RDD。接着将另外一個資料均勻的RDD也把這些key過濾出來形成一個單獨的RDD,将這個RDD中的每條資料都膨脹成n條,這n條資料都按照順序添加一個從0到n的字首,這樣的話就可以保證join的過程中key能比對上,而且相同的key也被拆分開來了,剩下的key在做成另外一個RDD。在将附加随機字首的RDD和擴容後的RDD進行join,join之後在将字首去掉,剩下的key作成的兩個RDD進行join即可。最後将兩次join的結果在union起來,實踐下來優化效果還是很明顯。
// 取樣 檢視導緻資料傾斜的key
val countSkewRDD = bigRDD.mapValues(_ => 1L).reduceByKey(_ + _)
val skewArr: Array[Int] = countSkewRDD.sortBy(item => {
item._2
}, false).take(10).map(item => {
item._1
})
// 過濾出來資料傾斜的RDD
val bigSkewRDD = bigRDD.filter(item => {
skewArr.contains(item._1)
})
// 過濾出來沒有傾斜的RDD
val bigCommonRDD = bigRDD.filter(item => {
!skewArr.contains(item._1)
})
// 過濾出來資料傾斜的RDD
val smallSkewRDD = smallRDD.filter(item => {
skewArr.contains(item._1)
}).flatMap(item => {
(1 to 10).map(i => {
(i + "_" + item._1, item._2)
})
})
// 過濾出來沒有傾斜的RDD
val smallCommonRDD = smallRDD.filter(item => {
!skewArr.contains(item._1)
})
val skewResult = bigSkewRDD.map(item => {
((new Random).nextInt(10) + "_" + item._1, item._2)
}).join(smallSkewRDD).map(item => {
(item._1.split("_")(0), item._2._1)
})
val commonResult = bigCommonRDD.join(smallCommonRDD).map(item => {
(item._1.toString, item._2._1)
})
skewResult.union(commonResult).count()
使用随機字首和擴容RDD進行JOIN
對于有大量的key傾斜的情況下,可以采取上面的方法不同的是不要在采樣和過濾掉傾斜的key,直接将有傾斜的key添加随機字首,在對另一個RDD進行其它操作同上面的方法類似,隻是這種方案的弊端是需要對全量的RDD進行擴容,對記憶體要求比較高。也确實是個行之有效的方案。