天天看點

Flink SQL 1.11 新功能

本文整理自 Apache Flink PMC,阿裡巴巴技術專家伍翀(雲邪)的分享,旨在幫助使用者快速了解新版本 Table & SQL 在 Connectivity 和 Simplicity 等方面的優化及實際開發使用的最佳實踐,主要分為以下四個部分:

  1. 簡要回顧 Flink 1.8 ~ Flink 1.11 版本在 Apache 社群的發展趨勢,其中國内開發者的積極參與和中文社群的蓬勃發展對 Flink 在社群和 GitHub 的活躍度做出了重要貢獻。
  2. 詳細解讀 Flink SQL 1.11 新功能,如 connectors 參數簡化 + 動态 Table 參數減少代碼備援,内置 connectors + LIKE 文法幫助快速測試,重構的 TableEnvironment 、TableSource / TableSink 接口提升易用性,Hive Dialect + CDC 進一步支援流批一體。
  3. 重點展示新版本對 Hive 數倉實時化的支援和 Flink SQL 引入 CDC 的資料同步最佳實踐。
  4. 簡要解讀 Flink SQL 1.12 未來規劃。

1 Flink 1.8 ~ 1.11 社群發展趨勢回顧

自 2019 年初阿裡巴巴宣布向 Flink 社群貢獻 Blink 源碼并在同年 4 月釋出 Flink 1.8 版本後,Flink 在社群的活躍程度猶如坐上小火箭般上升,每個版本包含的 git commits 數量以 50% 的增速持續上漲, 吸引了一大批國内開發者和使用者參與到社群的生态發展中來,中文使用者郵件清單([email protected])更是在今年 6 月首次超出英文使用者郵件清單([email protected]),在 7 月超出比例達到了 50%。對比其它 Apache 開源社群如 Spark、Kafka 的使用者郵件清單數(每月約 200 封左右)可以看出,整個 Flink 社群的發展依然非常健康和活躍。

Flink SQL 1.11 新功能
Flink SQL 1.11 新功能

2 Flink SQL 新功能解讀

在了解 Flink 整體發展趨勢後,我們來看下最近釋出的 Flink 1.11 版本在 connectivity 和 simplicity 方面都帶來了哪些令人耳目一新的功能。

FLIP-122:簡化 connector 參數

整個 Flink SQL 1.11 在圍繞易用性方面做了很多優化,比如 FLIP-122[1] 。

優化了 connector 的 property 參數名稱冗長的問題。以 Kafka 為例,在 1.11 版本之前使用者的 DDL 需要聲明成如下方式:

CREATE TABLE user_behavior (              ...              ) WITH (              'connector.type'='kafka',              'connector.version'='universal',              'connector.topic'='user_behavior',              'connector.startup-mode'='earliest-offset',              'connector.properties.zookeeper.connect'='localhost:2181',              'connector.properties.bootstrap.servers'='localhost:9092',              'format.type'='json'              );
           

而在 Flink SQL 1.11 中則簡化為:

CREATE TABLE user_behavior (              ...              ) WITH (              'connector'='kafka',              'topic'='user_behavior',              'scan.startup.mode'='earliest-offset',              'properties.zookeeper.connect'='localhost:2181',              'properties.bootstrap.servers'='localhost:9092',              'format'='json'              );
           

DDL 表達的資訊量絲毫未少,但是看起來清爽許多 :)。Flink 的開發者們為這個優化做了很多讨論,有興趣可以圍觀 FLIP-122 Discussion Thread[2]。

FLINK-16743:内置 connectors

Flink SQL 1.11 新加入了三種内置的 connectors,如下表所示:

connector  描述 使用場景
'connector'='datagen' 用于生成随機資料的source 常用于測試
'connector'='blackhole' 不做任何處理的 sink 常用于性能測試
'connector'='print'  列印到标準輸出流(.out檔案)的 sink 常用于調試

在外部 connector 環境還沒有 ready 時,使用者可以選擇 datagen source 和 print sink 快速建構 pipeline 熟悉 Flink SQL;對于想要測試 Flink SQL 性能的使用者,可以使用 blackhole 作為 sink;對于調試排錯場景,print sink 會将計算結果打到标準輸出(比如叢集環境下就會打到 taskmanager.out 檔案),使得定位問題的成本大大降低。

