基于真實案例項目需求,本文介紹一條FlinkSQL語句如何消費Kafka資料同時寫入資料湖Hudi表、離線數倉Hive表(本文重點側重于寫Hive表)。必要環境準備工作的相關介紹,詳見之前的文章,本文不再重複贅述。寫Hive表的官方文檔位址:
https://hudi.apache.org/docs/syncing_metastore/

Hudi0.10.0修改與編譯
下載下傳hudi0.10.0源碼,修改packaging/hudi-flink-bundle/pom.xml檔案hive.version為自己叢集的Hive版本(如:2.1.1-cdh6.2.0)。使用如下指令進行編譯打包:
[root@felixzh hudi-cdh]# mvn clean install-DskipTests -Dflink.version=1.13.6-Dhadoop.version=3.0.0-cdh6.2.0 -Pflink-bundle-shade-hive2
需要用到的jar包:
hudi-hadoop-mr-bundle-0.10.0.jar、hudi-flink-bundle_2.11-0.10.0.jar
将hudi-hadoop-mr-bundle-0.10.0.jar拷貝到hive類路徑,如:lib/或者auxlib,然後重新開機hive服務。将hudi-flink-bundle_2.11-0.10.0.jar拷貝到flink安裝路徑,如:/opt/flink-1.13.6。
啟動sql-client
./bin/sql-client.sh embedded -j hudi-flink-bundle_2.11-0.10.0.jar shell
建立kafka表
CREATE TABLE t_kafka (
uuid STRING
,name STRING
,age INT
,ts TIMESTAMP(3)
,partition1 STRING
,table_name STRING
) WITH (
'connector' = 'kafka',
'topic' = 'hudi2',
'scan.startup.mode' = 'latest-offset',
'properties.bootstrap.servers' = 'felixzh1:9092',
'properties.group.id' = 'group',
'format' = 'json',
'json.fail-on-missing-field' = 'true',
'json.ignore-parse-errors' = 'false'
);
建立jdbc模式hudi表
CREATE TABLEt_hudi_jdbc(
uuid VARCHAR(20) PRIMARY KEY NOT ENFORCED,
name VARCHAR(10),
age INT,
ts TIMESTAMP(3),
`partition1` VARCHAR(20),
`table_name` VARCHAR(20)
) PARTITIONED BY(`partition1`) WITH (
'connector' = 'hudi',
'path' = 'hdfs:///flink/hudi_kafka_jdbc',
'table.type' = 'COPY_ON_WRITE', -- If MERGE_ON_READ, hive query will not haveoutput until the parquet file is generated
'hive_sync.enable' = 'true',
'hive_sync.mode' = 'jdbc',
'hive_sync.metastore.uris' ='thrift://management0.hde.com:9083,thrift://management1.hde.com:9083',
'hive_sync.jdbc_url'='jdbc:hive2://management1.hde.com:2181,managerment2.hde.com:2181,management0.hde.com:2181/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2;principal=hive/[email protected]',
'hive_sync.table'='kafka_hudi_jdbc',
'hive_sync.db'='default'
);
建立hms模式hudi表
CREATE TABLE t_hudi_hms(
uuid VARCHAR(20) PRIMARY KEY NOT ENFORCED,
name VARCHAR(10),
age INT,
ts TIMESTAMP(3),
`partition1` VARCHAR(20),
`table_name` VARCHAR(20)
) PARTITIONED BY (`partition1`) WITH (
'connector' = 'hudi',
'path' = 'hdfs:///flink/hudi_kafka_hms',
'table.type' = 'COPY_ON_WRITE', -- If MERGE_ON_READ, hive query will not have output until the parquet file is generated
'hive_sync.enable' = 'true',
'hive_sync.mode' = 'hms',
'hive_sync.metastore.uris' = 'thrift://management0.hde.com:9083,thrift://management1.hde.com:9083',
'hive_sync.table'='kafka_hudi_hms',
'hive_sync.db'='default'
);
送出任務sql
insert into t_hudi_jdbc select uuid, name, age, ts, partition1,table_name from t_kafka;
insert into t_hudi_hms select uuid, name, age, ts, partition1,table_name from t_kafka;
模拟Kafka資料
[root@felixzh1 bin]# cat hudi_default_data
{"uuid": "id1", "name": "Danny", "age": 23, "ts": "1970-01-01 00:00:01", "partition1": "par1", "table_name":"table1"}
{"uuid": "id2", "name": "Stephen", "age": 33, "ts": "1970-01-01 00:00:02", "partition1": "par1", "table_name":"table1"}
{"uuid": "id3", "name": "Julian", "age": 53, "ts": "1970-01-01 00:00:03", "partition1": "par2", "table_name":"table1"}
{"uuid": "id4", "name": "Fabian", "age": 31, "ts": "1970-01-01 00:00:04", "partition1": "par2", "table_name":"table1"}
{"uuid": "id5", "name": "Sophia", "age": 18, "ts": "1970-01-01 00:00:05", "partition1": "par3", "table_name":"table1"}
{"uuid": "id6", "name": "Emma", "age": 20, "ts": "1970-01-01 00:00:06", "partition1": "par3", "table_name":"table1"}
{"uuid": "id7", "name": "Bob", "age": 44, "ts": "1970-01-01 00:00:07", "partition1": "par4", "table_name":"table1"}
{"uuid": "id8", "name": "Han", "age": 56, "ts": "1970-01-01 00:00:08", "partition1": "par4", "table_name":"table1"}
{"uuid": "id1", "name": "Danny", "age": 23, "ts": "1970-01-01 00:00:01", "partition1": "par1", "table_name":"table2"}
{"uuid": "id2", "name": "Stephen", "age": 33, "ts": "1970-01-01 00:00:02", "partition1": "par1", "table_name":"table2"}
{"uuid": "id3", "name": "Julian", "age": 53, "ts": "1970-01-01 00:00:03", "partition1": "par2", "table_name":"table2"}
{"uuid": "id4", "name": "Fabian", "age": 31, "ts": "1970-01-01 00:00:04", "partition1": "par2", "table_name":"table2"}
{"uuid": "id5", "name": "Sophia", "age": 18, "ts": "1970-01-01 00:00:05", "partition1": "par3", "table_name":"table2"}
{"uuid": "id6", "name": "Emma", "age": 20, "ts": "1970-01-01 00:00:06", "partition1": "par3", "table_name":"table2"}
{"uuid": "id7", "name": "Bob", "age": 44, "ts": "1970-01-01 00:00:07", "partition1": "par4", "table_name":"table2"}
{"uuid": "id8", "name": "Han", "age": 56, "ts": "1970-01-01 00:00:08", "partition1": "par4", "table_name":"table2"}
[root@felixzh1 bin]# ./kafka-console-producer.sh --broker-list felixzh1:9092 --topic hudi2 < hudi_default_data
檢視Hive結果
如果資料庫和表不存在,會自動建立。
show tables;
select * from kafka_hudi_jdbc;
select * from kafka_hudi_hms;