天天看點

Spark優化之Kyro序列化

Spark 2.0.2, double[], 使用Kyro序列化加速,和手動注冊類名

Kyro通常比原生的Java預設實作快10倍,是以建議使用Kyro來加速。

如何開啟Kyro

開啟的方法很簡單,就是設參數spark.serializer。有三種方式:

  • 程式内:
val conf = new SparkConf()
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

val spark = SparkSession
  .builder()
  .config(conf)
  .getOrCreate()      
  • spark-submit參數:

    ​​

    ​spark-submit --conf spark.serializer=org.apache.spark.serializer.KryoSerializer​

  • 全局預設配置:conf/spark-defaults.conf:

    ​​

    ​spark.serializer org.apache.spark.serializer.KryoSerializer​

開啟完後要注冊類名

為什麼要注冊類名

Kyro預設序列化執行個體時在前面會寫上類名,比如java.lang.Double,類名越長,額外的存儲開銷越大。為了解決這個問題,Kyro允許将類名注冊進映射表裡,通過配置設定數字ID來替換冗長的類名,比如java.lang.Double使用數字0來代替。這種方式節省了儲存空間,但代價是我們必須手動将所有性能相關的類名注冊。

spark使用Twitter chill注冊了常用的Scala類,也對自己的常用類都進行了注冊,具體見KryoSerializer.scala。但很遺憾,在實際使用中,仍然有大量的類名未包含其中,必須手動注冊。

怎麼注冊類名

  1. 找到沒有注冊的類名
conf.set("spark.kryo.registrationRequired", "true")      

開啟​

​spark.kryo.registrationRequired=true​

​,于是Kyro遇到沒有注冊的類名時就會抛異常告警。于是,一遍遍反複排查直到完全跑通,純體力活。

  1. 如何注冊私有類?

    程式内部時,spark指南中使用classOf方法來找類名,

val conf = new SparkConf().setMaster(...).setAppName(...)
conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))      

它的問題是,你沒法導入其他包的私有類。解決方法是使用​

​Class.forName​

​,如:

conf.registerKryoClasses(Array(Class.forName("org.apache.spark.SomePrivateClass")))      

因為KyroSerializer也是使用Class.forName來解析spark.kryo.classesToRegister字段,是以直接指定類名即可。

  1. 如何注冊Java原生類型和數組?

    使用Class.forName來擷取,具體的書寫規則見Class.getName()。

舉點例子:

Element Type Encoding
double[] "[B"
double[][] "[[B"
Object()
  1. 範例

    最後,貼份對spark-ml中邏輯回歸和XGBoost的手動注冊類名,己經囊括了所有用法,

spark.kryo.classesToRegister      org.apache.spark.mllib.stat.MultivariateOnlineSummarizer,[D,[I,[F,org.apache.spark.ml.classification.MultiClassSummarizer,org.apache.spark.ml.classification.LogisticAggregator,ml.dmlc.xgboost4j.scala.Booster,ml.dmlc.xgboost4j.java.Booster,[Lml.dmlc.xgboost4j.scala.Booster;,org.apache.spark.ml.feature.LabeledPoint,org.apache.spark.ml.linalg.SparseVector,org.apache.spark.mllib.evaluation.binary.BinaryLabelCounter,scala.reflect.ClassTag$$anon$1,java.lang.Class,[Lorg.apache.spark.mllib.evaluation.binary.BinaryLabelCounter;,scala.collection.mutable.WrappedArray$ofRef,[Ljava.lang.String;,[Lorg.apache.spark.sql.execution.datasources.HadoopFsRelation$FakeFileStatus;,org.apache.spark.sql.execution.datasources.HadoopFsRelation$FakeFileStatus,[Lorg.apache.spark.sql.execution.datasources.HadoopFsRelation$FakeBlockLocation;,org.apache.spark.sql.execution.datasources.HadoopFsRelation$FakeBlockLocation,org.apache.spark.sql.execution.columnar.CachedBatch,org.apache.spark.ml.feature.Instance,[[B,org.apache.spark.sql.catalyst.expressions.GenericInternalRow,[Ljava.lang.Object;      

繼續閱讀