1.控制算子
-
概念:
控制算子有三種,cache,persist,checkpoint,以上算子都可以将 RDD 持久化,持久化的機關是 partition。cache 和 persist 都是懶執行的。 必須有一個 action 類算子觸發執行。checkpoint 算子不僅能将 RDD 持久化到磁盤,還能切斷 RDD 之間的依賴關系。
2. cache
預設将 RDD 的資料持久化到記憶體中。cache 是懶執行。
注意:chche () = persist()=persist(StorageLevel.Memory_Only)
scala測試代碼
package persist
import org.apache.spark.{SparkConf, SparkContext}
/**
* @Author yqq
* @Date 2021/12/07 15:58
* @Version 1.0
*/
object CacheTest {
def main(args: Array[String]): Unit = {
var lines = new SparkContext(
new SparkConf()
.setAppName("cache")
.setMaster("local")
).textFile("data/persistData.txt")
lines = lines.cache()
val t1 = System.currentTimeMillis()
val l = lines.count()
val t2 = System.currentTimeMillis()
println(s"從磁盤讀取資料:l = $l,time=${t2-t1}ms")
val t3 = System.currentTimeMillis()
val r = lines.count()
val t4 = System.currentTimeMillis()
println(s"從記憶體讀取資料:l = $r,time=${t4-t3}ms")
}
}

java測試代碼
package persists;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
/**
* @Author yqq
* @Date 2021/12/07 16:24
* @Version 1.0
*/
public class CacheTest {
public static void main(String[] args) {
JavaRDD<String> lines = new JavaSparkContext(
new SparkConf()
.setMaster("local")
.setAppName("cache")
).textFile("data/persistData.txt");
lines.cache();
long t1 = System.currentTimeMillis();
long l = lines.count();
long t2 = System.currentTimeMillis();
System.out.println("從磁盤讀取資料:line="+l+",time:"+(t2-t1)+"ms");
long t3 = System.currentTimeMillis();
long r = lines.count();
long t4 = System.currentTimeMillis();
System.out.println("從磁盤讀取資料:line="+r+",time:"+(t4-t3)+"ms");
}
}
3. persist
可以指定持久化的級别。最常用的是 MEMORY_ONLY 和 MEMORY_AND_DISK。”_2”表示有副本數。
持久化級别如下:
scala測試代碼
package persists
import org.apache.spark.storage.StorageLevel
import org.apache.spark.{SparkConf, SparkContext}
/**
* @Author yqq
* @Date 2021/12/07 16:37
* @Version 1.0
*/
object PersistTest {
/**
* persist :持久化算子,可以手動指定持久化的級别。懶執行算子,需要action算子進行觸發。
* persist() 預設将資料持久化到記憶體中,cache() = persist() = persist(StorageLevel.MEMORY_ONLY)
* 常用的持久化級别:
* MEMORY_ONLY :資料放在記憶體中
* MEMORY_ONLY_SER:資料序列化之後存放在記憶體中,序列化之後的資料節省空間。時間換取空間。
* MEMORY_AND_DISK: 資料首先存放記憶體,記憶體存不下放磁盤
* MEMORY_AND_DISK_SER: 資料序列化之後,首先放記憶體,存不下放磁盤。
* 注意:
* 盡量不使用“_2”級别。
* 盡量避免使用“DISK_ONLY”級别。
*
* cache() 與 persist() 注意問題:
* 1.cache() 與persist都是懶執行算子,需要action算子觸發執行。持久化資料的機關是partition
* 2.可以對一個RDD使用cache或者persist之後指派給一個變量,在其他job中使用這個變量就是使用持久化的資料。建議不指派。
* 3.如果采用第二種方式對RDD進行持久化,對RDD進行持久化之後不能緊跟action算子。
*/
def main(args: Array[String]): Unit = {
var lines = new SparkContext(
new SparkConf()
.setAppName("cache")
.setMaster("local")
).textFile("data/persistData.txt")
lines.persist(StorageLevel.MEMORY_ONLY)
val t1 = System.currentTimeMillis()
val l = lines.count()
val t2 = System.currentTimeMillis()
println(s"從磁盤讀取資料:l = $l,time=${t2-t1}ms")
val t3 = System.currentTimeMillis()
val r = lines.count()
val t4 = System.currentTimeMillis()
println(s"從記憶體讀取資料:l = $r,time=${t4-t3}ms")
}
}
-
cache 和 persist 的注意事項
1 .cache 和 persist 都是懶執行,必須有一個 action 類算子觸發執行。
2 .cache 和 persist 算子的傳回值可以指派給一個變量,在其他 job 中直 接使用這個變量就是使用持久化的資料了。持久化的機關是 partition。
3 .cache 和 persist 算子後不能立即緊跟 action 算子。
4 .cache 和 persist 算子持久化的資料當 applilcation 執行完成之後會被 清除。
注意誤區:rdd.cache().count() 傳回的不是持久化的 RDD,而是一個數值 了。
4.checkpoint
-
persist(StorageLevel.DISK_ONLY)與 Checkpoint 的差別?
1 .checkpoint 需要指定額外的目錄存儲資料,checkpoint 資料是 由外部的存儲系統管理,不是 Spark 架構管理,當 application 完成之後,不會被清空。
2 .cache() 和 persist() 持久化的資料是由 Spark 架構管理,當 application 完成之後,會被清空。
3 .checkpoint 多用于儲存狀态。
-
checkpoint 的執行原理:
1 .當 RDD 的 job 執行完畢後,會從 finalRDD 從後往前回溯。
2 .當回溯到某一個 RDD 調用了 checkpoint 方法,會對目前的 RDD 做一個标記。
- 優化:對 RDD 執行 checkpoint 之前,最好對這個 RDD 先執行 cache,這樣新啟動的 job 隻需要将記憶體中的資料拷貝到 HDFS 上 就可以,省去了重新計算這一步。
package persist
import org.apache.spark.{SparkConf, SparkContext}
/**
* @Author yqq
* @Date 2021/12/07 17:21
* @Version 1.0
*/
object CheckPointTest {
/**
* checkpoint: 将RDD資料持久化到磁盤中,需要指定磁盤目錄。懶執行,需要action算子觸發執行。
* checkpoint & persist(StorageLevel.DISK_ONLY):
* 1.checkpoint需要手動指定持久化的目錄,persist不需要手動指定持久化的目錄。
* 2.checkpoint目錄由外部的存儲系統管理,不由Spark架構管理。當Spark application 執行完成之後,資料目錄不會被清空。
* persist 目錄由Spark架構管理,當application執行完成之後資料是被清空的。
* 3.checkpoint可以切斷RDD的依賴關系,資料當application執行完成之後還會保留,多用于狀态管理。
*
* checkpoint執行流程:
* 當Spark job執行完成之後,Spark架構會從後往前回溯,找到checkpointRDD 做标記,當回溯完成之後,Spark 重新啟動一個job
* 來計算checkpointRDD的結果,将結果持久化到指定的目錄中。
* 優化:對哪個RDD進行checkpoint時,可以先對這個RDD進行cache()。
*
*/
def main(args: Array[String]): Unit = {
val context = new SparkContext(
new SparkConf()
.setAppName("cache")
.setMaster("local")
)
var lines = context.textFile("data/words")
context.setCheckpointDir("data/ck")
lines.checkpoint()
println(lines.count())
}
}