天天看點

基于 Flink SQL CDC 的實時資料同步方案

傳統的資料同步方案與 Flink SQL CDC 解決方案

業務系統經常會遇到需要更新資料到多個存儲的需求。例如:一個訂單系統剛剛開始隻需要寫入資料庫即可完成業務使用。某天 BI 團隊期望對資料庫做全文索引,于是我們同時要寫多一份資料到 ES 中,改造後一段時間,又有需求需要寫入到 Redis 緩存中。

基于 Flink SQL CDC 的實時資料同步方案

很明顯這種模式是不可持續發展的,這種雙寫到各個資料存儲系統中可能導緻不可維護和擴充,資料一緻性問題等,需要引入分布式事務,成本和複雜度也随之增加。我們可以通過 CDC(Change Data Capture)工具進行解除耦合,同步到下遊需要同步的存儲系統。通過這種方式提高系統的穩健性,也友善後續的維護。

基于 Flink SQL CDC 的實時資料同步方案

Flink SQL CDC 資料同步與原了解析

CDC 全稱是 Change Data Capture ,它是一個比較廣義的概念,隻要能捕獲變更的資料,我們都可以稱為 CDC 。業界主要有基于查詢的 CDC 和基于日志的 CDC ,可以從下面表格對比他們功能和差異點。

基于 Flink SQL CDC 的實時資料同步方案

經過以上對比,我們可以發現基于日志 CDC 有以下這幾種優勢:

· 能夠捕獲所有資料的變化,捕獲完整的變更記錄。在異地容災,資料備份等場景中得到廣泛應用,如果是基于查詢的 CDC 有可能導緻兩次查詢的中間一部分資料丢失

· 每次 DML 操作均有記錄無需像查詢 CDC 這樣發起全表掃描進行過濾,擁有更高的效率和性能,具有低延遲,不增加資料庫負載的優勢

· 無需入侵業務,業務解耦,無需更改業務模型

· 捕獲删除事件和捕獲舊記錄的狀态,在查詢 CDC 中,周期的查詢無法感覺中間資料是否删除

基于 Flink SQL CDC 的實時資料同步方案

基于日志的 CDC 方案介紹

從 ETL 的角度進行分析,一般采集的都是業務庫資料,這裡使用 MySQL 作為需要采集的資料庫,通過 Debezium 把 MySQL Binlog 進行采集後發送至 Kafka 消息隊列,然後對接一些實時計算引擎或者 APP 進行消費後把資料傳輸入 OLAP 系統或者其他存儲媒體。

Flink 希望打通更多資料源,發揮完整的計算能力。我們生産中主要來源于業務日志和資料庫日志,Flink 在業務日志的支援上已經非常完善,但是在資料庫日志支援方面在 Flink 1.11 前還屬于一片空白,這就是為什麼要內建 CDC 的原因之一。

Flink SQL 内部支援了完整的 changelog 機制,是以 Flink 對接 CDC 資料隻需要把CDC 資料轉換成 Flink 認識的資料,是以在 Flink 1.11 裡面重構了 TableSource 接口,以便更好支援和內建 CDC。

基于 Flink SQL CDC 的實時資料同步方案
基于 Flink SQL CDC 的實時資料同步方案

重構後的 TableSource 輸出的都是 RowData 資料結構,代表了一行的資料。在RowData 上面會有一個中繼資料的資訊,我們稱為 RowKind 。RowKind 裡面包括了插入、更新前、更新後、删除,這樣和資料庫裡面的 binlog 概念十分類似。通過 Debezium 采集的 JSON 格式,包含了舊資料和新資料行以及原資料資訊,op 的 u表示是 update 更新操作辨別符,ts_ms 表示同步的時間戳。是以,對接 Debezium JSON 的資料,其實就是将這種原始的 JSON 資料轉換成 Flink 認識的 RowData。

選擇 Flink 作為 ETL 工具

當選擇 Flink 作為 ETL 工具時,在資料同步場景,如下圖同步結構:

基于 Flink SQL CDC 的實時資料同步方案

通過 Debezium 訂閱業務庫 MySQL 的 Binlog 傳輸至 Kafka ,Flink 通過建立 Kafka 表指定 format 格式為 debezium-json ,然後通過 Flink 進行計算後或者直接插入到其他外部資料存儲系統,例如圖中的 Elasticsearch 和 PostgreSQL。

基于 Flink SQL CDC 的實時資料同步方案

但是這個架構有個缺點,我們可以看到采集端元件過多導緻維護繁雜,這時候就會想是否可以用 Flink SQL 直接對接 MySQL 的 binlog 資料呢,有沒可以替代的方案呢?

答案是有的!經過改進後結構如下圖:

基于 Flink SQL CDC 的實時資料同步方案

社群開發了 flink-cdc-connectors 元件,這是一個可以直接從 MySQL、PostgreSQL 等資料庫直接讀取全量資料和增量變更資料的 source 元件。目前也已開源,開源位址:

https://github.com/ververica/flink-cdc-connectors

flink-cdc-connectors 可以用來替換 Debezium+Kafka 的資料采集子產品,進而實作 Flink SQL 采集+計算+傳輸(ETL)一體化,這樣做的優點有以下:

· 開箱即用,簡單易上手

· 減少維護的元件,簡化實時鍊路,減輕部署成本

· 減小端到端延遲

