天天看點

Spark 持久化算子

1.控制算子

  • 概念:

    控制算子有三種,cache,persist,checkpoint,以上算子都可以将 RDD 持久化,持久化的機關是 partition。cache 和 persist 都是懶執行的。 必須有一個 action 類算子觸發執行。checkpoint 算子不僅能将 RDD 持久化到磁盤,還能切斷 RDD 之間的依賴關系。

2. cache

預設将 RDD 的資料持久化到記憶體中。cache 是懶執行。

注意:chche () = persist()=persist(StorageLevel.Memory_Only)

scala測試代碼

package persist

import org.apache.spark.{SparkConf, SparkContext}

/**
 * @Author yqq
 * @Date 2021/12/07 15:58
 * @Version 1.0
 */
object CacheTest {
  def main(args: Array[String]): Unit = {
    var lines = new SparkContext(
      new SparkConf()
        .setAppName("cache")
        .setMaster("local")
    ).textFile("data/persistData.txt")
    lines = lines.cache()
    val t1 = System.currentTimeMillis()
    val l = lines.count()
    val t2 = System.currentTimeMillis()
    println(s"從磁盤讀取資料:l = $l,time=${t2-t1}ms")
    val t3 = System.currentTimeMillis()
    val r = lines.count()
    val t4 = System.currentTimeMillis()
    println(s"從記憶體讀取資料:l = $r,time=${t4-t3}ms")
  }

}      
Spark 持久化算子
Spark 持久化算子

java測試代碼

package persists;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;

/**
 * @Author yqq
 * @Date 2021/12/07 16:24
 * @Version 1.0
 */
public class CacheTest {
    public static void main(String[] args) {
        JavaRDD<String> lines = new JavaSparkContext(
                new SparkConf()
                        .setMaster("local")
                        .setAppName("cache")
        ).textFile("data/persistData.txt");
        lines.cache();
        long t1 = System.currentTimeMillis();
        long l = lines.count();
        long t2 = System.currentTimeMillis();
        System.out.println("從磁盤讀取資料:line="+l+",time:"+(t2-t1)+"ms");
        long t3 = System.currentTimeMillis();
        long r = lines.count();
        long t4 = System.currentTimeMillis();
        System.out.println("從磁盤讀取資料:line="+r+",time:"+(t4-t3)+"ms");
    }
}      
Spark 持久化算子
Spark 持久化算子

3. persist

可以指定持久化的級别。最常用的是 MEMORY_ONLY 和 MEMORY_AND_DISK。”_2”表示有副本數。

持久化級别如下:

Spark 持久化算子
Spark 持久化算子

scala測試代碼

package persists

import org.apache.spark.storage.StorageLevel
import org.apache.spark.{SparkConf, SparkContext}

/**
 * @Author yqq
 * @Date 2021/12/07 16:37
 * @Version 1.0
 */
