天天看點

趕快上車:實時數倉Kafka-Flink-Hudi-Hive最新版SQL操作實戰案例

基于真實案例項目需求,本文介紹一條FlinkSQL語句如何消費Kafka資料同時寫入資料湖Hudi表、離線數倉Hive表(本文重點側重于寫Hive表)。必要環境準備工作的相關介紹,詳見之前的文章,本文不再重複贅述。寫Hive表的官方文檔位址​:

https://hudi.apache.org/docs/syncing_metastore/           
趕快上車:實時數倉Kafka-Flink-Hudi-Hive最新版SQL操作實戰案例

Hudi0.10.0修改與編譯

下載下傳hudi0.10.0源碼,修改packaging/hudi-flink-bundle/pom.xml檔案hive.version為自己叢集的Hive版本(如:2.1.1-cdh6.2.0)。使用如下指令進行編譯打包:

[root@felixzh hudi-cdh]# mvn clean install-DskipTests  -Dflink.version=1.13.6-Dhadoop.version=3.0.0-cdh6.2.0 -Pflink-bundle-shade-hive2           
趕快上車:實時數倉Kafka-Flink-Hudi-Hive最新版SQL操作實戰案例
趕快上車:實時數倉Kafka-Flink-Hudi-Hive最新版SQL操作實戰案例

需要用到的jar包:

hudi-hadoop-mr-bundle-0.10.0.jar、hudi-flink-bundle_2.11-0.10.0.jar           

将hudi-hadoop-mr-bundle-0.10.0.jar拷貝到hive類路徑,如:lib/或者auxlib,然後重新開機hive服務。将hudi-flink-bundle_2.11-0.10.0.jar拷貝到flink安裝路徑,如:/opt/flink-1.13.6。

趕快上車:實時數倉Kafka-Flink-Hudi-Hive最新版SQL操作實戰案例

啟動sql-client

./bin/sql-client.sh embedded -j hudi-flink-bundle_2.11-0.10.0.jar  shell           

建立kafka表

CREATE TABLE t_kafka (
   uuid STRING
  ,name STRING
  ,age INT
  ,ts TIMESTAMP(3)
  ,partition1 STRING
  ,table_name STRING
) WITH (
    'connector' = 'kafka', 
    'topic' = 'hudi2',  
    'scan.startup.mode' = 'latest-offset',  
    'properties.bootstrap.servers' = 'felixzh1:9092', 
    'properties.group.id' = 'group', 
    'format' = 'json',
    'json.fail-on-missing-field' = 'true',
    'json.ignore-parse-errors' = 'false'
);           
趕快上車:實時數倉Kafka-Flink-Hudi-Hive最新版SQL操作實戰案例

建立jdbc模式hudi表

CREATE TABLEt_hudi_jdbc(
  uuid VARCHAR(20) PRIMARY KEY NOT ENFORCED,
  name VARCHAR(10),
  age INT,
  ts TIMESTAMP(3),
  `partition1` VARCHAR(20),
  `table_name` VARCHAR(20)
) PARTITIONED BY(`partition1`) WITH (
  'connector' = 'hudi',
  'path' = 'hdfs:///flink/hudi_kafka_jdbc',
  'table.type' = 'COPY_ON_WRITE',  -- If MERGE_ON_READ, hive query will not haveoutput until the parquet file is generated
  'hive_sync.enable' = 'true',    
  'hive_sync.mode' = 'jdbc',       
  'hive_sync.metastore.uris' ='thrift://management0.hde.com:9083,thrift://management1.hde.com:9083',
 'hive_sync.jdbc_url'='jdbc:hive2://management1.hde.com:2181,managerment2.hde.com:2181,management0.hde.com:2181/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2;principal=hive/[email protected]',  
  'hive_sync.table'='kafka_hudi_jdbc',                
  'hive_sync.db'='default'                 
);           
趕快上車:實時數倉Kafka-Flink-Hudi-Hive最新版SQL操作實戰案例

建立hms模式hudi表

