天天看點

zeppelin源碼分析(3)——主要的class分析(上)

zeppelin的module、package、class衆多,如何快速地理清頭緒,抓住重點?本文分析zeppelin主要module中重點的類以及它們之間的關系,理清這些類的職責,對于了解zeppelin的運作過程至關重要。

經過之前文章的分析,我們已經了解了zeppelin涉及到架構層面的幾個module為:zeppelin-server、zeppelin-zengine、zeppelin-interpreter,并且三者之間有如下的依賴關系:

zeppelin源碼分析(3)——主要的class分析(上)

本文要分析的主要的class,也都來自于這三個module。

zeppelin源碼分析(3)——主要的class分析(上)

以上類圖中省略了字段和方法,以避免過早引入太多細節,重點關注類與類之間的關系組成。由于篇幅的限制,再加上zeppelin提供的核心價值是與Interpreter相關的多語言repl解釋器,筆者就選擇從右上角黃色的區域開始,分多篇分析。

Interpreter

zeppelin源碼分析(3)——主要的class分析(上)

Interpreter是一個抽象類,該類是zeppelin核心類,Zeppelin提供的核心價值:解釋執行各種語言的代碼,都是通過該抽象類的每個具體的實作類完成的。Interpreter主要規定了各語言repl解釋器需要遵循的“規範(contract)”,包括:

1. repl解釋器的生命周期管理。如open(), close(), destroy(),規定了産生和銷毀repl解釋器。

2. 解釋執行代碼的接口——interpreter(),這些真正産生價值的地方。

3. 執行代碼過程中互動控制和易用性增強,如cancel(), getProgress(), completion(),分别是終止代碼的執行、擷取執行進度以及代碼自動完成。

4. 解釋器的配置接口,如setProperty()、setClassLoaderURL(URL[])等。

5. 性能優化接口,如getScheduler(),getIntepreterGroup()等。

6. 解釋器注冊接口(已經deprecated了),如一系列重載的register接口。

以上展現了zeppelin的repl解釋器程序需要受其主程序ZeppelinServer的控制,也是zeppelin設計決策在代碼中的展現。

注:現在的解釋器注冊通過如下2種方式進行:

  1. 将interpreter-setting.json打包到解釋器的jar檔案中
  2. 放置到如下位置:interpreter/{interpreter}/interpreter-setting.json

RemoteInterpreterService Thrift協定分析

Apache Thrift是跨語言RPC通信架構,提供了相應的DSL(Domain Specific Language)和支援多種語言的代碼生成工具,使得代碼開發人員可以隻關注具體的業務,而不用關注底層的通信細節。zeppelin使用Thrift定義了其主程序ZeppelinServer與需要采用獨立JVM程序運作的各repl解釋器之間的通信協定。

關于為什麼要采用單獨的JVM程序來啟動repl解釋器程序,本系列的第3篇也有提及,這裡再贅述一下:

1. zeppelin旨在提供一個開放的架構,支援多種語言和産品,由于每種語言和産品都是各自獨立演進的,各自的運作時依賴也各不相同,甚至是互相沖突的,如果放在同一JVM中,僅解決沖突,維護各個産品之間的相容性都是一項艱巨的任務,某些産品版本甚至是完全不能相容的。

2. 大資料分析,是否具有橫向擴充能力是production-ready一項重要的衡量名額,如果将repl程序與主程序合在一起,會驗證影響系統性能。

是以,在有必要的時候,zeppelin采用獨立JVM的方式來啟動repl程序,并且采用Thrift協定定義了主程序與RemoteInterpreterService程序之間的通信協定,具體如下:

service RemoteInterpreterService {
  void createInterpreter(: string intpGroupId, : string noteId, : string className, : map<string, string> properties);


  void open(: string noteId, : string className);
  void close(: string noteId, : string className);
  RemoteInterpreterResult interpret(: string noteId, : string className, : string st, : RemoteInterpreterContext interpreterContext);
  void cancel(: string noteId, : string className, : RemoteInterpreterContext interpreterContext);
  i32 getProgress(: string noteId, : string className, : RemoteInterpreterContext interpreterContext);
  string getFormType(: string noteId, : string className);
  list<InterpreterCompletion> completion(: string noteId, : string className, : string buf, : i32 cursor);
  void shutdown();


  string getStatus(: string noteId, :string jobId);


  RemoteInterpreterEvent getEvent();


  // as a response, ZeppelinServer send list of resources to Interpreter process
  void resourcePoolResponseGetAll(: list<string> resources);
  // as a response, ZeppelinServer send serialized value of resource
  void resourceResponseGet(: string resourceId, : binary object);
  // get all resources in the interpreter process
  list<string> resourcePoolGetAll();
  // get value of resource
  binary resourceGet(: string noteId, : string paragraphId, : string resourceName);
  // remove resource
  bool resourceRemove(: string noteId, : string paragraphId, :string resourceName);


  void angularObjectUpdate(: string name, : string noteId, : string paragraphId, : string
  object);
  void angularObjectAdd(: string name, : string noteId, : string paragraphId, : string object);
  void angularObjectRemove(: string name, : string noteId, : string paragraphId);
  void angularRegistryPush(: string registry);
}
           