object PersistTest {
  /**
   * persist :持久化算子,可以手動指定持久化的級别。懶執行算子,需要action算子進行觸發。
   *   persist() 預設将資料持久化到記憶體中,cache() = persist() = persist(StorageLevel.MEMORY_ONLY)
   *   常用的持久化級别:
   *     MEMORY_ONLY :資料放在記憶體中
   *     MEMORY_ONLY_SER:資料序列化之後存放在記憶體中,序列化之後的資料節省空間。時間換取空間。
   *     MEMORY_AND_DISK: 資料首先存放記憶體,記憶體存不下放磁盤
   *     MEMORY_AND_DISK_SER: 資料序列化之後,首先放記憶體,存不下放磁盤。
   *   注意:
   *     盡量不使用“_2”級别。
   *     盡量避免使用“DISK_ONLY”級别。
   *
   *  cache() 與 persist() 注意問題:
   *  1.cache() 與persist都是懶執行算子,需要action算子觸發執行。持久化資料的機關是partition
   *  2.可以對一個RDD使用cache或者persist之後指派給一個變量,在其他job中使用這個變量就是使用持久化的資料。建議不指派。
   *  3.如果采用第二種方式對RDD進行持久化,對RDD進行持久化之後不能緊跟action算子。
   */
  def main(args: Array[String]): Unit = {
    var lines = new SparkContext(
      new SparkConf()
        .setAppName("cache")
        .setMaster("local")
    ).textFile("data/persistData.txt")
    lines.persist(StorageLevel.MEMORY_ONLY)
    val t1 = System.currentTimeMillis()
    val l = lines.count()
    val t2 = System.currentTimeMillis()
    println(s"從磁盤讀取資料:l = $l,time=${t2-t1}ms")
    val t3 = System.currentTimeMillis()
    val r = lines.count()
    val t4 = System.currentTimeMillis()
    println(s"從記憶體讀取資料:l = $r,time=${t4-t3}ms")
  }

}      
Spark 持久化算子
Spark 持久化算子
  • cache 和 persist 的注意事項

    1 .cache 和 persist 都是懶執行,必須有一個 action 類算子觸發執行。

    2 .cache 和 persist 算子的傳回值可以指派給一個變量,在其他 job 中直 接使用這個變量就是使用持久化的資料了。持久化的機關是 partition。

    3 .cache 和 persist 算子後不能立即緊跟 action 算子。

    4 .cache 和 persist 算子持久化的資料當 applilcation 執行完成之後會被 清除。

    注意誤區:rdd.cache().count() 傳回的不是持久化的 RDD,而是一個數值 了。

4.checkpoint

  1. persist(StorageLevel.DISK_ONLY)與 Checkpoint 的差別?

    1 .checkpoint 需要指定額外的目錄存儲資料,checkpoint 資料是 由外部的存儲系統管理,不是 Spark 架構管理,當 application 完成之後,不會被清空。

    2 .cache() 和 persist() 持久化的資料是由 Spark 架構管理,當 application 完成之後,會被清空。

    3 .checkpoint 多用于儲存狀态。

  2. checkpoint 的執行原理:

    1 .當 RDD 的 job 執行完畢後,會從 finalRDD 從後往前回溯。

    2 .當回溯到某一個 RDD 調用了 checkpoint 方法,會對目前的 RDD 做一個标記。

  3. 優化:對 RDD 執行 checkpoint 之前,最好對這個 RDD 先執行 cache,這樣新啟動的 job 隻需要将記憶體中的資料拷貝到 HDFS 上 就可以,省去了重新計算這一步。
package persist

import org.apache.spark.{SparkConf, SparkContext}

/**
 * @Author yqq
 * @Date 2021/12/07 17:21
 * @Version 1.0
 */
object CheckPointTest {
  /**
   * checkpoint: 将RDD資料持久化到磁盤中,需要指定磁盤目錄。懶執行,需要action算子觸發執行。
   * checkpoint & persist(StorageLevel.DISK_ONLY):
   *   1.checkpoint需要手動指定持久化的目錄,persist不需要手動指定持久化的目錄。
   *   2.checkpoint目錄由外部的存儲系統管理,不由Spark架構管理。當Spark application 執行完成之後,資料目錄不會被清空。
   *     persist 目錄由Spark架構管理,當application執行完成之後資料是被清空的。
   *   3.checkpoint可以切斷RDD的依賴關系,資料當application執行完成之後還會保留,多用于狀态管理。
   *
   *  checkpoint執行流程:
   *    當Spark job執行完成之後,Spark架構會從後往前回溯,找到checkpointRDD 做标記,當回溯完成之後,Spark 重新啟動一個job
   *    來計算checkpointRDD的結果,将結果持久化到指定的目錄中。
   *    優化:對哪個RDD進行checkpoint時,可以先對這個RDD進行cache()。
   *
   */
  def main(args: Array[String]): Unit = {
    val context = new SparkContext(
      new SparkConf()
        .setAppName("cache")
        .setMaster("local")
    )
    var lines = context.textFile("data/words")
    context.setCheckpointDir("data/ck")
    lines.checkpoint()
    println(lines.count())
  }

}      

繼續閱讀