天天看點

Flink SQL CDC 上線!我們總結了 13 條生産實踐經驗

Flink SQL CDC 上線!我們總結了 13 條生産實踐經驗

摘要: 7月,Flink 1.11 新版釋出,在生态及易用性上有大幅提升,其中 Table & SQL 開始支援 Change Data Capture(CDC)。CDC 被廣泛使用在複制資料、更新緩存、微服務間同步資料、審計日志等場景,本文由社群由曾慶東同學分享,主要介紹 Flink SQL CDC 在生産環境的落地實踐以及總結的實戰經驗,文章分為以下幾部分:

  1. 項目背景
  2. 解決方案
  3. 項目運作環境與現狀
  4. 具體實作
  5. 踩過的坑和學到的經驗
  6. 總結

Tips: 點選下方連結可檢視社群直播的 Flink SQL CDC 相關視訊~

https://flink-learning.org.cn/developers/flink-training-course3/

01 項目背景

本人目前參與的項目屬于公司裡面資料密集、計算密集的一個重要項目,需要提供高效且準确的 OLAP 服務,提供靈活且實時的報表。業務資料存儲在 MySQL 中,通過主從複制同步到報表庫。作為集團級公司,資料增長多而且快,出現了多個千萬級、億級的大表。為了實作各個次元的各種複雜的報表業務,有些千萬級大表仍然需要進行 Join,計算規模非常驚人,經常不能及時響應請求。随着資料量的日益增長和實時分析的需求越來越大,急需對系統進行流式計算、實時化改造。正是在這個背景下,開始了我們與 Flink SQL CDC 的故事。

02 解決方案

針對平台現在存在的問題,我們提出了把報表的資料實時化的方案。該方案主要通過 Flink SQL CDC + Elasticsearch 實作。Flink SQL 支援 CDC 模式的資料同步,将 MySQL 中的全增量資料實時地采集、預計算、并同步到 Elasticsearch 中,Elasticsearch 作為我們的實時報表和即席分析引擎。項目整體架構圖如下所示:

Flink SQL CDC 上線!我們總結了 13 條生産實踐經驗

實時報表實作具體思路是,使用 Flink CDC 讀取全量資料,全量資料同步完成後,Flink CDC 會無縫切換至 MySQL 的 binlog 位點繼續消費增量的變更資料,且保證不會多消費一條也不會少消費一條。讀取到的賬單和訂單的全增量資料會與産品表做關聯補全資訊,并做一些預聚合,然後将聚合結果輸出到 Elasticsearch,前端頁面隻需要到 Elasticsearch 通過精準比對(terms)查找資料,或者再使用 agg 做高維聚合統計得到多個服務中心的報表資料。從整體架構中,可以看到,Flink SQL 及其 CDC 功能在我們的架構中扮演着核心角色。我們采用 Flink SQL CDC,而不是 Canal + Kafka 的傳統架構,主要原因還是因為其依賴元件少,維護成本低,開箱即用,上手容易。具體來說 Flink SQL CDC 是一個集采集、計算、傳輸于一體的工具,其吸引我們的優點有:① 減少維護的元件、簡化實作鍊路; ② 減少端到端延遲; ③ 減輕維護成本和開發成本; ④ 支援 Exactly Once 的讀取和計算(由于我們是賬務系統,是以資料一緻性非常重要); ⑤ 資料不落地,減少存儲成本; ⑥ 支援全量和增量流式讀取;有關 Flink SQL CDC 的介紹和教程,可以觀看 Apache Flink 社群釋出的相關視訊:https://www.bilibili.com/video/BV1zt4y1D7kt/項目使用的是 flink-cdc-connectors 中提供的 mysql-cdc 元件。這是一個 Flink 資料源,支援對 MySQL 資料庫的全量和增量讀取。它在掃描全表前會先加一個全局讀鎖,然後擷取此時的 binlog position,緊接着釋放全局讀鎖。随後開始掃描全表,當全表快照讀取完後,會從之前擷取的 binlog position 擷取增量的變更記錄。是以這個讀鎖是非常輕量的,持鎖時間非常短,不會對線上業務造成太大影響。更多資訊可以參考 flink-cdc-connectors 項目官網: https://github.com/ververica/flink-cdc-connectors。

