概述
本次我們将學習Spark core中的cache操作以及和 persist的差別。首先大家可能想到的是cache到底是什麼呢?他有什麼作用呢?我們可以帶着這兩個問題進行下面的學習。
本文結構:
1. cache的産生背景
2. cache的作用
3. 源碼解析cache于persist的差別,以及緩存級别詳解
1 cache的産生背景
我們先做一個簡單的測試讀取一個本地檔案做一次collect操作
val rdd=sc.textFile("file:///home/hadoop/data/input.txt")
val rdd=sc.textFile("file:///home/hadoop/data/input.txt")
上面我們進行了兩次相同的操作,觀察日志我們發現這樣一句話
Submitting ResultStage 0 (file:///home/hadoop/data/input.txt MapPartitionsRDD[1] at textFile at <console>:25), which has no missing parents
,每次都要去本地讀取input.txt檔案,這裡大家能想到存在什麼問題嗎? 如果我的檔案很大,每次都對相同的RDD進行同一個action操作,那麼每次都要到本地讀取檔案,得到相同的結果。不斷進行這樣的重複操作,耗費資源浪費時間啊。這時候我們可能想到能不能把RDD儲存在記憶體中呢?答案是可以的,這就是我們所要學習的cache。
2 cache的作用
通過上面的講解我們知道,有時候很多地方都會用到同一個RDD, 那麼每個地方遇到Action操作的時候都會對同一個算子計算多次,這樣會造成效率低下的問題。通過cache操作可以把RDD持久化到記憶體或者磁盤。
現在我們利用上面說的例子,把rdd進行cache操作
rdd.cache
這時候我們打開
192.168.137.130:4040
界面檢視storage界面中是否有我們的剛才cache的檔案,發現并沒有。這時候我們進行一個action操作
rdd.count
。繼續檢視storage是不是有東西了哈,
并且給我們列出了很多資訊,存儲級别(後面詳解),大小(會發現要比源檔案大,這也是一個調優點)等等。
說到這裡小夥伴能能想到什麼呢? cacha是一個Tranformation還是一個Action呢?相信大夥應該知道了。
cache這個方法也是個Tranformation,當第一次遇到Action算子的時才會進行持久化,是以說我們第一次進行了cache操作在ui中并沒有看到結果,進行了count操作才有。
3 源碼解析cache于persist的差別,以及緩存級别詳解
Spark版本:2.2.0
- 源碼分析
/**
* Persist this RDD with the default storage level (`MEMORY_ONLY`).
*/
def cache(): this.type = persist()
從源碼中可以明顯看出cache()調用了persist(), 想要知道二者的不同還需要看一下persist函數:
(這裡注釋cache的storage level)
/**
* Persist this RDD with the default storage level (`MEMORY_ONLY`).
*/
def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)
可以看到persist()内部調用了persist(StorageLevel.MEMORY_ONLY),是不是和上面對上了哈,這裡我們能夠得出cache和persist的差別了:cache隻有一個預設的緩存級别MEMORY_ONLY ,而persist可以根據情況設定其它的緩存級别。
我相信小夥伴們肯定很好奇這個緩存級别到底有多少種呢?我們繼續怼源碼看看:
**
* :: DeveloperApi ::
* Flags for controlling the storage of an RDD. Each StorageLevel records whether to use memory,
* or ExternalBlockStore, whether to drop the RDD to disk if it falls out of memory or
* ExternalBlockStore, whether to keep the data in memory in a serialized format, and whether
* to replicate the RDD partitions on multiple nodes.
*
* The [[org.apache.spark.storage.StorageLevel]] singleton object contains some static constants
* for commonly useful storage levels. To create your own storage level object, use the
* factory method of the singleton object (`StorageLevel(...)`).
*/
@DeveloperApi
class StorageLevel private(
private var _useDisk: Boolean,
private var _useMemory: Boolean,
private var _useOffHeap: Boolean,
private var _deserialized: Boolean,
private var _replication: Int = 1)
extends Externalizable
我們先來看看存儲類型,源碼中我們可以看出有五個參數,分别代表:
useDisk:使用硬碟(外存);
useMemory:使用記憶體;
useOffHeap:使用堆外記憶體,這是Java虛拟機裡面的概念,堆外記憶體意味着把記憶體對象配置設定在Java虛拟機的堆以外的記憶體,這些記憶體直接受作業系統管理(而不是虛拟機)。這樣做的結果就是能保持一個較小的堆,以減少垃圾收集對應用的影響。這部分記憶體也會被頻繁的使用而且也可能導緻OOM,它是通過存儲在堆中的DirectByteBuffer對象進行引用,可以避免堆和堆外資料進行來回複制;
deserialized:反序列化,其逆過程式列化(Serialization)是java提供的一種機制,将對象表示成一連串的位元組;而反序列化就表示将位元組恢複為對象的過程。序列化是對象永久化的一種機制,可以将對象及其屬性儲存起來,并能在反序列化後直接恢複這個對象;
replication:備份數(在多個節點上備份,預設為1)。
我們接着看看緩存級别:
/**
* Various [[org.apache.spark.storage.StorageLevel]] defined and utility functions for creating
* new storage levels.
*/
object StorageLevel {
val NONE = new StorageLevel(false, false, false, false)
val DISK_ONLY = new StorageLevel(true, false, false, false)
val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
val MEMORY_ONLY = new StorageLevel(false, true, false, true)
val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
val OFF_HEAP = new StorageLevel(true, true, true, false, 1)
可以看到這裡列出了12種緩存級别,**但這些有什麼差別呢?**可以看到每個緩存級别後面都跟了一個StorageLevel的構造函數,裡面包含了4個或5個參數,和上面說的存儲類型是相對應的,四個參數是因為有一個是有預設值的。
好吧這裡我又想問小夥伴們一個問題了,這幾種存儲方式什麼意思呢?該如何選擇呢?
官網上進行了詳細的解釋。我這裡介紹一個有興趣的同學可以去官網看看哈。
-
MEMORY_ONLY
使用反序列化的Java對象格式,将資料儲存在記憶體中。如果記憶體不夠存放所有的資料,某些分區将不會被緩存,并且将在需要時重新計算。這是預設級别。
-
MEMORY_AND_DISK
使用反序列化的Java對象格式,優先嘗試将資料儲存在記憶體中。如果記憶體不夠存放所有的資料,會将資料寫入磁盤檔案中,下次對這個RDD執行算子時,持久化在磁盤檔案中的資料會被讀取出來使用。
-
MEMORY_ONLY_SER((Java and Scala))
基本含義同MEMORY_ONLY。唯一的差別是,會将RDD中的資料進行序列化,RDD的每個partition會被序列化成一個位元組數組。這種方式更加節省記憶體,但是會加大cpu負擔。
- 一個簡單的案例感官行的認識存儲級别的差别
19M page_views.dat
val rdd1=sc.textFile("file:///home/hadoop/data/page_views.dat")
rdd1.persist().count
ui檢視緩存大小
是不是明顯變大了,我們先删除緩存
rdd1.unpersist()
- 使用MEMORY_ONLY_SER級别
import org.apache.spark.storage.StorageLevel
rdd1.persist(StorageLevel.MEMORY_ONLY_SER)
rdd1.count
這裡我就用這兩種方式進行對比,大家可以試試其他方式。
那如何選擇呢?哈哈官網也說了。
你可以在記憶體使用和CPU效率之間來做出不同的選擇不同的權衡。
- 預設情況下,性能最高的當然是MEMORY_ONLY,但前提是你的記憶體必須足夠足夠大,可以綽綽有餘地存放下整個RDD的所有資料。因為不進行序列化與反序列化操作,就避免了這部分的性能開銷;對這個RDD的後續算子操作,都是基于純記憶體中的資料的操作,不需要從磁盤檔案中讀取資料,性能也很高;而且不需要複制一份資料副本,并遠端傳送到其他節點上。但是這裡必須要注意的是,在實際的生産環境中,恐怕能夠直接用這種政策的場景還是有限的,如果RDD中資料比較多時(比如幾十億),直接用這種持久化級别,會導緻JVM的OOM記憶體溢出異常。
- 如果使用MEMORY_ONLY級别時發生了記憶體溢出,那麼建議嘗試使用MEMORY_ONLY_SER級别。該級别會将RDD資料序列化後再儲存在記憶體中,此時每個partition僅僅是一個位元組數組而已,大大減少了對象數量,并降低了記憶體占用。這種級别比MEMORY_ONLY多出來的性能開銷,主要就是序列化與反序列化的開銷。但是後續算子可以基于純記憶體進行操作,是以性能總體還是比較高的。此外,可能發生的問題同上,如果RDD中的資料量過多的話,還是可能會導緻OOM記憶體溢出的異常。
- 不要洩漏到磁盤,除非你在記憶體中計算需要很大的花費,或者可以過濾大量資料,儲存部分相對重要的在記憶體中。否則存儲在磁盤中計算速度會很慢,性能急劇降低。
- 字尾為_2的級别,必須将所有資料都複制一份副本,并發送到其他節點上,資料複制以及網絡傳輸會導緻較大的性能開銷,除非是要求作業的高可用性,否則不建議使用。
4 删除緩存中的資料
spark自動監視每個節點上的緩存使用,并以最近最少使用的(LRU)方式丢棄舊資料分區。如果您想手動删除RDD,而不是等待它從緩存中掉出來,請使用 RDD.unpersist()方法。