天天看點

Flink CDC 2.0 詳細介紹

簡介: Flink CDC 2.0.0 版本于 8 月 10 日正式釋出,點選了解詳情~

本文由社群志願者-陳政羽(本人)整理,内容來源自阿裡巴巴進階開發工程師徐榜江 (雪盡) 7 月 10 日在北京站 Flink Meetup 分享的《詳解 Flink-CDC》。深入講解了最新釋出的 Flink CDC 2.0.0 版本帶來的核心特性,包括:全量資料的并發讀取、checkpoint、無鎖讀取等重大改進。

GitHub 位址:

https://github.com/ververica/flink-cdc-connectors

一、CDC 概述

CDC 的全稱是 Change Data Capture ,在廣義的概念上,隻要是能捕獲資料變更的技術,我們都可以稱之為 CDC 。目前通常描述的 CDC 技術主要面向資料庫的變更,是一種用于捕獲資料庫中資料變更的技術。CDC 技術的應用場景非常廣泛:

  • 資料同步:用于備份,容災;
  • 資料分發:一個資料源分發給多個下遊系統;
  • 資料采集:面向資料倉庫 / 資料湖的 ETL 資料內建,是非常重要的資料源。

CDC 的技術方案非常多,目前業界主流的實作機制可以分為兩種:

  • 基于查詢的 CDC:
    • 離線排程查詢作業,批處理。把一張表同步到其他系統,每次通過查詢去擷取表中最新的資料;
    • 無法保障資料一緻性,查的過程中有可能資料已經發生了多次變更;
    • 不保障實時性,基于離線排程存在天然的延遲。
  • 基于日志的 CDC:
    • 實時消費日志,流處理,例如 MySQL 的 binlog 日志完整記錄了資料庫中的變更,可以把 binlog 檔案當作流的資料源;
    • 保障資料一緻性,因為 binlog 檔案包含了所有曆史變更明細;
    • 保障實時性,因為類似 binlog 的日志檔案是可以流式消費的,提供的是實時資料。

對比常見的開源 CDC 方案,我們可以發現:

Flink CDC 2.0 詳細介紹
  • 對比增量同步能力,
    • 基于日志的方式,可以很好的做到增量同步;
    • 而基于查詢的方式是很難做到增量同步的。
  • 對比全量同步能力,基于查詢或者日志的 CDC 方案基本都支援,除了 Canal。
  • 而對比全量 + 增量同步的能力,隻有 Flink CDC、Debezium、Oracle Goldengate 支援較好。
  • 從架構角度去看,該表将架構分為單機和分布式,這裡的分布式架構不單純展現在資料讀取能力的水準擴充上,更重要的是在大資料場景下分布式系統接入能力。例如 Flink CDC 的資料入湖或者入倉的時候,下遊通常是分布式的系統,如 Hive、HDFS、Iceberg、Hudi 等,那麼從對接入分布式系統能力上看,Flink CDC 的架構能夠很好地接入此類系統。
  • 在資料轉換 / 資料清洗能力上,當資料進入到 CDC 工具的時候是否能較友善的對資料做一些過濾或者清洗,甚至聚合?
    • 在 Flink CDC 上操作相當簡單,可以通過 Flink SQL 去操作這些資料;
    • 但是像 DataX、Debezium 等則需要通過腳本或者模闆去做,是以使用者的使用門檻會比較高。
  • 另外,在生态方面,這裡指的是下遊的一些資料庫或者資料源的支援。Flink CDC 下遊有豐富的 Connector,例如寫入到 TiDB、MySQL、Pg、HBase、Kafka、ClickHouse 等常見的一些系統,也支援各種自定義 connector。

二、Flink CDC 項目

講到這裡,先帶大家回顧下開發 Flink CDC 項目的動機。

1. Dynamic Table & ChangeLog Stream

大家都知道 Flink 有兩個基礎概念:Dynamic Table 和 Changelog Stream。

Flink CDC 2.0 詳細介紹
  • Dynamic Table 就是 Flink SQL 定義的動态表,動态表和流的概念是對等的。參照上圖,流可以轉換成動态表,動态表也可以轉換成流。
  • 在 Flink SQL中,資料在從一個算子流向另外一個算子時都是以 Changelog Stream 的形式,任意時刻的 Changelog Stream 可以翻譯為一個表,也可以翻譯為一個流。

