天天看點

Flink exactly-once 實戰筆記 

Flink-Kafka

衆所周知,Flink在很早的時候就通過Checkpointing提供了exactly-once的semantic,不過僅限于自身或者是從KafkaConsumer中消費資料。而在Flink 1.4版本的時候加入了赫赫有名的TwoPhaseCommitSinkFunction,提供了End-to-End的exatcly-once語言,當然是在需要下遊支援復原的情況下,具體的概念和設計方式官網已經寫的比較清楚,就不多加贅述。而對于KafkaProducer,Kafka在0.11版本之後支援transaction,也就意味着支援對寫入資料的commit和rollback,在通過Flink寫入到Kafka的應用程式中可以達到exactly-once的效果。

接下來展示一下如何在Flink應用程式中激活exactly-once語義。對于SourceFunction大家随意采用一種即可,檔案,kafka topic等皆可。而主要部分是在于對FlinkKafkaProducer的初始化。我使用的是Flink1.7版本使用的Producer類為FlinkKafkaProducer011,觀察它的構造函數,很容易發現有的構造函數中需要你傳入一個枚舉變量semantic, 有三種可選值NONE, AT_LEAST_ONCE,EXACTLY_ONCE,而預設值為AT_LEAST_ONCE,很顯然我們在這裡需要使用EXACTLY_ONCE。不過在此之前,我們需要仔細閱讀一下Flink官網Flink-kafka-connector的内容,其中提到,Kafka broker的transaction.max.timeout.ms預設為15分鐘,而FlinkKafkaProducer011預設的transaction.timeout.ms為1個小時,遠遠超出了broker的最大逾時時間,這種情況下如果你的服務挂了超過15分鐘,就會造成資料丢失。是以如果需要你的producer支援的更長的事務時間就需要提高kafka broker transaction.max.timeout.ms的值。下面是一個簡單的執行個體去使用Exactly-once語義的FlinkKafkaProducer。

FlinkKafkaProducer<String> producer = new FlinkKafkaProducer<>(
    topics,
    new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()),
    properties,
    FlinkKafkaProducer011.Semantic.EXACTLY_ONCE
)
           

這麼做的話Flink sink到Kafka中在大部分情況下就都能保證Exactly-once。值得注意的是,所有通過事務寫入的Kafka topic, 在消費他們的時候,必須給消費者加上參數isolation.level=read_committed,這是因為Kafka的事務支援是給寫入的資料分為committed和uncomitted,如果使用預設配置的consumer,讀取的時候依然會讀取所有資料而不是根據事務隔離。

繼續閱讀