天天看點

spark算子調優

1、MapPartitions提升Map類操作性能

spark中,最基本的原則,就是每個task處理一個RDD的partition。

1.1  MapPartitions的優缺點

MapPartitions操作的優點:

如果是普通的map,比如一個partition中有1萬條資料。ok,那麼你的function要執行和計算1萬次。

但是,使用MapPartitions操作之後,一個task僅僅會執行一次function,function一次接收所有的partition資料。隻要執行一次就可以了,性能比較高。

MapPartitions的缺點:

如果是普通的map操作,一次function的執行就處理一條資料。那麼如果記憶體不夠用的情況下,比如處理了1千條資料了,那麼這個時候記憶體不夠了,那麼就可以将已經處理完的1千條資料從記憶體裡面垃圾回收掉,或者用其他方法,騰出空間來吧。

是以說普通的map操作通常不會導緻記憶體的OOM異常。

但是MapPartitions操作,對于大量資料來說,比如甚至一個partition,100萬資料,一次傳入一個function以後,那麼可能一下子記憶體不夠,但是又沒有辦法去騰出記憶體空間來,可能就OOM,記憶體溢出。

1.2  MapPartitions使用場景

當分析的資料量不是特别大的時候,都可以用這種MapPartitions系列操作,性能還是非常不錯的,是有提升的。比如原來是15分鐘,(曾經有一次性能調優),12分鐘。10分鐘->9分鐘。

但是也有過出問題的經驗,MapPartitions隻要一用,直接OOM,記憶體溢出,崩潰。

在項目中,自己先去估算一下RDD的資料量,以及每個partition的量,還有自己配置設定給每個executor的記憶體資源。看看一下子記憶體容納所有的partition資料行不行。如果行,可以試一下,能跑通就好。性能肯定是有提升的。但是試了以後,發現OOM了,那就放棄吧。

2、filter過後使用coalesce減少分區

2.1 出現問題

預設情況下,經過了filter之後,RDD中的每個partition的資料量,可能都不太一樣了。(原本每個partition的資料量可能是差不多的)

可能出現的問題:

1、每個partition資料量變少了,但是在後面進行處理的時候,還是要跟partition數量一樣數量的task,來進行處理,有點浪費task計算資源。

2、每個partition的資料量不一樣,會導緻後面的每個task處理每個partition的時候,每個task要處理的資料量就不同,這樣就會導緻有些task運作的速度很快,有些task運作的速度很慢。這就是資料傾斜。

針對上述的兩個問題,我們希望應該能夠怎麼樣?

1、針對第一個問題,我們希望可以進行partition的壓縮吧,因為資料量變少了,那麼partition其實也完全可以對應的變少。比如原來是4個partition,現在完全可以變成2個partition。那麼就隻要用後面的2個task來處理即可。就不會造成task計算資源的浪費。(不必要,針對隻有一點點資料的partition,還去啟動一個task來計算)

2、針對第二個問題,其實解決方案跟第一個問題是一樣的,也是去壓縮partition,盡量讓每個partition的資料量差不多。那麼後面的task配置設定到的partition的資料量也就差不多。不會造成有的task運作速度特别慢,有的task運作速度特别快。避免了資料傾斜的問題。

2.2  解決問題方法

調用coalesce算子

主要就是用于在filter操作之後,針對每個partition的資料量各不相同的情況,來壓縮partition的數量,而且讓每個partition的資料量都盡量均勻緊湊。進而便于後面的task進行計算操作,在某種程度上,能夠一定程度的提升性能。

3、使用foreachPartition優化寫資料庫性能

3.1  預設的foreach的性能缺陷在哪裡?

首先,對于每條資料,都要單獨去調用一次function,task為每個資料,都要去執行一次function函數。

如果100萬條資料,(一個partition),調用100萬次。性能比較差。

另外一個非常非常重要的一點

如果每個資料,你都去建立一個資料庫連接配接的話,那麼你就得建立100萬次資料庫連接配接。