聯想下 MySQL 中的表和 binlog 日志,就會發現:MySQL 資料庫的一張表所有的變更都記錄在 binlog 日志中,如果一直對表進行更新,binlog 日志流也一直會追加,資料庫中的表就相當于 binlog 日志流在某個時刻點物化的結果;日志流就是将表的變更資料持續捕獲的結果。這說明 Flink SQL 的 Dynamic Table 是可以非常自然地表示一張不斷變化的 MySQL 資料庫表。

Flink CDC 2.0 詳細介紹

在此基礎上,我們調研了一些 CDC 技術,最終選擇了 Debezium 作為 Flink CDC 的底層采集工具。Debezium 支援全量同步,也支援增量同步,也支援全量 + 增量的同步,非常靈活,同時基于日志的 CDC 技術使得提供 Exactly-Once 成為可能。

将 Flink SQL 的内部資料結構 RowData 和 Debezium 的資料結構進行對比,可以發現兩者是非常相似的。

  • 每條 RowData 都有一個中繼資料 RowKind,包括 4 種類型, 分别是插入 (INSERT)、更新前鏡像 (UPDATE_BEFORE)、更新後鏡像 (UPDATE_AFTER)、删除 (DELETE),這四種類型和資料庫裡面的 binlog 概念保持一緻。
  • 而 Debezium 的資料結構,也有一個類似的中繼資料 op 字段, op 字段的取值也有四種,分别是 c、u、d、r,各自對應 create、update、delete、read。對于代表更新操作的 u,其資料部分同時包含了前鏡像 (before) 和後鏡像 (after)。

通過分析兩種資料結構,Flink 和 Debezium 兩者的底層資料是可以非常友善地對接起來的,大家可以發現 Flink 做 CDC 從技術上是非常合适的。

2. 傳統 CDC ETL 分析

我們來看下傳統 CDC 的 ETL 分析鍊路,如下圖所示:

Flink CDC 2.0 詳細介紹

傳統的基于 CDC 的 ETL 分析中,資料采集工具是必須的,國外使用者常用 Debezium,國内使用者常用阿裡開源的 Canal,采集工具負責采集資料庫的增量資料,一些采集工具也支援同步全量資料。采集到的資料一般輸出到消息中間件如 Kafka,然後 Flink 計算引擎再去消費這一部分資料寫入到目的端,目的端可以是各種 DB,資料湖,實時數倉和離線數倉。 

注意,Flink 提供了 changelog-json format,可以将 changelog 資料寫入離線數倉如 Hive / HDFS;對于實時數倉,Flink 支援将 changelog 通過 upsert-kafka connector 直接寫入 Kafka。

Flink CDC 2.0 詳細介紹

我們一直在思考是否可以使用 Flink CDC 去替換上圖中虛線框内的采集元件和消息隊列,進而簡化分析鍊路,降低維護成本。同時更少的元件也意味着資料時效性能夠進一步提高。答案是可以的,于是就有了我們基于 Flink CDC 的 ETL 分析流程。

3. 基于 Flink CDC 的 ETL 分析

在使用了 Flink CDC 之後,除了元件更少,維護更友善外,另一個優勢是通過 Flink SQL 極大地降低了使用者使用門檻,可以看下面的例子:

Flink CDC 2.0 詳細介紹
Flink CDC 2.0 詳細介紹

該例子是通過 Flink CDC 去同步資料庫資料并寫入到 TiDB,使用者直接使用 Flink SQL 建立了産品和訂單的 MySQL-CDC 表,然後對資料流進行 JOIN 加工,加工後直接寫入到下遊資料庫。通過一個 Flink SQL 作業就完成了 CDC 的資料分析,加工和同步。

大家會發現這是一個純 SQL 作業,這意味着隻要會 SQL 的 BI,業務線同學都可以完成此類工作。與此同時,使用者也可以利用 Flink SQL 提供的豐富文法進行資料清洗、分析、聚合。

Flink CDC 2.0 詳細介紹

而這些能力,對于現有的 CDC 方案來說,進行資料的清洗,分析和聚合是非常困難的。

此外,利用 Flink SQL 雙流 JOIN、維表 JOIN、UDTF 文法可以非常容易地完成資料打寬,以及各種業務邏輯加工。

Flink CDC 2.0 詳細介紹

4. Flink CDC 項目發展

  • 2020 年 7 月由雲邪送出了第一個 commit,這是基于個人興趣孵化的項目;
  • 2020 年 7 中旬支援了 MySQL-CDC;
  • 2020 年 7 月末支援了 Postgres-CDC;
  • 一年的時間,該項目在 GitHub 上的 star 數已經超過 800。
Flink CDC 2.0 詳細介紹

三、Flink CDC 2.0 詳解

1. Flink CDC 痛點

MySQL CDC 是 Flink CDC 中使用最多也是最重要的 Connector,本文下述章節描述 Flink CDC Connector 均為 MySQL CDC Connector。

随着 Flink CDC 項目的發展,得到了很多使用者在社群的回報,主要歸納為三個:

Flink CDC 2.0 詳細介紹
  • 全量 + 增量讀取的過程需要保證所有資料的一緻性,是以需要通過加鎖保證,但是加鎖在資料庫層面上是一個十分高危的操作。底層 Debezium 在保證資料一緻性時,需要對讀取的庫或表加鎖,全局鎖可能導緻資料庫鎖住,表級鎖會鎖住表的讀,DBA 一般不給鎖權限。
  • 不支援水準擴充,因為 Flink CDC 底層是基于 Debezium,起架構是單節點,是以Flink CDC 隻支援單并發。在全量階段讀取階段,如果表非常大 (億級别),讀取時間在小時甚至天級别,使用者不能通過增加資源去提升作業速度。
  • 全量讀取階段不支援 checkpoint:CDC 讀取分為兩個階段,全量讀取和增量讀取,目前全量讀取階段是不支援 checkpoint 的,是以會存在一個問題:當我們同步全量資料時,假設需要 5 個小時,當我們同步了 4 小時的時候作業失敗,這時候就需要重新開始,再讀取 5 個小時。

2. Debezium 鎖分析

Flink CDC 底層封裝了 Debezium, Debezium 同步一張表分為兩個階段:

  • 全量階段:查詢目前表中所有記錄;
  • 增量階段:從 binlog 消費變更資料。

大部分使用者使用的場景都是全量 + 增量同步,加鎖是發生在全量階段,目的是為了确定全量階段的初始位點,保證增量 + 全量實作一條不多,一條不少,進而保證資料一緻性。從下圖中我們可以分析全局鎖和表鎖的一些加鎖流程,左邊紅色線條是鎖的生命周期,右邊是 MySQL 開啟可重複讀事務的生命周期。

Flink CDC 2.0 詳細介紹

以全局鎖為例,首先是擷取一個鎖,然後再去開啟可重複讀的事務。這裡鎖住操作是讀取 binlog 的起始位置和目前表的 schema。這樣做的目的是保證 binlog 的起始位置和讀取到的目前 schema 是可以對應上的,因為表的 schema 是會改變的,比如如删除列或者增加列。在讀取這兩個資訊後,SnapshotReader 會在可重複讀事務裡讀取全量資料,在全量資料讀取完成後,會啟動 BinlogReader 從讀取的 binlog 起始位置開始增量讀取,進而保證全量資料 + 增量資料的無縫銜接。

表鎖是全局鎖的退化版,因為全局鎖的權限會比較高,是以在某些場景,使用者隻有表鎖。表鎖鎖的時間會更長,因為表鎖有個特征:鎖提前釋放了可重複讀的事務預設會送出,是以鎖需要等到全量資料讀完後才能釋放。

經過上面分析,接下來看看這些鎖到底會造成怎樣嚴重的後果:

Flink CDC 2.0 詳細介紹

Flink CDC 1.x 可以不加鎖,能夠滿足大部分場景,但犧牲了一定的資料準确性。Flink CDC 1.x 預設加全局鎖,雖然能保證資料一緻性,但存在上述 hang 住資料的風險。

3. Flink CDC 2.0 設計 ( 以 MySQL 為例)

通過上面的分析,可以知道 2.0 的設計方案,核心要解決上述的三個問題,即支援無鎖、水準擴充、checkpoint。

Flink CDC 2.0 詳細介紹

DBlog 這篇論文裡描述的無鎖算法如下圖所示:

Flink CDC 2.0 詳細介紹

