天天看點

Hive 終于等來了 Flink

作者:Jason

Apache Spark 什麼時候開始支援內建 Hive 功能?筆者相信隻要使用過 Spark 的讀者,應該都會說這是很久以前的事情了。

那 Apache Flink 什麼時候支援與 Hive 的內建呢?讀者可能有些疑惑,還沒有支援吧,沒用過?或者說最近版本才支援,但是功能還比較弱。

其實比較也沒啥意義,不同社群發展的目标總是會有差異,而且 Flink 在真正的實時流計算方面投入的精力很多。不過筆者想表達的是,Apache Hive 已經成為資料倉庫生态系統的焦點,它不僅是一個用于大資料分析和 ETL 的 SQL 引擎,也是一個資料管理平台,是以無論是 Spark,還是 Flink,或是 Impala、Presto 等,都會積極地支援內建 Hive 的功能。

的确,對真正需要使用 Flink 通路 Hive 進行資料讀寫的讀者會發現,Apache Flink 1.9.0 版本才開始提供與 Hive 內建的功能。不過,值得欣慰的是,Flink 社群在內建 Hive 功能方面付出很多,目前進展也比較順利,最近 Flink 1.10.0 RC1 版本已經釋出,感興趣的讀者可以進行調研和驗證功能。

架構設計

首先,筆者基于社群公開的資料以及部落格,概括性地講解 Flink 內建 Hive 的架構設計。

Apache Flink 與 Hive 內建的目的,主要包含了中繼資料和實際表資料的通路。

中繼資料

為了通路外部系統的中繼資料,Flink 剛開始提供了 ExternalCatalog 的概念。但是 ExternalCatalog 的定義非常不完整,基本處于不可用的狀态。Flink 1.10 版本正式删除了 ExternalCatalog API (FLINK-13697),這包括:

  • ExternalCatalog(以及所有依賴的類,比如 ExternalTable)
  • SchematicDescriptor、MetadataDescriptor 和 StatisticsDescriptor

針對 ExternalCatalog 的問題,Flink 社群提出了一套全新的 Catalog 接口(new Catalog API)來取代現有的 ExternalCatalog。新的 Catalog 實作的功能包括:

  • 能夠支援資料庫、表、分區等多種中繼資料對象
  • 允許在一個使用者 Session 中維護多個 Catalog 執行個體,進而支援同時通路多個外部系統
  • Catalog 以可插拔的方式接入 Flink,允許使用者提供自定義的實作

下圖展示了新的 Catalog API 的總體架構:

建立 TableEnvironment 的時候會同時建立一個 CatalogManager,負責管理不同的 Catalog 執行個體。TableEnvironment 通過 Catalog 來為 Table API 和 SQL Client 使用者提供中繼資料服務。

val settings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build()

val tableEnv = TableEnvironment.create(settings)
val name            = "myhive"
val defaultDatabase = "mydatabase"
val hiveConfDir     = "/opt/hive-conf"// a local path
val version         = "2.3.4"

val hive = newHiveCatalog(name, defaultDatabase, hiveConfDir, version)
tableEnv.registerCatalog("myhive", hive)

// set the HiveCatalog as the current catalog of the session
tableEnv.useCatalog("myhive")           

目前 Catalog 有兩個實作,GenericInMemoryCatalog 和 HiveCatalog。其中 GenericInMemoryCatalog 保持了原有的 Flink 中繼資料管理機制,将所有中繼資料儲存在記憶體中。而 HiveCatalog 會與一個 Hive Metastore 的執行個體連接配接,提供中繼資料持久化的能力。要使用 Flink 與 Hive 進行互動,使用者需要配置一個 HiveCatalog,并通過 HiveCatalog 通路 Hive 中的中繼資料。

另一方面,HiveCatalog 也可以用來處理 Flink 自身的中繼資料,在這種場景下,HiveCatalog 僅将 Hive Metastore 作為持久化存儲使用,寫入 Hive Metastore 中的中繼資料并不一定是 Hive 所支援的格式。一個 HiveCatalog 執行個體可以同時支援這兩種模式,使用者無需為管理 Hive 和 Flink 的中繼資料建立不同的執行個體。

另外,通過設計 HiveShim 來支援不同版本的 Hive Metastore,具體支援的 Hive 版本清單,請參考官方文檔。

表資料

