天天看點

Flink JDBC Connector:Flink 與資料庫內建最佳實踐

簡介: Flink 1.11 引入了 CDC,在此基礎上, JDBC Connector 也發生比較大的變化,本文由 Apache Flink Contributor,阿裡巴巴進階開發工程師徐榜江(雪盡)分享,主要介紹 Flink 1.11 JDBC Connector 的最佳實踐。

作者:徐榜江(雪盡)

整理:陳政羽(Flink 社群志願者)

摘要:Flink 1.11 引入了 CDC,在此基礎上, JDBC Connector 也發生比較大的變化,本文由 Apache Flink Contributor,阿裡巴巴進階開發工程師徐榜江(雪盡)分享,主要介紹 Flink 1.11 JDBC Connector 的最佳實踐。大綱如下:
  1. JDBC connector
  2. JDBC Catalog
  3. JDBC Dialect
  4. Demo
Tips:點選下方連結可檢視作者原版 PPT 及分享視訊: https://flink-learning.org.cn/developers/flink-training-course3/

JDBC-Connector 的重構

JDBC Connector 在 Flink 1.11 版本發生了比較大的變化,我們先從以下幾個 Feature 來具體了解一下 Flink 社群在這個版本上對 JDBC 所做的改進。

這個 issue 主要為 DataStream API 新增了 JdbcSink,對于使用 DataStream 程式設計的使用者會更加友善地把資料寫入到 JDBC;并且規範了一些命名規則,以前命名使用的是 JDBC 加上連接配接器名稱,目前命名規範為 Jdbc+ 連接配接器名稱

這個 issue 将 flink-jdbc 包名重命名為 flink-connector-jdbc,與 Flink 的其他 connector 統一,将所有接口和類從 org.apache.flink.java.io.jdbc(舊包)規範為新包路徑 org.apache.flink.connector.jdbc(新包),通過這種重命名使用者在對底層源代碼的閱讀上面會更加容易的了解和統一。

由于早期資料類型系統并不是很完善,導緻了比較多的 Connector 在使用上會經常報資料類型相關的異常,例如 DECIMAL 精度類型,在以往的 Flink 1.10 版本中有可能出現下圖問題:

Flink JDBC Connector:Flink 與資料庫內建最佳實踐

基于 FLIP-95 新的 TableSource 和 TableSink 在精度支援方面做了重構,目前資料精度方面的支援已經很完善了。

在 Flink 1.11 版本中,我們對 DDL 的 WITH 參數相對于 1.10 版本做了簡化,從使用者視角看上就是簡化和規範了參數,如表格所示:

Old Key (Flink 1.10) New Key (Flink1.11)
connector.type
connector.url url
connector.table table-name
connector.driver driver
connector.username username
connector.password password
connector.read.partition.column scan.partition.column
connector.read.partition.num scan.partition.num
connector.read.partition.lower-bound scan.partition.lower-bound
connector.read.partition.upper-bound scan.partition.upper-bound
connector.read.fetch-size scan.fetch-size
connector.lookup.cache.max-rows lookup.cache.max-rows
connector.lookup.cache.ttl lookup.cache.ttl
connector.lookup.max-retries lookup.max-retries
connector.write.flush.max-rows sink.buffer-flush.max-rows
connector.write.flush.interval sink.buffer-flush.interval
connector.write.max-retries sink.max-retries

大家可以看到表格中有 3 個标紅的地方,這個是相對于 1.10 有發生變化比較多的地方。這次 FLIP 希望進一步簡化連接配接器屬性,以便使屬性更加簡潔和可讀,并更好地與 FLIP-107 協作。如果需要了解更多的 Connector 參數可以進一步參考官方文檔和 FLIP-122 中提到的改變,這樣有助于從舊版本遷移到新版本并了解參數的變化。

Flink 1.10 存在某些 Query 無法推斷出主鍵導緻無法進行 Upsert 更新操作(如下圖所示錯誤)。是以在 FLIP-87 中為 Flink SQL 引入的 Primary Key 限制。Flink 的主鍵限制遵循 SQL 标準,主鍵限制分為 PRIMARY KEY NOT ENFORCED 和 PRIMARY KEY ENFORCED, ENFORCED 表示是否對資料進行校驗。我們常見資料庫的主鍵限制屬于 PRIMARY KEY ENFORCED,會對資料進行校驗。因為 Flink 并不持有資料,是以 Flink 支援的主鍵模式是 PRIMARY KEY NOT ENFORCED, 這意味着 Flink 不會校驗資料,而是由使用者確定主鍵的完整性。例如 HBase 裡面對應的主鍵應該是 RowKey,在 MySQL 中對應的主鍵是在使用者資料庫的表中對應的主鍵。

Flink JDBC Connector:Flink 與資料庫內建最佳實踐

目前 Flink 支援 Catalog 主要有 JDBC Catalog 和 Hive Catalog 。在關系資料庫中的表,如果要在 Flink 中使用,使用者需要手動寫表的 DDL,一旦表的 Schema 發生改變,使用者需要手動修改, 這是比較繁瑣的事情。JDBC Catalog 提供了接口用于連接配接到各種關系型資料庫,使得 Flink 能夠自動檢索表,不用使用者手動輸入和修改。目前 JDBC Catalog 内置目前實作了 Postgres Catalog。Postgres catalog 是一個 read-only (隻讀)的 Catalog,隻支援讀取 Postgres 表,支援的功能比較有限。下面代碼展示了目前 Postgres catalog 支援的 6 個功能:資料庫是否存在、資料庫清單、擷取資料庫、根據資料庫名擷取表清單、獲得表、表是否存在。

// The supported methods by Postgres Catalog.
PostgresCatalog.databaseExists(String databaseName)
PostgresCatalog.listDatabases()
PostgresCatalog.getDatabase(String databaseName)
PostgresCatalog.listTables(String databaseName)
PostgresCatalog.getTable(ObjectPath tablePath)
PostgresCatalog.tableExists(ObjectPath tablePath)      

如果需要支援其他 DB (如 MySQL),需要使用者根據 FLIP-93 的 JdbcCatalog 接口實作對應不同的 JDBC Catalog。

什麼是 Dialect?

Dialect (方言)對各個資料庫來說,Dialect 展現各個資料庫的特性,比如文法、資料類型等。如果需要檢視詳細的差異,可以點選這裡[6]檢視詳細差異。下面通過對比 MySQL 和 Postgres 的一些常見場景舉例:

Dialect MySQL Postgres 場景描述
Grammar(文法) LIMIT 0,30 WITH LIMIT 30 OFFSET 0 分頁
Data Type (資料類型) BINARY BYTEA,ARRAY 字段類型
Command (指令) show tables dt 檢視所有表

在資料類型上面,Flink SQL 的資料類型目前映射規則如下:

MySQL type PostgreSQL type Flink SQL type
TINYINT

SMALLINT

TINYINT UNSIGNED

INT2

SMALLSERIAL

SERIAL2

INT

MEDIUMINT

UNSIGNED

INTEGER

SERIAL

BIGINT BIGSERIAL
DECIMAL(20, 0)

Flink 目前支援三種 Dialect: Derby、MySQL、PostgreSQL,Derby 主要用于測試,更多的類型映射可以點選下方連結前往官方文檔檢視。

https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/connectors/jdbc.html#data-type-mapping

如何保證 Dialect Upsert 的幂等性?

如果定義了主鍵,JDBC 寫入時是能夠保證 Upsert 語義的, 如果 DB 不支援 Upsert 文法,則會退化成 DELETE + INSERT 語義。Upsert query 是原子執行的,可以保證幂等性。這個在官方文檔中也較長的描述了更新失敗或者存在故障時候如何做出的處理,下面的表格是不同的 DB 對應不同的 Upsert 文法:

Database Upsert Grammar
INSERT .. ON DUPLICATE KEY UPDATE ..
PostgreSQL INSERT .. ON CONFLICT .. DO UPDATE SET ..

如何自定義 Dialect?

目前如果要實作自定義 Dialect (比如 SQL Server、Oracle 等), 需要使用者自己實作對應 Dialect 修改源碼并重新打包 flink-connector-jdbc。社群正在讨論提供一種插件化 dialect 的機制, 讓使用者可以不用修改源碼打包就能實作自定義 Dialect,這個機制需要把 Dialect 接口暴露給使用者。目前的 Dialect 接口不夠清晰,沒有考慮 DataStream API 的使用場景,也沒有考慮到 一些複雜的 SQL 場景,是以這個接口目前不太穩定(後續版本會修改) 。

