天天看點

Spark core中的cache、persist差別,以及緩存級别詳解

概述

本次我們将學習Spark core中的cache操作以及和 persist的差別。首先大家可能想到的是cache到底是什麼呢?他有什麼作用呢?我們可以帶着這兩個問題進行下面的學習。

本文結構:

1. cache的産生背景

2. cache的作用

3. 源碼解析cache于persist的差別,以及緩存級别詳解

1 cache的産生背景

我們先做一個簡單的測試讀取一個本地檔案做一次collect操作

val rdd=sc.textFile("file:///home/hadoop/data/input.txt")
val rdd=sc.textFile("file:///home/hadoop/data/input.txt")
           

上面我們進行了兩次相同的操作,觀察日志我們發現這樣一句話

Submitting ResultStage 0 (file:///home/hadoop/data/input.txt MapPartitionsRDD[1] at textFile at <console>:25), which has no missing parents

,每次都要去本地讀取input.txt檔案,這裡大家能想到存在什麼問題嗎? 如果我的檔案很大,每次都對相同的RDD進行同一個action操作,那麼每次都要到本地讀取檔案,得到相同的結果。不斷進行這樣的重複操作,耗費資源浪費時間啊。這時候我們可能想到能不能把RDD儲存在記憶體中呢?答案是可以的,這就是我們所要學習的cache。

2 cache的作用

通過上面的講解我們知道,有時候很多地方都會用到同一個RDD, 那麼每個地方遇到Action操作的時候都會對同一個算子計算多次,這樣會造成效率低下的問題。通過cache操作可以把RDD持久化到記憶體或者磁盤。

現在我們利用上面說的例子,把rdd進行cache操作

rdd.cache

這時候我們打開

192.168.137.130:4040

界面檢視storage界面中是否有我們的剛才cache的檔案,發現并沒有。這時候我們進行一個action操作

rdd.count

。繼續檢視storage是不是有東西了哈,

Spark core中的cache、persist差別,以及緩存級别詳解

并且給我們列出了很多資訊,存儲級别(後面詳解),大小(會發現要比源檔案大,這也是一個調優點)等等。

說到這裡小夥伴能能想到什麼呢? cacha是一個Tranformation還是一個Action呢?相信大夥應該知道了。

cache這個方法也是個Tranformation,當第一次遇到Action算子的時才會進行持久化,是以說我們第一次進行了cache操作在ui中并沒有看到結果,進行了count操作才有。

3 源碼解析cache于persist的差別,以及緩存級别詳解

Spark版本:2.2.0

  • 源碼分析
/**
   * Persist this RDD with the default storage level (`MEMORY_ONLY`).
   */
  def cache(): this.type = persist()
           

從源碼中可以明顯看出cache()調用了persist(), 想要知道二者的不同還需要看一下persist函數:

(這裡注釋cache的storage level)

/**
   * Persist this RDD with the default storage level (`MEMORY_ONLY`).
   */
  def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)
           

可以看到persist()内部調用了persist(StorageLevel.MEMORY_ONLY),是不是和上面對上了哈,這裡我們能夠得出cache和persist的差別了:cache隻有一個預設的緩存級别MEMORY_ONLY ,而persist可以根據情況設定其它的緩存級别。

我相信小夥伴們肯定很好奇這個緩存級别到底有多少種呢?我們繼續怼源碼看看:

**
 * :: DeveloperApi ::
 * Flags for controlling the storage of an RDD. Each StorageLevel records whether to use memory,
 * or ExternalBlockStore, whether to drop the RDD to disk if it falls out of memory or
 * ExternalBlockStore, whether to keep the data in memory in a serialized format, and whether
 * to replicate the RDD partitions on multiple nodes.
 *
 * The [[org.apache.spark.storage.StorageLevel]] singleton object contains some static constants
 * for commonly useful storage levels. To create your own storage level object, use the
 * factory method of the singleton object (`StorageLevel(...)`).
 */
@DeveloperApi
class StorageLevel private(
    private var _useDisk: Boolean,
    private var _useMemory: Boolean,
    private var _useOffHeap: Boolean,
    private var _deserialized: Boolean,
    private var _replication: Int = 1)
  extends Externalizable 
           

我們先來看看存儲類型,源碼中我們可以看出有五個參數,分别代表:

useDisk:使用硬碟(外存);

useMemory:使用記憶體;

useOffHeap:使用堆外記憶體,這是Java虛拟機裡面的概念,堆外記憶體意味着把記憶體對象配置設定在Java虛拟機的堆以外的記憶體,這些記憶體直接受作業系統管理(而不是虛拟機)。這樣做的結果就是能保持一個較小的堆,以減少垃圾收集對應用的影響。這部分記憶體也會被頻繁的使用而且也可能導緻OOM,它是通過存儲在堆中的DirectByteBuffer對象進行引用,可以避免堆和堆外資料進行來回複制;

