需求:将gzip压缩文件的数据同步到mysql数据库
gzip格式文件作为parquet格式的压缩文件存在,我们在获取的时候,通过hive创建表来获取
参考hive建表语句
create table table_name
(id string, name string, score double)
partitioned by (dt string)
STORED AS PARQUET TBLPROPERTIES('parquet.compression'='gzip');
给hive表插入数据,最后到hdfs上面将生成的gzip格式的文件数据下载下来
一、添加依赖包
gzip数据文件作为 parquet格式文件的压缩格式,只需要在dinky的plugins目录和flink的lib目录下添加parquet依赖包就可以,然后重启dinky和flink
依赖包下载地址参考:https://www.bookstack.cn/read/ApacheFlink-1.13-zh/d8d654b36a91abf4.md
下载的时候注意自己的flink版本
二、创建作业
三、编写flinksql代码
CREATE TABLE fs_table (
`id` STRING,
`name` STRING,
`sex` STRING,
`age` INT,
`work` STRING
) WITH (
'connector'='filesystem',
'path'='/home/data/gzip',
'format'='parquet'
);
CREATE TABLE gzip_out(
`id` STRING,
`name` STRING,
`sex` STRING,
`age` INT,
`work` STRING
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://172.16.119.50:3306/test?createDatabaseIfNotExist=true&useSSL=false',
'username' = 'root',
'password' = 'Tj@20220710',
'table-name' = 'gzip_out'
);
INSERT INTO gzip_out select id,name,sex,age,work from
四、运行作业
检查sql语句没有语法问题,选定提前部署好的flink集群,点击运行
通过flink的页面可以看到作业执行结束