天天看點

Flink 源碼 | 自定義 Format 消費 Maxwell CDC 資料

Flink 1.11 最重要的 Feature —— Hive Streaming 之前已經和大家分享過了,今天就和大家來聊一聊另一個特别重要的功能 —— CDC。

CDC概述

何為CDC?Change Data Capture,将資料庫中的’增’、’改’、’删’操作記錄下來。在很早之前是通過觸發器來完成記錄,現在通過 binlog+同步中間件來實作。常用的 binlog 同步中間件有很多,比如 Alibaba 開源的 canal[1],Red Hat 開源的debezium[2],Zendesk 開源的 Maxwell[3] 等等。

這些中間件會負責 binlog 的解析,并同步到消息中間件中,我們隻需要消費對應的 Topic 即可。

回到 Flink 上,CDC 似乎和我們沒有太大的關聯?其實不然,讓我們更加抽象地來看這個世界。

當我們用 Flink 去消費資料比如 Kafka 時,我們就仿佛在讀一張表,什麼表?一張不斷有記錄被插入的表,我們将每一條被插入的資料取出來,完成我們的邏輯。

Flink 源碼 | 自定義 Format 消費 Maxwell CDC 資料

當插入的每條資料都沒有問題時,一切都很美好。關聯、聚合、輸出。

但當我們發現,某條已經被計算過的資料有問題時,麻煩大了。我們直接改最後的輸出值其實是沒有用的,這次改了,當再來資料觸發計算時,結果還是會被錯誤的資料覆寫,因為中間計算結果沒有被修改,它仍然是一個錯誤的值。怎麼辦?撤回流似乎能解決這個問題,這也确實是解決這個問題的手段,但是問題來了,撤回流怎麼确定讀取的資料是要被撤回的?另外,怎麼去觸發一次撤回?

CDC 解決了這些:将消息中間件的資料反序列化後,根據 Type 來識别資料是 Insert 還是 Delete;另外,如果大家看過 Flink 源碼,會發現反序列化後的資料類型變了,從 Row 更新為 RowData,RowData 能夠将資料标記為撤回還是插入,這就意味着每個算子能夠判斷出資料到底是需要下發還是撤回。

CDC 的重要性就先說這麼多,之後有機會的話,出一篇實時 DQC 的視訊,告訴大家 CDC 的出現,對于實時 DQC 的幫助有多大。下面讓我們回到正題。

既然有那麼多 CDC 同步中間件,那麼一定會有各種各樣的格式存放在消息中間件中,我們必然需要去解析它們。于是 Flink 1.11 提供了 canal-json 和 debezium-json,但我們用的是 Maxwell 怎麼辦?隻能等官方出或者說是等有人向社群貢獻嗎?那如果我們用的是自研的同步中間件怎麼辦?

是以就有了今天的分享:如何去自定義實作一個 Maxwell format。大家也可以基于此文的思路去實作其他 CDC format,比如 OGG, 或是自研 CDC 工具産生的資料格式。

如何實作

當我們送出任務之後,Flink 會通過 SPI 機制将 classpath 下注冊的所有工廠類加載進來,包括 DynamicTableFactory、DeserializationFormatFactory 等等。而對于 Format 來說,到底使用哪個 DeserializationFormatFactory,是根據 DDL 語句中的 Format 來決定的。通過将 Format 的值與工廠類的 factoryIdentifier() 方法的傳回值進行比對 來确定。

再通過 DeserializationFormatFactory 中的 createDecodingFormat(...) 方法,将反序列化對象提供給 DynamicTableSource。

通過圖來了解整個過程(僅從反序列化資料并消費的角度來看):

Flink 源碼 | 自定義 Format 消費 Maxwell CDC 資料

想要實作 CDC Format 去解析某種 CDC 工具産生的資料其實很簡單,核心元件其實就三個:

  • 工廠類(DeserializationFormatFactory):負責編譯時根據 ‘format’ = ‘maxwell-json’建立對應的反序列化器。即 MaxwellJsonFormatFactory。
  • 反序列化類(DeserializationSchema):負責運作時的解析,根據固定格式将 CDC 資料轉換成 Flink 系統能認識的 INSERT/DELETE/UPDATE 消息,如 RowData。即 MaxwellJsonDeserializationSchema。
  • Service 注冊檔案:需要添加 Service 檔案 META-INF/services/org.apache.flink.table.factories.Factory ,并在其中增加一行我們實作的 MaxwellJsonFormatFactory 類路徑。

