接上篇,本文接着分析zeppelin類圖中右上角剩餘的類,同樣,在分析的過程中,我們重點關注該class的職責劃分,以及與其他類配合,完成zeppelin的設計目标的過程。
InterpreterInfoSaving
InterpreterInfoSaving是一個convenient類(提供的功能可以由其他類組合完成,設計目的是為了友善調用),定義其目的就是為了将原來分散存儲在各個interpreter子檔案夾中的InterpreterSetting、intepreterBindings和interpreterRepositories集中到一處,友善進行查找和持久化。
原來Interpter的注冊方式是static注冊方式:即每個Interpreter的實作類都定義一段static初始化段,在該類被第一次加載的時候,将其注冊到zeppelin中。例如:下面是Python interpreter的注冊代碼。
static {
Interpreter.register(
"python",
"python",
PythonInterpreter.class.getName(),
new InterpreterPropertyBuilder()
.add(ZEPPELIN_PYTHON, DEFAULT_ZEPPELIN_PYTHON,
"Python directory. Default : python (assume python is in your $PATH)")
.build()
);
}
現在這種方式已經Deprecated了,新的Interpreter采用了json配置檔案的方式來儲存這些配置,并且由zeppelin加載時來解析這些配置,該檔案約定為interpreter-setting.json,該檔案可以放到如下2種位置:
1. 作為main resources打包到interpreter的jar包中
2. 将其放到${ZEPPELIN_HOME}/interpreter/{interpreter}/interpreter-setting.json,位置下
interpreter-settings.json就近與interpreter的jar包存儲在一起,符合封裝的原則,但是zeppelin需要知道全局已經注冊了哪些interpreter,以及這些interpreter的配置
InterpreterInfoSaving類的唯一執行個體會被持久化到${ZEPPELIN_HOME}/conf/interpreter.json檔案,如下圖:
可以看出,InterpreterInfoSaving類是${ZEPPELIN_HOME}/conf/interpreter.json持久化檔案相對應的記憶體對象。
InterpreterSetting
InterpreterSettings是一份Interpreter建立的”contract”,其職責如下:
1. 維護了interpreter相關的元資訊,這些資訊對于正确運作intepreter程序至關重要,如:與外界環境進行互動(如spark on Yarn叢集)、第三方依賴加載等
2. 維護了note和intepreterGroup之間的關系
上篇中我們提到InterpreterGroup是建立jvm、note bind的最小機關,是以,也就決定了在多個note時,建立jvm的個數以及在多個note之間進行變量共享的程度。每個interpreter程序可以有一個或者多個interpreterGroup,每個interpreter執行個體都從屬于其中某一個InterpreterGroup。三者之間的關系如下圖:
如下是spark interpreter的settings:
"2C6QR3FVF": {
"id": "2C6QR3FVF",
"name": "spark",
"group": "spark",
"properties": {
"spark.executor.memory": "",
"args": "",
"zeppelin.spark.printREPLOutput": "true",
"spark.cores.max": "",
"zeppelin.dep.additionalRemoteRepository": "spark-packages,http://dl.bintray.com/spark-packages/maven,false;",
"zeppelin.spark.importImplicit": "true",
"zeppelin.spark.sql.stacktrace": "false",
"zeppelin.spark.concurrentSQL": "false",
"zeppelin.spark.useHiveContext": "true",
"zeppelin.pyspark.python": "python",
"zeppelin.dep.localrepo": "local-repo",
"zeppelin.interpreter.localRepo": "..//local-repo/2C6QR3FVF",
"zeppelin.R.knitr": "true",
"zeppelin.spark.maxResult": "1000",
"master": "local[*]",
"spark.app.name": "Zeppelin",
"zeppelin.R.image.width": "100%",
"zeppelin.R.render.options": "out.format \u003d \u0027html\u0027, comment \u003d NA, echo \u003d FALSE, results \u003d \u0027asis\u0027, message \u003d F, warning \u003d F",
"zeppelin.R.cmd": "R"
},
"interpreterGroup": [
{
"class": "org.apache.zeppelin.spark.SparkInterpreter",
"name": "spark"
},
{
"class": "org.apache.zeppelin.spark.SparkSqlInterpreter",
"name": "sql"
},
{
"class": "org.apache.zeppelin.spark.DepInterpreter",
"name": "dep"
},
{
"class": "org.apache.zeppelin.spark.PySparkInterpreter",
"name": "pyspark"
},
{
"class": "org.apache.zeppelin.spark.SparkRInterpreter",
"name": "r"
}
],
"dependencies": [],
"option": {
"remote": true,
"perNoteSession": false,
"perNoteProcess": false,
"isExistingProcess": false,
"port": "-1"
}
}
InterpreterSettings執行個體被InterpreterFactory對象初始化并填充。
多個note時,究竟是為每個note建立不同的interpreter程序還是共享同一個interpreter程序,zeppelin UI上提供了三種不同的模式可供選擇:
Shared模式
一種解釋器隻有一個Interpreter程序,并且該程序中隻有一個InterpreterGroup,所有的Interpreter執行個體都從屬于該InterpreterGroup,當然,也肯定在同一個程序内部。多個note之間,可以很容易的共享變量。
以下關于3種模式的解釋來自于Lee Moon Soo相關文章
Scoped模式
一種repl解釋器隻有一個Interpreter程序,但是與Shared模式不同,會建立過個InterpreterGroup,每個note關聯一個InterpreterGroup。這樣每個note相當于有了自己的session,session與session互相隔離,但是仍然由于這些InterpreterGroup仍然在同一個程序中,仍然可以在它們之間共享變量。
Isolated模式
獨占式,為每個note建立一個獨立的intepreter程序,該程序中建立一個InterpreterGroup執行個體,為該note的服務的Interpreter執行個體從屬于該InterpreterGroup。
SparkInterpreter的Shared/Scoped/Isolated模式
以SparkInterpreter為例,說明這三種模式對SparkContext和Scala repl共享方式上的差别:
SparkInterpreter Shared
所有的note共享同一個SparkContext和Scala REPL執行個體,是以,如果其中一個note定義了變量a,另外一個note可以通路并且修改該變量a。
SparkInterpreter Scoped
所有的note共享同一個SparkContext,所有的spark job都是通過同一個SparkContext送出的,但是不同的Scala repl解釋器,由于不同享Scala repl,故不存在一個note通路并修改了另一個note定義的變量的問題。
SparkInterpreter Isolated
獨占式,每個note都有自己的SparkContext和Scala repl,不共享。
其實這三種模式,底層都是通過InterpreterOption類來控制的。
InterpreterOption
雖然InterpreterOption是一個簡單的POJO,但是其字段取值,直接決定了zeppelin在建立interpreter程序時的處理方式,直接展現了”性能和資源占用之間進行trade-off”多種政策:
property | 含義 | 說明 |
---|---|---|
existingProcess | 是否連接配接已有intereter程序 | intereter程序可以獨立啟動,如在遠端(不同與zeppelin運作的其他host)節點上,可以采用該手段将repl程序進行分布式部署,并且先于zeppelin啟動,讓zeppelin連接配接到這些已有的線程,以解決intereter程序橫向擴充的問題。該模式需要顯式指定host和port,表示zeppelin主程序(ZeppelinServer所在程序)與該interpreter程序進行的IPC時,通信的socket。 |
perNoteProcess | 每個Note建立一個IntereterGroup程序 | 這是最細粒度的interpreter程序建立方式,同時也是最粗放的資源使用的方式,如果interpreter都在運作zeppelin的host上啟動的話, note數量很多的話,很可能造該host記憶體耗盡 |
perNoteSession | 每個Note的在同一個IntereterGroup程序中建立不同的Interpreter執行個體 | 建立interpreter程序時隻建立一個,但是在該程序内部,建立不同的interpreter執行個體。這種方式比perNoteProcess要節省記憶體資源。 |
InterpreterSettings是InterpreterOption的使用方,重要的地方見其getInterpreterGroup和getInterpreterProcessKey方法:
private String getInterpreterProcessKey(String noteId) {
if (getOption().isExistingProcess) {
return Constants.EXISTING_PROCESS;//existingProcess模式,共享現有的IntereterGroup程序
} else if (getOption().isPerNoteProcess()) {
return noteId;//perNoteProcess模式,每個note都會建立一個新的程序
} else {
return SHARED_PROCESS;//sharedProcess模式,所有note共享同一個interpreter程序
}
}
//為note建立新的InterpreterGroup或者是關聯現有的InterpreterGroup
public InterpreterGroup getInterpreterGroup(String noteId) {
String key = getInterpreterProcessKey(noteId);
synchronized (interpreterGroupRef) {
if (!interpreterGroupRef.containsKey(key)) {
String interpreterGroupId = id() + ":" + key;
InterpreterGroup intpGroup =
interpreterGroupFactory.createInterpreterGroup(interpreterGroupId, getOption());
interpreterGroupRef.put(key, intpGroup);
}
return interpreterGroupRef.get(key);
}
}
InterpreterFactory
InterpreterFactory是InterpreterGroupFactory的實作類,承擔如下職責:
1. interpreter執行個體的實際建立者
2. interpreter配置檔案的加載與持久化,如loadFromFile()和saveToFile()
3. interpreterSettings的管理
4. 第三方依賴加載器
5. 在zeppelin主程序中建立遠端AngularObjectRegistry的本地Proxy——RemoteAngularObjectRegistry,以保證遠端interpreter程序與前端angular對象雙向綁定。
InterpreterFactory實際執行建立interpreter執行個體的方式為createInterpretersForNote,具體實作如下:
public void createInterpretersForNote(
InterpreterSetting interpreterSetting,
String noteId,
String key) {
InterpreterGroup interpreterGroup = interpreterSetting.getInterpreterGroup(noteId);//調用interpreterSetting.getInterpreterGroup以确定建立InterpreterGroup的政策
String groupName = interpreterSetting.getGroup();
InterpreterOption option = interpreterSetting.getOption();
Properties properties = interpreterSetting.getProperties();
if (option.isExistingProcess) {
properties.put(Constants.ZEPPELIN_INTERPRETER_HOST, option.getHost());
properties.put(Constants.ZEPPELIN_INTERPRETER_PORT, option.getPort());
}
//省略了同步等到相同key interpreterGroup終止的代碼
logger.info("Create interpreter instance {} for note {}", interpreterSetting.getName(), noteId);
for (String className : interpreterClassList) {
Set<String> keys = Interpreter.registeredInterpreters.keySet();
for (String intName : keys) {
RegisteredInterpreter info = Interpreter.registeredInterpreters.get(intName);
if (info.getClassName().equals(className)
&& info.getGroup().equals(groupName)) {
Interpreter intp;
if (option.isRemote()) {//在單獨啟動的intepreter程序中建立Interpreter執行個體
intp = createRemoteRepl(info.getPath(),
key,
info.getClassName(),
properties,
interpreterSetting.id());
} else {//在zeppelin主程序中,動态加載并反射建立Interpreter執行個體
intp = createRepl(info.getPath(),
info.getClassName(),
properties);
}
synchronized (interpreterGroup) {
List<Interpreter> interpreters = interpreterGroup.get(key);
if (interpreters == null) {
interpreters = new LinkedList<Interpreter>();
interpreterGroup.put(key, interpreters);//完成noteId:interpreters執行個體或者interpreterGroup:interpreters執行個體之間的映射
}
interpreters.add(intp);
}
logger.info("Interpreter " + intp.getClassName() + " " + intp.hashCode() + " created");
intp.setInterpreterGroup(interpreterGroup);
break;
}
}
}
}
以下是createRemoteRepl方法實作,重點是用LazyOpenInterpreter Proxy了一個RemoteInterpreter執行個體:
private Interpreter createRemoteRepl(String interpreterPath, String noteId, String className,
Properties property, String interpreterSettingId) {
int connectTimeout = conf.getInt(ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT);
String localRepoPath = conf.getInterpreterLocalRepoPath() + "/" + interpreterSettingId;
int maxPoolSize = conf.getInt(ConfVars.ZEPPELIN_INTERPRETER_MAX_POOL_SIZE);
LazyOpenInterpreter intp = new LazyOpenInterpreter(new RemoteInterpreter(
property, noteId, className, conf.getInterpreterRemoteRunnerPath(),
interpreterPath, localRepoPath, connectTimeout,
maxPoolSize, remoteInterpreterProcessListener));
return intp;
}
InterpreterFactory執行解除note和interpreter執行個體之間的關系方法見removeInterpretersForNote:
public void removeInterpretersForNote(InterpreterSetting interpreterSetting,
String noteId) {
if (interpreterSetting.getOption().isPerNoteProcess()) {//perNoteProcess,直接關閉process
interpreterSetting.closeAndRemoveInterpreterGroup(noteId);
} else if (interpreterSetting.getOption().isPerNoteSession()) {//perNoteSession,由于process是共享的,note關閉,隻關閉該note相關的interpreter執行個體,process不關閉
InterpreterGroup interpreterGroup = interpreterSetting.getInterpreterGroup(noteId);
interpreterGroup.close(noteId);
interpreterGroup.destroy(noteId);
synchronized (interpreterGroup) {
interpreterGroup.remove(noteId);
interpreterGroup.notifyAll(); // notify createInterpreterForNote()
}
logger.info("Interpreter instance {} for note {} is removed",
interpreterSetting.getName(),
noteId);
}
}
參考文獻
[1.] Apache Zeppelin, Interpreter mode explained, Lee Moon Soo