天天看點

Spark 中 RDD 持久化

1、RDD Cache 緩存

1、介紹

RDD 通過 Cache 或者 Persist 方法将前面的計算結果緩存,預設情況下會把資料以序列化的形式緩存在 JVM 的堆記憶體中。但是并不是這兩個方法被調用時立即緩存,而是觸發後面的 action 算子時,該 RDD 将會被緩存在計算節點的記憶體中,并供後面重用。

2、代碼

package spark.core.lasting

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

/**
 * RDD Cache 緩存
 */
object Spark_RDD_Lasting_Study1 {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().
      setMaster("local[*]").
      set("spark.driver.host", "localhost").
      setAppName("rdd")
    val sc = new SparkContext(sparkConf)

    val fileRDD: RDD[String] = sc.textFile("input/word.txt")

    // cache 操作會增加血緣關系,不改變原有的血緣關系。
    println(fileRDD.toDebugString)
    println("------------------")

    // 資料緩存
    fileRDD.cache()
    println(fileRDD.toDebugString)

    fileRDD.collect()
  }
}
           

運作結果:

Spark 中 RDD 持久化

3、存儲級别

object StorageLevel {
  val NONE = new StorageLevel(false, false, false, false)
  val DISK_ONLY = new StorageLevel(true, false, false, false)
  val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
  val MEMORY_ONLY = new StorageLevel(false, true, false, true)
  val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
  val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
  val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
  val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
  val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
  val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
  val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
  val OFF_HEAP = new StorageLevel(true, true, true, false, 1)
}
           
Spark 中 RDD 持久化

緩存有可能丢失,或者存儲于記憶體的資料由于記憶體不足而被删除,RDD 的緩存容錯機制保證了即使緩存丢失也能保證計算的正确執行。通過基于 RDD 的一系列轉換,丢失的資料會被重算,由于 RDD 的各個 Partition 是相對獨立的,是以隻需要計算丢失的部分即可,并不需要重算全部 Partition。

2、RDD CheckPoint 檢查點

1、介紹

檢查點其實就是通過将 RDD 中間結果寫入磁盤,由于血緣依賴過長會造成容錯成本過高,這樣就不如在中間階段做檢查點容錯,如果檢查點之後有節點出現問題,可以從檢查點開始重做血緣,減少了開銷。對 RDD 進行 checkpoint 操作并不會馬上被執行,必須執行 Action 操作才能觸發。

2、代碼

package spark.core.lasting

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

/**
 * RDD CheckPoint 檢查點
 */
object Spark_RDD_Lasting_Study2 {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().
      setMaster("local[*]").
      set("spark.driver.host", "localhost").
      setAppName("rdd")
    val sc = new SparkContext(sparkConf)

    // 1、設定檢查點路徑
    sc.setCheckpointDir("check/checkpoint1")

    // 2、建立一個RDD
    val lineRdd: RDD[String] = sc.textFile("input/word.txt")

    // 3、業務邏輯
    val wordRdd: RDD[String] = lineRdd.flatMap(line => line.split(" "))

    val wordToOneRdd: RDD[(String, Long)] = wordRdd.map {
      word => {
        (word, System.currentTimeMillis())
      }
    }

    // 4、增加緩存,避免再重新跑一個 job 做 checkpoint
    wordToOneRdd.cache()

    // 5、資料檢查點:針對wordToOneRdd做檢查點計算
    wordToOneRdd.checkpoint()

    // 6、觸發執行邏輯
    wordToOneRdd.collect().foreach(println)
  }
}
           

3、緩存和檢查點差別

(1) Cache 緩存隻是将資料儲存起來,不切斷血緣依賴。Checkpoint 檢查點切斷血緣依賴。

(2) Cache緩存的資料通常存儲在磁盤、記憶體等地方,可靠性低。Checkpoint 的資料通常存儲在 HDFS 等容錯、高可用的檔案系統,可靠性高。

(3) 建議對 checkpoint() 的 RDD 使用 Cache 緩存,這樣 checkpoint 的 job 隻需從 Cache 緩存中讀取資料即可,否則需要再從頭計算一次 RDD。