天天看点

zeppelin源码分析(3)——主要的class分析(上)

zeppelin的module、package、class众多,如何快速地理清头绪,抓住重点?本文分析zeppelin主要module中重点的类以及它们之间的关系,理清这些类的职责,对于理解zeppelin的运行过程至关重要。

经过之前文章的分析,我们已经了解了zeppelin涉及到框架层面的几个module为:zeppelin-server、zeppelin-zengine、zeppelin-interpreter,并且三者之间有如下的依赖关系:

zeppelin源码分析(3)——主要的class分析(上)

本文要分析的主要的class,也都来自于这三个module。

zeppelin源码分析(3)——主要的class分析(上)

以上类图中省略了字段和方法,以避免过早引入太多细节,重点关注类与类之间的关系组成。由于篇幅的限制,再加上zeppelin提供的核心价值是与Interpreter相关的多语言repl解释器,笔者就选择从右上角黄色的区域开始,分多篇分析。

Interpreter

zeppelin源码分析(3)——主要的class分析(上)

Interpreter是一个抽象类,该类是zeppelin核心类,Zeppelin提供的核心价值:解释执行各种语言的代码,都是通过该抽象类的每个具体的实现类完成的。Interpreter主要规定了各语言repl解释器需要遵循的“规范(contract)”,包括:

1. repl解释器的生命周期管理。如open(), close(), destroy(),规定了产生和销毁repl解释器。

2. 解释执行代码的接口——interpreter(),这些真正产生价值的地方。

3. 执行代码过程中交互控制和易用性增强,如cancel(), getProgress(), completion(),分别是终止代码的执行、获取执行进度以及代码自动完成。

4. 解释器的配置接口,如setProperty()、setClassLoaderURL(URL[])等。

5. 性能优化接口,如getScheduler(),getIntepreterGroup()等。

6. 解释器注册接口(已经deprecated了),如一系列重载的register接口。

以上体现了zeppelin的repl解释器进程需要受其主进程ZeppelinServer的控制,也是zeppelin设计决策在代码中的体现。

注:现在的解释器注册通过如下2种方式进行:

  1. 将interpreter-setting.json打包到解释器的jar文件中
  2. 放置到如下位置:interpreter/{interpreter}/interpreter-setting.json

RemoteInterpreterService Thrift协议分析

Apache Thrift是跨语言RPC通信框架,提供了相应的DSL(Domain Specific Language)和支持多种语言的代码生成工具,使得代码开发人员可以只关注具体的业务,而不用关注底层的通信细节。zeppelin使用Thrift定义了其主进程ZeppelinServer与需要采用独立JVM进程运行的各repl解释器之间的通信协议。

关于为什么要采用单独的JVM进程来启动repl解释器进程,本系列的第3篇也有提及,这里再赘述一下:

1. zeppelin旨在提供一个开放的框架,支持多种语言和产品,由于每种语言和产品都是各自独立演进的,各自的运行时依赖也各不相同,甚至是相互冲突的,如果放在同一JVM中,仅解决冲突,维护各个产品之间的兼容性都是一项艰巨的任务,某些产品版本甚至是完全不能兼容的。

2. 大数据分析,是否具有横向扩展能力是production-ready一项重要的衡量指标,如果将repl进程与主进程合在一起,会验证影响系统性能。

因此,在有必要的时候,zeppelin采用独立JVM的方式来启动repl进程,并且采用Thrift协议定义了主进程与RemoteInterpreterService进程之间的通信协议,具体如下:

service RemoteInterpreterService {
  void createInterpreter(: string intpGroupId, : string noteId, : string className, : map<string, string> properties);


  void open(: string noteId, : string className);
  void close(: string noteId, : string className);
  RemoteInterpreterResult interpret(: string noteId, : string className, : string st, : RemoteInterpreterContext interpreterContext);
  void cancel(: string noteId, : string className, : RemoteInterpreterContext interpreterContext);
  i32 getProgress(: string noteId, : string className, : RemoteInterpreterContext interpreterContext);
  string getFormType(: string noteId, : string className);
  list<InterpreterCompletion> completion(: string noteId, : string className, : string buf, : i32 cursor);
  void shutdown();


  string getStatus(: string noteId, :string jobId);


  RemoteInterpreterEvent getEvent();


  // as a response, ZeppelinServer send list of resources to Interpreter process
  void resourcePoolResponseGetAll(: list<string> resources);
  // as a response, ZeppelinServer send serialized value of resource
  void resourceResponseGet(: string resourceId, : binary object);
  // get all resources in the interpreter process
  list<string> resourcePoolGetAll();
  // get value of resource
  binary resourceGet(: string noteId, : string paragraphId, : string resourceName);
  // remove resource
  bool resourceRemove(: string noteId, : string paragraphId, :string resourceName);


  void angularObjectUpdate(: string name, : string noteId, : string paragraphId, : string
  object);
  void angularObjectAdd(: string name, : string noteId, : string paragraphId, : string object);
  void angularObjectRemove(: string name, : string noteId, : string paragraphId);
  void angularRegistryPush(: string registry);
}
           

