天天看點

zeppelin源碼分析(3)——主要的class分析(中)InterpreterInfoSavingInterpreterSettingInterpreterOptionInterpreterFactory參考文獻

接上篇,本文接着分析zeppelin類圖中右上角剩餘的類,同樣,在分析的過程中,我們重點關注該class的職責劃分,以及與其他類配合,完成zeppelin的設計目标的過程。

zeppelin源碼分析(3)——主要的class分析(中)InterpreterInfoSavingInterpreterSettingInterpreterOptionInterpreterFactory參考文獻

InterpreterInfoSaving

zeppelin源碼分析(3)——主要的class分析(中)InterpreterInfoSavingInterpreterSettingInterpreterOptionInterpreterFactory參考文獻

InterpreterInfoSaving是一個convenient類(提供的功能可以由其他類組合完成,設計目的是為了友善調用),定義其目的就是為了将原來分散存儲在各個interpreter子檔案夾中的InterpreterSetting、intepreterBindings和interpreterRepositories集中到一處,友善進行查找和持久化。

原來Interpter的注冊方式是static注冊方式:即每個Interpreter的實作類都定義一段static初始化段,在該類被第一次加載的時候,将其注冊到zeppelin中。例如:下面是Python interpreter的注冊代碼。

static {
    Interpreter.register(
        "python",
        "python",
        PythonInterpreter.class.getName(),
        new InterpreterPropertyBuilder()
            .add(ZEPPELIN_PYTHON, DEFAULT_ZEPPELIN_PYTHON,
                "Python directory. Default : python (assume python is in your $PATH)")
            .build()
    );
  }
           

現在這種方式已經Deprecated了,新的Interpreter采用了json配置檔案的方式來儲存這些配置,并且由zeppelin加載時來解析這些配置,該檔案約定為interpreter-setting.json,該檔案可以放到如下2種位置:

1. 作為main resources打包到interpreter的jar包中

2. 将其放到${ZEPPELIN_HOME}/interpreter/{interpreter}/interpreter-setting.json,位置下

interpreter-settings.json就近與interpreter的jar包存儲在一起,符合封裝的原則,但是zeppelin需要知道全局已經注冊了哪些interpreter,以及這些interpreter的配置

InterpreterInfoSaving類的唯一執行個體會被持久化到${ZEPPELIN_HOME}/conf/interpreter.json檔案,如下圖:

zeppelin源碼分析(3)——主要的class分析(中)InterpreterInfoSavingInterpreterSettingInterpreterOptionInterpreterFactory參考文獻

可以看出,InterpreterInfoSaving類是${ZEPPELIN_HOME}/conf/interpreter.json持久化檔案相對應的記憶體對象。

InterpreterSetting

zeppelin源碼分析(3)——主要的class分析(中)InterpreterInfoSavingInterpreterSettingInterpreterOptionInterpreterFactory參考文獻

InterpreterSettings是一份Interpreter建立的”contract”,其職責如下:

1. 維護了interpreter相關的元資訊,這些資訊對于正确運作intepreter程序至關重要,如:與外界環境進行互動(如spark on Yarn叢集)、第三方依賴加載等

2. 維護了note和intepreterGroup之間的關系

上篇中我們提到InterpreterGroup是建立jvm、note bind的最小機關,是以,也就決定了在多個note時,建立jvm的個數以及在多個note之間進行變量共享的程度。每個interpreter程序可以有一個或者多個interpreterGroup,每個interpreter執行個體都從屬于其中某一個InterpreterGroup。三者之間的關系如下圖:

zeppelin源碼分析(3)——主要的class分析(中)InterpreterInfoSavingInterpreterSettingInterpreterOptionInterpreterFactory參考文獻

如下是spark interpreter的settings:

