天天看點

Flink如何連接配接hive

回顧

在上篇文章中,筆者使用的 CDH 版本為 5.16.2,其中 Hive 版本為 1.1.0(CDH 5.x 系列 Hive 版本都不高于 1.1.0,是不是不可了解),Flink 源代碼本身對 Hive 1.1.0 版本相容性不好,存在不少問題。為了相容目前版本,筆者基于 CDH 5.16.2 環境,對 Flink 代碼進行了修改,重新打包并部署。

其實經過很多開源項目的實戰,比如 Apache Atlas,Apache Spark 等,Hive 1.2.x 和 Hive 1.1.x 在大部分情況下,替換一些 Jar 包,是可以解決相容性的問題。對于筆者的環境來說,可以使用 Hive 1.2.1 版本的一些 Jar 包來代替 Hive 1.1.0 版本的 Jar 包。在本篇文章的開始部分,筆者會解決這個問題,然後再補充上篇文章缺少的實戰内容。

剪不斷理還亂的問題

根據讀者的回報,筆者将所有的問題總結為三類:

  • Flink 如何連接配接 Hive 除了 API 外,有沒有類似 spark-sql 指令
  • 識别不到 Hadoop 環境或配置檔案找不到
  • 依賴包、類或方法找不到

1. Flink 如何連接配接 Hive

有的讀者不太清楚,如何配置 Flink 連接配接 Hive 的 Catalog,這裡補充一個完整的 conf/sql-client-hive.yaml 示例:

01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31

catalogs:

- name: staginghive

type

: hive

hive-conf-

dir

:

/etc/hive/conf

hive-version: 1.2.1

execution:

planner: blink

type

: batch

time

-characteristic: event-

time

periodic-watermarks-interval: 200

result-mode: table

max-table-result-rows: 1000000

parallelism: 1

max-parallelism: 128

min-idle-state-retention: 0

max-idle-state-retention: 0

current-catalog: staginghive

current-database: ssb

restart-strategy:

type

: fallback

deployment:

response-timeout: 5000

gateway-address:

""

gateway-port: 0

m: yarn-cluster

yn: 2

ys: 5

yjm: 1024

ytm: 2048

sql-client-hive.yaml 配置檔案裡面包含:

  • Hive 配置檔案 catalogs 中配置了 Hive 的配置檔案路徑。
  • Yarn 配置資訊 deployment 中配置了 Yarn 的配置資訊。
  • 執行引擎資訊 execution 配置了 blink planner,并且使用 batch 模式。batch 模式比較穩定,适合傳統的批處理作業,而且可以容錯,另外中間資料落盤,建議開啟壓縮功能。除了 batch,Flink 也支援 streaming 模式。
  • Flink SQL CLI 工具

類似 spark-sql 指令,Flink 提供了 SQL CLI 工具,即 sql-client.sh 腳本。在 Flink 1.10 版本中,Flink SQL CLI 改進了很多功能,筆者後面講解。

sql-client.sh 使用方式如下:

1

$ bin

/sql-client

.sh embedded -d conf

/sql-client-hive

.yaml

2. 識别不到 Hadoop 環境或配置檔案找不到

筆者在上篇文章中提到過,在部署 Flink 的環境上部署 CDH gateway,包括 Hadoop、Hive 用戶端,另外還需要配置一些環境變量,如下:

1 2 3 4

export

HADOOP_CONF_DIR=

/etc/hadoop/conf

export

YARN_CONF_DIR=

/etc/hadoop/conf

export

HIVE_HOME=

/opt/cloudera/parcels/CDH/lib/hive

export

HIVE_CONF_DIR=

/etc/hive/conf

3. 依賴包、類或方法找不到

先檢視一下 Flink 家目錄下的 lib 目錄:

01 02 03 04 05 06 07 08 09 10 11 12 13

$ tree  lib

lib

├── flink-connector-hive_2.11-1.10.0.jar

├── flink-dist_2.11-1.10.0.jar

├── flink-hadoop-compatibility_2.11-1.10.0.jar

├── flink-shaded-hadoop-2-2.6.0-cdh5.16.2-9.0.jar

├── flink-table_2.11-1.10.0.jar

├── flink-table-blink_2.11-1.10.0.jar

├── hive-

exec

-1.1.0-cdh5.16.2.jar

├── hive-metastore-1.1.0-cdh5.16.2.jar

├── libfb303-0.9.3.jar

├── log4j-1.2.17.jar

└── slf4j-log4j12-1.7.15.jar

如果上面前兩個問題都解決後,執行如下指令:

1

$ bin

/sql-client

.sh embedded -d conf

/sql-client-hive

.yaml

報錯,報錯,還是報錯:

1

Caused by: java.lang.ClassNotFoundException: org.apache.commons.logging.LogFactory

其實在運作 sql-client.sh 腳本前,需要指定 Hadoop 環境的依賴包的路徑,建議不要報錯一個添加一個,除非有的讀者喜歡。這裡筆者提示一個友善的方式,即設定 HADOOPCLASSPATH(可以添加到 ~/.bashprofile 中)環境變量:

1

export

HADOOP_CLASSPATH=`hadoop classpath`

再次執行:

1

$ bin

/sql-client

.sh embedded -d conf

/sql-client-hive

.yaml

很抱歉,繼續報錯:

1

Caused by: org.apache.flink.table.client.gateway.SqlExecutionException: Could not create execution context. at org.apache.flink.table.client.gateway.

local

.ExecutionContext$Builder.build(ExecutionContext.java:753) at org.apache.flink.table.client.gateway.

local

.LocalExecutor.openSession(LocalExecutor.java:228) at org.apache.flink.table.client.SqlClient.start(SqlClient.java:98) at org.apache.flink.table.client.SqlClient.main(SqlClient.java:178) Caused by: org.apache.flink.table.catalog.exceptions.CatalogException: Failed to create Hive Metastore client

這裡就是 Hive 1.1.0 版本的 Jar 包與 Flink 出現版本不相容性的問題了,解決方法是:

  • 下載下傳 apache-hive-1.2.1 版本
  • 替換 Flink lib 目錄下的 Hive Jar 包 删除掉 hive-exec-1.1.0-cdh5.16.2.jar、 hive-metastore-1.1.0-cdh5.16.2.jar 和 libfb303-0.9.3.jar,然後添加 hive-exec-1.2.1.jar、 hive-metastore-1.2.1.jar 和 libfb303-0.9.2.jar,再次檢視 lib 目錄:
01 02 03 04 05 06 07 08 09 10 11 12 13

$ tree lib

lib

├── flink-connector-hive_2.11-1.10.0.jar

├── flink-dist_2.11-1.10.0.jar

├── flink-hadoop-compatibility_2.11-1.10.0.jar

├── flink-shaded-hadoop-2-2.6.0-cdh5.16.2-9.0.jar

├── flink-table_2.11-1.10.0.jar

├── flink-table-blink_2.11-1.10.0.jar

├── hive-

exec

-1.2.1.jar

├── hive-metastore-1.2.1.jar

├── libfb303-0.9.2.jar

├── log4j-1.2.17.jar

└── slf4j-log4j12-1.7.15.jar

最後再執行:

1

$ bin

/sql-client

.sh embedded -d conf

/sql-client-hive

.yaml

這時,讀者就可以看到手握栗子的可愛小松鼠了。

Flink如何連接配接hive

Flink SQL CLI 實踐

在 Flink 1.10 版本(目前為 RC1 階段) 中,Flink 社群對 SQL CLI 做了大量的改動,比如支援 View、支援更多的資料類型和 DDL 語句、支援分區讀寫、支援 INSERT OVERWRITE 等,實作了更多的 TableEnvironment API 的功能,更加友善使用者使用。

接下來,筆者詳細講解 Flink SQL CLI。

0. Help

執行下面指令,登入 Flink SQL 用戶端:

1 2

$ bin

/sql-client

.sh embedded -d conf

/sql-client-hive

.yaml

Flink SQL>

執行 HELP,檢視 Flink SQL 支援的指令,如下為大部分常用的:

  • CREATE TABLE
  • DROP TABLE
  • CREATE VIEW
  • DESCRIBE
  • DROP VIEW
  • EXPLAIN
  • INSERT INTO
  • INSERT OVERWRITE
  • SELECT
  • SHOW FUNCTIONS
  • USE CATALOG
  • SHOW TABLES
  • SHOW DATABASES
  • SOURCE
  • USE
  • SHOW CATALOGS