03 項目運作環境與現狀

我們在生産環境搭建了 Hadoop + Flink + Elasticsearch 分布式環境,采用的 Flink on YARN 的 per-job 模式運作,使用 RocksDB 作為 state backend,HDFS 作為 checkpoint 持久化位址,并且做好了 HDFS 的容錯,保證 checkpoint 資料不丢失。我們使用 SQL Client 送出作業,所有作業統一使用純 SQL,沒有寫一行 Java 代碼。目前已上線了 3 個基于 Flink CDC 的作業,已穩定線上上運作了兩個星期,并且業務産生的訂單實收和賬單實收資料能實時聚合輸出到 Elasticsearch,輸出的資料準确無誤。現在也正在對其他報表采用 Flink SQL CDC 進行實時化改造,替換舊的業務系統,讓系統資料更實時。

04 具體實作

① 進入 Flink/bin,使用 ./sql-client.sh embedded 啟動 SQL CLI 用戶端。 ② 使用 DDL 建立 Flink Source 和 Sink 表。這裡建立的表字段個數不一定要與 MySQL 的字段個數和順序一緻,隻需要挑選 MySQL 表中業務需要的字段即可,并且字段類型保持一緻。

-- 在Flink建立賬單實收source表CREATE TABLE bill_info (  billCode STRING,  serviceCode STRING,  accountPeriod STRING,  subjectName STRING ,  subjectCode STRING,  occurDate TIMESTAMP,  amt  DECIMAL(11,2),  status STRING,  proc_time AS PROCTIME() -–使用維表時需要指定該字段) WITH (  'connector' = 'mysql-cdc', -- 連接配接器  'hostname' = '******',   --mysql位址  'port' = '3307',  -- mysql端口  'username' = '******',  --mysql使用者名  'password' = '******',  -- mysql密碼  'database-name' = 'cdc', --  資料庫名稱  'table-name' = '***');
-- 在Flink建立訂單實收source表CREATE TABLE order_info (  orderCode STRING,  serviceCode STRING,  accountPeriod STRING,  subjectName STRING ,  subjectCode STRING,  occurDate TIMESTAMP,  amt  DECIMAL(11, 2),  status STRING,  proc_time AS PROCTIME()  -–使用維表時需要指定該字段) WITH (  'connector' = 'mysql-cdc',  'hostname' = '******',  'port' = '3307',  'username' = '******',  'password' = '******',  'database-name' = 'cdc',  'table-name' = '***',);
-- 建立科目維表CREATE TABLE subject_info (  code VARCHAR(32) NOT NULL,  name VARCHAR(64) NOT NULL,  PRIMARY KEY (code) NOT ENFORCED  --指定主鍵) WITH (  'connector' = 'jdbc',  'url' = 'jdbc:mysql://xxxx:xxxx/spd?useSSL=false&autoReconnect=true',  'driver' = 'com.mysql.cj.jdbc.Driver',  'table-name' = '***',  'username' = '******',  'password' = '******',  'lookup.cache.max-rows' = '3000',  'lookup.cache.ttl' = '10s',  'lookup.max-retries' = '3');
-- 建立實收分布結果表,把結果寫到 ElasticsearchCREATE TABLE income_distribution (  serviceCode STRING,  accountPeriod STRING,  subjectCode STRING,  subjectName STRING,  amt  DECIMAL(13,2),  PRIMARY KEY (serviceCode, accountPeriod, subjectCode) NOT ENFORCED) WITH (  'connector' = 'elasticsearch-7',  'hosts' = 'http://xxxx:9200',  'index' = 'income_distribution',  'sink.bulk-flush.backoff.strategy' = 'EXPONENTIAL');      

以上的建表 DDL 分别建立了訂單實收 source 表、賬單實收 source 表、産品科目維表和 Elasticsearch 結果表。建表完成後,Flink 是不會馬上去同步 MySQL 的資料,而是等到使用者送出了一個 insert 作業後才會執行同步資料,并且 Flink 不會存儲資料。我們的第一個作業是計算收入分布,資料來源于 bill_info 和 order_info 兩張 MySQL 表,并且賬單實收表和訂單實收表都需要關聯維表資料擷取應收科目的最新中文名稱,按照服務中心、賬期、科目代碼和科目名稱進行分組計算實收金額的 sum 值,實收分布具體 DML 如下:

