Spark中的checkpoint作用與用法
2017年07月27日 23:19:11
閱讀數:7234
checkpoint的意思就是建立檢查點,類似于快照,例如在spark計算裡面 計算流程DAG特别長,伺服器需要将整個DAG計算完成得出結果,但是如果在這很長的計算流程中突然中間算出的資料丢失了,spark又會根據RDD的依賴關系從頭到尾計算一遍,這樣子就很費性能,當然我們可以将中間的計算結果通過cache或者persist放到記憶體或者磁盤中,但是這樣也不能保證資料完全不會丢失,存儲的這個記憶體出問題了或者磁盤壞了,也會導緻spark從頭再根據RDD計算一遍,是以就有了checkpoint,其中checkpoint的作用就是将DAG中比較重要的中間資料做一個檢查點将結果存儲到一個高可用的地方(通常這個地方就是HDFS裡面)
· 說道checkpoint就得說說RDD的依賴
比如我們計算wordcount的時候:
sc.textFile("hdfspath").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).saveAsTextFile("hdfspath")
1.在textFile讀取hdfs的時候就會先建立一個HadoopRDD,其中這個RDD是去讀取hdfs的資料key為偏移量value為一行資料,因為通常來講偏移量沒有太大的作用是以然後會将HadoopRDD轉化為MapPartitionsRDD,這個RDD隻保留了hdfs的資料
2.flatMap 産生一個RDD MapPartitionsRDD
3.map 産生一個RDD MapPartitionsRDD
4.reduceByKey 産生一個RDD ShuffledRDD
5.saveAsTextFile 産生一個RDD MapPartitionsRDD
可以根據檢視RDD的依賴:
scala> val rdd = sc.textFile("hdfs://lijie:9000/checkpoint0727/c1a51ee9-1daf-4169-991e-b290f88bac20/rdd-0/part-00000").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)
rdd: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[29] at reduceByKey at <console>:27
scala> rdd.toDebugString
res3: String =
(2) ShuffledRDD[29] at reduceByKey at <console>:27 []
+-(2) MapPartitionsRDD[28] at map at <console>:27 []
| MapPartitionsRDD[27] at flatMap at <console>:27 []
| hdfs://lijie:9000/checkpoint0727/c1a51ee9-1daf-4169-991e-b290f88bac20/rdd-0/part-00000 MapPartitionsRDD[26] at textFile at <console>:27 []
| hdfs://lijie:9000/checkpoint0727/c1a51ee9-1daf-4169-991e-b290f88bac20/rdd-0/part-00000 HadoopRDD[25] at textFile at <console>:27 []
· 怎麼建立checkpoint
首先需要用sparkContext設定hdfs的checkpoint的目錄(如果不設定使用checkpoint會抛出異常:throw new SparkException(“Checkpoint directory has not been set in the SparkContext”):
scala> sc.setCheckpointDir("hdfs://lijie:9000/checkpoint0727")
執行了上面的代碼,hdfs裡面會建立一個目錄:
/checkpoint0727/c1a51ee9-1daf-4169-991e-b290f88bac20
然後執行checkpoint
scala> val rdd1 = sc.parallelize(1 to 10000)rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:27
scala> rdd1.checkpoint
發現hdfs中還是沒有資料,通過collect然後hdfs就有資料了,說明checkpoint也是個transformation的算子
scala> rdd1.sum
res2: Double = 5.0005E7
#其中hdfs
[[email protected] hadoop]# hadoop dfs -ls /checkpoint0727/c1a51ee9-1daf-4169-991e-b290f88bac20/rdd-0
DEPRECATED: Use of this script to execute hdfs command is deprecated.
Instead use the hdfs command for it.
Found 2 items
-rw-r--r-- 3 root supergroup 53404 2017-07-24 14:26 /checkpoint0727/c1a51ee9-1daf-4169-991e-b290f88bac20/rdd-0/part-00000
-rw-r--r-- 3 root supergroup 53404 2017-07-24 14:26 /checkpoint0727/c1a51ee9-1daf-4169-991e-b290f88bac20/rdd-0/part-00001
但是執行的時候相當于走了兩次流程,sum的時候前面計算了一遍,然後checkpoint又會計算一次,是以一般我們先進行cache然後做checkpoint就會隻走一次流程,checkpoint的時候就會從剛cache到記憶體中取資料寫入hdfs中,如下:
rdd.cache()
rdd.checkpoint()
rdd.collect
其中,在checkpoint的時候強烈建議先進行cache,并且當你checkpoint執行成功了,那麼前面所有的RDD依賴都會被銷毀,如下: