1、MapPartitions提升Map類操作性能
spark中,最基本的原則,就是每個task處理一個RDD的partition。
1.1 MapPartitions的優缺點
MapPartitions操作的優點:
如果是普通的map,比如一個partition中有1萬條資料。ok,那麼你的function要執行和計算1萬次。
但是,使用MapPartitions操作之後,一個task僅僅會執行一次function,function一次接收所有的partition資料。隻要執行一次就可以了,性能比較高。
MapPartitions的缺點:
如果是普通的map操作,一次function的執行就處理一條資料。那麼如果記憶體不夠用的情況下,比如處理了1千條資料了,那麼這個時候記憶體不夠了,那麼就可以将已經處理完的1千條資料從記憶體裡面垃圾回收掉,或者用其他方法,騰出空間來吧。
是以說普通的map操作通常不會導緻記憶體的OOM異常。
但是MapPartitions操作,對于大量資料來說,比如甚至一個partition,100萬資料,一次傳入一個function以後,那麼可能一下子記憶體不夠,但是又沒有辦法去騰出記憶體空間來,可能就OOM,記憶體溢出。
1.2 MapPartitions使用場景
當分析的資料量不是特别大的時候,都可以用這種MapPartitions系列操作,性能還是非常不錯的,是有提升的。比如原來是15分鐘,(曾經有一次性能調優),12分鐘。10分鐘->9分鐘。
但是也有過出問題的經驗,MapPartitions隻要一用,直接OOM,記憶體溢出,崩潰。
在項目中,自己先去估算一下RDD的資料量,以及每個partition的量,還有自己配置設定給每個executor的記憶體資源。看看一下子記憶體容納所有的partition資料行不行。如果行,可以試一下,能跑通就好。性能肯定是有提升的。但是試了以後,發現OOM了,那就放棄吧。
2、filter過後使用coalesce減少分區
2.1 出現問題
預設情況下,經過了filter之後,RDD中的每個partition的資料量,可能都不太一樣了。(原本每個partition的資料量可能是差不多的)
可能出現的問題:
1、每個partition資料量變少了,但是在後面進行處理的時候,還是要跟partition數量一樣數量的task,來進行處理,有點浪費task計算資源。
2、每個partition的資料量不一樣,會導緻後面的每個task處理每個partition的時候,每個task要處理的資料量就不同,這樣就會導緻有些task運作的速度很快,有些task運作的速度很慢。這就是資料傾斜。
針對上述的兩個問題,我們希望應該能夠怎麼樣?
1、針對第一個問題,我們希望可以進行partition的壓縮吧,因為資料量變少了,那麼partition其實也完全可以對應的變少。比如原來是4個partition,現在完全可以變成2個partition。那麼就隻要用後面的2個task來處理即可。就不會造成task計算資源的浪費。(不必要,針對隻有一點點資料的partition,還去啟動一個task來計算)
2、針對第二個問題,其實解決方案跟第一個問題是一樣的,也是去壓縮partition,盡量讓每個partition的資料量差不多。那麼後面的task配置設定到的partition的資料量也就差不多。不會造成有的task運作速度特别慢,有的task運作速度特别快。避免了資料傾斜的問題。
2.2 解決問題方法
調用coalesce算子
主要就是用于在filter操作之後,針對每個partition的資料量各不相同的情況,來壓縮partition的數量,而且讓每個partition的資料量都盡量均勻緊湊。進而便于後面的task進行計算操作,在某種程度上,能夠一定程度的提升性能。
3、使用foreachPartition優化寫資料庫性能
3.1 預設的foreach的性能缺陷在哪裡?
首先,對于每條資料,都要單獨去調用一次function,task為每個資料,都要去執行一次function函數。
如果100萬條資料,(一個partition),調用100萬次。性能比較差。
另外一個非常非常重要的一點
如果每個資料,你都去建立一個資料庫連接配接的話,那麼你就得建立100萬次資料庫連接配接。
但是要注意的是,資料庫連接配接的建立和銷毀,都是非常非常消耗性能的。雖然我們之前已經用了資料庫連接配接池,隻是建立了固定數量的資料庫連接配接。
你還是得多次通過資料庫連接配接,往資料庫(MySQL)發送一條SQL語句,然後MySQL需要去執行這條SQL語句。如果有100萬條資料,那麼就是100萬次發送SQL語句。
以上兩點(資料庫連接配接,多次發送SQL語句),都是非常消耗性能的。
3.2 用了foreachPartition算子之後,好處在哪裡?
1、對于我們寫的function函數,就調用一次,一次傳入一個partition所有的資料。
2、主要建立或者擷取一個資料庫連接配接就可以。
3、隻要向資料庫發送一次SQL語句和多組參數即可。
注意,與mapPartitions操作一樣,如果一個partition的數量真的特别特别大,比如是100萬,那基本上就不太靠譜了。很有可能會發生OOM,記憶體溢出的問題。
4、使用repartition解決Spark SQL低并行度的性能問題
4.1 設定并行度
并行度:之前說過,并行度是設定的:
1、spark.default.parallelism
2、textFile(),傳入第二個參數,指定partition數量(比較少用)
在生産環境中,是最好設定一下并行度。官網有推薦的設定方式,根據你的application的總cpu core數量(在spark-submit中可以指定),自己手動設定spark.default.parallelism參數,指定為cpu core總數的2~3倍。
4.2 你設定的這個并行度,在哪些情況下會生效?什麼情況下不會生效?
如果你壓根兒沒有使用Spark SQL(DataFrame),那麼你整個spark application預設所有stage的并行度都是你設定的那個參數。(除非你使用coalesce算子縮減過partition數量)。
問題來了,用Spark SQL的情況下,stage的并行度沒法自己指定。Spark SQL自己會預設根據hive表對應的hdfs檔案的block,自動設定Spark SQL查詢所在的那個stage的并行度。你自己通過spark.default.parallelism參數指定的并行度,隻會在沒有Spark SQL的stage中生效。
比如你第一個stage,用了Spark SQL從hive表中查詢出了一些資料,然後做了一些transformation操作,接着做了一個shuffle操作(groupByKey)。下一個stage,在shuffle操作之後,做了一些transformation操作。hive表,對應了一個hdfs檔案,有20個block。你自己設定了spark.default.parallelism參數為100。
你的第一個stage的并行度,是不受你的控制的,就隻有20個task。第二個stage,才會變成你自己設定的那個并行度,100。
5、reduceByKey本地聚合介紹
reduceByKey,相較于普通的shuffle操作(比如groupByKey),它的一個特點,就是說,會進行map端的本地聚合。對map端給下個stage每個task建立的輸出檔案中,寫資料之前,就會進行本地的combiner操作,也就是說對每一個key,對應的values,都會執行你的算子函數(_ + _)
5.1 用reduceByKey對性能的提升
1、在本地進行聚合以後,在map端的資料量就變少了,減少磁盤IO。而且可以減少磁盤空間的占用。
2、下一個stage,拉取資料的量,也就變少了。減少網絡的資料傳輸的性能消耗。
3、在reduce端進行資料緩存的記憶體占用變少了。
4、reduce端,要進行聚合的資料量也變少了。
5.2 reduceByKey在什麼情況下使用呢?
1、非常普通的,比如說,就是要實作類似于wordcount程式一樣的,對每個key對應的值,進行某種資料公式或者算法的計算(累加、類乘)。
2、對于一些類似于要對每個key進行一些字元串拼接的這種較為複雜的操作,可以自己衡量一下,其實有時,也是可以使用reduceByKey來實作的。但是不太好實作。如果真能夠實作出來,對性能絕對是有幫助的。(shuffle基本上就占了整個spark作業的90%以上的性能消耗,主要能對shuffle進行一定的調優,都是有價值的)