flink sql 寫 hudi
最近在做一個資料湖項目,基于 hudi 的湖倉一體項目,計算引擎是 flink + spark
之前稍稍研究了一下資料湖的三個主要技術元件 iceberg,以為可能會用,在網上看資料的時候,同樣也發現,iceberg 對 flink 比較親和,hudi 對 spark 比較親和
一直以為會選 iceberg,沒想到 iceberg 還有很多功能沒有實作,相對來說 hudi 會好很多
flink 的 hudi bundle 是 0.9-snapshot
hive 2.3
hadoop 3.1
hudi 選擇的是還沒釋出的 snapshot 版本,是以需要自己編譯
```sh
mvn install -dskiptests -drat.skip=true -pflink-bundle-shade-hive2
[info] ------------------------------------------------------------------------
[info] reactor summary:
[info]
[info] hudi ............................................... success [ 1.948 s]
[info] hudi-common ........................................ success [ 6.494 s]
[info] hudi-timeline-service .............................. success [ 2.287 s]
[info] hudi-client ........................................ success [ 0.054 s]
[info] hudi-client-common ................................. success [ 2.930 s]
[info] hudi-hadoop-mr ..................................... success [ 2.892 s]
[info] hudi-spark-client .................................. success [ 6.267 s]
[info] hudi-sync-common ................................... success [ 0.502 s]
[info] hudi-hive-sync ..................................... success [ 2.651 s]
[info] hudi-spark-datasource .............................. success [ 0.089 s]
[info] hudi-spark-common_2.11 ............................. success [ 2.346 s]
[info] hudi-spark2_2.11 ................................... success [ 1.436 s]
[info] hudi-spark_2.11 .................................... success [ 9.377 s]
[info] hudi-utilities_2.11 ................................ success [ 4.049 s]
[info] hudi-utilities-bundle_2.11 ......................... success [ 12.717 s]
[info] hudi-cli ........................................... success [ 4.430 s]
[info] hudi-java-client ................................... success [ 0.902 s]
[info] hudi-flink-client .................................. success [ 1.406 s]
[info] hudi-spark3_2.12 ................................... success [ 2.199 s]
[info] hudi-dla-sync ...................................... success [ 1.347 s]
[info] hudi-sync .......................................... success [ 0.042 s]
[info] hudi-hadoop-mr-bundle .............................. success [ 4.292 s]
[info] hudi-hive-sync-bundle .............................. success [ 1.297 s]
[info] hudi-spark-bundle_2.11 ............................. success [ 9.176 s]
[info] hudi-presto-bundle ................................. success [ 4.972 s]
[info] hudi-timeline-server-bundle ........................ success [ 4.643 s]
[info] hudi-hadoop-docker ................................. success [ 0.445 s]
[info] hudi-hadoop-base-docker ............................ success [ 0.204 s]
[info] hudi-hadoop-namenode-docker ........................ success [ 0.053 s]
[info] hudi-hadoop-datanode-docker ........................ success [ 0.045 s]
[info] hudi-hadoop-history-docker ......................... success [ 0.096 s]
[info] hudi-hadoop-hive-docker ............................ success [ 0.278 s]
[info] hudi-hadoop-sparkbase-docker ....................... success [ 0.064 s]
[info] hudi-hadoop-sparkmaster-docker ..................... success [ 0.048 s]
[info] hudi-hadoop-sparkworker-docker ..................... success [ 0.048 s]
[info] hudi-hadoop-sparkadhoc-docker ...................... success [ 0.047 s]
[info] hudi-hadoop-presto-docker .......................... success [ 0.087 s]
[info] hudi-integ-test .................................... success [ 4.581 s]
[info] hudi-integ-test-bundle ............................. success [ 32.789 s]
[info] hudi-examples ...................................... success [ 1.140 s]
[info] hudi-flink_2.11 .................................... success [ 1.734 s]
[info] hudi-flink-bundle_2.11 ............................. success [ 21.285 s]
[info] build success
[info] total time: 02:34 min
[info] finished at: 2021-07-18t18:17:51+08:00
[info] final memory: 232m/1644m
[warning] the requested profile "include-flink-sql-connector-hive" could not be activated because it does not exist.
```
flink 寫 hudi 很簡單,把 “hudi-flink-bundle_2.11-0.9.0-snapshot.jar” 放到flink 的 lib 下,把 hudi 當成flink 的一個 connector,使用 sql-client 就可以直接寫hudi 了,但是這樣不能直接将 hudi 的中繼資料同步到 hive,flink 在建 hudi 表的時候指定 hive 同步參數,可以将 flink 建的表的中繼資料,直接同步到 hive 中
* 配置 hive
* 将 hudi-hadoop-mr-bundle-0.9.0-snapshot.jar 放入 hive lib 中(讓 hive 支援 hudi 格式的資料)
* 啟動 hive metastore
* 啟動 hive hiveserver2
nohup hive --service metastore &
nohup hive --service hiveserver2 &
啟動一個 yarn session
./bin/yarn-session.sh -d -ynm sql
啟動 sql-client
./bin/sql-client.sh embedded -s application_1626588183454_0001
flink sql
```sql
create table kafka_ods_user_info (
id int
,name string
,sex string
,age int
,birthday string
) with (
'connector' = 'kafka',
'topic' = 'test_topic_1',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testgroup',
'scan.startup.mode' = 'latest-offset',
'format' = 'csv'
);
create table ods_user_info_3(
dl_uuid string
,id int
,`etl_create_time` timestamp(3) comment 'etl建立時間'
,`etl_update_time` timestamp(3) comment 'etl更新時間'
,`partition` string
'connector' = 'hudi'
,'is_generic' = 'true'
,'path' = 'hdfs:///user/hive/warehouse/ods.db/ods_user_info_3'
,'hoodie.datasource.write.recordkey.field' = 'dl_uuid'
,'hoodie.datasource.write.partitionpath.field' = 'partition'
,'write.precombine.field' = 'etl_update_time'
,'write.tasks' = '1'
,'table.type' = 'merge_on_read'
,'compaction.tasks' = '1'
,'compaction.trigger.strategy' = 'num_or_time'
,'compaction.delta_commits' = '30'
,'compaction.delta_seconds' = '3600'
,'hive_sync.enable' = 'true'
,'hive_sync.db' = 'ods'
,'hive_sync.table' = 'ods_user_info'
,'hive_sync.file_format' = 'parquet'
,'hive_sync.support_timestamp' = 'true'
,'hive_sync.use_jdbc' = 'true'
,'hive_sync.jdbc_url' = 'jdbc:hive2://localhost:10000'
,'hive_sync.metastore.uris' = 'thrift://thinkpad:9083'
,'hoodie.datasource.hive_style_partition' = 'true'
,'hive_sync.partition_fields' = 'partition'
,'read.tasks' = '1'
,'read.streaming.enabled' = 'true'
,'hoodie.datasource.query.type' = 'snapshot'
,'read.streaming.start-commit' = '20210101000000'
,'read.streaming.check-interval' = '30'
,'hoodie.datasource.merge.type' = 'payload_combine'
,'read.utc-timezone' = 'false'
insert into ods_user_info_3
select cast(id as string) dl_uuid
,id
,name
,sex
,age
,birthday
,now() etl_create_time
,now() etl_update_time
,date_format(now(), 'yyyy/mm/dd') -- only support partition format
from kafka_ods_user_info;
cho $message | /opt/kafka2.2/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test_topic_1
## hdfs 檢視資料```sh
hadoop fs -ls /user/hive/warehouse/ods.db/ods_user_info_3/2021/07/18/
found 4 items
-rw-r--r-- 1 wuxu supergroup 115647 2021-07-18 18:09 /user/hive/warehouse/ods.db/ods_user_info_3/2021/07/18/.baf45a5e-03f5-47f9-a75c-e8a3c1bcb595_20210718175258.log.1_0-1-0
-rw-r--r-- 1 wuxu supergroup 93 2021-07-18 17:05 /user/hive/warehouse/ods.db/ods_user_info_3/2021/07/18/.hoodie_partition_metadata
-rw-r--r-- 1 wuxu supergroup 436892 2021-07-18 17:20 /user/hive/warehouse/ods.db/ods_user_info_3/2021/07/18/9a29cbb2-b78c-4f32-a71e-36f975617ed0_0-1-0_20210718171958.parquet
-rw-r--r-- 1 wuxu supergroup 461463 2021-07-18 17:53 /user/hive/warehouse/ods.db/ods_user_info_3/2021/07/18/baf45a5e-03f5-47f9-a75c-e8a3c1bcb595_0-1-0_20210718175258.parquet
hive> select * from ods_user_info_rt limit 10;
ok
_hoodie_commit_time _hoodie_commit_seqno _hoodie_record_key _hoodie_partition_path _hoodie_file_name dl_uuid id name sex age birthday etl_create_time etl_update_time partition
slf4j: failed to load class "org.slf4j.impl.staticloggerbinder".
slf4j: defaulting to no-operation (nop) logger implementation
slf4j: see http://www.slf4j.org/codes.html#staticloggerbinder for further details.
20210718171958 20210718171958_0_1 4970 2021/07/18 baf45a5e-03f5-47f9-a75c-e8a3c1bcb595_0-1-0_20210718171958.parquet 4970 4970 zhangsan_4970 male_4970 18 2020-01-01 1626599814075 1626599814075 2021-07-18
20210718171958 20210718171958_0_2 4850 2021/07/18 baf45a5e-03f5-47f9-a75c-e8a3c1bcb595_0-1-0_20210718171958.parquet 4850 4850 zhangsan_4850 male_4850 18 2020-01-01 1626599551180 1626599551180 2021-07-18
20210718171958 20210718171958_0_3 4971 2021/07/18 baf45a5e-03f5-47f9-a75c-e8a3c1bcb595_0-1-0_20210718171958.parquet 4971 4971 zhangsan_4971 male_4971 18 2020-01-01 1626599816273 1626599816273 2021-07-18
20210718171958 20210718171958_0_4 4727 2021/07/18 baf45a5e-03f5-47f9-a75c-e8a3c1bcb595_0-1-0_20210718171958.parquet 4727 4727 zhangsan_4727 male_4727 18 2020-01-01 1626599281780 1626599281780 2021-07-18
20210718171958 20210718171958_0_5 4848 2021/07/18 baf45a5e-03f5-47f9-a75c-e8a3c1bcb595_0-1-0_20210718171958.parquet 4848 4848 zhangsan_4848 male_4848 18 2020-01-01 1626599546853 1626599546853 2021-07-18
20210718171958 20210718171958_0_6 4969 2021/07/18 baf45a5e-03f5-47f9-a75c-e8a3c1bcb595_0-1-0_20210718171958.parquet 4969 4969 zhangsan_4969 male_4969 18 2020-01-01 1626599811859 1626599811859 2021-07-18
20210718171958 20210718171958_0_7 4728 2021/07/18 baf45a5e-03f5-47f9-a75c-e8a3c1bcb595_0-1-0_20210718171958.parquet 4728 4728 zhangsan_4728 male_4728 18 2020-01-01 1626599284072 1626599284072 2021-07-18
20210718171958 20210718171958_0_8 4849 2021/07/18 baf45a5e-03f5-47f9-a75c-e8a3c1bcb595_0-1-0_20210718171958.parquet 4849 4849 zhangsan_4849 male_4849 18 2020-01-01 1626599548969 1626599548969 2021-07-18
20210718171958 20210718171958_0_9 4729 2021/07/18 baf45a5e-03f5-47f9-a75c-e8a3c1bcb595_0-1-0_20210718171958.parquet 4729 4729 zhangsan_4729 male_4729 18 2020-01-01 1626599286234 1626599286234 2021-07-18
20210718171958 20210718171958_0_10 4840 2021/07/18 baf45a5e-03f5-47f9-a75c-e8a3c1bcb595_0-1-0_20210718171958.parquet 4840 4840 zhangsan_4840 male_4840 18 2020-01-01 1626599529532 1626599529532 2021-07-18
time taken: 1.46 seconds, fetched: 10 row(s)