deserialized:反序列化,其逆過程式列化(Serialization)是java提供的一種機制,将對象表示成一連串的位元組;而反序列化就表示将位元組恢複為對象的過程。序列化是對象永久化的一種機制,可以将對象及其屬性儲存起來,并能在反序列化後直接恢複這個對象;

replication:備份數(在多個節點上備份,預設為1)。

我們接着看看緩存級别:

/**
 * Various [[org.apache.spark.storage.StorageLevel]] defined and utility functions for creating
 * new storage levels.
 */
object StorageLevel {
  val NONE = new StorageLevel(false, false, false, false)
  val DISK_ONLY = new StorageLevel(true, false, false, false)
  val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
  val MEMORY_ONLY = new StorageLevel(false, true, false, true)
  val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
  val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
  val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
  val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
  val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
  val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
  val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
  val OFF_HEAP = new StorageLevel(true, true, true, false, 1)

           

可以看到這裡列出了12種緩存級别,**但這些有什麼差別呢?**可以看到每個緩存級别後面都跟了一個StorageLevel的構造函數,裡面包含了4個或5個參數,和上面說的存儲類型是相對應的,四個參數是因為有一個是有預設值的。

好吧這裡我又想問小夥伴們一個問題了,這幾種存儲方式什麼意思呢?該如何選擇呢?

官網上進行了詳細的解釋。我這裡介紹一個有興趣的同學可以去官網看看哈。

  • MEMORY_ONLY

    使用反序列化的Java對象格式,将資料儲存在記憶體中。如果記憶體不夠存放所有的資料,某些分區将不會被緩存,并且将在需要時重新計算。這是預設級别。

  • MEMORY_AND_DISK

    使用反序列化的Java對象格式,優先嘗試将資料儲存在記憶體中。如果記憶體不夠存放所有的資料,會将資料寫入磁盤檔案中,下次對這個RDD執行算子時,持久化在磁盤檔案中的資料會被讀取出來使用。

  • MEMORY_ONLY_SER((Java and Scala))

    基本含義同MEMORY_ONLY。唯一的差別是,會将RDD中的資料進行序列化,RDD的每個partition會被序列化成一個位元組數組。這種方式更加節省記憶體,但是會加大cpu負擔。

  • 一個簡單的案例感官行的認識存儲級别的差别
19M     page_views.dat

val rdd1=sc.textFile("file:///home/hadoop/data/page_views.dat")

rdd1.persist().count
           

ui檢視緩存大小

Spark core中的cache、persist差別,以及緩存級别詳解

是不是明顯變大了,我們先删除緩存

rdd1.unpersist()

  • 使用MEMORY_ONLY_SER級别
import org.apache.spark.storage.StorageLevel
rdd1.persist(StorageLevel.MEMORY_ONLY_SER)
rdd1.count
           
Spark core中的cache、persist差別,以及緩存級别詳解

這裡我就用這兩種方式進行對比,大家可以試試其他方式。

那如何選擇呢?哈哈官網也說了。

你可以在記憶體使用和CPU效率之間來做出不同的選擇不同的權衡。

  1. 預設情況下,性能最高的當然是MEMORY_ONLY,但前提是你的記憶體必須足夠足夠大,可以綽綽有餘地存放下整個RDD的所有資料。因為不進行序列化與反序列化操作,就避免了這部分的性能開銷;對這個RDD的後續算子操作,都是基于純記憶體中的資料的操作,不需要從磁盤檔案中讀取資料,性能也很高;而且不需要複制一份資料副本,并遠端傳送到其他節點上。但是這裡必須要注意的是,在實際的生産環境中,恐怕能夠直接用這種政策的場景還是有限的,如果RDD中資料比較多時(比如幾十億),直接用這種持久化級别,會導緻JVM的OOM記憶體溢出異常。
  2. 如果使用MEMORY_ONLY級别時發生了記憶體溢出,那麼建議嘗試使用MEMORY_ONLY_SER級别。該級别會将RDD資料序列化後再儲存在記憶體中,此時每個partition僅僅是一個位元組數組而已,大大減少了對象數量,并降低了記憶體占用。這種級别比MEMORY_ONLY多出來的性能開銷,主要就是序列化與反序列化的開銷。但是後續算子可以基于純記憶體進行操作,是以性能總體還是比較高的。此外,可能發生的問題同上,如果RDD中的資料量過多的話,還是可能會導緻OOM記憶體溢出的異常。
  3. 不要洩漏到磁盤,除非你在記憶體中計算需要很大的花費,或者可以過濾大量資料,儲存部分相對重要的在記憶體中。否則存儲在磁盤中計算速度會很慢,性能急劇降低。
  4. 字尾為_2的級别,必須将所有資料都複制一份副本,并發送到其他節點上,資料複制以及網絡傳輸會導緻較大的性能開銷,除非是要求作業的高可用性,否則不建議使用。

4 删除緩存中的資料

spark自動監視每個節點上的緩存使用,并以最近最少使用的(LRU)方式丢棄舊資料分區。如果您想手動删除RDD,而不是等待它從緩存中掉出來,請使用 RDD.unpersist()方法。

繼續閱讀