INSERT INTO income_distributionSELECT t1.serviceCode, t1.accountPeriod, t1.subjectCode, t1.subjectName, SUM(amt) AS amt FROM (  SELECT b.serviceCode, b.accountPeriod, b.subjectCode, s.name AS subjectName, SUM(amt) AS amt   FROM bill_info AS b  JOIN subject_info FOR SYSTEM_TIME AS OF b.proc_time s ON b.subjectCode = s.code   GROUP BY b.serviceCode, b.accountPeriod, b.subjectCode, s.nameUNION ALL  SELECT b.serviceCode, b.accountPeriod, b.subjectCode, s.name AS subjectName, SUM(amt) AS amt  FROM order_info AS b  JOIN subject_info FOR SYSTEM_TIME AS OF b.proc_time s ON b.subjectCode = s.code   GROUP BY b.serviceCode, b.accountPeriod, b.subjectCode, s.name) AS t1GROUP BY t1.serviceCode, t1.accountPeriod, t1.subjectCode, t1.subjectName;      

Flink SQL 的維表 JOIN 和雙流 JOIN 寫法上不太一樣,對于維表,還需要在 Flink source table 上添加一個 proctime 字段 proc_time AS PROCTIME(),關聯的時候使用 FOR SYSTEM_TIME AS OF 的 SQL 文法查詢時态表,意思是關聯查詢最新版本的維表資料。關于維表 JOIN 的使用可參閱: https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/streaming/joins.html。③ 在 SQL Client 執行以上作業後,YARN 會建立一個 Flink 叢集運作作業,并且使用者可以在 Hadoop 上檢視到執行作業的所有資訊,并且能進入 Flink 的 Web UI 頁面檢視 Flink 作業詳情,以下是 Hadoop 所有作業情況。

Flink SQL CDC 上線!我們總結了 13 條生産實踐經驗

④ 作業送出後,Flink SQL CDC 會掃描指定的 MySQL 表,在這期間 Flink 也會進行 checkpoint,是以需要按照上文所述的配置 checkpoint 的重試政策和重試次數。當資料被讀取進 Flink 後,Flink 會流式地進行作業邏輯的計算,實時統計出聚合結果輸出到 Elasticsearch(sink 端)。 相當于我們使用 Flink 在 MySQL 的表上維護了一個實時的物化視圖,并将這個實時物化視圖的結果存在了 Elasticsearch 中。在 Elasticsearch 中使用 GET /income_distribution/_search{ "query": {"match_all": {}}} 指令檢視輸出的實收分布結果,如下圖:

Flink SQL CDC 上線!我們總結了 13 條生産實踐經驗

通過圖中的結果可以看出聚合結果被實時的計算出來,并寫到了 Elasticsearch 中了。

05 踩過的坑和學到的經驗

1. Flink 作業原來運作在 standalone session 模式下,送出多個 Flink 作業會導緻作業失敗報錯。

  • 原因:因為 standalone session 模式下啟動多個作業會導緻多個作業的 Task 共享一個 JVM,可能會導緻一些不穩定的問題。并且排查問題時,多個作業的日志混在一個 TaskManager 中,增加了排查的難度。
  • 解決方法:采用 YARN 的 per-job 模式啟動多個作業,能有更好的隔離性。

2. SELECT elasticsearch table 報以下錯誤:

Flink SQL CDC 上線!我們總結了 13 條生産實踐經驗
  • 原因:Elasticsearch connector 目前隻支援了 sink,不支援 source 。是以不能 SELECT elasticsearch table。

3. 在 flink-conf.yaml 裡修改預設并行度,但是在 Web UI 看到作業的并行度還是 1,并行度修改不生效。

  • 解決辦法:在使用 SQL Client 時 sql-client-defaults.yaml 中的并行度配置的優先級更高。在 sql-client-defaults.yaml 中修改并行度,或者删除 sql-client-defaults.yaml 中的并行度配置。更建議采用後者。

4. Flink 作業在掃描 MySQL 全量資料時,checkpoint 逾時,出現作業 failover,如下圖:

Flink SQL CDC 上線!我們總結了 13 條生産實踐經驗
  • 原因:Flink CDC 在 scan 全表資料(我們的實收表有千萬級資料)需要小時級的時間(受下遊聚合反壓影響),而在 scan 全表過程中是沒有 offset 可以記錄的(意味着沒法做 checkpoint),但是 Flink 架構任何時候都會按照固定間隔時間做 checkpoint,是以此處 mysql-cdc source 做了比較取巧的方式,即在 scan 全表的過程中,會讓執行中的 checkpoint 一直等待甚至逾時。逾時的 checkpoint 會被仍未認為是 failed checkpoint,預設配置下,這會觸發 Flink 的 failover 機制,而預設的 failover 機制是不重新開機。是以會造成上面的現象。
  • 解決辦法:在 flink-conf.yaml 配置 failed checkpoint 容忍次數,以及失敗重新開機政策,如下:
execution.checkpointing.interval: 10min   # checkpoint間隔時間execution.checkpointing.tolerable-failed-checkpoints: 100  # checkpoint 失敗容忍次數restart-strategy: fixed-delay  # 重試政策restart-strategy.fixed-delay.attempts: 2147483647   # 重試次數      

目前 Flink 社群也有一個 issue(Flink-18578)來支援 source 主動拒絕 checkpoint 的機制,将來基于該機制,能比較優雅地解決這個問題。5. Flink 怎麼樣開啟 YARN 的 per-job 模式?

  • 解決方法:在 flink-conf.yaml 中配置 execution.target: yarn-per-job。

6. 進入 SQL Client 建立 table 後,在另外一個節點進入 SQL Client 查詢不到 table。

  • 原因:因為 SQL Client 預設的 Catalog 是在 in-memory 的,不是持久化  Catalog,是以這屬于正常現象,每次啟動 Catalog 裡面都是空的。

7. 作業在運作時 Elasticsearch 報如下錯誤:

Caused by: org.apache.Flink.elasticsearch7.shaded.org.elasticsearch.ElasticsearchException: Elasticsearch exception [type=illegal_argument_exception, reason=mapper [amt] cannot be changed from type [long] to [float]]
  • 原因:資料庫表的字段 amt 的類型是 decimal,DDL 建立輸出到 es 的 amt 字段的類型也是 decimal,因為輸出到 es 的第一條資料的amt如果是整數,比如是 10,輸出到 es 的類型是 long 類型的,es client 會自動建立 es 的索引并且設定 amt 字段為 long 類型的格式,那麼如果下一次輸出到 es 的 amt 是非整數 10.1,那麼輸出到 es 的時候就會出現類型不比對的錯誤。
  • 解決方法:手動生成 es 索引和 mapping 的資訊,指定好 decimal 類型的資料格式是 saclefloat,但是在 DDL 處仍然可以保留該字段類型是 decimal。

8. 作業在運作時 mysql cdc source 報如下錯誤:

Flink SQL CDC 上線!我們總結了 13 條生産實踐經驗
  • 原因:因為資料庫中别的表做了字段修改,CDC source 同步到了 ALTER DDL 語句,但是解析失敗抛出的異常。
  • 解決方法:在 flink-cdc-connectors 最新版本中已經修複該問題(跳過了無法解析的 DDL)。更新 connector jar 包到最新版本 1.1.0:flink-sql-connector-mysql-cdc-1.1.0.jar,替換 flink/lib 下的舊包。

9. 掃描全表階段慢,在 Web UI 出現如下現象:

Flink SQL CDC 上線!我們總結了 13 條生産實踐經驗
  • 原因:掃描全表階段慢不一定是 cdc source 的問題,可能是下遊節點處理太慢反壓了。
  • 解決方法:通過 Web UI 的反壓工具排查發現,瓶頸主要在聚合節點上。通過在 sql-client-defaults.yaml 檔案配上 MiniBatch 相關參數和開啟 distinct 優化(我們的聚合中有 count distinct),作業的 scan 效率得到了很大的提升,從原先的 10 小時,提升到了 1 小時。關于性能調優的參數可以參閱:https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/tuning/streaming_aggregation_optimization.html。
configuration:  table.exec.mini-batch.enabled: true  table.exec.mini-batch.allow-latency: 2s  table.exec.mini-batch.size: 5000  table.optimizer.distinct-agg.split.enabled: true      

