一、環境準備
1.編譯hudi:看我另外一篇hudi的編譯文檔
2.環境準備:
flink 1.13.1+hudi0.10+hive2.1.1+cdh6.3.0+kafka2.2.1
3.配置flink on yarn模式
配置如下:flink-conf.yaml的配置檔案如下
4.配置flink的環境變量
5.檢視flink是否能正常使用

6.hudi編譯好的jar包和kafka的jar包放到flink的lib下
以下三個包也要放到flink的lib下,否則同步資料到hive的時候會報錯。
7.部署同步sync to hive的環境
将hudi-hadoop-mr-bundle-0.10.0-SNAPSHOT.jar包放入到以下路徑
路徑如下:
8.進入平台操作安裝 YARN MapReduce 架構 JAR
9.hive的輔助jar
因為後面考慮到hudi的資料要存到oss上,是以要放這幾個包進來(關于oss的配置詳細可參考oss配置文檔)
10.重新開機hive,使配置生效
二、測試demo
1.建立kafka資料
2.啟動flink-sql
3.執行hudi的Demo語句
<col>
類型
備注
inyint
1位元組 整數值
smallint
2位元組 整數值
int
4位元組 整數值
bigint
8位元組 整數值
decimal(precision, scale)
精确數值,精度precision,小數點後位數scale
precision
取值1~38,預設預設為9
scale
不能大于precision,預設預設為0
float
4位元組 浮點型
double
8位元組 浮點型
boolean
true/false
char(length)
固定長度字元,length必填(1~255)
以上證明跑成功了,去yarn上看
kafka正常消費了。
多幾次往kafka裡面造資料
注意:要以char8更新,因類這個是key
檢視hudi裡面是否生成parquet檔案
到hue上檢視hive中是否有資料同步過來
資料已經從hudi中同步到hive了。
FAQ:
2021-11-04 16:17:29,687 ERROR org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - Could not start cluster entrypoint YarnJobClusterEntrypoint.org.apache.flink.runtime.entrypoint.ClusterEntrypointException: Failed to initialize the cluster entrypoint YarnJobClusterEntrypoint. at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:212) ~[flink-dist_2.11-1.13.1.jar:1.13.1] at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:600) [flink-dist_2.11-1.13.1.jar:1.13.1] at org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint.main(YarnJobClusterEntrypoint.java:99) [flink-dist_2.11-1.13.1.jar:1.13.1]Caused by: org.apache.flink.util.FlinkException: Could not create the DispatcherResourceManagerComponent. at org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory.create(DefaultDispatcherResourceManagerComponentFactory.java:275) ~[flink-dist_2.11-1.13.1.jar:1.13.1] at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:250) ~[flink-dist_2.11-1.13.1.jar:1.13.1] at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$1(ClusterEntrypoint.java:189) ~[flink-dist_2.11-1.13.1.jar:1.13.1] at java.security.AccessController.doPrivileged(Native Method) ~[?:1.8.0_181] at javax.security.auth.Subject.doAs(Subject.java:422) ~[?:1.8.0_181] at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875) ~[hadoop-common-3.0.0-cdh6.3.0.jar:?] at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) ~[flink-dist_2.11-1.13.1.jar:1.13.1] at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:186) ~[flink-dist_2.11-1.13.1.jar:1.13.1] ... 2 more
Caused by: java.net.BindException: Could not start rest endpoint on any port in port range 40631 at org.apache.flink.runtime.rest.RestServerEndpoint.start(RestServerEndpoint.java:234) ~[flink-dist_2.11-1.13.1.jar:1.13.1] at org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory.create(DefaultDispatcherResourceManagerComponentFactory.java:172) ~[flink-dist_2.11-1.13.1.jar:1.13.1] at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:250) ~[flink-dist_2.11-1.13.1.jar:1.13.1] at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$1(ClusterEntrypoint.java:189) ~[flink-dist_2.11-1.13.1.jar:1.13.1] at java.security.AccessController.doPrivileged(Native Method) ~[?:1.8.0_181] at javax.security.auth.Subject.doAs(Subject.java:422) ~[?:1.8.0_181] at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875) ~[hadoop-common-3.0.0-cdh6.3.0.jar:?] at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) ~[flink-dist_2.11-1.13.1.jar:1.13.1] at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:186) ~[flink-dist_2.11-1.13.1.jar:1.13.1] ... 2 more
以上報錯是因為缺少hadoop-mapreduce-client-core的jar包導緻的
解決方案:
需要把以下三個jar包放到flink的lib目錄下即可。
線上壓縮政策沒起之前占用記憶體資源,推薦離線壓縮,但離線壓縮需手動根據壓縮政策才可觸發
cow寫少讀多的場景 mor 相反
MOR表壓縮線上壓縮按照配置壓縮,如壓縮失敗,會有重試壓縮操作,重試壓縮操作延遲一小時後重試