再通過代碼,來看看反序列化中的細節:

public void deserialize(byte[] message, Collectorout) throws IOException {
       try {
           RowData row = jsonDeserializer.deserialize(message);
           String type = row.getString(2).toString(); // "type" field
           if (OP_INSERT.equals(type)) {
               RowData insert = row.getRow(0, fieldCount);
               insert.setRowKind(RowKind.INSERT);
               out.collect(insert);
           } else if (OP_UPDATE.equals(type)) {
               GenericRowData after = (GenericRowData) row.getRow(0, fieldCount); // "data" field
               GenericRowData before = (GenericRowData) row.getRow(1, fieldCount); // "old" field
               for (int f = 0; f < fieldCount; f++) {
                   if (before.isNullAt(f)) {
                       before.setField(f, after.getField(f));
                   }
               }
               before.setRowKind(RowKind.UPDATE_BEFORE);
               after.setRowKind(RowKind.UPDATE_AFTER);
               out.collect(before);
               out.collect(after);
           } else if (OP_DELETE.equals(type)) {
               RowData delete = row.getRow(0, fieldCount);
               delete.setRowKind(RowKind.DELETE);
               out.collect(delete);
           } else {
               if (!ignoreParseErrors) {
                   throw new IOException(format(
                       "Unknown \"type\" value \"%s\". The Maxwell JSON message is '%s'", type, new String(message)));
               }
           }
       } catch (Throwable t) {
           if (!ignoreParseErrors) {
               throw new IOException(format(
                   "Corrupt Maxwell JSON message '%s'.", new String(message)), t);
           }
       }
   }           

其實并不複雜:先通過 jsonDeserializer 将位元組數組根據 [data: ROW, old: ROW, type: String] 的 schema 反序列化成 RowData,然後根據 “type” 列的值來判斷資料是什麼類型:增、改、删;再根據資料類型取出 “data” 或者 “old” 區的資料,來組裝成 Flink 認識的 INSERT/DELETE/UPDATE 資料并下發。

對象 jsonDeserializer 即 JSON 格式的反序列化器,它可以通過指定的 RowType 類型,讀取 JSON 的位元組數組中指定的字段并反序列化成 RowData。在我們的場景中,我們需要去讀取如下 Maxwell 資料的 “data”, “old” 和 “type” 部分的資料。

{"database":"test","table":"product","type":"update","ts":1596684928,"xid":7291,"commit":true,"data":{"id":102,"name":"car battery","description":"12V car battery","weight":5.17},"old":{"weight":8.1}}           

是以 MaxwellJsonDeserializationSchema 中定義的 JSON 的 RowType 如下所示。

private RowType createJsonRowType(DataType databaseSchema) {
       // Maxwell JSON contains other information, e.g. "database", "ts"
       // but we don't need them
       return (RowType) DataTypes.ROW(
           DataTypes.FIELD("data", databaseSchema),
           DataTypes.FIELD("old", databaseSchema),
           DataTypes.FIELD("type", DataTypes.STRING())).getLogicalType();
   }           

databaseSchema 是使用者通過 DDL 定義的 schema 資訊,也對應着資料庫中表的 schema。結合上面的 JSON 和代碼,我們能夠得知 jsonDeserializer 隻會取走 byte[] 中 data、old、type 這三個字段對應的值,其中 data 和old 還是個嵌套JSON,它們的 schema 資訊和 databaseSchema 一緻。由于 Maxwell 在同步資料時,“old”區不包含未被更新的字段,是以 jsonDeserializer 傳回後,我們會通過 “data” 區的 RowData 将 old 區的缺失字段補齊。

得到 RowData 之後,會取出 type 字段,然後根據對應的值,會有三種分支:

  • insert:取出 data 中的值,也就是我們通過DDL定義的字段對應的值,再将其标記為 RowKind.INSERT 類型資料,最後下發。
  • update:分别取出 data 和 old 的值,然後循環 old 中每個字段,字段值如果為空說明是未修改的字段,那就用 data 中對應位置字段的值替代;之後将 old 标記為 RowKind.UPDATE_BEFORE 也就意味着 Flink 引擎需要将之前對應的值撤回,data 标記為 RowKind.UPDATE_AFTER 正常下發。
  • delete:取出 data 中的值,标記為 RowKind.DELETE,代表需要撤回。

處理的過程中,如果抛出異常,會根據 DDL 中maxwell-json.ignore-parse-errors的值來确定是忽視這條資料繼續處理下一條資料,還是讓任務報錯。