FLIP-110:LIKE 文法

Flink SQL 1.11 支援使用者從已定義好的 table DDL 中快速 “fork” 自己的版本并進一步修改 watermark 或者 connector 等屬性。比如下面這張 base_table 上想加一個 watermark,在 Flink 1.11 版本之前,使用者隻能重新将表聲明一遍,并加入自己的修改,可謂 “牽一發而動全身”。

-- before Flink SQL 1.11              CREATE TABLE base_table (              id BIGINT,              name STRING,              ts TIMESTAMP              ) WITH (              'connector.type'='kafka',              ...              );
              CREATE TABLE derived_table (              id BIGINT,              name STRING,              ts TIMESTAMP,              WATERMARK FOR ts AS ts - INTERVAL '5' SECOND              ) WITH (              'connector.type'='kafka',              ...              );
           

從 Flink 1.11 開始,使用者隻需要使用 CREATE TABLE LIKE 文法就可以完成之前的操作。

-- Flink SQL 1.11              CREATE TABLE base_table (              id BIGINT,              name STRING,              ts TIMESTAMP              ) WITH (              'connector'='kafka',              ...              );
              CREATE TABLE derived_table (              WATERMARK FOR ts AS ts - INTERVAL '5' SECOND              ) LIKE base_table;
           

而内置 connector 與 CREATE TABLE LIKE 文法搭配使用則會如下圖一般産生“天雷勾地火”的效果,極大提升開發效率。

Flink SQL 1.11 新功能

FLIP-113:動态 Table 參數

對于像 Kafka 這種消息隊列,在聲明 DDL 時通常會有一個啟動點位去指定開始消費資料的時間,如果需要更改啟動點位,在老版本上就需要重新聲明一遍新點位的 DDL,非常不友善。

CREATE TABLE user_behavior (              user_id BIGINT,              behavior STRING,              ts TIMESTAMP(3)              ) WITH (              'connector'='kafka',              'topic'='user_behavior',              'scan.startup.mode'='timestamp',              'scan.startup.timestamp-millis'='123456',              'properties.bootstrap.servers'='localhost:9092',              'format'='json'              );
           

從 Flink 1.11 開始,使用者可以在 SQL client 中按如下方式設定開啟 SQL 動态參數(預設是關閉的),如此即可在 DML 裡指定具體的啟動點位。

SET 'table.dynamic-table-options.enabled' = 'true';              SELECT user_id, COUNT(DISTINCT behaviro)              FROM user_behavior /*+ OPTIONS('scan.startup.timestamp-millis'='1596282223') */              GROUP BY user_id;
           

除啟動點位外,動态參數還支援像 sink.partition、scan.startup.mode 等更多運作時參數,感興趣可移步 FLIP-113[3],獲得更多資訊。

FLIP-84:重構優化 TableEnvironment 接口 

Flink SQL 1.11 以前的 TableEnvironment 接口定義和行為有一些不夠清晰,比如:

  • TableEnvironment#sqlUpdate() 方法對于 DDL 會立即執行,但對于 INSERT INTO DML 語句卻是 buffer 住的,直到調用 TableEnvironment#execute() 才會被執行,是以在使用者看起來順序執行的語句,實際産生的效果可能會不一樣。
  • 觸發作業送出有兩個入口,一個是 TableEnvironment#execute(),另一個是 StreamExecutionEnvironment#execute(),于使用者而言很難了解應該使用哪個方法觸發作業送出。
  • 單次執行不接受多個 INSERT INTO 語句。

針對這些問題,Flink SQL 1.11 提供了新 API,即 TableEnvironment#executeSql(),它統一了執行 SQL 的行為, 無論接收 DDL、查詢 query 還是 INSERT INTO 都會立即執行。針對多 sink 場景提供了 StatementSet 和 TableEnvironment#createStatementSet() 方法,允許使用者添加多條 INSERT 語句一起執行。

除此之外,新的 execute 方法都有傳回值,使用者可以在傳回值上執行 print,collect 等方法。

新舊 API 對比如下表所示:

Current Interface New Interface
tEnv.sqlUpdate("CREATE TABLE...”); TableResult result = tEnv.executeSql("CREATE TABLE...”);

tEnv.sqlUpdate("INSERT INTO...SELECT...”);

tEnv.execute();

TableResult result = 

tEnv.executeSql("INSERT INTO ... SELECT...”);

tEnv.sqlUpdate("insert into xx ...”); 

tEnv.sqlUpdate("insert into yy ...”); 

tEnv.execute();

StatementSet ss =tEnv.createStatementSet(); 

ss.addInsertSql("insert into xx ...”); 

ss.addInsertSql("insert into yy ...”); 

TableResult result = ss.execute();

對于在 Flink 1.11 上使用新接口遇到的一些常見問題,雲邪做了統一解答,可在 Appendix 部分檢視。

FLIP-95:TableSource & TableSink 重構

開發者們在 Flink SQL 1.11 版本花了大量經曆對 TableSource 和 TableSink API 進行了重構,核心優化點如下:

  • 移除類型相關接口,簡化開發,解決迷惑的類型問題,支援全類型
  • 尋找 Factory 時,更清晰的報錯資訊
  • 解決找不到 primary key 的問題
  • 統一了流批 source,統一了流批 sink
  • 支援讀取 CDC 和輸出 CDC
  • 直接高效地生成 Flink SQL 内部資料結構 RowData

新 DynamicTableSink API 去掉了所有類型相關接口,因為所有的類型都是從 DDL 來的,不需要 TableSink 告訴架構是什麼類型。而對于使用者來說,最直覺的體驗就是在老版本上遇到各種奇奇怪怪報錯的機率降低了很多,比如不支援的精度類型和找不到 primary key / table factory 的詭異報錯在新版本上都不複存在了。關于 Flink 1.11 是如何解決這些問題的詳細可以在附錄部分閱讀。

FLIP-123:Hive Dialect

Flink 1.10 版本對 Hive connector 的支援達到了生産可用,但是老版本的 Flink SQL 不支援 Hive DDL 及使用 Hive syntax,這無疑限制了 Flink connectivity。在新版本中,開發者們為支援 HiveQL 引入了新 parser,使用者可以在 SQL client 的 yaml 檔案中指定是否使用 Hive 文法,也可以在 SQL client 中通過 set table.sql-dialect=hive/default 動态切換。更多資訊可以參考 FLIP-123[4]。

以上簡要介紹了 Flink 1.11 在減少使用者不必要的輸入和操作方面對 connectivity 和 simplicity 方面做出的優化。下面會重點介紹在外部系統和資料生态方面對 connectivity 和 simplicity 的兩個核心優化,并附上最佳實踐介紹。

3 Hive 數倉實時化 & Flink SQL + CDC 最佳實踐

Hive 數倉實時化

下圖是一張非常經典的 Lambda 數倉架構,在整個大資料行業從批處理逐漸擁抱流計算的許多年裡代表“最先進的生産力”。然而随着業務發展和規模擴大,兩套單獨的架構所帶來的開發、運維、計算成本問題已經日益凸顯。

Flink SQL 1.11 新功能

而 Flink 作為一個流批一體的計算引擎,在最初的設計上就認為“萬物本質皆是流”,批處理是流計算的特例,如果能夠在自身提供高效批處理能力的同時與現有的大資料生态結合,則能以最小侵入的方式改造現有的數倉架構使其支援流批一體。在新版本中,Flink SQL 提供了開箱即用的 “Hive 數倉同步”功能,即所有的資料加工邏輯由 Flink SQL 以流計算模式執行,在資料寫入端,自動将 ODS,DWD 和 DWS 層的已經加工好的資料實時回流到 Hive table。One size (sql) fits for all suites (tables) 的設計,使得在 batch 層不再需要維護任何計算 pipeline。

Flink SQL 1.11 新功能

對比傳統架構,它帶來的好處和解決的問題有哪些呢?

  • 計算口徑與處理邏輯統一,降低開發和運維成本

傳統架構維護兩套資料 pipeline 最大的問題在于需要保持它們處理邏輯的等價性,但由于使用了不同的計算引擎(比如離線使用 Hive,實時使用 Flink 或 Spark Streaming),SQL 往往不能直接套用,存在代碼上的差異性,經年累月下來,離線和實時處理邏輯很可能會完全 diverge,有些大的公司甚至會存在兩個團隊分别去維護實時和離線數倉,人力物力成本非常高。Flink 支援 Hive Streaming Sink 後,實時處理結果可以實時回流到 Hive 表,離線的計算層可以完全去掉,處理邏輯由 Flink SQL 統一維護,離線層隻需要使用回流好的 ODS、DWD、DWS 表做進一步 ad-hoc 查詢即可。

  • 離線對于“資料漂移”的處理更自然,離線數倉“實時化”

離線數倉 pipeline 非 data-driven 的排程執行方式,在跨分區的資料邊界處理上往往需要很多 trick 來保證分區資料的完整性,而在兩套數倉架構并行的情況下,有時會存在對 late event 處理差異導緻資料對比不一緻的問題。而實時 data-driven 的處理方式和 Flink 對于 event time 的友好支援本身就意味着以業務時間為分區(window),通過 event time + watermark 可以統一定義實時和離線資料的完整性和時效性,Hive Streaming Sink 更是解決了離線數倉同步的“最後一公裡問題”。

下面會以一個 Demo 為例,介紹 Hive 數倉實時化的最佳實踐。

■ 實時資料寫入 Hive 的最佳實踐

FLIP-105:支援 Change Data Capture (CDC)

除了對 Hive Streaming Sink 的支援,Flink SQL 1.11 的另一大亮點就是引入了 CDC 機制。CDC 的全稱是 Change Data Capture,用于 tracking 資料庫表的增删改查操作,是目前非常成熟的同步資料庫變更的一種方案。在國内常見的 CDC 工具就是阿裡開源的 Canal,在國外比較流行的有 Debezium。Flink SQL 在設計之初就提出了 Dynamic Table 和“流表二象性”的概念,并且在 Flink SQL 内部完整支援了 Changelog 功能,相對于其他開源流計算系統是一個重要優勢。本質上 Changelog 就等價于一張一直在變化的資料庫的表。Dynamic Table 這個概念是 Flink SQL 的基石, Flink SQL 的各個算子之間傳遞的就是 Changelog,完整地支援了 Insert、Delete、Update 這幾種消息類型。

得益于 Flink SQL 運作時的強大,Flink 與 CDC 對接隻需要将外部的資料流轉為 Flink 系統内部的 Insert、Delete、Update 消息即可。進入到 Flink 内部後,就可以靈活地應用 Flink 各種 query 文法了。

Flink SQL 1.11 新功能

在實際應用中,把 Debezium Kafka Connect Service 注冊到 Kafka 叢集并帶上想同步的資料庫表資訊,Kafka 則會自動建立 topic 并監聽 Binlog,把變更同步到 topic 中。在 Flink 端想要消費帶 CDC 的資料也很簡單,隻需要在 DDL 中聲明 format = debezium-json 即可。

Flink SQL 1.11 新功能

在 Flink 1.11 上開發者們還做了一些有趣的探索,既然 Flink SQL 運作時能夠完整支援 Changelog,那是否有可能不需要 Debezium 或者 Canal 的服務,直接通過 Flink 擷取 MySQL 的變更呢?答案當然是可以,Debezium 類庫的良好設計使得它的 API 可以被封裝為 Flink 的 Source Function,不需要再起額外的 Service,目前這個項目已經開源,支援了 MySQL 和 Postgres 的 CDC 讀取,後續也會支援更多類型的資料庫,可移步到下方連結解鎖更多使用姿勢。

https://github.com/ververica/flink-cdc-connectors

下面的 Demo 會介紹如何使用 flink-cdc-connectors 捕獲 MySQL 和 Postgres 的資料變更,并利用 Flink SQL 做多流 join 後實時同步到 Elasticsearch 中。

Flink SQL 1.11 新功能

假設你在一個電商公司,訂單和物流是你最核心的資料,你想要實時分析訂單的發貨情況。因為公司已經很大了,是以商品的資訊、訂單的資訊、物流的資訊,都分散在不同的資料庫和表中。我們需要建立一個流式 ETL,去實時消費所有資料庫全量和增量的資料,并将他們關聯在一起,打成一個大寬表。進而友善資料分析師後續的分析。

4 Flink SQL 1.12 未來規劃

以上介紹了 Flink SQL 1.11 的核心功能與最佳實踐,對于下個版本,雲邪也給出了一些 ongoing 的計劃,并歡迎大家在社群積極提出意見 & 建議。

  • FLIP-132[5]:Temporal Table DDL (Binlog 模式的維表關聯)
  • FLIP-129[6]:重構 Descriptor API (Table API 的 DDL)
  • 支援 Schema Registry Avro 格式
  • CDC 更完善的支援(批處理,upsert 輸出到 Kafka 或 Hive)
  • 優化 Streaming File Sink 小檔案問題
  • N-ary input operator (Batch 性能提升)

5 附錄

使用新版本 TableEnvironment 遇到的常見報錯及原因

第一個常見報錯是 No operators defined in streaming topolog。遇到這個問題的原因是在老版本中執行 INSERT INTO 語句的下面兩個方法: 

在新版本中沒有完全向前相容(方法還在,執行邏輯變了),如果沒有将 Table 轉換為 AppendedStream/RetractStream 時(通過StreamExecutionEnvironment#toAppendStream/toRetractStream),上面的代碼執行就會出現上述錯誤;與此同時,一旦做了上述轉換,就必須使用 StreamExecutionEnvironment#execute() 來觸發作業執行。是以建議使用者還是遷移到新版本的 API 上面,語義上也會更清晰一些。

Flink SQL 1.11 新功能

第二個問題是調用新的 TableEnvironemnt#executeSql() 後 print 沒有看到傳回值,原因是因為目前 print 依賴了 checkpoint 機制,開啟 exactly-onece 後就可以了,新版本會優化此問題。

Flink SQL 1.11 新功能

老版本的 StreamTableSource、StreamTableSink 常見報錯及新版本優化

第一個常見報錯是不支援精度類型,經常出現在 JDBC 或者 HBase 資料源上 ,在新版本上這個問題就不會再出現了。

Flink SQL 1.11 新功能

第二個常見報錯是 Sink 時找不到 PK,因為老的 StreamSink 需要通過 query 去推導出 PK,當 query 變得複雜時有可能會丢失 PK 資訊,但實際上 PK 資訊在 DDL 裡就可以擷取,沒有必要通過 query 去推導,是以新版本的 Sink 就不會再出現這個錯誤啦。

Flink SQL 1.11 新功能

第三個常見報錯是在解析 Source 和 Sink 時,如果使用者少填或者填錯了參數,架構傳回的報錯資訊很模糊,“找不到 table factory”,使用者也不知道該怎麼修改。這是因為老版本 SPI 設計得比較通用,沒有對 Source 和 Sink 解析的邏輯做單獨處理,當比對不到完整參數清單的時候架構已經預設目前的 table factory 不是要找的,然後周遊所有的 table factories 發現一個也不比對,就報了這個錯。在新版的加載邏輯裡,Flink 會先判斷 connector 類型,再比對剩餘的參數清單,這個時候如果必填的參數缺失或填錯了,架構就可以精準報錯給使用者。

Flink SQL 1.11 新功能

參考資料:

[1]https://cwiki.apache.org/confluence/display/FLINK/FLIP-122%3A+New+Connector+Property+Keys+for+New+Factory

[2]http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-122-New-Connector-Property-Keys-for-New-Factory-td39462.html

[3]https://cwiki.apache.org/confluence/display/FLINK/FLIP-113%3A+Supports+Dynamic+Table+Options+for+Flink+SQL

[4]https://cwiki.apache.org/confluence/display/FLINK/FLIP-123%3A+DDL+and+DML+compatibility+for+Hive+connector

[5]https://cwiki.apache.org/confluence/display/FLINK/FLIP-132+Temporal+Table+DDL

[6]https://cwiki.apache.org/confluence/display/FLINK/FLIP-129%3A+Refactor+Descriptor+API+to+register+connectors+in+Table+API

繼續閱讀