"2C6QR3FVF": {
      "id": "2C6QR3FVF",
      "name": "spark",
      "group": "spark",
      "properties": {
        "spark.executor.memory": "",
        "args": "",
        "zeppelin.spark.printREPLOutput": "true",
        "spark.cores.max": "",
        "zeppelin.dep.additionalRemoteRepository": "spark-packages,http://dl.bintray.com/spark-packages/maven,false;",
        "zeppelin.spark.importImplicit": "true",
        "zeppelin.spark.sql.stacktrace": "false",
        "zeppelin.spark.concurrentSQL": "false",
        "zeppelin.spark.useHiveContext": "true",
        "zeppelin.pyspark.python": "python",
        "zeppelin.dep.localrepo": "local-repo",
        "zeppelin.interpreter.localRepo": "..//local-repo/2C6QR3FVF",
        "zeppelin.R.knitr": "true",
        "zeppelin.spark.maxResult": "1000",
        "master": "local[*]",
        "spark.app.name": "Zeppelin",
        "zeppelin.R.image.width": "100%",
        "zeppelin.R.render.options": "out.format \u003d \u0027html\u0027, comment \u003d NA, echo \u003d FALSE, results \u003d \u0027asis\u0027, message \u003d F, warning \u003d F",
        "zeppelin.R.cmd": "R"
      },
      "interpreterGroup": [
        {
          "class": "org.apache.zeppelin.spark.SparkInterpreter",
          "name": "spark"
        },
        {
          "class": "org.apache.zeppelin.spark.SparkSqlInterpreter",
          "name": "sql"
        },
        {
          "class": "org.apache.zeppelin.spark.DepInterpreter",
          "name": "dep"
        },
        {
          "class": "org.apache.zeppelin.spark.PySparkInterpreter",
          "name": "pyspark"
        },
        {
          "class": "org.apache.zeppelin.spark.SparkRInterpreter",
          "name": "r"
        }
      ],
      "dependencies": [],
      "option": {
        "remote": true,
        "perNoteSession": false,
        "perNoteProcess": false,
        "isExistingProcess": false,
        "port": "-1"
      }
    }
           

InterpreterSettings執行個體被InterpreterFactory對象初始化并填充。

多個note時,究竟是為每個note建立不同的interpreter程序還是共享同一個interpreter程序,zeppelin UI上提供了三種不同的模式可供選擇:

zeppelin源碼分析(3)——主要的class分析(中)InterpreterInfoSavingInterpreterSettingInterpreterOptionInterpreterFactory參考文獻

Shared模式

一種解釋器隻有一個Interpreter程序,并且該程序中隻有一個InterpreterGroup,所有的Interpreter執行個體都從屬于該InterpreterGroup,當然,也肯定在同一個程序内部。多個note之間,可以很容易的共享變量。

以下關于3種模式的解釋來自于Lee Moon Soo相關文章

zeppelin源碼分析(3)——主要的class分析(中)InterpreterInfoSavingInterpreterSettingInterpreterOptionInterpreterFactory參考文獻

Scoped模式

一種repl解釋器隻有一個Interpreter程序,但是與Shared模式不同,會建立過個InterpreterGroup,每個note關聯一個InterpreterGroup。這樣每個note相當于有了自己的session,session與session互相隔離,但是仍然由于這些InterpreterGroup仍然在同一個程序中,仍然可以在它們之間共享變量。

zeppelin源碼分析(3)——主要的class分析(中)InterpreterInfoSavingInterpreterSettingInterpreterOptionInterpreterFactory參考文獻

Isolated模式

獨占式,為每個note建立一個獨立的intepreter程序,該程序中建立一個InterpreterGroup執行個體,為該note的服務的Interpreter執行個體從屬于該InterpreterGroup。

zeppelin源碼分析(3)——主要的class分析(中)InterpreterInfoSavingInterpreterSettingInterpreterOptionInterpreterFactory參考文獻

SparkInterpreter的Shared/Scoped/Isolated模式

以SparkInterpreter為例,說明這三種模式對SparkContext和Scala repl共享方式上的差别:

SparkInterpreter Shared

所有的note共享同一個SparkContext和Scala REPL執行個體,是以,如果其中一個note定義了變量a,另外一個note可以通路并且修改該變量a。

zeppelin源碼分析(3)——主要的class分析(中)InterpreterInfoSavingInterpreterSettingInterpreterOptionInterpreterFactory參考文獻

SparkInterpreter Scoped

所有的note共享同一個SparkContext,所有的spark job都是通過同一個SparkContext送出的,但是不同的Scala repl解釋器,由于不同享Scala repl,故不存在一個note通路并修改了另一個note定義的變量的問題。

zeppelin源碼分析(3)——主要的class分析(中)InterpreterInfoSavingInterpreterSettingInterpreterOptionInterpreterFactory參考文獻

SparkInterpreter Isolated

獨占式,每個note都有自己的SparkContext和Scala repl,不共享。

zeppelin源碼分析(3)——主要的class分析(中)InterpreterInfoSavingInterpreterSettingInterpreterOptionInterpreterFactory參考文獻

其實這三種模式,底層都是通過InterpreterOption類來控制的。

InterpreterOption

zeppelin源碼分析(3)——主要的class分析(中)InterpreterInfoSavingInterpreterSettingInterpreterOptionInterpreterFactory參考文獻

雖然InterpreterOption是一個簡單的POJO,但是其字段取值,直接決定了zeppelin在建立interpreter程序時的處理方式,直接展現了”性能和資源占用之間進行trade-off”多種政策:

property 含義 說明
existingProcess 是否連接配接已有intereter程序 intereter程序可以獨立啟動,如在遠端(不同與zeppelin運作的其他host)節點上,可以采用該手段将repl程序進行分布式部署,并且先于zeppelin啟動,讓zeppelin連接配接到這些已有的線程,以解決intereter程序橫向擴充的問題。該模式需要顯式指定host和port,表示zeppelin主程序(ZeppelinServer所在程序)與該interpreter程序進行的IPC時,通信的socket。
perNoteProcess 每個Note建立一個IntereterGroup程序 這是最細粒度的interpreter程序建立方式,同時也是最粗放的資源使用的方式,如果interpreter都在運作zeppelin的host上啟動的話, note數量很多的話,很可能造該host記憶體耗盡
perNoteSession 每個Note的在同一個IntereterGroup程序中建立不同的Interpreter執行個體 建立interpreter程序時隻建立一個,但是在該程序内部,建立不同的interpreter執行個體。這種方式比perNoteProcess要節省記憶體資源。

InterpreterSettings是InterpreterOption的使用方,重要的地方見其getInterpreterGroup和getInterpreterProcessKey方法:

private String getInterpreterProcessKey(String noteId) {
    if (getOption().isExistingProcess) {
      return Constants.EXISTING_PROCESS;//existingProcess模式,共享現有的IntereterGroup程序
    } else if (getOption().isPerNoteProcess()) {
      return noteId;//perNoteProcess模式,每個note都會建立一個新的程序
    } else {
      return SHARED_PROCESS;//sharedProcess模式,所有note共享同一個interpreter程序
    }
  }

  //為note建立新的InterpreterGroup或者是關聯現有的InterpreterGroup
  public InterpreterGroup getInterpreterGroup(String noteId) {
    String key = getInterpreterProcessKey(noteId);
    synchronized (interpreterGroupRef) {
      if (!interpreterGroupRef.containsKey(key)) {
        String interpreterGroupId = id() + ":" + key;
        InterpreterGroup intpGroup =
            interpreterGroupFactory.createInterpreterGroup(interpreterGroupId, getOption());
        interpreterGroupRef.put(key, intpGroup);
      }
      return interpreterGroupRef.get(key);
    }
  }
           

InterpreterFactory

zeppelin源碼分析(3)——主要的class分析(中)InterpreterInfoSavingInterpreterSettingInterpreterOptionInterpreterFactory參考文獻
zeppelin源碼分析(3)——主要的class分析(中)InterpreterInfoSavingInterpreterSettingInterpreterOptionInterpreterFactory參考文獻

InterpreterFactory是InterpreterGroupFactory的實作類,承擔如下職責:

1. interpreter執行個體的實際建立者

2. interpreter配置檔案的加載與持久化,如loadFromFile()和saveToFile()

3. interpreterSettings的管理

4. 第三方依賴加載器

5. 在zeppelin主程序中建立遠端AngularObjectRegistry的本地Proxy——RemoteAngularObjectRegistry,以保證遠端interpreter程序與前端angular對象雙向綁定。

InterpreterFactory實際執行建立interpreter執行個體的方式為createInterpretersForNote,具體實作如下:

