在某些場景中,比如GROUP BY聚合之後的結果,需要去更新之前的結果值。這個時候,需要将 Kafka 消息記錄的 key 當成主鍵處理,用來确定一條資料是應該作為插入、删除還是更新記錄來處理。在Flink1.11中,可以通過 flink-cdc-connectors 項目提供的 changelog-json format來實作該功能。
在Flink1.12版本中, 新增了一個 upsert connector(upsert-kafka),該 connector 擴充自現有的 Kafka connector,工作在 upsert 模式(FLIP-149)下。新的 upsert-kafka connector 既可以作為 source 使用,也可以作為 sink 使用,并且提供了與現有的 kafka connector 相同的基本功能和持久性保證,因為兩者之間複用了大部分代碼。本文将以Flink1.12為例,介紹該功能的基本使用步驟,以下是全文,希望對你有所幫助。
Upsert Kafka Connector允許使用者以upsert的方式從Kafka主題讀取資料或将資料寫入Kafka主題。
當作為資料源時,upsert-kafka Connector會生産一個changelog流,其中每條資料記錄都表示一個更新或删除事件。更準确地說,如果不存在對應的key,則視為INSERT操作。如果已經存在了相對應的key,則該key對應的value值為最後一次更新的值。
用表來類比,changelog 流中的資料記錄被解釋為 UPSERT,也稱為 INSERT/UPDATE,因為任何具有相同 key 的現有行都被覆寫。另外,value 為空的消息将會被視作為 DELETE 消息。
當作為資料彙時,upsert-kafka Connector會消費一個changelog流。它将INSERT / UPDATE_AFTER資料作為正常的Kafka消息值寫入(即INSERT和UPDATE操作,都會進行正常寫入,如果是更新,則同一個key會存儲多條資料,但在讀取該表資料時,隻保留最後一次更新的值),并将 DELETE 資料以 value 為空的 Kafka 消息寫入(key被打上墓碑标記,表示對應 key 的消息被删除)。Flink 将根據主鍵列的值對資料進行分區,進而保證主鍵上的消息有序,是以同一主鍵上的更新/删除消息将落在同一分區中
為了使用Upsert Kafka連接配接器,需要添加下面的依賴
如果使用SQL Client,需要下載下傳flink-sql-connector-kafka_2.11-1.12.0.jar,并将其放置在Flink安裝目錄的lib檔案夾下。
尖叫提示: 要使用 upsert-kafka connector,必須在建立表時使用 PRIMARY KEY定義主鍵,并為鍵(key.format)和值(value.format)指定序列化反序列化格式。
connector
必選。指定要使用的連接配接器,Upsert Kafka 連接配接器使用:<code>'upsert-kafka'</code>。
topic
必選。用于讀取和寫入的 Kafka topic 名稱。
properties.bootstrap.servers
必選。以逗号分隔的 Kafka brokers 清單。
key.format
必選。用于對 Kafka 消息中 key 部分序列化和反序列化的格式。key 字段由 PRIMARY KEY 文法指定。支援的格式包括 <code>'csv'</code>、<code>'json'</code>、<code>'avro'</code>。
value.format
必選。用于對 Kafka 消息中 value 部分序列化和反序列化的格式。支援的格式包括 <code>'csv'</code>、<code>'json'</code>、<code>'avro'</code>。
properties.*
可選。該選項可以傳遞任意的 Kafka 參數。選項的字尾名必須比對定義在 Kafka 參數文檔中的參數名。Flink 會自動移除 選項名中的 "properties." 字首,并将轉換後的鍵名以及值傳入 KafkaClient。例如,你可以通過 <code>'properties.allow.auto.create.topics' = 'false'</code> 來禁止自動建立 topic。但是,某些選項,例如<code>'key.deserializer'</code> 和 <code>'value.deserializer'</code> 是不允許通過該方式傳遞參數,因為 Flink 會重寫這些參數的值。
value.fields-include
可選,預設為ALL。控制key字段是否出現在 value 中。當取ALL時,表示<code>消息的 value 部分将包含 schema 中所有的字段,包括定義為主鍵的字段。</code>當取EXCEPT_KEY時,表示記錄的 value 部分包含 schema 的所有字段,定義為主鍵的字段除外。
key.fields-prefix
可選。為了避免與value字段命名沖突,為key字段添加一個自定義字首。預設字首為空。一旦指定了key字段的字首,必須在DDL中指明字首的名稱,但是在建構key的序列化資料類型時,将移除該字首。見下面的示例。在需要注意的是:使用該配置屬性,value.fields-include的值必須為EXCEPT_KEY。
如果指定了key字段字首,但在DDL中并沒有添加該字首字元串,那麼在向該表寫入數時,會抛出下面異常: [ERROR] Could not execute SQL statement. Reason: org.apache.flink.table.api.ValidationException: All fields in 'key.fields' must be prefixed with 'qwe' when option 'key.fields-prefix' is set but field 'do_date' is not prefixed.
sink.parallelism
可選。定義 upsert-kafka sink 算子的并行度。預設情況下,由架構确定并行度,與上遊連結算子的并行度保持一緻。
關于Key、value的序列化可以參考Kafka connector。值得注意的是,必須指定Key和Value的序列化格式,其中Key是通過PRIMARY KEY指定的。
Upsert Kafka 工作在 upsert 模式(FLIP-149)下。當我們建立表時,需要在 DDL 中定義主鍵。具有相同key的資料,會存在相同的分區中。在 changlog source 上定義主鍵意味着在物化後的 changelog 上主鍵具有唯一性。定義的主鍵将決定哪些字段出現在 Kafka 消息的 key 中。
預設情況下,如果啟用 checkpoint,Upsert Kafka sink 會保證至少一次将資料插入 Kafka topic。
這意味着,Flink 可以将具有相同 key 的重複記錄寫入 Kafka topic。但由于該連接配接器以 upsert 的模式工作,該連接配接器作為 source 讀入時,可以確定具有相同主鍵值下僅最後一條消息會生效。是以,upsert-kafka 連接配接器可以像 HBase sink 一樣實作幂等寫入。
Flink 支援根據 Upsert Kafka 的 每個分區的資料特性發送相應的 watermark。當使用這個特性的時候,watermark 是在 Kafka consumer 内部生成的。合并每個分區生成的 watermark 的方式和 streaming shuffle 的方式是一緻的(單個分區的輸入取最大值,多個分區的輸入取最小值)。資料源産生的 watermark 是取決于該 consumer 負責的所有分區中目前最小的 watermark。如果該 consumer 負責的部分分區是空閑的,那麼整體的 watermark 并不會前進。在這種情況下,可以通過設定合适的 table.exec.source.idle-timeout 來緩解這個問題。
Upsert Kafka 用位元組bytes存儲消息的 key 和 value,是以沒有 schema 或資料類型。消息按格式進行序列化和反序列化,例如:csv、json、avro。不同的序列化格式所提供的資料類型有所不同,是以需要根據使用的序列化格式進行确定表字段的資料類型是否與該序列化類型提供的資料類型相容。
本文以實時地統計網頁PV和UV的總量為例,介紹upsert-kafka基本使用方式:
Kafka 資料源
使用者的ippv資訊,一個使用者在一天内可以有很多次pv
Kafka Sink表
統計每分鐘的PV、UV,并将結果存儲在Kafka中
計算邏輯
生産使用者通路資料到kafka,向kafka中的user_ippv插入資料:
查詢結果表:

可以看出:每分鐘的pv、uv隻顯示一條資料,即代表着截止到目前時間點的pv和uv
檢視Kafka中result_total_pvuv_min主題的資料,如下:
可以看出:針對每一條通路資料,觸發計算了一次PV、UV,每一條資料都是截止到目前時間的累計PV和UV。
預設情況下,如果在啟用了檢查點的情況下執行查詢,Upsert Kafka接收器會将具有至少一次保證的資料提取到Kafka主題中。 這意味着,Flink可能會将具有相同鍵的重複記錄寫入Kafka主題。但是,由于連接配接器在upsert模式下工作,是以作為源讀回時,同一鍵上的最後一條記錄将生效。是以,upsert-kafka連接配接器就像HBase接收器一樣實作幂等寫入。
本文以Flink1.12為例,介紹了upsert-kafka的基本使用,該方式允許使用者以upsert 的方式讀寫Kafka中的表,使用起來非常友善。另外本文也給出了一個具體的使用案例,可以進一步加深對該功能的使用。