天天看點

Spark性能調優:RDD的複用及持久化

避免建立重複的 RDD

通常來說,開發一個 Spark 作業時,首先是基于某個資料源(比如 Hive 表或 HDFS 檔案)建立一個初始的 RDD;接着對這個 RDD 執行某個算子操作,然後得到下一個 RDD;以此類推,循環往複,直到計算出最終我們需要的結果。在這個過程中,多個 RDD 會通過不同的算子操作(比如 map、reduce 等)串起來,這個 “RDD 串”,就是 RDD lineage,也就是 “RDD 的血緣關系鍊”。

我們在開發過程中要注意:對于同一份資料,隻應該建立一個 RDD,不能建立多個 RDD 來代表同一份資料。

在開發 RDD lineage 極其冗長的 Spark 作業時,可能會忘了自己之前對于某一份資料已經建立過一個 RDD 了,進而導緻對于同一份資料,建立了多個 RDD。這就意味着,我們的 Spark 作業會進行多次重複計算來建立多個代表相同資料的 RDD,進而增加了作業的性能開銷。

一個簡單的例子:

<span style="color:#000000"><code>// 需要對名為“hello.txt”的HDFS檔案進行一次map操作,再進行一次reduce操作。也就是說,需要對一份資料執行兩次算子操作。

// 錯誤的做法:對于同一份資料執行多次算子操作時,建立多個RDD。
// 這裡執行了兩次textFile方法,針對同一個HDFS檔案,建立了兩個RDD出來,然後分别對每個RDD都執行了一個算子操作。
// 這種情況下,Spark需要從HDFS上兩次加載hello.txt檔案的内容,并建立兩個單獨的RDD;第二次加載HDFS檔案以及建立RDD的性能開銷,很明顯是白白浪費掉的。
val rdd1 = sc.textFile(<span style="color:#009900 !important">"hdfs://192.168.0.0:8020/hello.txt"</span>)
rdd1.map(<span style="color:#000088 !important">...</span>)
val rdd2 = sc.textFile(<span style="color:#009900 !important">"hdfs://192.168.0.0:8020/hello.txt"</span>)
rdd2.reduce(<span style="color:#000088 !important">...</span>)

// 正确的用法:對于一份資料執行多次算子操作時,隻使用一個RDD。
// 這種寫法很明顯比上一種寫法要好多了,因為我們對于同一份資料隻建立了一個RDD,然後對這一個RDD執行了多次算子操作。
// 但是要注意到這裡為止優化還沒有結束,由于rdd1被執行了兩次算子操作,第二次執行reduce操作的時候,還會再次從源頭處重新計算一次rdd1的資料,是以還是會有重複計算的性能開銷。
// 要徹底解決這個問題,必須結合“原則三:對多次使用的RDD進行持久化”,才能保證一個RDD被多次使用時隻被計算一次。
val rdd1 = sc.textFile(<span style="color:#009900 !important">"hdfs://192.168.0.0:8020/hello.txt"</span>)
rdd1.map(<span style="color:#000088 !important">...</span>)
rdd1.reduce(<span style="color:#000088 !important">...</span>)</code></span>
           

盡可能複用同一個 RDD

除了要避免在開發過程中對一份完全相同的資料建立多個 RDD 之外,在對不同的資料執行算子操作時還要盡可能地複用一個 RDD。比如說,有一個 RDD 的資料格式是 key-value 類型的,另一個是單 value 類型的,這兩個 RDD 的 value 資料是完全一樣的。那麼此時我們可以隻使用 key-value 類型的那個 RDD,因為其中已經包含了另一個的資料。對于類似這種多個 RDD 的資料有重疊或者包含的情況,我們應該盡量複用一個 RDD,這樣可以盡可能地減少 RDD 的數量,進而盡可能減少算子執行的次數。

一個簡單的例子

<span style="color:#000000"><code>// 錯誤的做法。

// 有一個<Long, String>格式的RDD,即rdd1。
// 接着由于業務需要,對rdd1執行了一個map操作,建立了一個rdd2,而rdd2中的資料僅僅是rdd1中的value值而已,也就是說,rdd2是rdd1的子集。
JavaPairRDD<Long, String> rdd1 = <span style="color:#000088 !important">...</span>
JavaRDD<String> rdd2 = rdd1.map(<span style="color:#000088 !important">...</span>)