左邊是 Chunk 的切分算法描述,Chunk 的切分算法其實和很多資料庫的分庫分表原理類似,通過表的主鍵對表中的資料進行分片。假設每個 Chunk 的步長為 10,按照這個規則進行切分,隻需要把這些 Chunk 的區間做成左開右閉或者左閉右開的區間,保證銜接後的區間能夠等于表的主鍵區間即可。

右邊是每個 Chunk 的無鎖讀算法描述,該算法的核心思想是在劃分了 Chunk 後,對于每個 Chunk 的全量讀取和增量讀取,在不用鎖的條件下完成一緻性的合并。Chunk 的切分如下圖所示:

Flink CDC 2.0 詳細介紹

因為每個 chunk 隻負責自己主鍵範圍内的資料,不難推導,隻要能夠保證每個 Chunk 讀取的一緻性,就能保證整張表讀取的一緻性,這便是無鎖算法的基本原理。

Netflix 的 DBLog 論文中 Chunk 讀取算法是通過在 DB 維護一張信号表,再通過信号表在 binlog 檔案中打點,記錄每個 chunk 讀取前的 Low Position (低位點) 和讀取結束之後 High Position (高位點) ,在低位點和高位點之間去查詢該 Chunk 的全量資料。在讀取出這一部分 Chunk 的資料之後,再将這 2 個位點之間的 binlog 增量資料合并到 chunk 所屬的全量資料,進而得到高位點時刻,該 chunk 對應的全量資料。

Flink CDC 結合自身的情況,在 Chunk 讀取算法上做了去信号表的改進,不需要額外維護信号表,通過直接讀取 binlog 位點替代在 binlog 中做标記的功能,整體的 chunk 讀算法描述如下圖所示:

Flink CDC 2.0 詳細介紹

比如正在讀取 Chunk-1,Chunk 的區間是 [K1, K10],首先直接将該區間内的資料 select 出來并把它存在 buffer 中,在 select 之前記錄 binlog 的一個位點 (低位點),select 完成後記錄 binlog 的一個位點 (高位點)。然後開始增量部分,消費從低位點到高位點的 binlog。

  • 圖中的 - ( k2,100 ) + ( k2,108 ) 記錄表示這條資料的值從 100 更新到 108;
  • 第二條記錄是删除 k3;
  • 第三條記錄是更新 k2 為 119;
  • 第四條記錄是 k5 的資料由原來的 77 變更為 100。

觀察圖檔中右下角最終的輸出,會發現在消費該 chunk 的 binlog 時,出現的 key 是k2、k3、k5,我們前往 buffer 将這些 key 做标記。

  • 對于 k1、k4、k6、k7 來說,在高位點讀取完畢之後,這些記錄沒有變化過,是以這些資料是可以直接輸出的;
  • 對于改變過的資料,則需要将增量的資料合并到全量的資料中,隻保留合并後的最終資料。例如,k2 最終的結果是 119 ,那麼隻需要輸出 +(k2,119),而不需要中間發生過改變的資料。

通過這種方式,Chunk 最終的輸出就是在高位點是 chunk 中最新的資料。

上圖描述的是單個 Chunk 的一緻性讀,但是如果有多個表分了很多不同的 Chunk,且這些 Chunk 分發到了不同的 task 中,那麼如何分發 Chunk 并保證全局一緻性讀呢?

這個就是基于 FLIP-27 來優雅地實作的,通過下圖可以看到有 SourceEnumerator 的元件,這個元件主要用于 Chunk 的劃分,劃分好的 Chunk 會提供給下遊的 SourceReader 去讀取,通過把 chunk 分發給不同的 SourceReader 便實作了并發讀取 Snapshot Chunk 的過程,同時基于 FLIP-27 我們能較為友善地做到 chunk 粒度的 checkpoint。

Flink CDC 2.0 詳細介紹

當 Snapshot Chunk 讀取完成之後,需要有一個彙報的流程,如下圖中橘色的彙報資訊,将 Snapshot Chunk 完成資訊彙報給 SourceEnumerator。

Flink CDC 2.0 詳細介紹

彙報的主要目的是為了後續分發 binlog chunk (如下圖)。因為 Flink CDC 支援全量 + 增量同步,是以當所有 Snapshot Chunk 讀取完成之後,還需要消費增量的 binlog,這是通過下發一個 binlog chunk 給任意一個 Source Reader 進行單并發讀取實作的。