社群目前之是以沒有把這個 API 開放給使用者,是從使用者使用的體驗角度考慮,希望把這種頂級 API 設計得盡量穩定、簡潔後再開放出來給使用者使用,避免使用者在後續 Flink 版本的疊代中多次修改代碼。目前社群已經有相應的計劃去做了,大家可以留意 FLINK-16833[7] 提出的 JDBCDialect 插件化設計。

實踐 Demo

大家看完上述 Flink 1.11 在 JDBC 所做的改動後,大家可以嘗試下面這個關于商品表 CDC 同步和 ETL 的小案例,有助于了解 JDBC Catalog 和 CDC 的同步機制。

環境與版本:Flink 1.11.1、Docker、Kafka 1.11.1、MySQL Driver 5.1.48、PostgreSQL Driver 42.2.14

流程如下:

  1. Flink standalone 環境準備并在提供的位址下載下傳好對應的安裝包和 connector jar。
  2. 測試資料準備,通過拉起容器運作已經打包好的鏡像。其中 Kafka 中的 changelog 資料是通過 debezium connector 抓取的 MySQL orders表 的 binlog。
  3. 通過 SQL Client 編寫 SQL 作業,分别建立 Flink 訂單表,維表,使用者表,産品表,并建立 Function UDF。從 PG Catalog 擷取結果表資訊之後,把作業送出至叢集執行運作。
  4. 測試 CDC 資料同步和維表 join,通過新增訂單、修改訂單、删除訂單、維表資料更新等一系列操作驗證 CDC 在 Flink 上如何運作以及寫入結果表。
Flink JDBC Connector:Flink 與資料庫內建最佳實踐

上圖為業務流程整體圖,項目 Demo 位址:

https://github.com/leonardBang/flink-sql-etl

問答環節

**1.Flink SQL Client 上面執行的 use default,是使用哪個 catlog 呢?

**

答:Flink 内部有一個内置 Catlog,它把 meta 資料存于記憶體中。在 SQL Client 上沒有顯式指定 Hive catlog 或者 jdbc catlog 時會使用内置的 Catalog,剛剛的案例給大家示範的是 Postgres Catalog,裡面有結果表。在内置 Catlog 可以看到我們剛剛建立 Kafka 的表,MySQL 的次元表。

2.Flink MySQL DDL 連接配接 8 小時後就會自動斷開的問題是否已經解決?

這個問題會在 1.12 版本解決此問題,目前 master 分支已經合并,具體可以參考以下位址,描述了相關問題的讨論和解決辦法。

https://issues.apache.org/jira/browse/FLINK-16681

3.什麼是 CDC?能大概講下目前 Flink 支援的 CDC 嗎?

通過 Change Data Capture 機制(CDC)來将外部系統的動态資料(如 Mysql BinLog、Kafka Compacted Topic)導入 Flink,以及将 Flink 的 Update/Retract 流寫出到外部系統中是使用者一直希望的功能。

Flink 1.11 實作了對 CDC 資料讀取和寫出的支援。目前 Flink 可以支援 Debezium(Demo 中所用的工具) 和 Canal(阿裡巴巴開源同步工具) 兩種 CDC 格式。Debezium 在國外用得比較多,Canal 在國内用得比較多,兩者格式會有所差別,詳細可以參考官方使用文檔。

總結

本文從 JDBC Connector 的重構、資料精度、主鍵限制、命名規範等方面詳細介紹,分享了社群目前實作的 Postgres Catalog 功能點;介紹了 Flink 如何實作 JDBC Dialect 的統一以及目前社群對 Dialect 做的一些計劃;最後的實踐 Demo 環節示範了通過 SQL Client 進行維表 JOIN 和 ETL 操作以及解答了大家在實際生産中所遇到的問題,希望對大家進一步了解 Flink CDC 新功能有所幫助。

參考連結:

[1]

https://issues.apache.org/jira/browse/FLINK-15782

[2]

https://issues.apache.org/jira/browse/FLINK-17537

[3]

https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces

[4]

https://cwiki.apache.org/confluence/display/FLINK/FLIP-122%3A+New+Connector+Property+Keys+for+New+Factory

[5]

https://cwiki.apache.org/confluence/display/FLINK/FLIP+87%3A+Primary+key+constraints+in+Table+API

[6]

https://www.postgresqltutorial.com/postgresql-vs-mysql/

[7]

https://issues.apache.org/jira/browse/FLINK-16833