MEMORY_ONLY
代碼如下
package com.yxw.Test
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.{SparkConf, SparkContext}
object KryoTest000 {
def main(args: Array[String]): Unit = {
//定義輸入輸出路徑
val inputpath = new Path(args(0)) //file:///E:/BaiduNetdiskDownload/cleaned.log
val outputpath = new Path(args(1)) //file:///E:/BaiduNetdiskDownload/outputpath
//連接配接hdfs
val fsConf = new Configuration()
val fs = FileSystem.get(fsConf)
//路徑存在就删除
if (fs.exists(outputpath)){
fs.delete(outputpath,true)
val path = args(1).toString
println(s"已經删除存在的路徑 $path")
}
//建立sparkcontext
val conf = new SparkConf().setAppName("KryoTest000APP").setMaster("local[4]")
val sc = new SparkContext(conf)
//得到檔案 建立RDD
val files = sc.textFile(args(0))
// files.foreach(println)
//調用utils 持久化
val res = KryoUtils.logCache(files)
//res.collect()
// //序列化方式存到記憶體
KryoUtils.saveLog(res,args(1))
//
Thread.sleep(50000) //睡50s 以便觀察webUI
}
}
KryoUtils.scala
package com.yxw.Test
import org.apache.spark.rdd.RDD
case class INFO(cdn: String, region: String, level: String, date: String, ip: String, domain: String, url: String, traffic: String)
object KryoUtils {
//baidu CN E 2018050103 222.73.34.128 rw.uestc.edu.cn http://rw.uestc.edu.cn/user_upload/15316339776271051.html 72071
def logCache(logs: RDD[String]): RDD[INFO] = {
logs.filter(_.split("\t").length == 8).map(log => {
val info = log.split("\t")
INFO(info(0), info(1), info(2), info(3), info(4), info(5), info(6), info(7))
}).cache()
}
def saveLog(logsCache: RDD[INFO], outputpath: String) = {
logsCache.map(logCache => {
logCache.cdn + "\t" + logCache.region + "\t" + logCache.level + "\t" +
logCache.date + "\t" + logCache.ip + "\t" + logCache.domain + "\t" + logCache.url + "\t" + logCache.traffic
}).repartition(1).saveAsTextFile(outputpath)
}
}
運作的結果如圖:

由于有個shuffle操作 共耗時10s
cache的大小為:
900多M,這簡直恐怖
MEMORY_ONLY_SER 未使用kryo序列化
修改代碼:
結果如下:
序列化後占用記憶體已經顯著減小,耗時也減小為7s
MEMORY_ONLY_SER 使用kryo序列化未注冊
代碼修改如下:
結果如下:
耗時差不多,都是7s,但是記憶體占用稍微減小
MEMORY_ONLY_SER 使用kryo序列化并注冊
修改代碼如下:
結果如圖所示:
可以看到速度,記憶體占用都是以上幾組測試用最優的
注冊kryo序列化并開啟RDD壓縮
這個測試是網上看來的
注意:RDD壓縮隻能存在于序列化的情況下
修改代碼如圖:
運作結果如圖:
持久化的記憶體占用大小僅為45M左右!!!
spark.rdd.compress
這個參數決定了RDD Cache的過程中,RDD資料在序列化之後是否進一步進行壓縮再儲存到記憶體或磁盤上。當然是為了進一步減小Cache資料的尺寸,對于Cache在磁盤上而言,絕對大小大概沒有太大關系,主要是考慮Disk的IO帶寬。而對于Cache在記憶體中,那主要就是考慮尺寸的影響,是否能夠Cache更多的資料,是否能減小Cache資料對GC造成的壓力等。
這兩者,前者通常不會是主要問題,尤其是在RDD Cache本身的目的就是追求速度,減少重算步驟,用IO換CPU的情況下。而後者,GC問題當然是需要考量的,資料量小,占用空間少,GC的問題大概會減輕,但是是否真的需要走到RDD Cache壓縮這一步,或許用其它方式來解決可能更加有效。
是以這個值預設是關閉的,但是如果在磁盤IO的确成為問題或者GC問題真的沒有其它更好的解決辦法的時候,可以考慮啟用RDD壓縮。
以上