天天看點

Flink SQL 1.11 on Zeppelin 平台化實踐

作者:LittleMagic

大資料領域 SQL 化開發的風潮方興未艾(所謂"Everybody knows SQL"),Flink 自然也不能“免俗”。Flink SQL 是 Flink 系統内部最進階别的 API,也是流批一體思想的集大成者。使用者可以通過簡單明了的 SQL 語句像查表一樣執行流任務或批任務,屏蔽了底層 DataStream/DataSet API 的複雜細節,降低了使用門檻。

但是,Flink SQL 的預設開發方式是通過 Java/Scala API 編寫,與純 SQL 化、平台化的目标相去甚遠。目前官方提供的 Flink SQL Client 僅能在配備 Flink 用戶端的本地使用,局限性很大。而 Ververica 開源的 Flink SQL Gateway 元件是基于 REST API 的,仍然需要二次開發才能供給上層使用,并不是很友善。

鑒于有很多企業都無法配備專門的團隊來解決 Flink SQL 平台化的問題,那麼到底有沒有一個開源的、開箱即用的、功能相對完善的元件呢?答案就是本文的主角——Apache Zeppelin。

Flink SQL on Zeppelin!

Flink SQL 1.11 on Zeppelin 平台化實踐

Zeppelin 是基于 Web 的互動式資料分析筆記本,支援 SQL、Scala、Python 等語言。Zeppelin 通過插件化的 Interpreter(解釋器)來解析使用者送出的代碼,并将其轉化到對應的後端(計算架構、資料庫等)執行,靈活性很高。其架構簡圖如下所示。

Flink SQL 1.11 on Zeppelin 平台化實踐

Flink Interpreter 就是 Zeppelin 原生支援的衆多 Interpreters 之一。隻要配置好 Flink Interpreter 以及相關的執行環境,我們就可以将 Zeppelin 用作 Flink SQL 作業的開發平台了(當然,Scala 和 Python 也是沒問題的)。接下來本文就逐漸介紹 Flink on Zeppelin 的內建方法。

配置 Zeppelin

目前 Zeppelin 的最新版本是 0.9.0-preview2,可以在官網下載下傳包含所有 Interpreters 的 zeppelin-0.9.0-preview2-bin-all.tgz,并解壓到伺服器的合适位置。

接下來進入 conf 目錄。将環境配置檔案 zeppelin-env.sh.template 更名為 zeppelin-env.sh,并修改:

# JDK目錄
export JAVA_HOME=/opt/jdk1.8.0_172
# 友善之後配置Interpreter on YARN模式。注意必須安裝Hadoop,且hadoop必須配置在系統環境變量PATH中
export USE_HADOOP=true
# Hadoop配置檔案目錄
export HADOOP_CONF_DIR=/etc/hadoop/hadoop-conf           

将服務配置檔案 zeppelin-site.xml.template 更名為 zeppelin-site.xml,并修改:

<!-- 服務位址。預設為127.0.0.1,改為0.0.0.0使得可以在外部通路 -->
<property>
  <name>zeppelin.server.addr</name>
  <value>0.0.0.0</value>
  <description>Server binding address</description>
</property>

<!-- 服務端口。預設為8080,如果已占用,可以修改之 -->
<property>
  <name>zeppelin.server.port</name>
  <value>18080</value>
  <description>Server port.</description>
</property>           

最基礎的配置就完成了。運作 bin/zeppelin-daemon.sh start 指令,傳回 Zeppelin start [ OK ]的提示之後,通路<伺服器位址>:18080,出現下面的頁面,就表示 Zeppelin 服務啟動成功。

Flink SQL 1.11 on Zeppelin 平台化實踐

當然,為了一步到位适應生産環境,也可以适當修改 zeppelin-site.xml 中的以下參數:

<!-- 将Notebook repo更改為HDFS存儲 -->
<property>
  <name>zeppelin.notebook.storage</name>
  <value>org.apache.zeppelin.notebook.repo.FileSystemNotebookRepo</value>
  <description>Hadoop compatible file system notebook persistence layer implementation, such as local file system, hdfs, azure wasb, s3 and etc.</description>
</property>

<!-- Notebook在HDFS上的存儲路徑 -->
<property>
  <name>zeppelin.notebook.dir</name>
  <value>/zeppelin/notebook</value>
  <description>path or URI for notebook persist</description>
</property>

<!-- 啟用Zeppelin的恢複功能。當Zeppelin服務挂掉并重新開機之後,能連接配接到原來運作的Interpreter -->
<property>
  <name>zeppelin.recovery.storage.class</name>
  <value>org.apache.zeppelin.interpreter.recovery.FileSystemRecoveryStorage</value>
  <description>ReoveryStorage implementation based on hadoop FileSystem</description>
</property>

<!-- Zeppelin恢複中繼資料在HDFS上的存儲路徑 -->
<property>
  <name>zeppelin.recovery.dir</name>
  <value>/zeppelin/recovery</value>
  <description>Location where recovery metadata is stored</description>
</property>

<!-- 禁止使用匿名使用者 -->
<property>
  <name>zeppelin.anonymous.allowed</name>
  <value>true</value>
  <description>Anonymous user allowed by default</description>
</property>           