10. CDC source 掃描 MySQL 表期間,發現無法往該表 insert 資料。

  • 原因:由于使用的 MySQL 使用者未授權 RELOAD 權限,導緻無法擷取全局讀鎖(FLUSH TABLES WITH READ LOCK), CDC source 就會退化成表級讀鎖,而使用表級讀鎖需要等到全表 scan 完,才能釋放鎖,是以會發現持鎖時間過長的現象,影響其他業務寫入資料。
  • 解決方法:給使用的 MySQL 使用者授予 RELOAD 權限即可。所需的權限清單詳見文檔:https://github.com/ververica/flink-cdc-connectors/wiki/mysql-cdc-connector#setup-mysql-server。如果出于某些原因無法授予 RELOAD 權限,也可以顯式配上 'debezium.snapshot.locking.mode' = 'none'來避免所有鎖的擷取,但要注意隻有當快照期間表的 schema 不會變更才安全。

11. 多個作業共用同一張 source table 時,沒有修改 server id 導緻讀取出來的資料有丢失。

  • 原因:MySQL binlog 資料同步的原理是,CDC source 會僞裝成 MySQL 叢集的一個 slave(使用指定的 server id 作為唯一 id),然後從 MySQL 拉取 binlog 資料。如果一個 MySQL 叢集中有多個 slave 有同樣的 id,就會導緻拉取資料錯亂的問題。
  • 解決方法:預設會随機生成一個 server id,容易有碰撞的風險。是以建議使用動态參數(table hint)在 query 中覆寫 server id。如下所示:
SELECT *FROM bill_info /*+ OPTIONS('server-id'='123456') */ ;      

12. 在啟動作業時,YARN 接收了任務,但作業一直未啟動:

Flink SQL CDC 上線!我們總結了 13 條生産實踐經驗
  • 原因:Queue Resource Limit for AM 超過了限制資源限制。預設的最大記憶體是 30G (叢集記憶體) * 0.1 = 3G,而每個 JM 申請 2G 記憶體,當送出第二個任務時,資源就不夠了。
  • 解決方法:調大 AM 的 resource limit,在 capacity-scheduler.xml 配置 yarn.scheduler.capacity.maximum-am-resource-percent,代表AM的占總資源的百分比,預設為0.1,改成0.3(根據伺服器的性能靈活配置)。

13. AM 程序起不來,一直被 kill 掉。

Flink SQL CDC 上線!我們總結了 13 條生産實踐經驗
  • 原因:386.9 MB of 1 GB physical memory used; 2.1 GB of 2.1 GB virtual memory use。預設實體記憶體是 1GB,動态申請到了 1GB,其中使用了386.9 MB。實體記憶體 x 2.1=虛拟記憶體,1GBx2.1≈2.1GB ,2.1GB 虛拟記憶體已經耗盡,當虛拟記憶體不夠時候,AM 的 container 就會自殺。
  • 解決方法:兩個解決方案,或調整 yarn.nodemanager.vmem-pmem-ratio 值大點,或 yarn.nodemanager.vmem-check-enabled=false,關閉虛拟記憶體檢查。參考:https://blog.csdn.net/lzxlfly/article/details/89175452。

為了提升了實時報表服務的可用性和實時性,一開始我們采用了 Canal+Kafka+Flink 的方案,可是發現需要寫比較多的 Java 代碼,而且還需要處理好 DataStream 和 Table 的轉換以及 binlong 位置的擷取,開發難度相對較大。另外,需要維護 Kafka 和 Canal 這兩個元件的穩定運作,對于我們小團隊來說成本也不小。由于我們公司已經有基于 Flink 的任務線上上運作,是以采用 Flink SQL CDC 就成了順理成章的事情。基于 Flink SQL CDC 的方案隻需要編寫 SQL ,不用寫一行 Java 代碼就能完成實時鍊路的打通和實時報表的計算,對于我們來說非常的簡單易用,而且線上上運作的穩定性和性能表現也讓我們滿意。我們正在公司内大力推廣 Flink SQL CDC 的使用,也正在着手改造其他幾個實時鍊路的任務。非常感謝開源社群能為我們提供如此強大的工具,也希望 Flink CDC 越來越強大,支援更多的資料庫和功能。也再次感謝雲邪老師對于我們項目上線的大力支援!