Spark 2.0.2, double[], 使用Kyro序列化加速,和手動注冊類名
Kyro通常比原生的Java預設實作快10倍,是以建議使用Kyro來加速。
如何開啟Kyro
開啟的方法很簡單,就是設參數spark.serializer。有三種方式:
- 程式内:
val conf = new SparkConf()
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
val spark = SparkSession
.builder()
.config(conf)
.getOrCreate()
-
spark-submit參數:
spark-submit --conf spark.serializer=org.apache.spark.serializer.KryoSerializer
-
全局預設配置:conf/spark-defaults.conf:
spark.serializer org.apache.spark.serializer.KryoSerializer
開啟完後要注冊類名
為什麼要注冊類名
Kyro預設序列化執行個體時在前面會寫上類名,比如java.lang.Double,類名越長,額外的存儲開銷越大。為了解決這個問題,Kyro允許将類名注冊進映射表裡,通過配置設定數字ID來替換冗長的類名,比如java.lang.Double使用數字0來代替。這種方式節省了儲存空間,但代價是我們必須手動将所有性能相關的類名注冊。
spark使用Twitter chill注冊了常用的Scala類,也對自己的常用類都進行了注冊,具體見KryoSerializer.scala。但很遺憾,在實際使用中,仍然有大量的類名未包含其中,必須手動注冊。
怎麼注冊類名
- 找到沒有注冊的類名
conf.set("spark.kryo.registrationRequired", "true")
開啟
spark.kryo.registrationRequired=true
,于是Kyro遇到沒有注冊的類名時就會抛異常告警。于是,一遍遍反複排查直到完全跑通,純體力活。
-
如何注冊私有類?
程式内部時,spark指南中使用classOf方法來找類名,
val conf = new SparkConf().setMaster(...).setAppName(...)
conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))
它的問題是,你沒法導入其他包的私有類。解決方法是使用
Class.forName
,如:
conf.registerKryoClasses(Array(Class.forName("org.apache.spark.SomePrivateClass")))
因為KyroSerializer也是使用Class.forName來解析spark.kryo.classesToRegister字段,是以直接指定類名即可。
-
如何注冊Java原生類型和數組?
使用Class.forName來擷取,具體的書寫規則見Class.getName()。
舉點例子:
Element Type | Encoding |
double[] | "[B" |
double[][] | "[[B" |
Object() | |
-
範例
最後,貼份對spark-ml中邏輯回歸和XGBoost的手動注冊類名,己經囊括了所有用法,
spark.kryo.classesToRegister org.apache.spark.mllib.stat.MultivariateOnlineSummarizer,[D,[I,[F,org.apache.spark.ml.classification.MultiClassSummarizer,org.apache.spark.ml.classification.LogisticAggregator,ml.dmlc.xgboost4j.scala.Booster,ml.dmlc.xgboost4j.java.Booster,[Lml.dmlc.xgboost4j.scala.Booster;,org.apache.spark.ml.feature.LabeledPoint,org.apache.spark.ml.linalg.SparseVector,org.apache.spark.mllib.evaluation.binary.BinaryLabelCounter,scala.reflect.ClassTag$$anon$1,java.lang.Class,[Lorg.apache.spark.mllib.evaluation.binary.BinaryLabelCounter;,scala.collection.mutable.WrappedArray$ofRef,[Ljava.lang.String;,[Lorg.apache.spark.sql.execution.datasources.HadoopFsRelation$FakeFileStatus;,org.apache.spark.sql.execution.datasources.HadoopFsRelation$FakeFileStatus,[Lorg.apache.spark.sql.execution.datasources.HadoopFsRelation$FakeBlockLocation;,org.apache.spark.sql.execution.datasources.HadoopFsRelation$FakeBlockLocation,org.apache.spark.sql.execution.columnar.CachedBatch,org.apache.spark.ml.feature.Instance,[[B,org.apache.spark.sql.catalyst.expressions.GenericInternalRow,[Ljava.lang.Object;