回顧
在上篇文章中,筆者使用的 CDH 版本為 5.16.2,其中 Hive 版本為 1.1.0(CDH 5.x 系列 Hive 版本都不高于 1.1.0,是不是不可了解),Flink 源代碼本身對 Hive 1.1.0 版本相容性不好,存在不少問題。為了相容目前版本,筆者基于 CDH 5.16.2 環境,對 Flink 代碼進行了修改,重新打包并部署。
其實經過很多開源項目的實戰,比如 Apache Atlas,Apache Spark 等,Hive 1.2.x 和 Hive 1.1.x 在大部分情況下,替換一些 Jar 包,是可以解決相容性的問題。對于筆者的環境來說,可以使用 Hive 1.2.1 版本的一些 Jar 包來代替 Hive 1.1.0 版本的 Jar 包。在本篇文章的開始部分,筆者會解決這個問題,然後再補充上篇文章缺少的實戰内容。
剪不斷理還亂的問題
根據讀者的回報,筆者将所有的問題總結為三類:
- Flink 如何連接配接 Hive 除了 API 外,有沒有類似 spark-sql 指令
- 識别不到 Hadoop 環境或配置檔案找不到
- 依賴包、類或方法找不到
1. Flink 如何連接配接 Hive
有的讀者不太清楚,如何配置 Flink 連接配接 Hive 的 Catalog,這裡補充一個完整的 conf/sql-client-hive.yaml 示例:
01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 | |
sql-client-hive.yaml 配置檔案裡面包含:
- Hive 配置檔案 catalogs 中配置了 Hive 的配置檔案路徑。
- Yarn 配置資訊 deployment 中配置了 Yarn 的配置資訊。
- 執行引擎資訊 execution 配置了 blink planner,并且使用 batch 模式。batch 模式比較穩定,适合傳統的批處理作業,而且可以容錯,另外中間資料落盤,建議開啟壓縮功能。除了 batch,Flink 也支援 streaming 模式。
- Flink SQL CLI 工具
類似 spark-sql 指令,Flink 提供了 SQL CLI 工具,即 sql-client.sh 腳本。在 Flink 1.10 版本中,Flink SQL CLI 改進了很多功能,筆者後面講解。
sql-client.sh 使用方式如下:
1 | |
2. 識别不到 Hadoop 環境或配置檔案找不到
筆者在上篇文章中提到過,在部署 Flink 的環境上部署 CDH gateway,包括 Hadoop、Hive 用戶端,另外還需要配置一些環境變量,如下:
1 2 3 4 | |
3. 依賴包、類或方法找不到
先檢視一下 Flink 家目錄下的 lib 目錄:
01 02 03 04 05 06 07 08 09 10 11 12 13 | |
如果上面前兩個問題都解決後,執行如下指令:
1 | |
報錯,報錯,還是報錯:
1 | |
其實在運作 sql-client.sh 腳本前,需要指定 Hadoop 環境的依賴包的路徑,建議不要報錯一個添加一個,除非有的讀者喜歡。這裡筆者提示一個友善的方式,即設定 HADOOPCLASSPATH(可以添加到 ~/.bashprofile 中)環境變量:
1 | |
再次執行:
1 | |
很抱歉,繼續報錯:
1 | |
這裡就是 Hive 1.1.0 版本的 Jar 包與 Flink 出現版本不相容性的問題了,解決方法是:
- 下載下傳 apache-hive-1.2.1 版本
- 替換 Flink lib 目錄下的 Hive Jar 包 删除掉 hive-exec-1.1.0-cdh5.16.2.jar、 hive-metastore-1.1.0-cdh5.16.2.jar 和 libfb303-0.9.3.jar,然後添加 hive-exec-1.2.1.jar、 hive-metastore-1.2.1.jar 和 libfb303-0.9.2.jar,再次檢視 lib 目錄:
01 02 03 04 05 06 07 08 09 10 11 12 13 | |
最後再執行:
1 | |
這時,讀者就可以看到手握栗子的可愛小松鼠了。
Flink SQL CLI 實踐
在 Flink 1.10 版本(目前為 RC1 階段) 中,Flink 社群對 SQL CLI 做了大量的改動,比如支援 View、支援更多的資料類型和 DDL 語句、支援分區讀寫、支援 INSERT OVERWRITE 等,實作了更多的 TableEnvironment API 的功能,更加友善使用者使用。
接下來,筆者詳細講解 Flink SQL CLI。
0. Help
執行下面指令,登入 Flink SQL 用戶端:
1 2 | |
執行 HELP,檢視 Flink SQL 支援的指令,如下為大部分常用的:
- CREATE TABLE
- DROP TABLE
- CREATE VIEW
- DESCRIBE
- DROP VIEW
- EXPLAIN
- INSERT INTO
- INSERT OVERWRITE
- SELECT
- SHOW FUNCTIONS
- USE CATALOG
- SHOW TABLES
- SHOW DATABASES
- SOURCE
- USE
- SHOW CATALOGS
1. Hive 操作
- 1.1 建立表和導入資料
為了友善讀者進行實驗,筆者使用 ssb-dbgen 生成測試資料,讀者也可以使用測試環境已有的資料來進行實驗。
具體如何在 Hive 中一鍵式建立表并插入資料,可以參考筆者早期的項目 https://github.com/MLikeWater/ssb-kylin
- 1.2 Hive 表
檢視上個步驟中建立的 Hive 表:
01 02 03 04 05 06 07 08 09 10 11 | |
讀者可以對 Hive 進行各種查詢,對比後面 Flink SQL 查詢的結果。
2. Flink 操作
- 2.1 通過 HiveCatalog 通路 Hive 資料庫
登入 Flink SQL CLI,并查詢 catalogs:
1 2 3 4 5 6 | |
通過 show catalogs 擷取配置的所有 catalog。由于筆者在 sql-client-hive.yaml 檔案中設定了預設的 catalog,即為 staginghive。如果需要切換到其他 catalog,可以使用 usecatalog xxx。
- 2.2 查詢 Hive 中繼資料
通過 Flink SQL 查詢 Hive 資料庫和表:
01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 | |
這裡需要注意,Hive 的中繼資料在 Flink catalog 中都以小寫字母使用。
- 2.3 查詢
接下來,在 Flink SQL CLI 中查詢一些 SQL 語句,完整 SQL 參考 https://github.com/MLikeWater/ssb-kylin 的 README。
目前 Flink SQL 解析 Hive 視圖中繼資料時,會遇到一些 Bug,比如執行 Q1.1 SQL:
1 2 3 4 5 6 7 8 9 | |
Flink SQL 找不到視圖中的實體表。
p_lineorder 表是 Hive 中的一張視圖,建立表的語句如下:
01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 | |
然後繼續在 Flink SQL CLI 中查詢 Q1.1 SQL:
1 2 3 4 5 6 7 8 9 | |
繼續查詢 Q2.1 SQL:
[Bash shell] 純文字檢視 複制代碼 ?
01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 | |
最後再查詢一個 Q4.3 SQL:
01 02 03 04 05 06 07 08 09 10 11 12 13 14 | |
如果讀者感興趣的話,可以查詢剩餘的 SQL,當然也可以和 Spark SQL 進行比較。另外 Flink SQL 也支援 EXPLAIN,查詢 SQL 的執行計劃。
- 2.4 建立視圖
同樣,可以在 Flink SQL CLI 中建立和删除視圖,如下:
01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 | |
這裡筆者需要特别強調的是,目前 Flink 無法删除 Hive 中的視圖:
[Bash shell] 純文字檢視 複制代碼 ?
1 2 3 | |
- 2.5 分區操作
Hive 資料庫中建立一張分區表:
1 2 3 4 5 | |
接着,通過 Flink SQL 插入和查詢資料:
01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 | |
字段 day 在 Flink 屬于關鍵字,要特殊處理。
- 2.6 其他功能
- 2.6.1 函數
Flink SQL 支援内置的函數和自定義函數。對于内置的函數,可以執行 show functions 進行檢視,這一塊筆者以後會單獨介紹如何建立自定義函數。
- 2.6.2 設定參數
Flink SQL 支援設定環境參數,可以使用 set 指令檢視和設定參數:
01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 | |
總結
在本文中,筆者通過 Flink SQL 比較詳細地去操作 Hive 資料庫,以及 Flink SQL 提供的一些功能。
當然,目前 Flink SQL 操作 Hive 資料庫還是存在一些問題:
- 目前隻支援 TextFile 存儲格式,還無法指定其他存儲格式 ,隻支援 Hive 資料庫中 TextFile 存儲格式的表,而且 row format serde 是 org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe。雖然實作了 RCFile、ORC、Parquet、Sequence 等存儲格式,但是無法自動識别 Hive 表的存儲格式。如果要使用其他存儲格式,需要修改源碼,重新編譯。不過社群已經對這些存儲格式進行了測試,相信不久以後就可以在 Flink SQL 中使用。
- OpenCSVSerde 支援不完善:如果讀者使用 TextFile 的 row format serde 為 org.apache.hadoop.hive.serde2.OpenCSVSerde 時,無法正确識别字段類型,會把 Hive 表的字段全部映射為 String 類型。
- 暫時不支援 Bucket 表
- 暫時不支援 ACID 表
- Flink SQL 優化方面功能較少
- 權限控制方面:這方面和 Spark SQL 類似,目前基于 HDFS ACL 控制,暫時還沒有實作 Sentry 或 Ranger 控制權限,不過目前 Cloudera 正在開發基于 Ranger 設定 Spark SQL 和 Hive 共享通路權限的政策,實作行/列級控制以及審計資訊。
Flink 社群發展很快,所有這些問題隻是暫時的,随着新版本的釋出會被逐個解決。
如果 Flink SQL 目前不滿足的需求,建議使用 API 方式來解決問題。