一、环境准备
1.编译hudi:看我另外一篇hudi的编译文档
2.环境准备:
flink 1.13.1+hudi0.10+hive2.1.1+cdh6.3.0+kafka2.2.1
3.配置flink on yarn模式
配置如下:flink-conf.yaml的配置文件如下
4.配置flink的环境变量
5.查看flink是否能正常使用

6.hudi编译好的jar包和kafka的jar包放到flink的lib下
以下三个包也要放到flink的lib下,否则同步数据到hive的时候会报错。
7.部署同步sync to hive的环境
将hudi-hadoop-mr-bundle-0.10.0-SNAPSHOT.jar包放入到以下路径
路径如下:
8.进入平台操作安装 YARN MapReduce 框架 JAR
9.hive的辅助jar
因为后面考虑到hudi的数据要存到oss上,所以要放这几个包进来(关于oss的配置详细可参考oss配置文档)
10.重启hive,使配置生效
二、测试demo
1.创建kafka数据
2.启动flink-sql
3.执行hudi的Demo语句
<col>
类型
备注
inyint
1字节 整数值
smallint
2字节 整数值
int
4字节 整数值
bigint
8字节 整数值
decimal(precision, scale)
精确数值,精度precision,小数点后位数scale
precision
取值1~38,缺省默认为9
scale
不能大于precision,缺省默认为0
float
4字节 浮点型
double
8字节 浮点型
boolean
true/false
char(length)
固定长度字符,length必填(1~255)
以上证明跑成功了,去yarn上看
kafka正常消费了。
多几次往kafka里面造数据
注意:要以char8更新,因类这个是key
查看hudi里面是否生成parquet文件
到hue上查看hive中是否有数据同步过来
数据已经从hudi中同步到hive了。
FAQ:
2021-11-04 16:17:29,687 ERROR org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - Could not start cluster entrypoint YarnJobClusterEntrypoint.org.apache.flink.runtime.entrypoint.ClusterEntrypointException: Failed to initialize the cluster entrypoint YarnJobClusterEntrypoint. at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:212) ~[flink-dist_2.11-1.13.1.jar:1.13.1] at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:600) [flink-dist_2.11-1.13.1.jar:1.13.1] at org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint.main(YarnJobClusterEntrypoint.java:99) [flink-dist_2.11-1.13.1.jar:1.13.1]Caused by: org.apache.flink.util.FlinkException: Could not create the DispatcherResourceManagerComponent. at org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory.create(DefaultDispatcherResourceManagerComponentFactory.java:275) ~[flink-dist_2.11-1.13.1.jar:1.13.1] at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:250) ~[flink-dist_2.11-1.13.1.jar:1.13.1] at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$1(ClusterEntrypoint.java:189) ~[flink-dist_2.11-1.13.1.jar:1.13.1] at java.security.AccessController.doPrivileged(Native Method) ~[?:1.8.0_181] at javax.security.auth.Subject.doAs(Subject.java:422) ~[?:1.8.0_181] at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875) ~[hadoop-common-3.0.0-cdh6.3.0.jar:?] at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) ~[flink-dist_2.11-1.13.1.jar:1.13.1] at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:186) ~[flink-dist_2.11-1.13.1.jar:1.13.1] ... 2 more
Caused by: java.net.BindException: Could not start rest endpoint on any port in port range 40631 at org.apache.flink.runtime.rest.RestServerEndpoint.start(RestServerEndpoint.java:234) ~[flink-dist_2.11-1.13.1.jar:1.13.1] at org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory.create(DefaultDispatcherResourceManagerComponentFactory.java:172) ~[flink-dist_2.11-1.13.1.jar:1.13.1] at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:250) ~[flink-dist_2.11-1.13.1.jar:1.13.1] at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$1(ClusterEntrypoint.java:189) ~[flink-dist_2.11-1.13.1.jar:1.13.1] at java.security.AccessController.doPrivileged(Native Method) ~[?:1.8.0_181] at javax.security.auth.Subject.doAs(Subject.java:422) ~[?:1.8.0_181] at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875) ~[hadoop-common-3.0.0-cdh6.3.0.jar:?] at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) ~[flink-dist_2.11-1.13.1.jar:1.13.1] at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:186) ~[flink-dist_2.11-1.13.1.jar:1.13.1] ... 2 more
以上报错是因为缺少hadoop-mapreduce-client-core的jar包导致的
解决方案:
需要把以下三个jar包放到flink的lib目录下即可。
在线压缩策略没起之前占用内存资源,推荐离线压缩,但离线压缩需手动根据压缩策略才可触发
cow写少读多的场景 mor 相反
MOR表压缩在线压缩按照配置压缩,如压缩失败,会有重试压缩操作,重试压缩操作延迟一小时后重试