通常來說,我們在開發一個Spark作業時,首先是基于某個資料源(比如Hive表或HDFS檔案)建立一個初始的RDD;接着對這個RDD執行某個算子操作,然後得到下一個RDD;
以此類推,循環往複,直到計算出最終我們需要的結果。在這個過程中,多個RDD會通過不同的算子操作(比如map、reduce等)串起來,這個“RDD串”,就是RDD lineage,也就是“RDD的血緣關系鍊”。
我們在開發過程中要注意:對于同一份資料,隻應該建立一個RDD,不能建立多個RDD來代表同一份資料。
一些Spark初學者在剛開始開發Spark作業時,或者是有經驗的工程師在開發RDD lineage極其冗長的Spark作業時,可能會忘了自己之前對于某一份資料已經建立過一個RDD了,
進而導緻對于同一份資料,建立了多個RDD。這就意味着,我們的Spark作業會進行多次重複計算來建立多個代表相同資料的RDD,進而增加了作業的性能開銷。
除了要避免在開發過程中對一份完全相同的資料建立多個RDD之外,在對不同的資料執行算子操作時還要盡可能地複用一個RDD。
比如說,有一個RDD的資料格式是key-value類型的,另一個是單value類型的,這兩個RDD的value資料是完全一樣的。
那麼此時我們可以隻使用key-value類型的那個RDD,因為其中已經包含了另一個的資料。對于類似這種多個RDD的資料有重疊或者包含的情況,
我們應該盡量複用一個RDD,這樣可以盡可能地減少RDD的數量,進而盡可能減少算子執行的次數。
當你在Spark代碼中多次對一個RDD做了算子操作後,恭喜,你已經實作Spark作業第一步的優化了,也就是盡可能複用RDD。
此時就該在這個基礎之上,進行第二步優化了,也就是要保證對一個RDD執行多次算子操作時,這個RDD本身僅僅被計算一次。
Spark中對于一個RDD執行多次算子的預設原理是這樣的:每次你對一個RDD執行一個算子操作時,都會重新從源頭處計算一遍,
計算出那個RDD來,然後再對這個RDD執行你的算子操作。這種方式的性能是很差的。
是以對于這種情況,我們的建議是:對多次使用的RDD進行持久化。此時Spark就會根據你的持久化政策,
将RDD中的資料儲存到記憶體或者磁盤中。以後每次對這個RDD進行算子操作時,都會直接從記憶體或磁盤中提取持久化的RDD資料,然後執行算子,而不會從源頭處重新計算一遍這個RDD,再執行算子操作。
如果有可能的話,要盡量避免使用shuffle類算子。因為Spark作業運作過程中,最消耗性能的地方就是shuffle過程。
shuffle過程,簡單來說,就是将分布在叢集中多個節點上的同一個key,拉取到同一個節點上,進行聚合或join等操作。比如reduceByKey、join等算子,都會觸發shuffle操作。
shuffle過程中,各個節點上的相同key都會先寫入本地磁盤檔案中,然後其他節點需要通過網絡傳輸拉取各個節點上的磁盤檔案中的相同key。
而且相同key都拉取到同一個節點進行聚合操作時,還有可能會因為一個節點上處理的key過多,導緻記憶體不夠存放,進而溢寫到磁盤檔案中。
是以在shuffle過程中,可能會發生大量的磁盤檔案讀寫的IO操作,以及資料的網絡傳輸操作。磁盤IO和網絡資料傳輸也是shuffle性能較差的主要原因。
是以在我們的開發過程中,能避免則盡可能避免使用reduceByKey、join、distinct、repartition等會進行shuffle的算子,盡量使用map類的非shuffle算子。
這樣的話,沒有shuffle操作或者僅有較少shuffle操作的Spark作業,可以大大減少性能開銷.
如果因為業務需要,一定要使用shuffle操作,無法用map類的算子來替代,那麼盡量使用可以map-side預聚合的算子。
所謂的map-side預聚合,說的是在每個節點本地對相同的key進行一次聚合操作,類似于MapReduce中的本地combiner。map-side預聚合之後,
每個節點本地就隻會有一條相同的key,因為多條相同的key都被聚合起來了。其他節點在拉取所有節點上的相同key時,就會大大減少需要拉取的資料數量,
進而也就減少了磁盤IO以及網絡傳輸開銷。通常來說,在可能的情況下,建議使用reduceByKey或者aggregateByKey算子來替代掉groupByKey算子。
因為reduceByKey和aggregateByKey算子都會使用使用者自定義的函數對每個節點本地的相同key進行預聚合。而groupByKey算子是不會進行預聚合的,
全量的資料會在叢集的各個節點之間分發和傳輸,性能相對來說比較差。
比如如下兩幅圖,就是典型的例子,分别基于reduceByKey和groupByKey進行單詞計數。其中第一張圖是groupByKey的原理圖,
可以看到,沒有進行任何本地聚合時,所有資料都會在叢集節點之間傳輸;第二張圖是reduceByKey的原理圖,
可以看到,每個節點本地的相同key資料,都進行了預聚合,然後才傳輸到其他節點上進行全局聚合。

