作者:章劍鋒(簡鋒),阿裡巴巴進階技術專家
用過 Zeppelin 的人應該比較熟悉 Zeppelin 的 UI,因為 Zeppelin 的主要使用場景都是互動式,使用者需要手動來操作。那除了這種手動的方式,還有其他的方式嗎?如果你不想用 Zeppelin UI,但又想用 Zeppelin 送出和管理大資料作業 (比如 Flink Job)的能力該怎麼辦?或者是你在 Zeppelin 裡寫好了代碼,想定時排程起來,或者內建到其他系統裡,該怎麼辦?
如果你有這樣的訴求,那麼 Zeppelin Client API (SDK)就是你所需要的東西。
Zeppelin 簡介
對于不熟悉 Zeppelin 的人,可以用一句話來解釋 Zeppelin:大資料引擎的入口,互動式大資料分析平台底座。Zeppelin 最大的特點是連接配接多種引擎,具有可插拔式,下面這張圖例舉了一些常用的引擎,當然 Zeppelin 還支援其他很多引擎,這裡就不一一例舉。

雖然 Zeppelin 有 Rest API,但是 Zeppelin 的 Rest API 太多,對于很多不熟悉 Zeppelin 的人來說使用 Rest API 門檻太高,是以 Zeppelin 專門開發了一個 Client API (SDK),友善大家做內建。Zeppelin Client API (SDK)分為 2 個層面的的東西(接下來會逐個詳細介紹):
- Zeppelin Client API (Low Level API)
- Session API (High Level API)
Zeppelin Client API (Low Level API)
Zeppelin Client API 可以在 Note 和 Paragraph 的粒度進行操作。你可以先在 notebook 裡寫好代碼 (比如開發階段在 notebook 裡寫代碼,做測試),然後用 Low Level API 用程式設計的方式把 Job 跑起來(比如生産階段把作業定時排程起來)。Zeppelin Client API 最重要的 class 是 ZeppelinClient,也是 Zeppelin Client API 的入口。下面例舉幾個重要的接口(這些 API 都比較直覺,我就不多做解釋了)。
public String createNote(String notePath) throws Exception
public void deleteNote(String noteId) throws Exception
public NoteResult executeNote(String noteId) throws Exception
public NoteResult executeNote(String noteId,
Map<String, String> parameters) throws Exception
public NoteResult queryNoteResult(String noteId) throws Exception
public NoteResult submitNote(String noteId) throws Exception
public NoteResult submitNote(String noteId,
Map<String, String> parameters) throws Exception
public NoteResult waitUntilNoteFinished(String noteId) throws Exception
public String addParagraph(String noteId,
String title,
String text) throws Exception
public void updateParagraph(String noteId,
String paragraphId,
String title,
String text) throws Exception
public ParagraphResult executeParagraph(String noteId,
String paragraphId,
String sessionId,
Map<String, String> parameters) throws Exception
public ParagraphResult submitParagraph(String noteId,
String paragraphId,
String sessionId,
Map<String, String> parameters) throws Exception
public void cancelParagraph(String noteId, String paragraphId)
public ParagraphResult queryParagraphResult(String noteId, String paragraphId)
public ParagraphResult waitUtilParagraphFinish(String noteId, String paragraphId)
那這些 API 能用來做什麼呢?
一個典型的用途是我們在 Zeppelin 裡寫好代碼,做好測試,然後在第三方系統裡內建進來。比如下面的代碼就是把 Zeppelin 自帶的 Spark Basic Features 用程式設計的方式跑起來,你不僅可以跑 Zeppelin Note,還可以拿到運作結果 (ParagraphResult)。怎麼處理運作結果,就留給你發揮想象的空間吧(可以在你的系統裡展示出來,或者可視化出來,或者傳給其他系統做消費等等)。
此外,對于 Dynamic forms(動态控件,比如文本框,下拉框等等),你還可以動态的提供參數,如下面例子裡的 maxAge 和 marital。
ClientConfig clientConfig = new ClientConfig("http://localhost:8080");
ZeppelinClient zClient = new ZeppelinClient(clientConfig);
String zeppelinVersion = zClient.getVersion();
System.out.println("Zeppelin version: " + zeppelinVersion);
ParagraphResult paragraphResult = zClient.executeParagraph("2A94M5J1Z", "20150210-015259_1403135953");
System.out.println("Execute the 1st spark tutorial paragraph, paragraph result: " + paragraphResult);
paragraphResult = zClient.executeParagraph("2A94M5J1Z", "20150210-015302_1492795503");
System.out.println("Execute the 2nd spark tutorial paragraph, paragraph result: " + paragraphResult);
Map<String, String> parameters = new HashMap<>();
parameters.put("maxAge", "40");
paragraphResult = zClient.executeParagraph("2A94M5J1Z", "20150212-145404_867439529", parameters);
System.out.println("Execute the 3rd spark tutorial paragraph, paragraph result: " + paragraphResult);
parameters = new HashMap<>();
parameters.put("marital", "married");
paragraphResult = zClient.executeParagraph("2A94M5J1Z", "20150213-230422_1600658137", parameters);
System.out.println("Execute the 4th spark tutorial paragraph, paragraph result: " + paragraphResult);
這下面這張圖就是上面我們要 Zeppelin Client API 跑的 Zeppelin 自帶的 Spark Basic Features。
Session API (High Level API)
Session API 是 Zeppelin 的high level api,Session API 裡沒有 Note,Paragraph 的概念,粒度是你送出的代碼。Session API裡最重要的class就是 ZSession,這也是Session API的入口,一個 ZSession 代表一個獨立的Zeppelin Interpreter 程序,對于 Flink 來說就是一個獨立的 Flink Session Cluster。下面例舉一些典型的接口(這些 API 都比較直覺,我就不多做解釋了)。
public void start() throws Exception
public void start(MessageHandler messageHandler) throws Exception
public void stop() throws Exception
public ExecuteResult execute(String code) throws Exception
public ExecuteResult execute(String subInterpreter,
Map<< span="">String, String> localProperties,
String code,
StatementMessageHandler messageHandler) throws Exception
public ExecuteResult submit(String code) throws Exception
public ExecuteResult submit(String subInterpreter,
Map<< span="">String, String> localProperties,
String code,
StatementMessageHandler messageHandler) throws Exception
public void cancel(String statementId) throws Exception
public ExecuteResult queryStatement(String statementId) throws Exception
public ExecuteResult waitUntilFinished(String statementId) throws Exception
那這個 API 能用來做什麼呢? 一個典型的用途是就是我們動态建立 Session (Zeppelin Interpreter 程序),動态的送出運作代碼,并拿到運作結果。比如你不想用 Zeppelin 的 UI,要自己做一個 Flink 的開發管理平台,那麼你就可以自己做 UI,讓使用者在 UI 上配置 Flink Job,輸入 SQL,然後把所有的這些資訊發送到後端,後端調用 ZSession 來運作 Flink Job。
下面的 Java 代碼就是用程式設計的方式調用了 2 條 Flink SQL 語句,并且在 MyStatementMessageHandler1 和 MyStatementMessageHandler2 中讀取源源不斷發送過來更新的 SQL 運作結果 (怎麼來使用這個結果就靠你的想象力了)。
需要說明的是像 Flink Interpreter 這種流式結果資料更新是通過 WebSocket 實作的,是以下面的代碼裡有會有 CompositeMessageHandler,MyStatementMessageHandler1 以及 MyStatementMessageHandler2,這些 MessageHandler 就是用來處理通過 WebSocket 發送過來的流式資料結果。下面是 2 條我們在 Zeppelin 裡運作的 Flink SQL。
接下來我們會用 Zeppelin Session API 來跑着這 2 條 Flink SQL,然後我們會在MyStatementMessageHandler1,MyStatementMessageHandler2 裡拿到結果展示出來。
ZSession session = null;
try {
ClientConfig clientConfig = new ClientConfig("http://localhost:8080");
Map<< span="">String, String> intpProperties = new HashMap<>();
session = ZSession.builder()
.setClientConfig(clientConfig)
.setInterpreter("flink")
.setIntpProperties(intpProperties)
.build();
// CompositeMessageHandler allow you to add StatementMessageHandler for each statement.
// otherwise you have to use a global MessageHandler.
session.start(new CompositeMessageHandler());
System.out.println("Flink Web UI: " + session.getWeburl());
System.out.println("-----------------------------------------------------------------------------");
String initCode = IOUtils.toString(FlinkAdvancedExample.class.getResource("/init_stream.scala"));
ExecuteResult result = session.execute(initCode);
System.out.println("Job status: " + result.getStatus() + ", data: " + result.getResults().get(0).getData());
// run flink ssql
Map<< span="">String, String> localProperties = new HashMap<>();
localProperties.put("type", "update");
result = session.submit("ssql", localProperties, "select url, count(1) as pv from log group by url",
new MyStatementMessageHandler1());
session.waitUntilFinished(result.getStatementId());
result = session.submit("ssql", localProperties, "select upper(url), count(1) as pv from log group by url",
new MyStatementMessageHandler2());
session.waitUntilFinished(result.getStatementId());
} catch (Exception e) {
e.printStackTrace();
} finally {
if (session != null) {
try {
session.stop();
} catch (Exception e) {
e.printStackTrace();
}
}
}
public static class MyStatementMessageHandler1 implements StatementMessageHandler {
@Override
public void onStatementAppendOutput(String statementId, int index, String output) {
System.out.println("MyStatementMessageHandler1, append output: " + output);
}
@Override
public void onStatementUpdateOutput(String statementId, int index, String type, String output) {
System.out.println("MyStatementMessageHandler1, update output: " + output);
}
}
public static class MyStatementMessageHandler2 implements StatementMessageHandler {
@Override
public void onStatementAppendOutput(String statementId, int index, String output) {
System.out.println("MyStatementMessageHandler2, append output: " + output);
}
@Override
public void onStatementUpdateOutput(String statementId, int index, String type, String output) {
System.out.println("MyStatementMessageHandler2, update output: " + output);
}
}
除了程式設計方式跑 Flink Job,這個 Session API 還能給我們帶來什麼呢?
在 Zeppelin 裡如果你可以通過 %flink.conf 來對你的 Flink Cluster 進行非常豐富的配置,但是 %flink.conf 是純文字的配置,不熟悉 Flink 的人很容易配錯(如下圖)。如果你是自己做 Flink 開發平台的話就可以做一個更完整的 UI,用一些下拉框等等把一些配置選項固定下來,使用者隻要選擇就行了,不需要自己輸入文本來配置。
還有下面這類 paragraph 的 local properties 配置,比如 type,template, resumeFromLatestCheckpoint 也是比較容易寫錯的,同理你可以在自己 UI 裡用一些控件把這些選項提前固定下來,而不是讓使用者輸入文本的方式。
我相信 Zeppelin Client API 還有很多可以發揮和想象的空間,大家腦洞起來吧。
▼ 視訊示範 ▼
視訊示範連結 https://v.qq.com/x/page/m3146grr5e1.html
更多 Flink 技術幹貨及使用交流可加入 Flink 社群釘釘大群。