Zeppelin 內建了 Shiro 實作權限管理。禁止使用匿名使用者之後,可以在 conf 目錄下的 shiro.ini 中配置使用者名、密碼、角色等,不再贅述。注意每次修改配置都需要運作 bin/zeppelin-daemon.sh restart 重新開機 Zeppelin 服務。

配置 Flink Interpreter on YARN

在使用 Flink Interpreter 之前,我們有必要對它進行配置,使 Flink 作業和 Interpreter 本身在 YARN 環境中運作。

點選首頁使用者名區域菜單中的 Interpreter 項(上一節圖中已經示出),搜尋 Flink,就可以看到參數清單。

Flink SQL 1.11 on Zeppelin 平台化實踐

Interpreter Binding

首先,将 Interpreter Binding 模式修改為 Isolated per Note,如下圖所示。

Flink SQL 1.11 on Zeppelin 平台化實踐

在這種模式下,每個 Note 在執行時會分别啟動 Interpreter 程序,類似于 Flink on YARN 的 Per-job 模式,最符合生産環境的需要。

Flink on YARN 參數

以下是需要修改的部分基礎參數。注意這些參數也可以在 Note 中指定,每個作業自己的配置會覆寫掉這裡的預設配置。

  • FLINK_HOME:Flink 1.11所在的目錄;
  • HADOOP_CONF_DIR:Hadoop 配置檔案所在的目錄;
  • flink.execution.mode:Flink 作業的執行模式,指定為 YARN 以啟用 Flink on YARN;
  • flink.jm.memory:JobManager 的記憶體量(MB);
  • flink.tm.memory:TaskManager 的記憶體量(MB);
  • flink.tm.slot:TaskManager 的 Slot 數;
  • flink.yarn.appName:YARN Application 的預設名稱;
  • flink.yarn.queue:送出作業的預設 YARN 隊列。

Hive Integration 參數

如果我們想通路 Hive 資料,以及用 HiveCatalog 管理 Flink SQL 的中繼資料,還需要配置與 Hive 的內建。

  • HIVE_CONF_DIR:Hive 配置檔案(hive-site.xml)所在的目錄;
  • zeppelin.flink.enableHive:設為 true 以啟用 Hive Integration;
  • zeppelin.flink.hive.version:Hive 版本号。
  • 複制與 Hive Integration 相關的依賴到 $FLINK_HOME/lib 目錄下,包括:
  • flink-connector-hive_2.11-1.11.0.jar
  • flink-hadoop-compatibility_2.11-1.11.0.jar
  • hive-exec-..jar
  • 如果 Hive 版本是1.x,還需要額外加入 hive-metastore-1.*.jar、libfb303-0.9.2.jar 和 libthrift-0.9.2.jar
  • 保證 Hive 中繼資料服務(Metastore)啟動。注意不能是 Embedded 模式,即必須以外部資料庫(MySQL、Postgres等)作為中繼資料存儲。

Interpreter on YARN 參數

在預設情況下,Interpreter 程序是在部署 Zeppelin 服務的節點上啟動的。随着送出的任務越來越多,就會出現單點問題。是以我們需要讓 Interpreter 也在 YARN 上運作,如下圖所示。

Flink SQL 1.11 on Zeppelin 平台化實踐
  • zeppelin.interpreter.yarn.resource.cores:Interpreter Container 占用的vCore 數量;
  • zeppelin.interpreter.yarn.resource.memory:Interpreter Container 占用的記憶體量(MB);
  • zeppelin.interpreter.yarn.queue:Interpreter 所處的 YARN 隊列名稱。

配置完成之後,Flink on Zeppelin 內建完畢,可以測試一下了。

測試 Flink SQL on Zeppelin

建立一個 Note,Interpreter 指定為 Flink。然後寫入第一個 Paragraph:

Flink SQL 1.11 on Zeppelin 平台化實踐

以 %flink.conf 标記的 Paragraph 用于指定這個 Note 中的作業配置,支援 Flink 的所有配置參數(參見 Flink 官網)。另外,flink.execution.packages 參數支援以 Maven GAV 坐标的方式引入外部依賴項。

接下來建立第二個 Paragraph,建立 Kafka 流表:

Flink SQL 1.11 on Zeppelin 平台化實踐

%flink.ssql 表示利用 StreamTableEnvironment 執行流處理 SQL,相對地,%flink.bsql 表示利用 BatchTableEnvironment 執行批處理 SQL。注意表參數中的 properties.bootstrap.servers 利用了 Zeppelin Credentials 來填寫,友善不同作業之間複用。

執行上述 SQL 之後會輸出資訊:

Flink SQL 1.11 on Zeppelin 平台化實踐

同時在 Hive 中可以看到該表的中繼資料。

最後寫第三個 Paragraph,從流表中查詢,并實時展現出來:

Flink SQL 1.11 on Zeppelin 平台化實踐

點選右上角的 FLINK JOB 标記,可以打開作業的 Web UI。上述作業的 JobGraph 如下。

Flink SQL 1.11 on Zeppelin 平台化實踐

除 SELECT 查詢外,通過 Zeppelin 也可以執行 INSERT 查詢,實作更加豐富的功能。關于 Flink SQL on Zeppelin 的更多應用,筆者在今後的文章中會繼續講解。

更多 Flink 技術交流可加入 Apache Flink 社群釘釘交流群:

Flink SQL 1.11 on Zeppelin 平台化實踐