reduceByKey有預聚合,groupbyKey沒有預聚合
mapPartitions類的算子,一次函數調用會處理一個partition所有的資料,而不是一次函數調用處理一條,性能相對來說會高一些。
但是有的時候,使用mapPartitions會出現OOM(記憶體溢出)的問題。因為單次函數調用就要處理掉一個partition所有的資料,
如果記憶體不夠,垃圾回收時是無法回收掉太多對象的,很可能出現OOM異常。是以使用這類操作時要慎重
原理類似于“使用mapPartitions替代map”,也是一次函數調用處理一個partition的所有資料,而不是一次函數調用處理一條資料。
在實踐中發現,foreachPartitions類的算子,對性能的提升還是很有幫助的。
比如在foreach函數中,将RDD中所有資料寫MySQL,那麼如果是普通的foreach算子,就會一條資料一條資料地寫,每次函數調用可能就會建立一個資料庫連接配接,
此時就勢必會頻繁地建立和銷毀資料庫連接配接,性能是非常低下;但是如果用foreachPartitions算子一次性處理一個partition的資料,那麼對于每個partition,
隻要建立一個資料庫連接配接即可,然後執行批量插入操作,此時性能是比較高的。實踐中發現,對于1萬條左右的資料量寫MySQL,性能可以提升30%以上。
coalesce中的shuffle是false。
通常對一個RDD執行filter算子過濾掉RDD中較多資料後(比如30%以上的資料),建議使用coalesce算子,手動減少RDD的partition數量,将RDD中的資料壓縮到更少的partition中去。
因為filter之後,RDD的每個partition中都會有很多資料被過濾掉,此時如果照常進行後續的計算,
其實每個task處理的partition中的資料量并不是很多,有一點資源浪費,而且此時處理的task越多,可能速度反而越慢。
是以用coalesce減少partition數量,将RDD中的資料壓縮到更少的partition之後,隻要使用更少的task即可處理完所有的partition。在某些場景下,對于性能的提升會有一定的幫助。
repartitionAndSortWithinPartitions是Spark官網推薦的一個算子,官方建議,如果需要在repartition重分區之後,還要進行排序,建議直接使用repartitionAndSortWithinPartitions算子。
因為該算子可以一邊進行重分區的shuffle操作,一邊進行排序。shuffle與sort兩個操作同時進行,比先shuffle再sort來說,性能可能是要高的。
有時在開發過程中,會遇到需要在算子函數中使用外部變量的場景(尤其是大變量,比如100M以上的大集合),那麼此時就應該使用Spark的廣播(Broadcast)功能來提升性能。
在算子函數中使用到外部變量時,預設情況下,Spark會将該變量複制多個副本,通過網絡傳輸到task中,此時每個task都有一個變量副本。
如果變量本身比較大的話(比如100M,甚至1G),那麼大量的變量副本在網絡中傳輸的性能開銷,以及在各個節點的Executor中占用過多記憶體導緻的頻繁GC,都會極大地影響性能。
是以對于上述情況,如果使用的外部變量比較大,建議使用Spark的廣播功能,對該變量進行廣播。
廣播後的變量,會保證每個Executor的記憶體中,隻駐留一份變量副本,而Executor中的task執行時共享該Executor中的那份變量副本。
這樣的話,可以大大減少變量副本的數量,進而減少網絡傳輸的性能開銷,并減少對Executor記憶體的占用開銷,降低GC的頻率。
樣例類
在Spark中,主要有三個地方涉及到了序列化:
在算子函數中使用到外部變量時,該變量會被序列化後進行網絡傳輸(見“原則七:廣播大變量”中的講解)。
将自定義的類型作為RDD的泛型類型時(比如JavaRDD,Student是自定義類型),所有自定義類型對象,都會進行序列化。是以這種情況下,也要求自定義的類必須實作Serializable接口。
使用可序列化的持久化政策時(比如MEMORY_ONLY_SER),Spark會将RDD中的每個partition都序列化成一個大的位元組數組。
對于這三種出現序列化的地方,我們都可以通過使用Kryo序列化類庫,來優化序列化和反序列化的性能。
Spark預設使用的是Java的序列化機制,也就是ObjectOutputStream/ObjectInputStream API來進行序列化和反序列化。
但是Spark同時支援使用Kryo序列化庫,Kryo序列化類庫的性能比Java序列化類庫的性能要高很多。官方介紹,Kryo序列化機制比Java序列化機制,性能高10倍左右。
Spark之是以預設沒有使用Kryo作為序列化類庫,是因為Kryo要求最好要注冊所有需要進行序列化的自定義類型,是以對于開發者來說,這種方式比較麻煩。
以下是使用Kryo的代碼示例,我們隻要設定序列化類,再注冊要序列化的自定義類型即可(比如算子函數中使用到的外部變量類型、作為RDD泛型類型的自定義類型等):
能用簡單類型盡量用簡單類型
Java中,有三種類型比較耗費記憶體:
對象,每個Java對象都有對象頭、引用等額外的資訊,是以比較占用記憶體空間。
字元串,每個字元串内部都有一個字元數組以及長度等額外資訊。
集合類型,比如HashMap、LinkedList等,因為集合類型内部通常會使用一些内部類來封裝集合元素,比如Map.Entry。
是以Spark官方建議,在Spark編碼實作中,特别是對于算子函數中的代碼,盡量不要使用上述三種資料結構,
盡量使用字元串替代對象,使用原始類型(比如Int、Long)替代字元串,使用數組替代集合類型,這樣盡可能地減少記憶體占用,進而降低GC頻率,提升性能。