天天看點

Spark的kryo性能測試以及RDD持久化級别

MEMORY_ONLY

代碼如下

package com.yxw.Test

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.{SparkConf, SparkContext}

object KryoTest000 {
  def main(args: Array[String]): Unit = {

    //定義輸入輸出路徑
    val inputpath = new Path(args(0)) //file:///E:/BaiduNetdiskDownload/cleaned.log
    val outputpath = new Path(args(1)) //file:///E:/BaiduNetdiskDownload/outputpath

    //連接配接hdfs
    val fsConf = new Configuration()
    val fs = FileSystem.get(fsConf)

    //路徑存在就删除
    if (fs.exists(outputpath)){
      fs.delete(outputpath,true)
      val path = args(1).toString
      println(s"已經删除存在的路徑 $path")
    }

    //建立sparkcontext
    val conf = new SparkConf().setAppName("KryoTest000APP").setMaster("local[4]")
    val sc = new SparkContext(conf)
    //得到檔案 建立RDD
    val files = sc.textFile(args(0))

   // files.foreach(println)
    //調用utils 持久化
    val res = KryoUtils.logCache(files)
    //res.collect()


//    //序列化方式存到記憶體
    KryoUtils.saveLog(res,args(1))
//
    Thread.sleep(50000) //睡50s 以便觀察webUI


  }
}
           

KryoUtils.scala

package com.yxw.Test

import org.apache.spark.rdd.RDD

case class INFO(cdn: String, region: String, level: String, date: String, ip: String, domain: String, url: String, traffic: String)

object KryoUtils {
  //baidu	CN	E	2018050103	222.73.34.128	rw.uestc.edu.cn	http://rw.uestc.edu.cn/user_upload/15316339776271051.html	72071
  def logCache(logs: RDD[String]): RDD[INFO] = {
    logs.filter(_.split("\t").length == 8).map(log => {
      val info = log.split("\t")
      INFO(info(0), info(1), info(2), info(3), info(4), info(5), info(6), info(7))

    }).cache()


  }

  def saveLog(logsCache: RDD[INFO], outputpath: String) = {
    logsCache.map(logCache => {
      logCache.cdn + "\t" + logCache.region + "\t" + logCache.level + "\t" +
        logCache.date + "\t" + logCache.ip + "\t" + logCache.domain + "\t" + logCache.url + "\t" + logCache.traffic


    }).repartition(1).saveAsTextFile(outputpath)
  }
}
           

運作的結果如圖:

Spark的kryo性能測試以及RDD持久化級别

由于有個shuffle操作 共耗時10s

cache的大小為:

Spark的kryo性能測試以及RDD持久化級别

900多M,這簡直恐怖

MEMORY_ONLY_SER 未使用kryo序列化

修改代碼:

Spark的kryo性能測試以及RDD持久化級别

結果如下:

Spark的kryo性能測試以及RDD持久化級别
Spark的kryo性能測試以及RDD持久化級别

序列化後占用記憶體已經顯著減小,耗時也減小為7s

MEMORY_ONLY_SER 使用kryo序列化未注冊

代碼修改如下:

Spark的kryo性能測試以及RDD持久化級别

結果如下:

Spark的kryo性能測試以及RDD持久化級别
Spark的kryo性能測試以及RDD持久化級别

耗時差不多,都是7s,但是記憶體占用稍微減小

MEMORY_ONLY_SER 使用kryo序列化并注冊

修改代碼如下:

Spark的kryo性能測試以及RDD持久化級别

結果如圖所示:

Spark的kryo性能測試以及RDD持久化級别
Spark的kryo性能測試以及RDD持久化級别

可以看到速度,記憶體占用都是以上幾組測試用最優的

注冊kryo序列化并開啟RDD壓縮

這個測試是網上看來的

注意:RDD壓縮隻能存在于序列化的情況下

修改代碼如圖:

Spark的kryo性能測試以及RDD持久化級别

運作結果如圖:

Spark的kryo性能測試以及RDD持久化級别
Spark的kryo性能測試以及RDD持久化級别

持久化的記憶體占用大小僅為45M左右!!!

spark.rdd.compress

這個參數決定了RDD Cache的過程中,RDD資料在序列化之後是否進一步進行壓縮再儲存到記憶體或磁盤上。當然是為了進一步減小Cache資料的尺寸,對于Cache在磁盤上而言,絕對大小大概沒有太大關系,主要是考慮Disk的IO帶寬。而對于Cache在記憶體中,那主要就是考慮尺寸的影響,是否能夠Cache更多的資料,是否能減小Cache資料對GC造成的壓力等。

這兩者,前者通常不會是主要問題,尤其是在RDD Cache本身的目的就是追求速度,減少重算步驟,用IO換CPU的情況下。而後者,GC問題當然是需要考量的,資料量小,占用空間少,GC的問題大概會減輕,但是是否真的需要走到RDD Cache壓縮這一步,或許用其它方式來解決可能更加有效。

是以這個值預設是關閉的,但是如果在磁盤IO的确成為問題或者GC問題真的沒有其它更好的解決辦法的時候,可以考慮啟用RDD壓縮。

以上

繼續閱讀