天天看點

Spark中的checkpoint用法

怎麼建立checkpoint

首先需要用sparkContext設定hdfs的checkpoint的目錄(如果不設定使用checkpoint會抛出異常:throw new SparkException(“Checkpoint directory has not been set in the SparkContext”):

scala> sc.setCheckpointDir("hdfs://lijie:9000/checkpoint0727")

執行了上面的代碼,hdfs裡面會建立一個目錄:

/checkpoint0727/c1a51ee9-1daf-4169-991e-b290f88bac20

然後執行checkpoint

scala> val rdd1 = sc.parallelize(1 to 10000)

rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:27

scala> rdd1.checkpoint

發現hdfs中還是沒有資料,通過collect然後hdfs就有資料了,說明checkpoint也是個transformation的算子

scala> rdd1.sum

res2: Double = 5.0005E7  

#其中hdfs

[[email protected] hadoop]# hadoop dfs -ls /checkpoint0727/c1a51ee9-1daf-4169-991e-b290f88bac20/rdd-0

DEPRECATED: Use of this script to execute hdfs command is deprecated.

Instead use the hdfs command for it.

Found 2 items

-rw-r--r--   3 root supergroup      53404 2017-07-24 14:26 /checkpoint0727/c1a51ee9-1daf-4169-991e-b290f88bac20/rdd-0/part-00000

-rw-r--r--   3 root supergroup      53404 2017-07-24 14:26 /checkpoint0727/c1a51ee9-1daf-4169-991e-b290f88bac20/rdd-0/part-00001

但是執行的時候相當于走了兩次流程,sum的時候前面計算了一遍,然後checkpoint又會計算一次,是以一般我們先進行cache然後做checkpoint就會隻走一次流程,checkpoint的時候就會從剛cache到記憶體中取資料寫入hdfs中,如下:

rdd.cache()

rdd.checkpoint()

rdd.collect

在checkpoint的時候強烈建議先進行cache,并且當你checkpoint執行成功了,那麼前面所有的RDD依賴都會被銷毀,如下:

繼續閱讀