天天看点

整合Apache Hudi+Flink+CDH

一、环境准备

1.编译hudi:看我另外一篇hudi的编译文档

2.环境准备:

flink 1.13.1+hudi0.10+hive2.1.1+cdh6.3.0+kafka2.2.1

3.配置flink on yarn模式

配置如下:flink-conf.yaml的配置文件如下

4.配置flink的环境变量

5.查看flink是否能正常使用

整合Apache Hudi+Flink+CDH

6.hudi编译好的jar包和kafka的jar包放到flink的lib下

整合Apache Hudi+Flink+CDH

以下三个包也要放到flink的lib下,否则同步数据到hive的时候会报错。

整合Apache Hudi+Flink+CDH

7.部署同步sync to hive的环境

将hudi-hadoop-mr-bundle-0.10.0-SNAPSHOT.jar包放入到以下路径

路径如下:

整合Apache Hudi+Flink+CDH
整合Apache Hudi+Flink+CDH
整合Apache Hudi+Flink+CDH

8.进入平台操作安装 YARN MapReduce 框架 JAR

整合Apache Hudi+Flink+CDH

9.hive的辅助jar

整合Apache Hudi+Flink+CDH

因为后面考虑到hudi的数据要存到oss上,所以要放这几个包进来(关于oss的配置详细可参考oss配置文档)

整合Apache Hudi+Flink+CDH

10.重启hive,使配置生效

整合Apache Hudi+Flink+CDH

二、测试demo

1.创建kafka数据

2.启动flink-sql

整合Apache Hudi+Flink+CDH

3.执行hudi的Demo语句

<col>

                              类型

                          备注

inyint

 1字节 整数值

smallint

 2字节 整数值

int

 4字节 整数值

bigint

 8字节 整数值

decimal(precision, scale)

 精确数值,精度precision,小数点后位数scale

precision

取值1~38,缺省默认为9

scale

不能大于precision,缺省默认为0

float

 4字节 浮点型

double

 8字节 浮点型

boolean

 true/false

char(length)

 固定长度字符,length必填(1~255)

整合Apache Hudi+Flink+CDH
整合Apache Hudi+Flink+CDH
整合Apache Hudi+Flink+CDH

以上证明跑成功了,去yarn上看

整合Apache Hudi+Flink+CDH

kafka正常消费了。

多几次往kafka里面造数据

整合Apache Hudi+Flink+CDH

注意:要以char8更新,因类这个是key

查看hudi里面是否生成parquet文件

整合Apache Hudi+Flink+CDH

到hue上查看hive中是否有数据同步过来

数据已经从hudi中同步到hive了。

整合Apache Hudi+Flink+CDH

FAQ:

2021-11-04 16:17:29,687 ERROR org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] - Could not start cluster entrypoint YarnJobClusterEntrypoint.org.apache.flink.runtime.entrypoint.ClusterEntrypointException: Failed to initialize the cluster entrypoint YarnJobClusterEntrypoint.  at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:212) ~[flink-dist_2.11-1.13.1.jar:1.13.1]  at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:600) [flink-dist_2.11-1.13.1.jar:1.13.1]  at org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint.main(YarnJobClusterEntrypoint.java:99) [flink-dist_2.11-1.13.1.jar:1.13.1]Caused by: org.apache.flink.util.FlinkException: Could not create the DispatcherResourceManagerComponent.  at org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory.create(DefaultDispatcherResourceManagerComponentFactory.java:275) ~[flink-dist_2.11-1.13.1.jar:1.13.1]  at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:250) ~[flink-dist_2.11-1.13.1.jar:1.13.1]  at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$1(ClusterEntrypoint.java:189) ~[flink-dist_2.11-1.13.1.jar:1.13.1]  at java.security.AccessController.doPrivileged(Native Method) ~[?:1.8.0_181]  at javax.security.auth.Subject.doAs(Subject.java:422) ~[?:1.8.0_181]  at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875) ~[hadoop-common-3.0.0-cdh6.3.0.jar:?]  at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) ~[flink-dist_2.11-1.13.1.jar:1.13.1]  at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:186) ~[flink-dist_2.11-1.13.1.jar:1.13.1]  ... 2 more

Caused by: java.net.BindException: Could not start rest endpoint on any port in port range 40631  at org.apache.flink.runtime.rest.RestServerEndpoint.start(RestServerEndpoint.java:234) ~[flink-dist_2.11-1.13.1.jar:1.13.1]  at org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory.create(DefaultDispatcherResourceManagerComponentFactory.java:172) ~[flink-dist_2.11-1.13.1.jar:1.13.1]  at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:250) ~[flink-dist_2.11-1.13.1.jar:1.13.1]  at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$1(ClusterEntrypoint.java:189) ~[flink-dist_2.11-1.13.1.jar:1.13.1]  at java.security.AccessController.doPrivileged(Native Method) ~[?:1.8.0_181]  at javax.security.auth.Subject.doAs(Subject.java:422) ~[?:1.8.0_181]  at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875) ~[hadoop-common-3.0.0-cdh6.3.0.jar:?]  at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) ~[flink-dist_2.11-1.13.1.jar:1.13.1]  at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:186) ~[flink-dist_2.11-1.13.1.jar:1.13.1]  ... 2 more 

以上报错是因为缺少hadoop-mapreduce-client-core的jar包导致的

解决方案:

需要把以下三个jar包放到flink的lib目录下即可。

整合Apache Hudi+Flink+CDH

在线压缩策略没起之前占用内存资源,推荐离线压缩,但离线压缩需手动根据压缩策略才可触发

cow写少读多的场景 mor 相反

MOR表压缩在线压缩按照配置压缩,如压缩失败,会有重试压缩操作,重试压缩操作延迟一小时后重试

继续阅读