天天看點

【Flink】 Flink與Kafka版本對應關系1.概述M.擴充

【Flink】 Flink與Kafka版本對應關系1.概述M.擴充

1.概述

轉載:Flink與Kafka版本對應關系

轉載這個主要是上次做flink kafka版本更新,忘記記錄了。

先上幹貨:以下為Flink和Kafka的版本對照表

Flink版本 Kafka版本
1.12.X 2.4.1
1.11.X 2.4.1
1.10.X 2.2.1
1.9.X 2.2.0
1.8.X 2.0.1
1.7.X 2.0.1
0.10.x 0.8.2.0
0.9.x 0.8.2.0

最近在使用Flink進行Kafka吞吐量測試時,遇到了以下問題導緻程式無法運作

去網上查資料也沒有很直接的解決方法,大緻也就是說是Flink與Kafka的版本問題

目前Flink采用的是1.12.0版本,Kafka采用的是2.3.0版本

報錯資訊如下:

Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
	at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
	at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:638)
	at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
	at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1509)
	at lenrnflink.Te2.main(Te2.java:24)
Caused by: java.lang.NoSuchMethodError: org.apache.flink.api.common.serialization.DeserializationSchema.open(Lorg/apache/flink/api/common/serialization/DeserializationSchema$InitializationContext;)V
	at org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.open(KafkaDeserializationSchemaWrapper.java:46)
	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:696)
	at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
	at java.lang.Thread.run(Thread.java:748)
           

pom.xml中關于Flink與Flink-Kafka連接配接器配置如下:

Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
	at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
	at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:638)
	at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
	at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1509)
	at lenrnflink.Te2.main(Te2.java:24)
Caused by: java.lang.NoSuchMethodError: org.apache.flink.api.common.serialization.DeserializationSchema.open(Lorg/apache/flink/api/common/serialization/DeserializationSchema$InitializationContext;)V
	at org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.open(KafkaDeserializationSchemaWrapper.java:46)
	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:696)
	at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
	at java.lang.Thread.run(Thread.java:748)

           

網上說有可能是Kafka伺服器與用戶端Fetch版本不一緻導緻的,然後我經過檢查發現伺服器與用戶端版本Fetch一緻。

Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
	at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
	at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:638)
	at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
	at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1509)
	at lenrnflink.Te2.main(Te2.java:24)
Caused by: java.lang.NoSuchMethodError: org.apache.flink.api.common.serialization.DeserializationSchema.open(Lorg/apache/flink/api/common/serialization/DeserializationSchema$InitializationContext;)V
	at org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.open(KafkaDeserializationSchemaWrapper.java:46)
	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:696)
	at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
	at java.lang.Thread.run(Thread.java:748)

           
//用戶端fetch版本,0-11,與伺服器一緻
    public static Schema[] schemaVersions() {
        return new Schema[]{FETCH_REQUEST_V0, FETCH_REQUEST_V1, FETCH_REQUEST_V2, FETCH_REQUEST_V3, FETCH_REQUEST_V4, FETCH_REQUEST_V5, FETCH_REQUEST_V6, FETCH_REQUEST_V7, FETCH_REQUEST_V8, FETCH_REQUEST_V9, FETCH_REQUEST_V10, FETCH_REQUEST_V11};
    }
    
           

想要去找Flink與Kafka的版本對應關系,發現并沒有找到,偶然在Maven發現了線索

在Maven倉庫網站https://mvnrepository.com/中,找到flink-connector-kafka的詳情頁面:https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka

【Flink】 Flink與Kafka版本對應關系1.概述M.擴充

這裡的依賴包與Flink版本一一對應,我們點選Version為1.12.0的連結進去,發現頁面中有一列為:Compile Dependencies,裡面是kafka-client的2.4.1的依賴

【Flink】 Flink與Kafka版本對應關系1.概述M.擴充

說明Flink1.12.0推薦的Kafka版本為2.4.1,但是我使用的Kafka為2.3.0,版本不一緻,導緻報錯。

之後我把Kafka也更新到2.4.1,錯誤解決

這邊建議各位在使用Flink中Kafka連接配接器的時候,去Maven倉庫中檢視一下依賴,選用比對的Flink與Kafka版本,避免出現異常。

最終我們的是Flink主版本是 flink 1.12.4 但是我們的connect 是

kafka-connect 1.11.0

這樣改動量是最小的,也是支援kafka 0.10.0版本的。

M.擴充

flink引出的kafka不同版本的相容性

繼續閱讀