與前面的Interpreter類的定義進行對比不難發現,RemoteInterpreterService Thrift接口與Interpreter抽象類定義的接口大部分相同,不同之處在于:

1. RemoteInterpreterService接口的實作類由于運作在不同的JVM中,需要在每個接口方法中額外傳遞環境資訊,如noteId和className等,如createInterpreter、open、close、cancel等。

2. RemoteInterpreterService接口中多出了兩種類型的接口,一種是為了完成ZeppelinServer程序和RemoteInterpreter程序之間的resource協商(neigotiation),如resourceXXX接口;另一種是為了完成2者之間angular object的前背景雙向綁定,如augularXXX接口。

具體檔案位置見:

${ZEPPELIN_HOME}/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift。在其同級目錄下,zeppelin還提供了代碼生成腳本genthrift.sh:

thrift --gen java RemoteInterpreterService.thrift
mv gen-java/org/apache/zeppelin/interpreter/thrift ../java/org/apache/zeppelin/interpreter/thrift
rm -rf gen-java
           

可以看出,

${ZEPPELIN_HOME}/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift目錄下所有檔案都是Thrift的代碼生成器根據該接口檔案自動生成的。如果我們修改過該接口檔案,則需要重新執行該腳本。

InterpreterGroup

zeppelin源碼分析(3)——主要的class分析(上)

InterpterGroup繼承了ConcurrentHashMap

RemoteInterpreterProcess

zeppelin源碼分析(3)——主要的class分析(上)

RemoteInterpreterProcess是采用獨立JVM啟動repl程序的具體執行類,它采用Apache Commons Exec架構來根據Zeppelin主程序的”訓示”啟動獨立程序,具體邏輯如下:

port = RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces();
CommandLine cmdLine = CommandLine.parse(interpreterRunner);
cmdLine.addArgument("-d", false);
cmdLine.addArgument(interpreterDir, false);
cmdLine.addArgument("-p", false);
cmdLine.addArgument(Integer.toString(port), false);
cmdLine.addArgument("-l", false);
cmdLine.addArgument(localRepoDir, false);


executor = new DefaultExecutor();


watchdog = new ExecuteWatchdog(ExecuteWatchdog.INFINITE_TIMEOUT);
executor.setWatchdog(watchdog);


running = true;
try {
   Map procEnv = EnvironmentUtils.getProcEnvironment();
   procEnv.putAll(env);


   logger.info("Run interpreter process {}", cmdLine);
   executor.execute(cmdLine, procEnv, this);
} catch (IOException e) {
   //省略...
}
           

這裡有幾點主要注意:

1. 該程序端口是zeppelin自動尋找作業系統中目前可用的端口

2. RemoteInterpreterProcess并非在在構造函數中,就啟動JVM,而是在被引用(reference方法被調用)之後,才啟動的

3. 具體的interpterRunner腳本為${ZEPPELIN_HOME}/bin/interpreter.sh,參見

ZeppelinConfiguration.getInterpreterRemoteRunnerPath()

interpreter.sh檔案重點部分如下:

##省略環境變量和classpath拼接等内容
ZEPPELIN_SERVER=org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer


SPARK_APP_JAR="$(ls ${ZEPPELIN_HOME}/interpreter/spark/zeppelin-spark*.jar)"


if [[ -n "${SPARK_SUBMIT}" ]]; then
    ${SPARK_SUBMIT} --class ${ZEPPELIN_SERVER} --driver-class-path "${ZEPPELIN_INTP_CLASSPATH_OVERRIDES}:${CLASSPATH}" --driver-java-options "${JAVA_INTP_OPTS}" ${SPARK_SUBMIT_OPTIONS} ${SPARK_APP_JAR} ${PORT} &
else
    ${ZEPPELIN_RUNNER} ${JAVA_INTP_OPTS} ${ZEPPELIN_INTP_MEM} -cp ${ZEPPELIN_INTP_CLASSPATH_OVERRIDES}:${CLASSPATH} ${ZEPPELIN_SERVER} ${PORT} &
fi
           

可以看出,在單機環境下,主要是在啟動單獨JVM程序,執行RemoteInterpreterServer,并向其main方法,傳遞必要的參數。在spark環境下,我們會單獨分析,此處暫時略過。

RemoteInterpreter

zeppelin源碼分析(3)——主要的class分析(上)

RemoteInterpreter可能是zeppelin中最誤導人的類命名了,筆者認為其命名為RemoteIntepreterProxy,或者是InterpterProxy、InterpterStub更合适一些,因為其本質是遠端Interpter的本地代理,是Proxy模式的典型應用,其運作在zeppelin主程序中,通過Thrift服務的Client來控制遠端Interpreter的執行。

通過其初始化代碼,可見一斑:

//省略掉了出錯處理等其他内容
  public synchronized void init() {
    RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
    final InterpreterGroup interpreterGroup = getInterpreterGroup();
    interpreterProcess.reference(interpreterGroup);


    synchronized (interpreterProcess) {
      Client client = interpreterProcess.getClient();
      client.createInterpreter(groupId, noteId, getClassName(), (Map) property);
    }  
  }
           

繼續閱讀