天天看點

整合Apache Hudi+Flink+CDH

一、環境準備

1.編譯hudi:看我另外一篇hudi的編譯文檔

2.環境準備:

flink 1.13.1+hudi0.10+hive2.1.1+cdh6.3.0+kafka2.2.1

3.配置flink on yarn模式

配置如下:flink-conf.yaml的配置檔案如下

4.配置flink的環境變量

5.檢視flink是否能正常使用

整合Apache Hudi+Flink+CDH

6.hudi編譯好的jar包和kafka的jar包放到flink的lib下

整合Apache Hudi+Flink+CDH

以下三個包也要放到flink的lib下,否則同步資料到hive的時候會報錯。

整合Apache Hudi+Flink+CDH

7.部署同步sync to hive的環境

将hudi-hadoop-mr-bundle-0.10.0-SNAPSHOT.jar包放入到以下路徑

路徑如下:

整合Apache Hudi+Flink+CDH
整合Apache Hudi+Flink+CDH
整合Apache Hudi+Flink+CDH

8.進入平台操作安裝 YARN MapReduce 架構 JAR

整合Apache Hudi+Flink+CDH

9.hive的輔助jar

整合Apache Hudi+Flink+CDH

因為後面考慮到hudi的資料要存到oss上,是以要放這幾個包進來(關于oss的配置詳細可參考oss配置文檔)

整合Apache Hudi+Flink+CDH

10.重新開機hive,使配置生效

整合Apache Hudi+Flink+CDH

二、測試demo

1.建立kafka資料

2.啟動flink-sql

整合Apache Hudi+Flink+CDH

3.執行hudi的Demo語句

<col>

                              類型

                          備注

inyint

 1位元組 整數值

smallint

 2位元組 整數值

int

 4位元組 整數值

bigint

 8位元組 整數值

decimal(precision, scale)

 精确數值,精度precision,小數點後位數scale

precision

取值1~38,預設預設為9

scale

不能大于precision,預設預設為0

float

 4位元組 浮點型

double

 8位元組 浮點型

boolean

 true/false

char(length)

 固定長度字元,length必填(1~255)

整合Apache Hudi+Flink+CDH
整合Apache Hudi+Flink+CDH
整合Apache Hudi+Flink+CDH

以上證明跑成功了,去yarn上看

整合Apache Hudi+Flink+CDH

kafka正常消費了。

多幾次往kafka裡面造資料

整合Apache Hudi+Flink+CDH

注意:要以char8更新,因類這個是key

檢視hudi裡面是否生成parquet檔案

整合Apache Hudi+Flink+CDH

到hue上檢視hive中是否有資料同步過來

資料已經從hudi中同步到hive了。

整合Apache Hudi+Flink+CDH

FAQ:

2021-11-04 16:17:29,687 ERROR org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] - Could not start cluster entrypoint YarnJobClusterEntrypoint.org.apache.flink.runtime.entrypoint.ClusterEntrypointException: Failed to initialize the cluster entrypoint YarnJobClusterEntrypoint.  at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:212) ~[flink-dist_2.11-1.13.1.jar:1.13.1]  at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:600) [flink-dist_2.11-1.13.1.jar:1.13.1]  at org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint.main(YarnJobClusterEntrypoint.java:99) [flink-dist_2.11-1.13.1.jar:1.13.1]Caused by: org.apache.flink.util.FlinkException: Could not create the DispatcherResourceManagerComponent.  at org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory.create(DefaultDispatcherResourceManagerComponentFactory.java:275) ~[flink-dist_2.11-1.13.1.jar:1.13.1]  at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:250) ~[flink-dist_2.11-1.13.1.jar:1.13.1]  at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$1(ClusterEntrypoint.java:189) ~[flink-dist_2.11-1.13.1.jar:1.13.1]  at java.security.AccessController.doPrivileged(Native Method) ~[?:1.8.0_181]  at javax.security.auth.Subject.doAs(Subject.java:422) ~[?:1.8.0_181]  at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875) ~[hadoop-common-3.0.0-cdh6.3.0.jar:?]  at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) ~[flink-dist_2.11-1.13.1.jar:1.13.1]  at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:186) ~[flink-dist_2.11-1.13.1.jar:1.13.1]  ... 2 more

Caused by: java.net.BindException: Could not start rest endpoint on any port in port range 40631  at org.apache.flink.runtime.rest.RestServerEndpoint.start(RestServerEndpoint.java:234) ~[flink-dist_2.11-1.13.1.jar:1.13.1]  at org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory.create(DefaultDispatcherResourceManagerComponentFactory.java:172) ~[flink-dist_2.11-1.13.1.jar:1.13.1]  at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:250) ~[flink-dist_2.11-1.13.1.jar:1.13.1]  at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$1(ClusterEntrypoint.java:189) ~[flink-dist_2.11-1.13.1.jar:1.13.1]  at java.security.AccessController.doPrivileged(Native Method) ~[?:1.8.0_181]  at javax.security.auth.Subject.doAs(Subject.java:422) ~[?:1.8.0_181]  at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875) ~[hadoop-common-3.0.0-cdh6.3.0.jar:?]  at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) ~[flink-dist_2.11-1.13.1.jar:1.13.1]  at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:186) ~[flink-dist_2.11-1.13.1.jar:1.13.1]  ... 2 more 

以上報錯是因為缺少hadoop-mapreduce-client-core的jar包導緻的

解決方案:

需要把以下三個jar包放到flink的lib目錄下即可。

整合Apache Hudi+Flink+CDH

線上壓縮政策沒起之前占用記憶體資源,推薦離線壓縮,但離線壓縮需手動根據壓縮政策才可觸發

cow寫少讀多的場景 mor 相反

MOR表壓縮線上壓縮按照配置壓縮,如壓縮失敗,會有重試壓縮操作,重試壓縮操作延遲一小時後重試

繼續閱讀