
1.概述
轉載:Flink與Kafka版本對應關系
轉載這個主要是上次做flink kafka版本更新,忘記記錄了。
先上幹貨:以下為Flink和Kafka的版本對照表
Flink版本 | Kafka版本 |
---|---|
1.12.X | 2.4.1 |
1.11.X | 2.4.1 |
1.10.X | 2.2.1 |
1.9.X | 2.2.0 |
1.8.X | 2.0.1 |
1.7.X | 2.0.1 |
0.10.x | 0.8.2.0 |
0.9.x | 0.8.2.0 |
最近在使用Flink進行Kafka吞吐量測試時,遇到了以下問題導緻程式無法運作
去網上查資料也沒有很直接的解決方法,大緻也就是說是Flink與Kafka的版本問題
目前Flink采用的是1.12.0版本,Kafka采用的是2.3.0版本
報錯資訊如下:
Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:638)
at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1509)
at lenrnflink.Te2.main(Te2.java:24)
Caused by: java.lang.NoSuchMethodError: org.apache.flink.api.common.serialization.DeserializationSchema.open(Lorg/apache/flink/api/common/serialization/DeserializationSchema$InitializationContext;)V
at org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.open(KafkaDeserializationSchemaWrapper.java:46)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:696)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:748)
pom.xml中關于Flink與Flink-Kafka連接配接器配置如下:
Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:638)
at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1509)
at lenrnflink.Te2.main(Te2.java:24)
Caused by: java.lang.NoSuchMethodError: org.apache.flink.api.common.serialization.DeserializationSchema.open(Lorg/apache/flink/api/common/serialization/DeserializationSchema$InitializationContext;)V
at org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.open(KafkaDeserializationSchemaWrapper.java:46)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:696)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:748)
網上說有可能是Kafka伺服器與用戶端Fetch版本不一緻導緻的,然後我經過檢查發現伺服器與用戶端版本Fetch一緻。
Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:638)
at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1509)
at lenrnflink.Te2.main(Te2.java:24)
Caused by: java.lang.NoSuchMethodError: org.apache.flink.api.common.serialization.DeserializationSchema.open(Lorg/apache/flink/api/common/serialization/DeserializationSchema$InitializationContext;)V
at org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.open(KafkaDeserializationSchemaWrapper.java:46)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:696)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:748)
//用戶端fetch版本,0-11,與伺服器一緻
public static Schema[] schemaVersions() {
return new Schema[]{FETCH_REQUEST_V0, FETCH_REQUEST_V1, FETCH_REQUEST_V2, FETCH_REQUEST_V3, FETCH_REQUEST_V4, FETCH_REQUEST_V5, FETCH_REQUEST_V6, FETCH_REQUEST_V7, FETCH_REQUEST_V8, FETCH_REQUEST_V9, FETCH_REQUEST_V10, FETCH_REQUEST_V11};
}
想要去找Flink與Kafka的版本對應關系,發現并沒有找到,偶然在Maven發現了線索
在Maven倉庫網站https://mvnrepository.com/中,找到flink-connector-kafka的詳情頁面:https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka
這裡的依賴包與Flink版本一一對應,我們點選Version為1.12.0的連結進去,發現頁面中有一列為:Compile Dependencies,裡面是kafka-client的2.4.1的依賴
說明Flink1.12.0推薦的Kafka版本為2.4.1,但是我使用的Kafka為2.3.0,版本不一緻,導緻報錯。
之後我把Kafka也更新到2.4.1,錯誤解決
這邊建議各位在使用Flink中Kafka連接配接器的時候,去Maven倉庫中檢視一下依賴,選用比對的Flink與Kafka版本,避免出現異常。
最終我們的是Flink主版本是 flink 1.12.4 但是我們的connect 是
kafka-connect 1.11.0
這樣改動量是最小的,也是支援kafka 0.10.0版本的。
M.擴充
flink引出的kafka不同版本的相容性