Flink CDC 2.0 詳細介紹

對于大部分使用者來講,其實無需過于關注如何無鎖算法和分片的細節,了解整體的流程就好。

整體流程可以概括為,首先通過主鍵對表進行 Snapshot Chunk 劃分,再将 Snapshot Chunk 分發給多個 SourceReader,每個 Snapshot Chunk 讀取時通過算法實作無鎖條件下的一緻性讀,SourceReader 讀取時支援 chunk 粒度的 checkpoint,在所有 Snapshot Chunk 讀取完成後,下發一個 binlog chunk 進行增量部分的 binlog 讀取,這便是 Flink CDC 2.0 的整體流程,如下圖所示:

Flink CDC 2.0 詳細介紹

Flink CDC 是一個完全開源的項目,項目所有設計和源碼目前都已貢獻到開源社群,

Flink CDC 2.0

也已經正式釋出,此次的核心改進和提升包括:

  • 提供 MySQL CDC 2.0,核心feature 包括
    • 并發讀取,全量資料的讀取性能可以水準擴充;
    • 全程無鎖,不對線上業務産生鎖的風險;
    • 斷點續傳,支援全量階段的 checkpoint。
  • 搭建文檔網站,提供多版本文檔支援,文檔支援關鍵詞搜尋

筆者用 TPC-DS 資料集中的 customer 表進行了測試,Flink 版本是 1.13.1,customer 表的資料量是 6500 萬條,Source 并發為 8,全量讀取階段:

  • MySQL CDC 2.0 用時 13 分鐘;
  • MySQL CDC 1.4 用時 89 分鐘;
  • 讀取性能提升 6.8 倍。

為了提供更好的文檔支援,Flink CDC 社群搭建了文檔網站,網站支援對文檔的版本管理:

Flink CDC 2.0 詳細介紹

文檔網站支援關鍵字搜尋功能,非常實用:

Flink CDC 2.0 詳細介紹

四、未來規劃

Flink CDC 2.0 詳細介紹

關于 CDC 項目的未來規劃,我們希望圍繞穩定性,進階 feature 和生态內建三個方面展開。

  • 穩定性
    • 通過社群的方式吸引更多的開發者,公司的開源力量提升 Flink CDC 的成熟度;
    • 支援 Lazy Assigning。Lazy Assigning 的思路是将 chunk 先劃分一批,而不是一次性進行全部劃分。目前 Source Reader 對資料讀取進行分片是一次性全部劃分好所有 chunk,例如有 1 萬個 chunk,可以先劃分 1 千個 chunk,而不是一次性全部劃分,在 SourceReader 讀取完 1 千 chunk 後再繼續劃分,節約劃分 chunk 的時間。
  • 進階 Feature
    • 支援 Schema Evolution。這個場景是:當同步資料庫的過程中,突然在表中添加了一個字段,并且希望後續同步下遊系統的時候能夠自動加入這個字段;
    • 支援 Watermark Pushdown 通過 CDC 的 binlog 擷取到一些心跳資訊,這些心跳的資訊可以作為一個 Watermark,通過這個心跳資訊可以知道到這個流目前消費的一些進度;
    • 支援 META 資料,分庫分表的場景下,有可能需要中繼資料知道這條資料來源哪個庫哪個表,在下遊系統入湖入倉可以有更多的靈活操作;
    • 整庫同步:使用者要同步整個資料庫隻需一行 SQL 文法即可完成,而不用每張表定義一個 DDL 和 query。
  • 生态內建
    • 內建更多上遊資料庫,如 Oracle,MS SqlServer。Cloudera 目前正在積極貢獻 oracle-cdc connector;
    • 在入湖層面,Hudi 和 Iceberg 寫入上有一定的優化空間,例如在高 QPS 入湖的時候,資料分布有比較大的性能影響,這一點可以通過與生态打通和內建繼續優化。

附錄

[1]

Flink-CDC 項目位址

[2]

Flink-CDC 文檔網站

[3]

Percona - MySQL 全局鎖時間分析

[4]

DBLog - 無鎖算法論文

[5]

Flink FLIP-27 設計文檔

各位我的個人微信将很快開通重新維護,歡迎大家後續持續關注我的個人微信,将會釋出更多Flink社群文章和Flink相關知識

Flink CDC 2.0 詳細介紹