与前面的Interpreter类的定义进行对比不难发现,RemoteInterpreterService Thrift接口与Interpreter抽象类定义的接口大部分相同,不同之处在于:

1. RemoteInterpreterService接口的实现类由于运行在不同的JVM中,需要在每个接口方法中额外传递环境信息,如noteId和className等,如createInterpreter、open、close、cancel等。

2. RemoteInterpreterService接口中多出了两种类型的接口,一种是为了完成ZeppelinServer进程和RemoteInterpreter进程之间的resource协商(neigotiation),如resourceXXX接口;另一种是为了完成2者之间angular object的前后台双向绑定,如augularXXX接口。

具体文件位置见:

${ZEPPELIN_HOME}/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift。在其同级目录下,zeppelin还提供了代码生成脚本genthrift.sh:

thrift --gen java RemoteInterpreterService.thrift
mv gen-java/org/apache/zeppelin/interpreter/thrift ../java/org/apache/zeppelin/interpreter/thrift
rm -rf gen-java
           

可以看出,

${ZEPPELIN_HOME}/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift目录下所有文件都是Thrift的代码生成器根据该接口文件自动生成的。如果我们修改过该接口文件,则需要重新执行该脚本。

InterpreterGroup

zeppelin源码分析(3)——主要的class分析(上)

InterpterGroup继承了ConcurrentHashMap

RemoteInterpreterProcess

zeppelin源码分析(3)——主要的class分析(上)

RemoteInterpreterProcess是采用独立JVM启动repl进程的具体执行类,它采用Apache Commons Exec框架来根据Zeppelin主进程的”指示”启动独立进程,具体逻辑如下:

port = RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces();
CommandLine cmdLine = CommandLine.parse(interpreterRunner);
cmdLine.addArgument("-d", false);
cmdLine.addArgument(interpreterDir, false);
cmdLine.addArgument("-p", false);
cmdLine.addArgument(Integer.toString(port), false);
cmdLine.addArgument("-l", false);
cmdLine.addArgument(localRepoDir, false);


executor = new DefaultExecutor();


watchdog = new ExecuteWatchdog(ExecuteWatchdog.INFINITE_TIMEOUT);
executor.setWatchdog(watchdog);


running = true;
try {
   Map procEnv = EnvironmentUtils.getProcEnvironment();
   procEnv.putAll(env);


   logger.info("Run interpreter process {}", cmdLine);
   executor.execute(cmdLine, procEnv, this);
} catch (IOException e) {
   //省略...
}
           

这里有几点主要注意:

1. 该进程端口是zeppelin自动寻找操作系统中当前可用的端口

2. RemoteInterpreterProcess并非在在构造函数中,就启动JVM,而是在被引用(reference方法被调用)之后,才启动的

3. 具体的interpterRunner脚本为${ZEPPELIN_HOME}/bin/interpreter.sh,参见

ZeppelinConfiguration.getInterpreterRemoteRunnerPath()

interpreter.sh文件重点部分如下:

##省略环境变量和classpath拼接等内容
ZEPPELIN_SERVER=org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer


SPARK_APP_JAR="$(ls ${ZEPPELIN_HOME}/interpreter/spark/zeppelin-spark*.jar)"


if [[ -n "${SPARK_SUBMIT}" ]]; then
    ${SPARK_SUBMIT} --class ${ZEPPELIN_SERVER} --driver-class-path "${ZEPPELIN_INTP_CLASSPATH_OVERRIDES}:${CLASSPATH}" --driver-java-options "${JAVA_INTP_OPTS}" ${SPARK_SUBMIT_OPTIONS} ${SPARK_APP_JAR} ${PORT} &
else
    ${ZEPPELIN_RUNNER} ${JAVA_INTP_OPTS} ${ZEPPELIN_INTP_MEM} -cp ${ZEPPELIN_INTP_CLASSPATH_OVERRIDES}:${CLASSPATH} ${ZEPPELIN_SERVER} ${PORT} &
fi
           

可以看出,在单机环境下,主要是在启动单独JVM进程,执行RemoteInterpreterServer,并向其main方法,传递必要的参数。在spark环境下,我们会单独分析,此处暂时略过。

RemoteInterpreter

zeppelin源码分析(3)——主要的class分析(上)

RemoteInterpreter可能是zeppelin中最误导人的类命名了,笔者认为其命名为RemoteIntepreterProxy,或者是InterpterProxy、InterpterStub更合适一些,因为其本质是远程Interpter的本地代理,是Proxy模式的典型应用,其运行在zeppelin主进程中,通过Thrift服务的Client来控制远程Interpreter的执行。

通过其初始化代码,可见一斑:

//省略掉了出错处理等其他内容
  public synchronized void init() {
    RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
    final InterpreterGroup interpreterGroup = getInterpreterGroup();
    interpreterProcess.reference(interpreterGroup);


    synchronized (interpreterProcess) {
      Client client = interpreterProcess.getClient();
      client.createInterpreter(groupId, noteId, getClassName(), (Map) property);
    }  
  }
           

继续阅读