· Flink 自身支援 Exactly Once 的讀取和計算

· 資料不落地,減少存儲成本

· 支援全量和增量流式讀取

· binlog 采集位點可回溯*

基于 Flink SQL CDC 的資料同步方案實踐

下面給大家帶來 3 個關于 Flink SQL + CDC 在實際場景中使用較多的案例。在完成實驗時候,你需要 Docker、MySQL、Elasticsearch 等元件,具體請參考每個案例參考文檔。

案例 1 : Flink SQL CDC + JDBC Connector

這個案例通過訂閱我們訂單表(事實表)資料,通過 Debezium 将 MySQL Binlog 發送至 Kafka,通過維表 Join 和 ETL 操作把結果輸出至下遊的 PG 資料庫。具體可以參考 Flink 公衆号文章:《Flink JDBC Connector:Flink 與資料庫內建最佳實踐》案例進行實踐操作。

https://www.bilibili.com/video/BV1bp4y1q78d

基于 Flink SQL CDC 的實時資料同步方案

案例 2 : CDC Streaming ETL

模拟電商公司的訂單表和物流表,需要對訂單資料進行統計分析,對于不同的資訊需要進行關聯後續形成訂單的大寬表後,交給下遊的業務方使用 ES 做資料分析,這個案例示範了如何隻依賴 Flink 不依賴其他元件,借助 Flink 強大的計算能力實時把 Binlog 的資料流關聯一次并同步至 ES 。

基于 Flink SQL CDC 的實時資料同步方案

例如如下的這段 Flink SQL 代碼就能完成實時同步 MySQL 中 orders 表的全量+增量資料的目的。

CREATE TABLE orders (
  order_id INT,
  order_date TIMESTAMP(0),
  customer_name STRING,
  price DECIMAL(10, 5),
  product_id INT,
  order_status BOOLEAN
) WITH (
  'connector' = 'mysql-cdc',
  'hostname' = 'localhost',
  'port' = '3306',
  'username' = 'root',
  'password' = '123456',
  'database-name' = 'mydb',
  'table-name' = 'orders'
);

SELECT * FROM orders      

為了讓讀者更好地上手和了解,我們還提供了 docker-compose 的測試環境,更詳細的案例教程請參考下文的視訊連結和文檔連結。

視訊連結:

https://www.bilibili.com/video/BV1zt4y1D7kt

文檔教程:

https://github.com/ververica/flink-cdc-connectors/wiki/中文教程

案例 3 : Streaming Changes to Kafka

下面案例就是對 GMV 進行天級别的全站統計。包含插入/更新/删除,隻有付款的訂單才能計算進入 GMV ,觀察 GMV 值的變化。

基于 Flink SQL CDC 的實時資料同步方案

Flink SQL CDC 的更多應用場景

Flink SQL CDC 不僅可以靈活地應用于實時資料同步場景中,還可以打通更多的場景提供給使用者選擇。

Flink 在資料同步場景中的靈活定位

· 如果你已經有 Debezium/Canal + Kafka 的采集層 (E),可以使用 Flink 作為計算層 (T) 和傳輸層 (L)

· 也可以用 Flink 替代 Debezium/Canal ,由 Flink 直接同步變更資料到 Kafka,Flink 統一 ETL 流程

· 如果不需要 Kafka 資料緩存,可以由 Flink 直接同步變更資料到目的地,Flink 統一 ETL 流程

Flink SQL CDC : 打通更多場景

· 實時資料同步,資料備份,資料遷移,數倉建構

優勢:豐富的上下遊(E & L),強大的計算(T),易用的 API(SQL),流式計算低延遲

· 資料庫之上的實時物化視圖、流式資料分析

· 索引建構和實時維護

· 業務 cache 重新整理

· 審計跟蹤

· 微服務的解耦,讀寫分離

· 基于 CDC 的維表關聯

下面介紹一下為何用 CDC 的維表關聯會比基于查詢的維表查詢快。

■ 基于查詢的維表關聯

基于 Flink SQL CDC 的實時資料同步方案

目前維表查詢的方式主要是通過 Join 的方式,資料從消息隊列進來後通過向資料庫發起 IO 的請求,由資料庫把結果傳回後合并再輸出到下遊,但是這個過程無可避免的産生了 IO 和網絡通信的消耗,導緻吞吐量無法進一步提升,就算使用一些緩存機制,但是因為緩存更新不及時可能會導緻精确性也沒那麼高。

■ 基于 CDC 的維表關聯

基于 Flink SQL CDC 的實時資料同步方案

我們可以通過 CDC 把維表的資料導入到維表 Join 的狀态裡面,在這個 State 裡面因為它是一個分布式的 State ,裡面儲存了 Database 裡面實時的資料庫維表鏡像,當消息隊列資料過來時候無需再次查詢遠端的資料庫了,直接查詢本地磁盤的 State ,避免了 IO 操作,實作了低延遲、高吞吐,更精準。

Tips:目前此功能在 1.12 版本的規劃中,具體進度請關注 FLIP-132 。

未來規劃

· FLIP-132 :Temporal Table DDL(基于 CDC 的維表關聯)

· Upsert 資料輸出到 Kafka

· 更多的 CDC formats 支援(debezium-avro, OGG, Maxwell)

· 批模式支援處理 CDC 資料

· flink-cdc-connectors 支援更多資料庫

總結