// 分别對rdd1和rdd2執行了不同的算子操作。
rdd1.reduceByKey(<span style="color:#000088 !important">...</span>)
rdd2.map(<span style="color:#000088 !important">...</span>)

//-------------------------------------------
// 正确的做法。

// 上面這個case中,其實rdd1和rdd2的差別無非就是資料格式不同而已,rdd2的資料完全就是rdd1的子集而已,卻建立了兩個rdd,并對兩個rdd都執行了一次算子操作。
// 此時會因為對rdd1執行map算子來建立rdd2,而多執行一次算子操作,進而增加性能開銷。

// 其實在這種情況下完全可以複用同一個RDD。
// 我們可以使用rdd1,既做reduceByKey操作,也做map操作。
// 在進行第二個map操作時,隻使用每個資料的tuple._2,也就是rdd1中的value值,即可。
JavaPairRDD<Long, String> rdd1 = <span style="color:#000088 !important">...</span>
rdd1.reduceByKey(<span style="color:#000088 !important">...</span>)
rdd1.map(tuple._2...)

// 第二種方式相較于第一種方式而言,很明顯減少了一次rdd2的計算開銷。
// 但是到這裡為止,優化還沒有結束,對rdd1我們還是執行了兩次算子操作,rdd1實際上還是會被計算兩次。
// 是以還需要配合“對多次使用的RDD進行持久化”進行使用,才能保證一個RDD被多次使用時隻被計算一次。
</code></span>
           

對多次使用的 RDD 進行持久化

Spark 中對于一個 RDD 執行多次算子的預設原理是這樣的:每次你對一個 RDD 執行一個算子操作時,都會重新從源頭處計算一遍,計算出那個 RDD 來,然後再對這個 RDD 執行你的算子操作。這種方式的性能是很差的。

是以對于這種情況,我們的建議是:對多次使用的 RDD 進行持久化。此時 Spark 就會根據你的持久化政策,将 RDD 中的資料儲存到記憶體或者磁盤中。以後每次對這個 RDD 進行算子操作時,都會直接從記憶體或磁盤中提取持久化的 RDD 資料,然後執行算子,而不會從源頭處重新計算一遍這個 RDD,再執行算子操作。

<span style="color:#000000"><code>// 如果要對一個RDD進行持久化,隻要對這個RDD調用cache()和persist()即可。

// 正确的做法。
// cache()方法表示:使用非序列化的方式将RDD中的資料全部嘗試持久化到記憶體中。
// 此時再對rdd1執行兩次算子操作時,隻有在第一次執行map算子時,才會将這個rdd1從源頭處計算一次。
// 第二次執行reduce算子時,就會直接從記憶體中提取資料進行計算,不會重複計算一個rdd。
val rdd1 = sc.textFile(<span style="color:#009900 !important">"hdfs://192.168.0.1:9000/hello.txt"</span>).cache()
rdd1.map(<span style="color:#000088 !important">...</span>)
rdd1.reduce(<span style="color:#000088 !important">...</span>)

// persist()方法表示:手動選擇持久化級别,并使用指定的方式進行持久化。
// 比如說,StorageLevel.MEMORY_AND_DISK_SER表示,記憶體充足時優先持久化到記憶體中,記憶體不充足時持久化到磁盤檔案中。
// 而且其中的_SER字尾表示,使用序列化的方式來儲存RDD資料,此時RDD中的每個partition都會序列化成一個大的位元組數組,然後再持久化到記憶體或磁盤中。
// 序列化的方式可以減少持久化的資料對記憶體/磁盤的占用量,進而避免記憶體被持久化資料占用過多,進而發生頻繁GC。
val rdd1 = sc.textFile(<span style="color:#009900 !important">"hdfs://192.168.0.1:9000/hello.txt"</span>).persist(StorageLevel.MEMORY_AND_DISK_SER)
rdd1.map(<span style="color:#000088 !important">...</span>)
rdd1.reduce(<span style="color:#000088 !important">...</span>)</code></span>
           

對于 persist () 方法而言,我們可以根據不同的業務場景選擇不同的持久化級别。

