最近做了一個将結果資料寫入到Kafka的需求,sink部分代碼如下:
其中StringKeyedSerializationSchema是自定義的實作KeyedSerializationSchema的序列化器,用于序列化寫入kafka的key/value, 任務也上線了,在flink web頁面看到任務各項名額一切正常,也測試消費寫入kafka的資料,得到的結果也如預期一樣,想着萬事大吉了,so easy~
過了一會kafka中間件的同僚找過來說:你這個寫入topic的資料怎麼隻有這幾個分區,其他分區都沒有資料寫入~

什麼情況?任務看着一切都ok啊,怎麼就有分區沒有資料寫入呢?馬上google一下資料寫入kafka的分區政策:
如果指定寫入分區,就将資料寫入分區
如果沒有指定分區,指定了key, 那麼就會按照key hash對分區取模方式發送
如果既沒指定分區又沒指定key,那麼就會以輪序的方式發送
而實際情況是有幾個分區一條資料都沒有寫入,并且在StringKeyedSerializationSchema也指定了每條寫入資料的key, 那麼就一定是第一種情況了,在FlinkKafkaProducer011中指定了資料寫入的分區,馬上翻看源碼,在FlinkKafkaProducer011的invoke方法裡面有這麼一個邏輯:
很明顯就是執行了if邏輯,也是就flinkKafkaPartitioner不為空,在建構ProducerRecord時調用了flinkKafkaPartitioner.partition的方法,指定寫入的partition,而flinkKafkaPartitioner是在FlinkKafkaProducer011初始化的時候給的預設值FlinkFixedPartitioner,在看下其partition方式:
parallelInstanceId表示目前task的index,partitions表示kafka的topic的分區,該邏輯求得的分區就是根據目前task index 對partition取餘得到的,而我設定的sinkParallelism是4,topic的分區數是6,到這裡就比較明朗,取餘永遠不會得到4、5,是以就導緻分區4、5一直沒有資料寫入。如果設定的parallism設定比kafka的分區數還要大,就會導緻得到的partition值大于topic實際partition。
那麼解決方式有一下幾種:
parallism設定成為與kafka topic 分區數一緻大小
将flinkKafkaPartitioner指定為空,并且制定寫入kafka的key
将flinkKafkaPartitioner與寫入的key都置為空
自定義一個FlinkKafkaPartitioner,重寫partition方法
最終選擇第三種較為簡單的方案,修改代碼:
同時将StringKeyedSerializationSchema的serializeKey傳回值設定為null. 再次運作任務,檢視kafka 資料寫入情況,所有分區都有資料寫入。最終破案。