筆者在 maxwell-json 反序列化功能的基礎之上,還實作了序列化的功能,即能将 Flink 産生的 changelog 以 Maxwell 的 JSON 格式輸出到外部系統中。其實作思路與反序列化器的思路正好相反,更多細節可以參考 Pull Request 中的實作。

PR 實作詳情連結: https://github.com/apache/flink/pull/13090

功能示範

給大家示範一下從 Kafka 中讀取 Maxwell 推送來的 maxwell json 格式資料,并将聚合後的資料再次寫入 Kafka 後,重新讀出來驗證資料是否正确。

Kafka 資料源表

CREATE TABLE topic_products (
 -- schema is totally the same to the MySQL "products" table
 id BIGINT,
 name STRING,
 description STRING,
 weight DECIMAL(10, 2)
) WITH (
'connector' = 'kafka',
'topic' = 'maxwell',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'format' = 'maxwell-json');           

Kafka 資料結果表&資料源表

CREATE TABLE topic_sink (
 name STRING,
 sum_weight DECIMAL(10, 2)
) WITH (
'connector' = 'kafka',
'topic' = 'maxwell-sink',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'format' = 'maxwell-json'
);           

MySQL 表

-- 注意,這部分 SQL 在 MySQL 中執行,不是 Flink 中的表
CREATE TABLE product (
id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(255),
description VARCHAR(512),
weight FLOAT
);
truncate product ;
ALTER TABLE product AUTO_INCREMENT = 101;
INSERT INTO product
VALUES (default,"scooter","Small 2-wheel scooter",3.14),
      (default,"car battery","12V car battery",8.1),
      (default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3",0.8),
      (default,"hammer","12oz carpenter's hammer",0.75),
      (default,"hammer","14oz carpenter's hammer",0.875),
      (default,"hammer","16oz carpenter's hammer",1.0),
      (default,"rocks","box of assorted rocks",5.3),
      (default,"jacket","water resistent black wind breaker",0.1),
      (default,"spare tire","24 inch spare tire",22.2);
UPDATE product SET description='18oz carpenter hammer' WHERE id=106;
UPDATE product SET weight='5.1' WHERE id=107;
INSERT INTO product VALUES (default,"jacket","water resistent white wind breaker",0.2);
INSERT INTO product VALUES (default,"scooter","Big 2-wheel scooter ",5.18);
UPDATE product SET description='new water resistent white wind breaker', weight='0.5' WHERE id=110;
UPDATE product SET weight='5.17' WHERE id=111;
DELETE FROM product WHERE id=111;
UPDATE product SET weight='5.17' WHERE id=102 or id = 101;
DELETE FROM product WHERE id=102 or id = 103;           

先看看能不能正常讀取 Kafka 中的 maxwell json 資料。

select * from topic_products;           
Flink 源碼 | 自定義 Format 消費 Maxwell CDC 資料

可以看到,所有字段值都變成了 Update 之後的值,同時,被 Delete 的資料也沒有出現。

接着讓我們再将聚合資料寫入 Kafka。

insert into topic_sink select name,sum(weight) as sum_weight from topic_products group by name;           

在 Flink 叢集的 Web 頁面也能夠看到任務正确送出,接下來再讓我們把聚合資料查出來。

select * from topic_sink           
Flink 源碼 | 自定義 Format 消費 Maxwell CDC 資料

最後,讓我們查詢一下 MySQL 中的表,來驗證資料是否一緻;因為在 Flink 中,我們将 weight 字段定義成 Decimal(10,2),是以我們在查詢 MySQL 的時候,需要将 weight 字段進行類型轉換。

Flink 源碼 | 自定義 Format 消費 Maxwell CDC 資料

沒有問題,我們的 maxwell json 解析很成功。

寫在最後

根據筆者實作 maxwell-json format 的經驗,Flink 對于接口的定義、對于子產品職責的劃分還是很清晰的,是以實作一個自定義 CDC format 非常簡單(核心代碼隻有200多行)。是以,如果你是用的 OGG,或是自研的同步中間件,可以通過本文的思路快速實作一個 CDC format,一起解放你的 CDC 資料!

參考連結:

[1]

https://github.com/alibaba/canal

[2]

https://debezium.io/

[3]

https://maxwells-daemon.io/

更多 Flink 技術交流可加 Flink 社群釘釘交流群~

Flink 源碼 | 自定義 Format 消費 Maxwell CDC 資料