持久化級别 含義
MEMORY_ONLY 使用未序列化的 Java 對象格式,将資料儲存在記憶體中。如果記憶體不夠存放所有的資料,則資料可能就不會進行持久化。那麼下次對這個 RDD 執行算子操作時,那些沒有被持久化的資料,需要從源頭處重新計算一遍。這是預設的持久化政策,使用 cache () 方法時,實際就是使用的這種持久化政策。
MEMORY_AND_DISK 使用未序列化的 Java 對象格式,優先嘗試将資料儲存在記憶體中。如果記憶體不夠存放所有的資料,會将資料寫入磁盤檔案中,下次對這個 RDD 執行算子時,持久化在磁盤檔案中的資料會被讀取出來使用。
MEMORY_ONLY_SER 基本 含義同 MEMORY_ONLY。唯一的差別是,會将 RDD 中的資料進行序列化,RDD 的每個 partition 會被序列化成一個位元組數組。這種方式更加節省記憶體,進而可以避免持久化的資料占用過多記憶體導緻頻繁 GC。
MEMORY_AND_DISK_SER 基本含義同 MEMORY_AND_DISK。唯一的差別是,會将 RDD 中的資料進行序列化,RDD 的每個 partition 會被序列化成一個位元組數組。這種方式更加節省記憶體,進而可以避免持久化的資料占用過多記憶體導緻頻繁 GC。
DISK_ONLY 使用未序列化的 Java 對象格式,将資料全部寫入磁盤檔案中。
MEMORY_ONLY_2, MEMORY_AND_DISK_2, 等等. 對于上述任意一種持久化政策,如果加上字尾_2,代表的是将每個持久化的資料,都複制一份副本,并将副本儲存到其他節點上。這種基于副本的持久化機制主要用于進行容錯。假如某個節點挂掉,節點的記憶體或磁盤中的持久化資料丢失了,那麼後續對 RDD 計算時還可以使用該資料在其他節點上的副本。如果沒有副本的話,就隻能将這些資料從源頭處重新計算一遍了。

如何選擇一種最合适的持久化政策

  • 預設情況下,性能最高的當然是 MEMORY_ONLY,但前提是你的記憶體必須足夠足夠大,可以綽綽有餘地存放下整個 RDD 的所有資料。因為不進行序列化與反序列化操作,就避免了這部分的性能開銷;對這個 RDD 的後續算子操作,都是基于純記憶體中的資料的操作,不需要從磁盤檔案中讀取資料,性能也很高;而且不需要複制一份資料副本,并遠端傳送到其他節點上。但是這裡必須要注意的是,在實際的生産環境中,恐怕能夠直接用這種政策的場景還是有限的,如果 RDD 中資料比較多時(比如幾十億),直接用這種持久化級别,會導緻 JVM 的 OOM 記憶體溢出異常。
  • 如果使用 MEMORY_ONLY 級别時發生了記憶體溢出,那麼建議嘗試使用 MEMORY_ONLY_SER 級别。該級别會将 RDD 資料序列化後再儲存在記憶體中,此時每個 partition 僅僅是一個位元組數組而已,大大減少了對象數量,并降低了記憶體占用。這種級别比 MEMORY_ONLY 多出來的性能開銷,主要就是序列化與反序列化的開銷。但是後續算子可以基于純記憶體進行操作,是以性能總體還是比較高的。此外,可能發生的問題同上,如果 RDD 中的資料量過多的話,還是可能會導緻 OOM 記憶體溢出的異常。
  • 如果純記憶體的級别都無法使用,那麼建議使用 MEMORY_AND_DISK_SER 政策,而不是 MEMORY_AND_DISK 政策。因為既然到了這一步,就說明 RDD 的資料量很大,記憶體無法完全放下。序列化後的資料比較少,可以節省記憶體和磁盤的空間開銷。同時該政策會優先盡量嘗試将資料緩存在記憶體中,記憶體緩存不下才會寫入磁盤。
  • 通常不建議使用 DISK_ONLY 和字尾為_2 的級别:因為完全基于磁盤檔案進行資料的讀寫,會導緻性能急劇降低,有時還不如重新計算一次所有 RDD。字尾為_2 的級别,必須将所有資料都複制一份副本,并發送到其他節點上,資料複制以及網絡傳輸會導緻較大的性能開銷,除非是要求作業的高可用性,否則不建議使用。

繼續閱讀