public void createInterpretersForNote(
      InterpreterSetting interpreterSetting,
      String noteId,
      String key) {
    InterpreterGroup interpreterGroup = interpreterSetting.getInterpreterGroup(noteId);//調用interpreterSetting.getInterpreterGroup以确定建立InterpreterGroup的政策
    String groupName = interpreterSetting.getGroup();
    InterpreterOption option = interpreterSetting.getOption();
    Properties properties = interpreterSetting.getProperties();
    if (option.isExistingProcess) {
      properties.put(Constants.ZEPPELIN_INTERPRETER_HOST, option.getHost());
      properties.put(Constants.ZEPPELIN_INTERPRETER_PORT, option.getPort());
    }

    //省略了同步等到相同key interpreterGroup終止的代碼
    logger.info("Create interpreter instance {} for note {}", interpreterSetting.getName(), noteId);

    for (String className : interpreterClassList) {
      Set<String> keys = Interpreter.registeredInterpreters.keySet();
      for (String intName : keys) {
        RegisteredInterpreter info = Interpreter.registeredInterpreters.get(intName);
        if (info.getClassName().equals(className)
            && info.getGroup().equals(groupName)) {
          Interpreter intp;

          if (option.isRemote()) {//在單獨啟動的intepreter程序中建立Interpreter執行個體
            intp = createRemoteRepl(info.getPath(),
                key,
                info.getClassName(),
                properties,
                interpreterSetting.id());
          } else {//在zeppelin主程序中,動态加載并反射建立Interpreter執行個體
            intp = createRepl(info.getPath(),
                info.getClassName(),
                properties);
          }

          synchronized (interpreterGroup) {
            List<Interpreter> interpreters = interpreterGroup.get(key);
            if (interpreters == null) {
              interpreters = new LinkedList<Interpreter>();
              interpreterGroup.put(key, interpreters);//完成noteId:interpreters執行個體或者interpreterGroup:interpreters執行個體之間的映射
            }
            interpreters.add(intp);
          }
          logger.info("Interpreter " + intp.getClassName() + " " + intp.hashCode() + " created");
          intp.setInterpreterGroup(interpreterGroup);
          break;
        }
      }
    }
  }
           

以下是createRemoteRepl方法實作,重點是用LazyOpenInterpreter Proxy了一個RemoteInterpreter執行個體:

private Interpreter createRemoteRepl(String interpreterPath, String noteId, String className,
      Properties property, String interpreterSettingId) {
    int connectTimeout = conf.getInt(ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT);
    String localRepoPath = conf.getInterpreterLocalRepoPath() + "/" + interpreterSettingId;
    int maxPoolSize = conf.getInt(ConfVars.ZEPPELIN_INTERPRETER_MAX_POOL_SIZE);

    LazyOpenInterpreter intp = new LazyOpenInterpreter(new RemoteInterpreter(
        property, noteId, className, conf.getInterpreterRemoteRunnerPath(),
        interpreterPath, localRepoPath, connectTimeout,
        maxPoolSize, remoteInterpreterProcessListener));
    return intp;
  }
           

InterpreterFactory執行解除note和interpreter執行個體之間的關系方法見removeInterpretersForNote:

public void removeInterpretersForNote(InterpreterSetting interpreterSetting,
                                        String noteId) {
    if (interpreterSetting.getOption().isPerNoteProcess()) {//perNoteProcess,直接關閉process
      interpreterSetting.closeAndRemoveInterpreterGroup(noteId);
    } else if (interpreterSetting.getOption().isPerNoteSession()) {//perNoteSession,由于process是共享的,note關閉,隻關閉該note相關的interpreter執行個體,process不關閉
      InterpreterGroup interpreterGroup = interpreterSetting.getInterpreterGroup(noteId);

      interpreterGroup.close(noteId);
      interpreterGroup.destroy(noteId);
      synchronized (interpreterGroup) {
        interpreterGroup.remove(noteId);
        interpreterGroup.notifyAll(); // notify createInterpreterForNote()
      }
      logger.info("Interpreter instance {} for note {} is removed",
          interpreterSetting.getName(),
          noteId);
    }
  }
           

參考文獻

[1.] Apache Zeppelin, Interpreter mode explained, Lee Moon Soo

繼續閱讀