怎麼建立checkpoint
首先需要用sparkContext設定hdfs的checkpoint的目錄(如果不設定使用checkpoint會抛出異常:throw new SparkException(“Checkpoint directory has not been set in the SparkContext”):
scala> sc.setCheckpointDir("hdfs://lijie:9000/checkpoint0727")
執行了上面的代碼,hdfs裡面會建立一個目錄:
/checkpoint0727/c1a51ee9-1daf-4169-991e-b290f88bac20
然後執行checkpoint
scala> val rdd1 = sc.parallelize(1 to 10000)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:27
scala> rdd1.checkpoint
發現hdfs中還是沒有資料,通過collect然後hdfs就有資料了,說明checkpoint也是個transformation的算子
scala> rdd1.sum
res2: Double = 5.0005E7
#其中hdfs
[[email protected] hadoop]# hadoop dfs -ls /checkpoint0727/c1a51ee9-1daf-4169-991e-b290f88bac20/rdd-0
DEPRECATED: Use of this script to execute hdfs command is deprecated.
Instead use the hdfs command for it.
Found 2 items
-rw-r--r-- 3 root supergroup 53404 2017-07-24 14:26 /checkpoint0727/c1a51ee9-1daf-4169-991e-b290f88bac20/rdd-0/part-00000
-rw-r--r-- 3 root supergroup 53404 2017-07-24 14:26 /checkpoint0727/c1a51ee9-1daf-4169-991e-b290f88bac20/rdd-0/part-00001
但是執行的時候相當于走了兩次流程,sum的時候前面計算了一遍,然後checkpoint又會計算一次,是以一般我們先進行cache然後做checkpoint就會隻走一次流程,checkpoint的時候就會從剛cache到記憶體中取資料寫入hdfs中,如下:
rdd.cache()
rdd.checkpoint()
rdd.collect
在checkpoint的時候強烈建議先進行cache,并且當你checkpoint執行成功了,那麼前面所有的RDD依賴都會被銷毀,如下: