1、RDD Cache 緩存
1、介紹
RDD 通過 Cache 或者 Persist 方法将前面的計算結果緩存,預設情況下會把資料以序列化的形式緩存在 JVM 的堆記憶體中。但是并不是這兩個方法被調用時立即緩存,而是觸發後面的 action 算子時,該 RDD 将會被緩存在計算節點的記憶體中,并供後面重用。
2、代碼
package spark.core.lasting
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
/**
* RDD Cache 緩存
*/
object Spark_RDD_Lasting_Study1 {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().
setMaster("local[*]").
set("spark.driver.host", "localhost").
setAppName("rdd")
val sc = new SparkContext(sparkConf)
val fileRDD: RDD[String] = sc.textFile("input/word.txt")
// cache 操作會增加血緣關系,不改變原有的血緣關系。
println(fileRDD.toDebugString)
println("------------------")
// 資料緩存
fileRDD.cache()
println(fileRDD.toDebugString)
fileRDD.collect()
}
}
運作結果:
![](https://img.laitimes.com/img/__Qf2AjLwojIjJCLyojI0JCLiAzNfRHLGZkRGZkRfJ3bs92YsYTMfVmepNHLxEERNlXU65kMnpnT4VkMMBjVtJWd0ckW65UbM5WOHJWa5kHT20ESjBjUIF2X0hXZ0xCMx81dvRWYoNHLrdEZwZ1Rh5WNXp1bwNjW1ZUba9VZwlHdssmch1mclRXY39CXldWYtlWPzNXZj9mcw1ycz9WL49zZuBnL2UTNzIDOzMjMxEDOwAjMwIzLc52YucWbp5GZzNmLn9Gbi1yZtl2Lc9CX6MHc0RHaiojIsJye.png)
3、存儲級别
object StorageLevel {
val NONE = new StorageLevel(false, false, false, false)
val DISK_ONLY = new StorageLevel(true, false, false, false)
val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
val MEMORY_ONLY = new StorageLevel(false, true, false, true)
val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
val OFF_HEAP = new StorageLevel(true, true, true, false, 1)
}
緩存有可能丢失,或者存儲于記憶體的資料由于記憶體不足而被删除,RDD 的緩存容錯機制保證了即使緩存丢失也能保證計算的正确執行。通過基于 RDD 的一系列轉換,丢失的資料會被重算,由于 RDD 的各個 Partition 是相對獨立的,是以隻需要計算丢失的部分即可,并不需要重算全部 Partition。
2、RDD CheckPoint 檢查點
1、介紹
檢查點其實就是通過将 RDD 中間結果寫入磁盤,由于血緣依賴過長會造成容錯成本過高,這樣就不如在中間階段做檢查點容錯,如果檢查點之後有節點出現問題,可以從檢查點開始重做血緣,減少了開銷。對 RDD 進行 checkpoint 操作并不會馬上被執行,必須執行 Action 操作才能觸發。
2、代碼
package spark.core.lasting
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
/**
* RDD CheckPoint 檢查點
*/
object Spark_RDD_Lasting_Study2 {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().
setMaster("local[*]").
set("spark.driver.host", "localhost").
setAppName("rdd")
val sc = new SparkContext(sparkConf)
// 1、設定檢查點路徑
sc.setCheckpointDir("check/checkpoint1")
// 2、建立一個RDD
val lineRdd: RDD[String] = sc.textFile("input/word.txt")
// 3、業務邏輯
val wordRdd: RDD[String] = lineRdd.flatMap(line => line.split(" "))
val wordToOneRdd: RDD[(String, Long)] = wordRdd.map {
word => {
(word, System.currentTimeMillis())
}
}
// 4、增加緩存,避免再重新跑一個 job 做 checkpoint
wordToOneRdd.cache()
// 5、資料檢查點:針對wordToOneRdd做檢查點計算
wordToOneRdd.checkpoint()
// 6、觸發執行邏輯
wordToOneRdd.collect().foreach(println)
}
}
3、緩存和檢查點差別
(1) Cache 緩存隻是将資料儲存起來,不切斷血緣依賴。Checkpoint 檢查點切斷血緣依賴。
(2) Cache緩存的資料通常存儲在磁盤、記憶體等地方,可靠性低。Checkpoint 的資料通常存儲在 HDFS 等容錯、高可用的檔案系統,可靠性高。
(3) 建議對 checkpoint() 的 RDD 使用 Cache 緩存,這樣 checkpoint 的 job 隻需從 Cache 緩存中讀取資料即可,否則需要再從頭計算一次 RDD。