天天看點

Zeppelin對Spark進行互動式資料查詢和分析

Zeppelin的資料可視化能力很贊,下面是Zeppelin自帶的Tutorial的顯示效果:

Zeppelin對Spark進行互動式資料查詢和分析

目前Zeppelin還在起步階段,master上代碼的版本号是0.5.0,而官網上的文檔内容基本對應的還是0.4.0的代碼,至于youtube上的那個視訊demo,估計是更早期的版本了。不過沒關系,Zeppelin的UI比較好用,上手沒有什麼難度。

在安裝試玩Zeppelin之前,需要注意的是

Zeppelin支援Spark的local mode,是以可以直接編譯運作,不用事先搭好一個Spark叢集。

Zeppelin目前還不支援Spark 1.2,是以如果想測試叢集模式,要選擇1.1.x系列的Spark。

Zeppelin預設會使用8080作為web端口,并且會占用8081端口作為web socket通信的端口。如果端口沖突,可以設定環境變量ZEPPELIN_PORT來修改端口。

Zeppelin對Spark進行互動式資料查詢和分析

下面是一個用scala從hdfs上讀取資料,然後用sql進行分析的例子

Zeppelin對Spark進行互動式資料查詢和分析

Zeppelin的工作方式和Spark的Thrift Server很像,都是向Spark送出一個應用(Application),然後每一個查詢對應一個stage。

Zeppelin對Spark進行互動式資料查詢和分析

是以,在啟動Zeppelin前,可以通過配置環境變量ZEPPELIN_JAVA_OPTS來對即将啟動的Spark driver進行配置,例如“-Dspark.executor.memory=8g -Dspark.cores.max=64”。

一. Spark的版本不比對。Zeppelin在編譯的時候是需要指定Spark的版本的,是以當Zeppelin的Spark版本,和你連接配接的Spark叢集的版本不一緻時,就會出現下面的異常。

ERROR [2015-03-13 09:40:22,508] ({sparkDriver-akka.actor.default-dispatcher-22} Slf4jLogger.scala[apply$mcV$sp]:66) – org.apache.spark.storage.BlockManagerId; local class incompatible: stream classdesc serialVersionUID = 2439208141545036836, local class serialVersionUID = 1677335532749418220 java.io.InvalidClassException: org.apache.spark.storage.BlockManagerId; local class incompatible: stream classdesc serialVersionUID = 2439208141545036836, local class serialVersionUID = 1677335532749418220 at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:604) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1601) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1514) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1750) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1347) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1964) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1888) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:369) at akka.serialization.JavaSerializer$$anonfun$1.apply(Serializer.scala:136) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at akka.serialization.JavaSerializer.fromBinary(Serializer.scala:136) at akka.serialization.Serialization$$anonfun$deserialize$1.apply(Serialization.scala:104) at scala.util.Try$.apply(Try.scala:161)

解決方法有兩個:

編譯時候指定對應的版本;如果是自己編譯的Spark,就用mvn install把Spark的jar都安裝到本地的maven庫,然後在編譯zeppelin時指定自己的Spark版本即可S,例如“-Dspark.version=1.1.0-SNAPSHOT”。

直接替換{Zeppelin Home}\interpreter\spark下面有關spark的jar包,把”datanucleus-*.jar”和“spark-*.jar”都替換掉。

二. 通路HDFS時,出現異常“class org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$AppendRequestProto overrides final method getUnknownFields.()Lcom/google/protobuf/UnknownFieldSet;”

解決方法是,删掉{Zeppelin Home}\interpreter\spark下面的protobuf*.jar,換成HDFS使用的Protobuf的版本。