Flink 提供了 Hive Data Connector 來讀寫 Hive 的表資料。Hive Data Connector 盡可能的複用了 Hive 本身的 Input/Output Format 和 SerDe 等類,這樣做的好處一方面是減少了代碼重複,更重要的是可以最大程度的保持與 Hive 的相容,即 Flink 寫入的資料 Hive 可以正常讀取,并且反之亦然。

內建 Hive 功能

Flink 與 Hive 內建的功能在 1.9.0 版本中作為試用功能釋出,存在不少使用的局限性,但是不久将釋出的 Flink 1.10 穩定版本會更加完善內建 Hive 的功能并應用到企業場景中。

為了讓讀者提前體驗 Flink 1.10 內建 Hive 的功能,筆者會基于 Cloudera CDH 編譯 Flink 1.10.0 RC1 版本并進行較為完整的測試。

環境資訊

CDH 版本:cdh5.16.2

Flink 版本:release-1.10.0-rc1

Flink 使用了 RC 版本,僅供測試,不建議用于生産環境。

目前 Cloudera Data Platform 正式內建了 Flink 作為其流計算産品,非常友善使用者使用。

CDH 環境開啟了 Sentry 和 Kerberos。

下載下傳并編譯 Flink

$ wget https://github.com/apache/flink/archive/release-1.10.0-rc1.tar.gz
$ tar zxvf release-1.10.0-rc1.tar.gz
$ cd flink-release-1.10.0-rc1/
$ mvn clean install -DskipTests-Pvendor-repos -Dhadoop.version=2.6.0-cdh5.16.2           

不出意外的話,編譯到 flink-hadoop-fs 子產品時,會報如下錯誤:

[ERROR] Failed to execute goal on project flink-hadoop-fs: Could not resolve dependencies for project org.apache.flink:flink-hadoop-fs:jar:1.10.0: Failed to collect dependencies at org.apache.flink:flink-shaded-hadoop-2:jar:2.6.0-cdh5.16.2-9.0: Failed to read artifact descriptor for org.apache.flink:flink-shaded-hadoop-2:jar:2.6.0-cdh5.16.2-9.0: Could not transfer artifact org.apache.flink:flink-shaded-hadoop-2:pom:2.6.0-cdh5.16.2-9.0 from/to HDPReleases ( https://repo.hortonworks.com/content/repositories/releases/): Remote host closed connection during handshake: SSL peer shut down incorrectly

編譯中遇到 flink-shaded-hadoop-2 找不到的問題,其實檢視 Maven 倉庫會發現,根本原因是 CDH 的 flink-shaded-hadoop-2 的 jar 包在 Maven 中央倉庫是沒有對應的編譯版本,是以需要先對 Flink 依賴的 flink-shaded-hadoop-2 進行打包,再進行編譯。

■ 解決 flink-shaded-hadoop-2 問題

  • 擷取 flink-shaded 源碼
git clone https://github.com/apache/flink-shaded.git           
  • 切換依賴的版本分支

根據上面報錯時提示缺少的版本切換對應的代碼分支,即缺少的是 9.0 版本的 flink-shaded-hadoop-2:

git checkout release-9.0           
  • 配置 CDH Repo 倉庫

修改 flink-shaded 項目中的 pom.xml,添加 CDH maven 倉庫,否則編譯時找不到 CDH 相關的包。

在 ... 中添加如下内容:

<profile>
<id>vendor-repos</id>
<activation>
<property>
<name>vendor-repos</name>
</property>
</activation>
<!-- Add vendor maven repositories -->
<repositories>
<!-- Cloudera -->
<repository>
<id>cloudera-releases</id>
<url>https://repository.cloudera.com/artifactory/cloudera-repos</url>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
</repositories>
</profile>           
  • 編譯 flink-shaded

開始執行編譯:

mvn clean install -DskipTests-Drat.skip=true-Pvendor-repos -Dhadoop.version=2.6.0-cdh5.16.2           
建議通過科學上網方式編譯,如果讀者遇到一些網絡連接配接的問題,可以試着重試或者更換依賴元件的倉庫位址。

編譯成功後,就會把 flink-shaded-hadoop-2-uber-2.6.0-cdh5.16.2-9.0.jar 安裝在本地 maven 倉庫,如下為編譯的最後日志:

Installing /Users/.../source/flink-shaded/flink-shaded-hadoop-2-uber/target/flink-shaded-hadoop-2-uber-2.6.0-cdh5.16.2-9.0.jar to /Users/.../.m2/repository/org/apache/flink/flink-shaded-hadoop-2-uber/2.6.0-cdh5.16.2-9.0/flink-shaded-hadoop-2-uber-2.6.0-cdh5.16.2-9.0.jar

Installing /Users/.../source/flink-shaded/flink-shaded-hadoop-2-uber/target/dependency-reduced-pom.xml to /Users/.../.m2/repository/org/apache/flink/flink-shaded-hadoop-2-uber/2.6.0-cdh5.16.2-9.0/flink-shaded-hadoop-2-uber-2.6.0-cdh5.16.2-9.0.pom

重新編譯 Flink

mvn clean install -DskipTests-Pvendor-repos -Dhadoop.version=2.6.0-cdh5.16.2           

漫長的等待過程,讀者可以并行做其他事情。

編譯過程中,如果不出意外的話,會看到類似下面的錯誤資訊:

[INFO] Running 'npm ci --cache-max=0 --no-save' in /Users/xxx/Downloads/Flink/flink-release-1.10.0-rc1/flink-release-1.10.0-rc1/flink-runtime-web/web-dashboard [WARNING] npm WARN prepare removing existing node_modules/ before installation [ERROR] WARN registry Unexpected warning for https://registry.npmjs.org/: Miscellaneous Warning ECONNRESET: request to https://registry.npmjs.org/mime/-/mime-2.4.0.tgz failed, reason: read ECONNRESET [ERROR] WARN registry Using stale package data from https://registry.npmjs.org/ due to a request error during revalidation. [ERROR] WARN registry Unexpected warning for https://registry.npmjs.org/optimist/-/optimist-0.6.1.tgz failed, reason: read ECONNRESET

可以看到, flink-runtime-web 子產品引入了對 frontend-maven-plugin 的依賴,需要安裝 node、npm 和依賴元件。

如果沒有通過科學上網,可以修改 flink-runtime-web/pom.xml 檔案,添加 nodeDownloadRoot 和 npmDownloadRoot 的資訊:

<plugin>
<groupId>com.github.eirslett</groupId>
<artifactId>frontend-maven-plugin</artifactId>
<version>1.6</version>
<executions>
<execution>
<id>install node and npm</id>
<goals>
<goal>install-node-and-npm</goal>
</goals>
<configuration>
<nodeDownloadRoot>https://registry.npm.taobao.org/dist/</nodeDownloadRoot>
<npmDownloadRoot>https://registry.npmjs.org/npm/-/</npmDownloadRoot>
<nodeVersion>v10.9.0</nodeVersion>
</configuration>
</execution>
<execution>
<id>npm install</id>
<goals>
<goal>npm</goal>
</goals>
<configuration>
<arguments>ci --cache-max=0 --no-save</arguments>
<environmentVariables>
<HUSKY_SKIP_INSTALL>true</HUSKY_SKIP_INSTALL>
</environmentVariables>
</configuration>
</execution>
<execution>
<id>npm run build</id>
<goals>
<goal>npm</goal>
</goals>
<configuration>
<arguments>run build</arguments>
</configuration>
</execution>
</executions>
<configuration>
<workingDirectory>web-dashboard</workingDirectory>
</configuration>
</plugin>           

編譯成功後,Flink 安裝檔案位于 flink-release-1.10.0-rc1/flink-dist/target/flink-1.10.0-bin 目錄下,打包并上傳到部署到節點:

$ cd flink-dist/target/flink-1.10.0-bin
$ tar zcvf flink-1.10.0.tar.gz flink-1.10.0           

部署和配置

Flink 部署比較簡單,解壓縮包即可。另外可以設定軟連結、環境變量等,筆者不再介紹。

Flink 的核心配置檔案是 flink-conf.yaml,一個典型的配置如下:

jobmanager.rpc.address: localhost
jobmanager.rpc.port: 6123
jobmanager.heap.size: 2048m
taskmanager.heap.size: 1024m
taskmanager.numberOfTaskSlots: 4
parallelism.default: 1
high-availability: zookeeper
high-availability.storageDir:hdfs:///user/flink110/recovery
high-availability.zookeeper.quorum: zk1:2181,zk2:2181,zk3:2181
state.backend: filesystem
state.checkpoints.dir: hdfs:///user/flink110/checkpoints
state.savepoints.dir:hdfs:///user/flink110/savepoints
jobmanager.execution.failover-strategy: region
rest.port: 8081
taskmanager.memory.preallocate: false
classloader.resolve-order: parent-first
security.kerberos.login.use-ticket-cache: true
security.kerberos.login.keytab:/home/flink_user/flink_user.keytab
security.kerberos.login.principal: flink_user
jobmanager.archive.fs.dir:hdfs:///user/flink110/completed-jobs
historyserver.web.address: 0.0.0.0
historyserver.web.port: 8082
historyserver.archive.fs.dir:hdfs:///user/flink110/completed-jobs
historyserver.archive.fs.refresh-interval: 10000

筆者隻羅列了一些常見的配置參數,讀者根據實際情況修改。配置參數其實還是比較容易了解的,以後結合實戰的文章再進行詳細講解。

**■  內建 Hive 配置的依賴**


如果要使用 Flink 與 Hive 內建的功能,除了上面的配置外,使用者還需要添加相應的依賴:

- 如果需要使用 SQL Client,則需要将依賴的 jar 拷貝到 Flink 的 lib 目錄中
- 如果需要使用 Table API,則需要将相應的依賴添加到項目中(如 pom.xml)
           

org.apache.flink

flink-connector-hive_2.11

1.11-SNAPSHOT

provided

flink-table-api-java-bridge_2.11

org.apache.hive

hive-exec

${hive.version}

筆者主要介紹使用 SQL Client 的方式,由于使用的 CDH 版本為 5.16.2,其中 Hadoop 版本為 2.6.0,Hive 版本為 1.1.0,是以需要将如下 jar 包拷貝到 flink 部署家目錄中的 lib 目錄下:


- Flink 的 Hive connector

flink-connector-hive2.11-1.10.0.jar
flink-hadoop-compatibility2.11-1.10.0.jar
flink-orc_2.11-1.10.0.jar           

flink-release-1.10.0-rc1/flink-connectors/flink-hadoop-compatibility/target/flink-hadoop-compatibility_2.11-1.10.0.jar

flink-release-1.10.0-rc1/flink-connectors/flink-connector-hive/target/flink-connector-hive_2.11-1.10.0.jar

flink-release-1.10.0-rc1/flink-formats/flink-orc/target/flink-orc_2.11-1.10.0.jar

- Hadoop 依賴


flink-shaded-hadoop-2-uber-2.6.0-cdh5.16.2-9.0.jar
           

flink-shaded/flink-shaded-hadoop-2-uber/target/flink-shaded-hadoop-2-uber-2.6.0-cdh5.16.2-9.0.jar

  • Hive 依賴

hive-exec-1.1.0-cdh5.16.2.jar

hive-metastore-1.1.0-cdh5.16.2.jar

libfb303-0.9.3.jar

/opt/cloudera/parcels/CDH/lib/hive/lib/hive-exec-1.1.0-cdh5.16.2.jar
/opt/cloudera/parcels/CDH/lib/hive/lib/hive-metastore-1.1.0-cdh5.16.2.jar
/opt/cloudera/parcels/CDH/lib/hive/lib/libfb303-
0.9.3.jar           

其中 flink-shaded-hadoop-2-uber 包含了 Hive 對于 Hadoop 的依賴。如果不用 Flink 提供的包,使用者也可以将叢集中使用的 Hadoop 包添加進來,不過需要保證添加的 Hadoop 版本與 Hive 所依賴的版本是相容的。

依賴的 Hive 包(即 hive-exec 和 hive-metastore)也可以使用使用者叢集中 Hive 所提供的 jar 包,詳情請見支援不同的 Hive 版本。

Flink 部署的節點要添加 Hadoop、Yarn 以及 Hive 的用戶端。

■ 配置 HiveCatalog

多年來,Hive Metastore 在 Hadoop 生态系統中已發展成為事實上的中繼資料中心。許多公司在其生産中有一個單獨的 Hive Metastore 服務執行個體,以管理其所有中繼資料(Hive 中繼資料或非 Hive 中繼資料)。

如果同時部署了 Hive 和 Flink,那麼通過 HiveCatalog 能夠使用 Hive Metastore 來管理 Flink 的中繼資料。

如果僅部署 Flink,HiveCatalog 就是 Flink 開箱即用提供的唯一持久化的 Catalog。如果沒有持久化的 Catalog,那麼使用 Flink SQL CREATE DDL 時必須在每個會話中重複建立像 Kafka 表這樣的元對象,這會浪費大量時間。HiveCatalog 通過授權使用者隻需要建立一次表和其他元對象,并在以後的跨會話中非常友善地進行引用和管理。

如果要使用 SQL Client 時,使用者需要在 sql-client-defaults.yaml 中指定自己所需的 Catalog,在 sql-client-defaults.yaml 的 catalogs 清單中可以指定一個或多個 Catalog 執行個體。

以下的示例展示了如何指定一個 HiveCatalog:

execution:

    planner: blink
    type: streaming
    ...
    current-catalog: myhive  # set the HiveCatalog as the current catalog of the session
    current-database: mydatabase

catalogs:  
  - name: myhive
     type: hive
     hive-conf-dir: /opt/hive-conf  # contains hive-site.xml
     hive-version:2.3.4           

其中:

  • name 是使用者給每個 Catalog 執行個體指定的名字,Catalog 名字和 DB 名字構成了 FlinkSQL 中中繼資料的命名空間,是以需要保證每個 Catalog 的名字是唯一的。
  • type 表示 Catalog 的類型,對于 HiveCatalog 而言,type 應該指定為 hive。
  • hive-conf-dir 用于讀取 Hive 的配置檔案,使用者可以将其設定為叢集中 Hive 的配置檔案目錄。
  • hive-version 用于指定所使用的 Hive 版本。

指定了 HiveCatalog 以後,使用者就可以啟動 sql-client,并通過以下指令驗證 HiveCatalog 已經正确加載。

Flink SQL> show catalogs;
default_catalog
myhive

Flink SQL> use catalog myhive;           

其中 show catalogs 會列出加載的所有 Catalog 執行個體。需要注意的是,除了使用者在 sql-client-defaults.yaml 檔案中配置的 Catalog 以外,FlinkSQL 還會自動加載一個 GenericInMemoryCatalog 執行個體作為内置的 Catalog,該内置 Catalog 預設名字為 default_catalog。

讀寫 Hive 表

設定好 HiveCatalog 以後就可以通過 SQL Client 或者 Table API 來讀寫 Hive 中的表了。

假設 Hive 中已經有一張名為 mytable 的表,我們可以用以下的 SQL 語句來讀寫這張表。

■ 讀資料

Flink SQL> show catalogs;
myhive
default_catalog

Flink SQL> use catalog myhive;

Flink SQL> show databases;
default

Flink SQL> show tables;
mytable

Flink SQL> describe mytable;
root

|-- name: name 
|-- type: STRING 
|-- name: value 
|-- type: DOUBLE

Flink SQL> SELECT * FROM mytable;

   name      value
__________ __________
   Tom        4.72
   John       8.0    
   Tom        24.2
   Bob.       3.14    
   Bob        4.72    
   Tom        34.9    
   Mary       4.79    
   Tiff          2.72    
   Bill          4.33    
   Mary       77.7           

■ 寫資料

Flink SQL> INSERT INTO mytable SELECT 'Tom',
 25;

Flink SQL> INSERT OVERWRITE mytable SELECT 'Tom', 25;

# 靜态分區
Flink SQL> INSERT OVERWRITE myparttable PARTITION (my_type='type_1', my_date='2019-08-08') SELECT 'Tom', 25;

# 動态分區

Flink SQL> INSERT OVERWRITE myparttable SELECT 'Tom', 25, 'type_1', '2019-08-08';

# 靜态分區和動态分區

Flink SQL> INSERT OVERWRITE myparttable PARTITION (my_type='type_1') SELECT 'Tom', 25, '2019-08-08';           

總結

在本文中,筆者首先介紹了 Flink 與 Hive 內建功能的架構設計,然後從源碼開始編譯,解決遇到的一些問題,接着部署和配置 Flink 環境以及內建 Hive 的具體操作過程,最後參考官方的案例,對 Hive 表進行讀寫操作。

後續,筆者會結合生産環境的實際使用情況,講解通過 Flink SQL 來操作 Hive。

參考: