天天看点

Flink SQL 写 hudi

flink sql 写 hudi

最近在做一个数据湖项目,基于 hudi 的湖仓一体项目,计算引擎是 flink + spark

之前稍稍研究了一下数据湖的三个主要技术组件 iceberg,以为可能会用,在网上看资料的时候,同样也发现,iceberg 对 flink 比较亲和,hudi 对 spark 比较亲和

一直以为会选 iceberg,没想到 iceberg 还有很多功能没有实现,相对来说 hudi 会好很多

flink 的 hudi bundle 是 0.9-snapshot

hive 2.3

hadoop 3.1

hudi 选择的是还没发布的 snapshot 版本,所以需要自己编译

```sh

mvn install -dskiptests -drat.skip=true -pflink-bundle-shade-hive2

[info] ------------------------------------------------------------------------

[info] reactor summary:

[info]

[info] hudi ............................................... success [ 1.948 s]

[info] hudi-common ........................................ success [ 6.494 s]

[info] hudi-timeline-service .............................. success [ 2.287 s]

[info] hudi-client ........................................ success [ 0.054 s]

[info] hudi-client-common ................................. success [ 2.930 s]

[info] hudi-hadoop-mr ..................................... success [ 2.892 s]

[info] hudi-spark-client .................................. success [ 6.267 s]

[info] hudi-sync-common ................................... success [ 0.502 s]

[info] hudi-hive-sync ..................................... success [ 2.651 s]

[info] hudi-spark-datasource .............................. success [ 0.089 s]

[info] hudi-spark-common_2.11 ............................. success [ 2.346 s]

[info] hudi-spark2_2.11 ................................... success [ 1.436 s]

[info] hudi-spark_2.11 .................................... success [ 9.377 s]

[info] hudi-utilities_2.11 ................................ success [ 4.049 s]

[info] hudi-utilities-bundle_2.11 ......................... success [ 12.717 s]

[info] hudi-cli ........................................... success [ 4.430 s]

[info] hudi-java-client ................................... success [ 0.902 s]

[info] hudi-flink-client .................................. success [ 1.406 s]

[info] hudi-spark3_2.12 ................................... success [ 2.199 s]

[info] hudi-dla-sync ...................................... success [ 1.347 s]

[info] hudi-sync .......................................... success [ 0.042 s]

[info] hudi-hadoop-mr-bundle .............................. success [ 4.292 s]

[info] hudi-hive-sync-bundle .............................. success [ 1.297 s]

[info] hudi-spark-bundle_2.11 ............................. success [ 9.176 s]

[info] hudi-presto-bundle ................................. success [ 4.972 s]

[info] hudi-timeline-server-bundle ........................ success [ 4.643 s]

[info] hudi-hadoop-docker ................................. success [ 0.445 s]

[info] hudi-hadoop-base-docker ............................ success [ 0.204 s]

[info] hudi-hadoop-namenode-docker ........................ success [ 0.053 s]

[info] hudi-hadoop-datanode-docker ........................ success [ 0.045 s]

[info] hudi-hadoop-history-docker ......................... success [ 0.096 s]

[info] hudi-hadoop-hive-docker ............................ success [ 0.278 s]

[info] hudi-hadoop-sparkbase-docker ....................... success [ 0.064 s]

[info] hudi-hadoop-sparkmaster-docker ..................... success [ 0.048 s]

[info] hudi-hadoop-sparkworker-docker ..................... success [ 0.048 s]

[info] hudi-hadoop-sparkadhoc-docker ...................... success [ 0.047 s]

[info] hudi-hadoop-presto-docker .......................... success [ 0.087 s]

[info] hudi-integ-test .................................... success [ 4.581 s]

[info] hudi-integ-test-bundle ............................. success [ 32.789 s]

[info] hudi-examples ...................................... success [ 1.140 s]

[info] hudi-flink_2.11 .................................... success [ 1.734 s]

[info] hudi-flink-bundle_2.11 ............................. success [ 21.285 s]

[info] build success

[info] total time: 02:34 min

[info] finished at: 2021-07-18t18:17:51+08:00

[info] final memory: 232m/1644m

[warning] the requested profile "include-flink-sql-connector-hive" could not be activated because it does not exist.

```

flink 写 hudi 很简单,把 “hudi-flink-bundle_2.11-0.9.0-snapshot.jar” 放到flink 的 lib 下,把 hudi 当成flink 的一个 connector,使用 sql-client 就可以直接写hudi 了,但是这样不能直接将 hudi 的元数据同步到 hive,flink 在建 hudi 表的时候指定 hive 同步参数,可以将 flink 建的表的元数据,直接同步到 hive 中

* 配置 hive

* 将 hudi-hadoop-mr-bundle-0.9.0-snapshot.jar 放入 hive lib 中(让 hive 支持 hudi 格式的数据)

* 启动 hive metastore

* 启动 hive hiveserver2

nohup hive --service metastore &

nohup hive --service hiveserver2 &

启动一个 yarn session

./bin/yarn-session.sh -d -ynm sql

启动 sql-client

./bin/sql-client.sh embedded -s application_1626588183454_0001

flink sql

```sql

create table kafka_ods_user_info (

id int

,name string

,sex string

,age int

,birthday string

) with (

'connector' = 'kafka',

'topic' = 'test_topic_1',

'properties.bootstrap.servers' = 'localhost:9092',

'properties.group.id' = 'testgroup',

'scan.startup.mode' = 'latest-offset',

'format' = 'csv'

);

create table ods_user_info_3(

dl_uuid string

,id int

,`etl_create_time` timestamp(3) comment 'etl创建时间'

,`etl_update_time` timestamp(3) comment 'etl更新时间'

,`partition` string

'connector' = 'hudi'

,'is_generic' = 'true'

,'path' = 'hdfs:///user/hive/warehouse/ods.db/ods_user_info_3'

,'hoodie.datasource.write.recordkey.field' = 'dl_uuid'

,'hoodie.datasource.write.partitionpath.field' = 'partition'

,'write.precombine.field' = 'etl_update_time'

,'write.tasks' = '1'

,'table.type' = 'merge_on_read'

,'compaction.tasks' = '1'

,'compaction.trigger.strategy' = 'num_or_time'

,'compaction.delta_commits' = '30'

,'compaction.delta_seconds' = '3600'

,'hive_sync.enable' = 'true'

,'hive_sync.db' = 'ods'

,'hive_sync.table' = 'ods_user_info'

,'hive_sync.file_format' = 'parquet'

,'hive_sync.support_timestamp' = 'true'

,'hive_sync.use_jdbc' = 'true'

,'hive_sync.jdbc_url' = 'jdbc:hive2://localhost:10000'

,'hive_sync.metastore.uris' = 'thrift://thinkpad:9083'

,'hoodie.datasource.hive_style_partition' = 'true'

,'hive_sync.partition_fields' = 'partition'

,'read.tasks' = '1'

,'read.streaming.enabled' = 'true'

,'hoodie.datasource.query.type' = 'snapshot'

,'read.streaming.start-commit' = '20210101000000'

,'read.streaming.check-interval' = '30'

,'hoodie.datasource.merge.type' = 'payload_combine'

,'read.utc-timezone' = 'false'

insert into ods_user_info_3

select cast(id as string) dl_uuid

,id

,name

,sex

,age

,birthday

,now() etl_create_time

,now() etl_update_time

,date_format(now(), 'yyyy/mm/dd') -- only support partition format

from kafka_ods_user_info;

cho $message | /opt/kafka2.2/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test_topic_1

## hdfs 查看数据```sh

hadoop fs -ls /user/hive/warehouse/ods.db/ods_user_info_3/2021/07/18/

found 4 items

-rw-r--r-- 1 wuxu supergroup 115647 2021-07-18 18:09 /user/hive/warehouse/ods.db/ods_user_info_3/2021/07/18/.baf45a5e-03f5-47f9-a75c-e8a3c1bcb595_20210718175258.log.1_0-1-0

-rw-r--r-- 1 wuxu supergroup 93 2021-07-18 17:05 /user/hive/warehouse/ods.db/ods_user_info_3/2021/07/18/.hoodie_partition_metadata

-rw-r--r-- 1 wuxu supergroup 436892 2021-07-18 17:20 /user/hive/warehouse/ods.db/ods_user_info_3/2021/07/18/9a29cbb2-b78c-4f32-a71e-36f975617ed0_0-1-0_20210718171958.parquet

-rw-r--r-- 1 wuxu supergroup 461463 2021-07-18 17:53 /user/hive/warehouse/ods.db/ods_user_info_3/2021/07/18/baf45a5e-03f5-47f9-a75c-e8a3c1bcb595_0-1-0_20210718175258.parquet

hive> select * from ods_user_info_rt limit 10;

ok

_hoodie_commit_time _hoodie_commit_seqno _hoodie_record_key _hoodie_partition_path _hoodie_file_name dl_uuid id name sex age birthday etl_create_time etl_update_time partition

slf4j: failed to load class "org.slf4j.impl.staticloggerbinder".

slf4j: defaulting to no-operation (nop) logger implementation

slf4j: see http://www.slf4j.org/codes.html#staticloggerbinder for further details.

20210718171958 20210718171958_0_1 4970 2021/07/18 baf45a5e-03f5-47f9-a75c-e8a3c1bcb595_0-1-0_20210718171958.parquet 4970 4970 zhangsan_4970 male_4970 18 2020-01-01 1626599814075 1626599814075 2021-07-18

20210718171958 20210718171958_0_2 4850 2021/07/18 baf45a5e-03f5-47f9-a75c-e8a3c1bcb595_0-1-0_20210718171958.parquet 4850 4850 zhangsan_4850 male_4850 18 2020-01-01 1626599551180 1626599551180 2021-07-18

20210718171958 20210718171958_0_3 4971 2021/07/18 baf45a5e-03f5-47f9-a75c-e8a3c1bcb595_0-1-0_20210718171958.parquet 4971 4971 zhangsan_4971 male_4971 18 2020-01-01 1626599816273 1626599816273 2021-07-18

20210718171958 20210718171958_0_4 4727 2021/07/18 baf45a5e-03f5-47f9-a75c-e8a3c1bcb595_0-1-0_20210718171958.parquet 4727 4727 zhangsan_4727 male_4727 18 2020-01-01 1626599281780 1626599281780 2021-07-18

20210718171958 20210718171958_0_5 4848 2021/07/18 baf45a5e-03f5-47f9-a75c-e8a3c1bcb595_0-1-0_20210718171958.parquet 4848 4848 zhangsan_4848 male_4848 18 2020-01-01 1626599546853 1626599546853 2021-07-18

20210718171958 20210718171958_0_6 4969 2021/07/18 baf45a5e-03f5-47f9-a75c-e8a3c1bcb595_0-1-0_20210718171958.parquet 4969 4969 zhangsan_4969 male_4969 18 2020-01-01 1626599811859 1626599811859 2021-07-18

20210718171958 20210718171958_0_7 4728 2021/07/18 baf45a5e-03f5-47f9-a75c-e8a3c1bcb595_0-1-0_20210718171958.parquet 4728 4728 zhangsan_4728 male_4728 18 2020-01-01 1626599284072 1626599284072 2021-07-18

20210718171958 20210718171958_0_8 4849 2021/07/18 baf45a5e-03f5-47f9-a75c-e8a3c1bcb595_0-1-0_20210718171958.parquet 4849 4849 zhangsan_4849 male_4849 18 2020-01-01 1626599548969 1626599548969 2021-07-18

20210718171958 20210718171958_0_9 4729 2021/07/18 baf45a5e-03f5-47f9-a75c-e8a3c1bcb595_0-1-0_20210718171958.parquet 4729 4729 zhangsan_4729 male_4729 18 2020-01-01 1626599286234 1626599286234 2021-07-18

20210718171958 20210718171958_0_10 4840 2021/07/18 baf45a5e-03f5-47f9-a75c-e8a3c1bcb595_0-1-0_20210718171958.parquet 4840 4840 zhangsan_4840 male_4840 18 2020-01-01 1626599529532 1626599529532 2021-07-18

time taken: 1.46 seconds, fetched: 10 row(s)

Flink SQL 写 hudi

继续阅读