CREATE TABLE t_hudi_hms(
  uuid VARCHAR(20) PRIMARY KEY NOT ENFORCED,
  name VARCHAR(10),
  age INT,
  ts TIMESTAMP(3),
  `partition1` VARCHAR(20),
  `table_name` VARCHAR(20)
) PARTITIONED BY (`partition1`) WITH (
  'connector' = 'hudi',
  'path' = 'hdfs:///flink/hudi_kafka_hms',
  'table.type' = 'COPY_ON_WRITE',  -- If MERGE_ON_READ, hive query will not have output until the parquet file is generated
  'hive_sync.enable' = 'true',     
  'hive_sync.mode' = 'hms',        
  'hive_sync.metastore.uris' = 'thrift://management0.hde.com:9083,thrift://management1.hde.com:9083', 
  'hive_sync.table'='kafka_hudi_hms',                 
  'hive_sync.db'='default'                  
);           
趕快上車:實時數倉Kafka-Flink-Hudi-Hive最新版SQL操作實戰案例

送出任務sql

insert into t_hudi_jdbc select uuid, name, age, ts, partition1,table_name from t_kafka;
insert into t_hudi_hms select uuid, name, age, ts, partition1,table_name from t_kafka;           
趕快上車:實時數倉Kafka-Flink-Hudi-Hive最新版SQL操作實戰案例

模拟Kafka資料

[root@felixzh1 bin]# cat hudi_default_data
{"uuid": "id1", "name": "Danny", "age": 23, "ts": "1970-01-01 00:00:01", "partition1": "par1", "table_name":"table1"}
{"uuid": "id2", "name": "Stephen", "age": 33, "ts": "1970-01-01 00:00:02", "partition1": "par1", "table_name":"table1"}
{"uuid": "id3", "name": "Julian", "age": 53, "ts": "1970-01-01 00:00:03", "partition1": "par2", "table_name":"table1"}
{"uuid": "id4", "name": "Fabian", "age": 31, "ts": "1970-01-01 00:00:04", "partition1": "par2", "table_name":"table1"}
{"uuid": "id5", "name": "Sophia", "age": 18, "ts": "1970-01-01 00:00:05", "partition1": "par3", "table_name":"table1"}
{"uuid": "id6", "name": "Emma", "age": 20, "ts": "1970-01-01 00:00:06", "partition1": "par3", "table_name":"table1"}
{"uuid": "id7", "name": "Bob", "age": 44, "ts": "1970-01-01 00:00:07", "partition1": "par4", "table_name":"table1"}
{"uuid": "id8", "name": "Han", "age": 56, "ts": "1970-01-01 00:00:08", "partition1": "par4", "table_name":"table1"}
{"uuid": "id1", "name": "Danny", "age": 23, "ts": "1970-01-01 00:00:01", "partition1": "par1", "table_name":"table2"}
{"uuid": "id2", "name": "Stephen", "age": 33, "ts": "1970-01-01 00:00:02", "partition1": "par1", "table_name":"table2"}
{"uuid": "id3", "name": "Julian", "age": 53, "ts": "1970-01-01 00:00:03", "partition1": "par2", "table_name":"table2"}
{"uuid": "id4", "name": "Fabian", "age": 31, "ts": "1970-01-01 00:00:04", "partition1": "par2", "table_name":"table2"}
{"uuid": "id5", "name": "Sophia", "age": 18, "ts": "1970-01-01 00:00:05", "partition1": "par3", "table_name":"table2"}
{"uuid": "id6", "name": "Emma", "age": 20, "ts": "1970-01-01 00:00:06", "partition1": "par3", "table_name":"table2"}
{"uuid": "id7", "name": "Bob", "age": 44, "ts": "1970-01-01 00:00:07", "partition1": "par4", "table_name":"table2"}
{"uuid": "id8", "name": "Han", "age": 56, "ts": "1970-01-01 00:00:08", "partition1": "par4", "table_name":"table2"}           
[root@felixzh1 bin]# ./kafka-console-producer.sh --broker-list felixzh1:9092 --topic hudi2 < hudi_default_data           
趕快上車:實時數倉Kafka-Flink-Hudi-Hive最新版SQL操作實戰案例

檢視Hive結果

如果資料庫和表不存在,會自動建立。

show tables;
select * from kafka_hudi_jdbc;
select * from kafka_hudi_hms;           
趕快上車:實時數倉Kafka-Flink-Hudi-Hive最新版SQL操作實戰案例
趕快上車:實時數倉Kafka-Flink-Hudi-Hive最新版SQL操作實戰案例
趕快上車:實時數倉Kafka-Flink-Hudi-Hive最新版SQL操作實戰案例