1. Hive 操作

  • 1.1 建立表和導入資料

為了友善讀者進行實驗,筆者使用 ssb-dbgen 生成測試資料,讀者也可以使用測試環境已有的資料來進行實驗。

具體如何在 Hive 中一鍵式建立表并插入資料,可以參考筆者早期的項目  https://github.com/MLikeWater/ssb-kylin

  • 1.2 Hive 表

檢視上個步驟中建立的 Hive 表:

01 02 03 04 05 06 07 08 09 10 11

0: jdbc:hive2:

//xx

.xxx.xxx.xxx:10000> show tables;

+--------------+--+

|   tab_name   |

+--------------+--+

| customer     |

| dates        |

| lineorder    |

| p_lineorder  |

| part         |

| supplier     |

+--------------+--+

讀者可以對 Hive 進行各種查詢,對比後面 Flink SQL 查詢的結果。

2. Flink 操作

  • 2.1 通過 HiveCatalog 通路 Hive 資料庫

登入 Flink SQL CLI,并查詢 catalogs:

1 2 3 4 5 6

$ bin

/sql-client

.sh embedded -d conf

/sql-client-hive

.yaml

Flink SQL> show catalogs;

default_catalog

staginghive

Flink SQL> use catalog staginghive;

通過 show catalogs 擷取配置的所有 catalog。由于筆者在 sql-client-hive.yaml 檔案中設定了預設的 catalog,即為 staginghive。如果需要切換到其他 catalog,可以使用 usecatalog xxx。

  • 2.2 查詢 Hive 中繼資料

通過 Flink SQL 查詢 Hive 資料庫和表:

01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28

# 查詢資料庫

Flink SQL> show databases;

...

ssb

tmp

...

Flink SQL> use ssb;

# 查詢表

Flink SQL> show tables;

customer

dates

lineorder

p_lineorder

part

supplier

# 查詢表結構

Flink SQL> DESCRIBE customer;

root

|-- c_custkey: INT

|-- c_name: STRING

|-- c_address: STRING

|-- c_city: STRING

|-- c_nation: STRING

|-- c_region: STRING

|-- c_phone: STRING

|-- c_mktsegment: STRING

這裡需要注意,Hive 的中繼資料在 Flink catalog 中都以小寫字母使用。

  • 2.3 查詢

接下來,在 Flink SQL CLI 中查詢一些 SQL 語句,完整 SQL 參考  https://github.com/MLikeWater/ssb-kylin  的 README。

目前 Flink SQL 解析 Hive 視圖中繼資料時,會遇到一些 Bug,比如執行 Q1.1 SQL:

1 2 3 4 5 6 7 8 9

Flink SQL>

select

sum

(v_revenue) as revenue

> from p_lineorder

> left

join

dates on lo_orderdate = d_datekey

> where d_year = 1993

> and lo_discount between 1 and 3

> and lo_quantity < 25;

[ERROR] Could not execute SQL statement. Reason:

org.apache.calcite.sql.validate.SqlValidatorException: Tabeorder

' not found; did you mean '

LINEORDER'?

Flink SQL 找不到視圖中的實體表。

p_lineorder 表是 Hive 中的一張視圖,建立表的語句如下:

01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43

CREATE VIEW P_LINEORDER AS

SELECT LO_ORDERKEY,

LO_LINENUMBER,

LO_CUSTKEY,

LO_PARTKEY,

LO_SUPPKEY,

LO_ORDERDATE,

LO_ORDERPRIOTITY,

LO_SHIPPRIOTITY,

LO_QUANTITY,

LO_EXTENDEDPRICE,

LO_ORDTOTALPRICE,

LO_DISCOUNT,

LO_REVENUE,

LO_SUPPLYCOST,

LO_TAX,

LO_COMMITDATE,

LO_SHIPMODE,

LO_EXTENDEDPRICE*LO_DISCOUNT AS V_REVENUE

FROM ssb.LINEORDER;

但是對于 Hive 中視圖的定義,Flink SQL 并沒有很好地進行中繼資料。為了後面 SQL 的順利執行,這裡筆者在 Hive 中删除并重建該視圖:

0: jdbc:hive2:

//xx

.xxx.xxx.xxx:10000> create view p_lineorder as

select

lo_orderkey,

lo_linenumber,

lo_custkey,

lo_partkey,

lo_suppkey,

lo_orderdate,

lo_orderpriotity,

lo_shippriotity,

lo_quantity,

lo_extendedprice,

lo_ordtotalprice,

lo_discount,

lo_revenue,

lo_supplycost,

lo_tax,

lo_commitdate,

lo_shipmode,

lo_extendedprice*lo_discount as v_revenue

from ssb.lineorder;

然後繼續在 Flink SQL CLI 中查詢 Q1.1 SQL:

1 2 3 4 5 6 7 8 9

Flink SQL>

select

sum

(v_revenue) as revenue

> from p_lineorder

> left

join

dates on lo_orderdate = d_datekey

> where d_year = 1993

> and lo_discount between 1 and 3

> and lo_quantity < 25;

revenue

894280292647

繼續查詢 Q2.1 SQL:

[Bash shell] 純文字檢視 複制代碼 ?

01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18

Flink SQL>

select

sum

(lo_revenue) as lo_revenue, d_year, p_brand

> from p_lineorder

> left

join

dates on lo_orderdate = d_datekey

> left

join

part on lo_partkey = p_partkey

> left

join

supplier on lo_suppkey = s_suppkey

> where p_category =

'MFGR#12'

and s_region =

'AMERICA'

> group by d_year, p_brand

> order by d_year, p_brand;

lo_revenue  d_year p_brand

819634128   1998   MFGR

#1206

877651232   1998   MFGR

#1207

754489428   1998   MFGR

#1208

816369488   1998   MFGR

#1209

668482306   1998   MFGR

#1210

660366608   1998   MFGR

#1211

862902570   1998   MFGR

#1212

...

最後再查詢一個 Q4.3 SQL:

01 02 03 04 05 06 07 08 09 10 11 12 13 14

Flink SQL>

select

d_year, s_city, p_brand,

sum

(lo_revenue) -

sum

(lo_supplycost) as profit

> from p_lineorder

> left

join

dates on lo_orderdate = d_datekey

> left

join

customer on lo_custkey = c_custkey

> left

join

supplier on lo_suppkey = s_suppkey

> left

join

part on lo_partkey = p_partkey

> where c_region =

'AMERICA'

and s_nation =

'UNITED STATES'

> and (d_year = 1997 or d_year = 1998)

> and p_category =

'MFGR#14'

> group by d_year, s_city, p_brand

> order by d_year, s_city, p_brand;

d_year  s_city       p_brand       profit

1998    UNITED ST9   MFGR

#1440     6665681

如果讀者感興趣的話,可以查詢剩餘的 SQL,當然也可以和 Spark SQL 進行比較。另外 Flink SQL 也支援 EXPLAIN,查詢 SQL 的執行計劃。

  • 2.4 建立視圖

同樣,可以在 Flink SQL CLI 中建立和删除視圖,如下:

01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21

Flink SQL> create view p_lineorder2 as

>

select

lo_orderkey,

> lo_linenumber,

> lo_custkey,

> lo_partkey,

> lo_suppkey,

> lo_orderdate,

> lo_orderpriotity,

> lo_shippriotity,

> lo_quantity,

> lo_extendedprice,

> lo_ordtotalprice,

> lo_discount,

> lo_revenue,

> lo_supplycost,

> lo_tax,

> lo_commitdate,

> lo_shipmode,

> lo_extendedprice * lo_discount as v_revenue

> from ssb.lineorder;

[INFO] View has been created.

這裡筆者需要特别強調的是,目前 Flink 無法删除 Hive 中的視圖:

[Bash shell] 純文字檢視 複制代碼 ?

1 2 3

Flink SQL> drop view p_lineorder;

[ERROR] Could not execute SQL statement. Reason:

The given view does not exist

in

the current CLI session. Only views created with a CREATE VIEW statement can be accessed.

  • 2.5 分區操作

Hive 資料庫中建立一張分區表:

1 2 3 4 5

CREATE TABLE IF NOT EXISTS flink_partition_test (

id

int,

name string

) PARTITIONED BY (day string,

type

string)

stored as textfile;

接着,通過 Flink SQL 插入和查詢資料:

01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27

# 插入靜态分區的資料

Flink SQL> INSERT INTO flink_partition_test PARTITION (

type

=

'Flink'

, `day`=

'2020-02-01'

) SELECT 100001,

'Flink001'

;

# 查詢

Flink SQL>

select

* from flink_partition_test;

id

name       day           

type

100001   Flink001   2020-02-01     Flink

# 插入動态分區

Flink SQL> INSERT INTO flink_partition_test SELECT 100002,

'Spark'

,

'2020-02-02'

,

'SparkSQL'

;

# 查詢

Flink SQL>

select

* from flink_partition_test;

id

name          day          

type

100002    Spark         2020-02-02    SparkSQL

100001    FlinkSQL      2020-02-01    Flink

# 動态和靜态分區結合使用類似,不再示範

# 覆寫插入資料

Flink SQL> INSERT OVERWRITE flink_partition_test PARTITION (

type

=

'Flink'

) SELECT 100002,

'Spark'

,

'2020-02-08'

,

'SparkSQL-2.4'

;

id

name        day            

type

100002  Spark       2020-02-02      SparkSQL

100001  FlinkSQL    2020-02-01      Flink

字段 day 在 Flink 屬于關鍵字,要特殊處理。

  • 2.6 其他功能
  • 2.6.1 函數

Flink SQL 支援内置的函數和自定義函數。對于内置的函數,可以執行 show functions 進行檢視,這一塊筆者以後會單獨介紹如何建立自定義函數。

  • 2.6.2 設定參數

Flink SQL 支援設定環境參數,可以使用 set 指令檢視和設定參數:

01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24

Flink SQL>

set

;

deployment.gateway-address=

deployment.gateway-port=0

deployment.m=yarn-cluster

deployment.response-timeout=5000

deployment.yjm=1024

deployment.yn=2

deployment.ys=5

deployment.ytm=2048

execution.current-catalog=staginghive

execution.current-database=ssb

execution.max-idle-state-retention=0

execution.max-parallelism=128

execution.max-table-result-rows=1000000

execution.min-idle-state-retention=0

execution.parallelism=1

execution.periodic-watermarks-interval=200

execution.planner=blink

execution.restart-strategy.

type

=fallback

execution.result-mode=table

execution.

time

-characteristic=event-

time

execution.

type

=batch

Flink SQL>

set

deployment.yjm = 2048;

總結

在本文中,筆者通過 Flink SQL 比較詳細地去操作 Hive 資料庫,以及 Flink SQL 提供的一些功能。

當然,目前 Flink SQL 操作 Hive 資料庫還是存在一些問題:

  • 目前隻支援 TextFile 存儲格式,還無法指定其他存儲格式 ,隻支援 Hive 資料庫中 TextFile 存儲格式的表,而且 row format serde 是 org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe。雖然實作了 RCFile、ORC、Parquet、Sequence 等存儲格式,但是無法自動識别 Hive 表的存儲格式。如果要使用其他存儲格式,需要修改源碼,重新編譯。不過社群已經對這些存儲格式進行了測試,相信不久以後就可以在 Flink SQL 中使用。
  • OpenCSVSerde 支援不完善:如果讀者使用 TextFile 的 row format serde 為 org.apache.hadoop.hive.serde2.OpenCSVSerde 時,無法正确識别字段類型,會把 Hive 表的字段全部映射為 String 類型。
  • 暫時不支援 Bucket 表
  • 暫時不支援 ACID 表
  • Flink SQL 優化方面功能較少
  • 權限控制方面:這方面和 Spark SQL 類似,目前基于 HDFS ACL 控制,暫時還沒有實作 Sentry 或 Ranger 控制權限,不過目前 Cloudera 正在開發基于 Ranger 設定 Spark SQL 和 Hive 共享通路權限的政策,實作行/列級控制以及審計資訊。

Flink 社群發展很快,所有這些問題隻是暫時的,随着新版本的釋出會被逐個解決。

如果 Flink SQL 目前不滿足的需求,建議使用 API 方式來解決問題。

繼續閱讀