但是要注意的是,資料庫連接配接的建立和銷毀,都是非常非常消耗性能的。雖然我們之前已經用了資料庫連接配接池,隻是建立了固定數量的資料庫連接配接。

你還是得多次通過資料庫連接配接,往資料庫(MySQL)發送一條SQL語句,然後MySQL需要去執行這條SQL語句。如果有100萬條資料,那麼就是100萬次發送SQL語句。

以上兩點(資料庫連接配接,多次發送SQL語句),都是非常消耗性能的。

3.2  用了foreachPartition算子之後,好處在哪裡?

1、對于我們寫的function函數,就調用一次,一次傳入一個partition所有的資料。

2、主要建立或者擷取一個資料庫連接配接就可以。

3、隻要向資料庫發送一次SQL語句和多組參數即可。

注意,與mapPartitions操作一樣,如果一個partition的數量真的特别特别大,比如是100萬,那基本上就不太靠譜了。很有可能會發生OOM,記憶體溢出的問題。

4、使用repartition解決Spark SQL低并行度的性能問題

4.1  設定并行度

并行度:之前說過,并行度是設定的:

1、spark.default.parallelism

2、textFile(),傳入第二個參數,指定partition數量(比較少用)

在生産環境中,是最好設定一下并行度。官網有推薦的設定方式,根據你的application的總cpu core數量(在spark-submit中可以指定),自己手動設定spark.default.parallelism參數,指定為cpu core總數的2~3倍。

4.2 你設定的這個并行度,在哪些情況下會生效?什麼情況下不會生效?

如果你壓根兒沒有使用Spark SQL(DataFrame),那麼你整個spark application預設所有stage的并行度都是你設定的那個參數。(除非你使用coalesce算子縮減過partition數量)。

問題來了,用Spark SQL的情況下,stage的并行度沒法自己指定。Spark SQL自己會預設根據hive表對應的hdfs檔案的block,自動設定Spark SQL查詢所在的那個stage的并行度。你自己通過spark.default.parallelism參數指定的并行度,隻會在沒有Spark SQL的stage中生效。

比如你第一個stage,用了Spark SQL從hive表中查詢出了一些資料,然後做了一些transformation操作,接着做了一個shuffle操作(groupByKey)。下一個stage,在shuffle操作之後,做了一些transformation操作。hive表,對應了一個hdfs檔案,有20個block。你自己設定了spark.default.parallelism參數為100。

你的第一個stage的并行度,是不受你的控制的,就隻有20個task。第二個stage,才會變成你自己設定的那個并行度,100。

5、reduceByKey本地聚合介紹

reduceByKey,相較于普通的shuffle操作(比如groupByKey),它的一個特點,就是說,會進行map端的本地聚合。對map端給下個stage每個task建立的輸出檔案中,寫資料之前,就會進行本地的combiner操作,也就是說對每一個key,對應的values,都會執行你的算子函數(_ + _)

5.1  用reduceByKey對性能的提升

1、在本地進行聚合以後,在map端的資料量就變少了,減少磁盤IO。而且可以減少磁盤空間的占用。

2、下一個stage,拉取資料的量,也就變少了。減少網絡的資料傳輸的性能消耗。

3、在reduce端進行資料緩存的記憶體占用變少了。

4、reduce端,要進行聚合的資料量也變少了。

5.2  reduceByKey在什麼情況下使用呢?

1、非常普通的,比如說,就是要實作類似于wordcount程式一樣的,對每個key對應的值,進行某種資料公式或者算法的計算(累加、類乘)。

2、對于一些類似于要對每個key進行一些字元串拼接的這種較為複雜的操作,可以自己衡量一下,其實有時,也是可以使用reduceByKey來實作的。但是不太好實作。如果真能夠實作出來,對性能絕對是有幫助的。(shuffle基本上就占了整個spark作業的90%以上的性能消耗,主要能對shuffle進行一定的調優,都是有價值的)

繼續閱讀