天天看點

flink1.11 Flink SQL 新功能解讀

在了解 Flink 整體發展趨勢後,我們來看下最近釋出的 Flink 1.11 版本在 connectivity 和 simplicity 方面都帶來了哪些令人耳目一新的功能。

#### FLIP-122:簡化 connector 參數

整個 Flink SQL 1.11 在圍繞易用性方面做了很多優化,比如 FLIP-122,優化了 connector 的 property 參數名稱冗長的問題。以 Kafka 為例,在 1.11 版本之前使用者的 DDL 需要聲明成如下方式

CREATE TABLE user_behavior (
  ...
) WITH (
  'connector.type'='kafka',
  'connector.version'='universal',
  'connector.topic'='user_behavior',
  'connector.startup-mode'='earliest-offset',
  'connector.properties.zookeeper.connect'='localhost:2181',
  'connector.properties.bootstrap.servers'='localhost:9092',
  'format.type'='json'
);

           

而在 Flink SQL 1.11 中則簡化為

CREATE TABLE user_behavior (
  ...
) WITH (
  'connector'='kafka',
  'topic'='user_behavior',
  'scan.startup.mode'='earliest-offset',
  'properties.zookeeper.connect'='localhost:2181',
  'properties.bootstrap.servers'='localhost:9092',
  'format'='json'
);
           

DDL 表達的資訊量絲毫未少,但是看起來清爽許多 :) Flink 的開發者們為這個優化做了很多讨論,有興趣可以圍觀 FLIP-122 Discussion Thread。

#### FLINK-16743:内置 connectors

Flink SQL 1.11 新加入了三種内置的 connectors,如下表所示

connector 描述 使用場景
'connector'='datagen' 用于生成随機資料的source 常用于測試
'connector'='blackhole' 不做任何處理的 sink 常用于性能測試
'connector'='print' 列印到标準輸出流(.out檔案)的 sink 常用于調試

在外部 connector 環境還沒有 ready 時,使用者可以選擇 

datagen

 source 和 

print

 sink 快速建構 pipeline 熟悉 Flink SQL;對于想要測試 Flink SQL 性能的使用者,可以使用 

blackhole

 作為 sink;對于調試排錯場景,

print

 sink 會将計算結果打到标準輸出(比如叢集環境下就會打到 taskmanager.out 檔案),使得定位問題的成本大大降低。

#### FLIP-110:LIKE 文法

Flink SQL 1.11 支援使用者從已定義好的 table DDL 中快速 “fork” 自己的版本并進一步修改 watermark 或者 connector 等屬性。比如下面這張 

base_table

 上想加一個 watermark,在 Flink 1.11 版本之前,使用者隻能重新将表聲明一遍,并加入自己的修改,可謂 “牽一發而動全身”。

-- before Flink SQL 1.11
CREATE TABLE base_table (
 id BIGINT,
 name STRING,
 ts TIMESTAMP
) WITH (
 'connector.type'='kafka',
 ...
);

CREATE TABLE derived_table (
 id BIGINT,
 name STRING,
 ts TIMESTAMP,
 WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
) WITH (
 'connector.type'='kafka',
 ...
);
           

從 Flink 1.11 開始,使用者隻需要使用 

CREATE TABLE LIKE

 文法就可以完成之前的操作

-- Flink SQL 1.11
CREATE TABLE base_table (
 id BIGINT,
 name STRING,
 ts TIMESTAMP
) WITH (
 'connector'='kafka',
 ...
);

CREATE TABLE derived_table (
 WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
) LIKE base_table;

           

而内置 connector 與 

CREATE TABLE LIKE

 文法搭配使用則會如下圖一般産生“天雷勾地火”的效果,極大提升開發效率。

flink1.11 Flink SQL 新功能解讀

#### FLIP-113:動态 Table 參數

對于像 Kafka 這種消息隊列,在聲明 DDL 時通常會有一個啟動點位去指定開始消費資料的時間,如果需要更改啟動點位,在老版本上就需要重新聲明一遍新點位的 DDL,非常不友善。

CREATE TABLE user_behavior (
  user_id BIGINT,
  behavior STRING,
  ts TIMESTAMP(3)
) WITH (
  'connector'='kafka',
  'topic'='user_behavior',
  'scan.startup.mode'='timestamp',
  'scan.startup.timestamp-millis'='123456',
  'properties.bootstrap.servers'='localhost:9092',
  'format'='json'
);
           

從 Flink 1.11 開始,使用者可以在 SQL client 中按如下方式設定開啟 SQL 動态參數(預設是關閉的),如此即可在 DML 裡指定具體的啟動點位。

SET 'table.dynamic-table-options.enabled' = 'true';

SELECT user_id, COUNT(DISTINCT behaviro)
FROM user_behavior /*+ OPTIONS('scan.startup.timestamp-millis'='1596282223') */
GROUP BY user_id;
           

除啟動點位外,動态參數還支援像 

sink.partition

 、 

scan.startup.mode

 等更多運作時參數,感興趣可移步 FLIP-113 獲得更多資訊。

#### FLIP-84:重構優化 TableEnvironment 接口

Flink SQL 1.11 以前的 

TableEnvironment

 接口定義和行為有一些不夠清晰,比如

  • TableEnvironment#sqlUpdate()

     方法對于 DDL 會立即執行,但對于 

    INSERT INTO

     DML 語句卻是 buffer 住的,直到調用 

    TableEnvironment#execute()

     才會被執行,是以在使用者看起來順序執行的語句,實際産生的效果可能會不一樣。
  • 觸發作業送出有兩個入口,一個是 

    TableEnvironment#execute()

    , 另一個是 

    StreamExecutionEnvironment#execute()

    ,于使用者而言很難了解應該使用哪個方法觸發作業送出。
  • 單次執行不接受多個 

    INSERT INTO

     語句。

針對這些問題,Flink SQL 1.11 提供了新 API,即 

TableEnvironment#executeSql()

,它統一了執行 sql 的行為, 無論接收 DDL、查詢 query 還是 

INSERT INTO

 都會立即執行。針對多 sink 場景提供了 

StatementSet

 和 

TableEnvironment#createStatementSet()

 方法,允許使用者添加多條 

INSERT

 語句一起執行。

除此之外,新的 

execute

 方法都有傳回值,使用者可以在傳回值上執行 

print

collect

 等方法。

新舊 API 對比如下表所示

Current Interface New Interface

tEnv.sqlUpdate("CREATE TABLE...”);

TableResult result = tEnv.executeSql("CREATE TABLE...”);

tEnv.sqlUpdate("INSERT INTO...SELECT...”);

<br/>

tEnv.execute();

TableResult result =

 <br/> 

tEnv.executeSql("INSERT INTO ... SELECT...”);

tEnv.sqlUpdate("insert into xx ...”);

 <br/>

tEnv.sqlUpdate("insert into yy ...”);

 <br/>

tEnv.execute();

StatementSet ss =tEnv.createStatementSet();

 <br/>

ss.addInsertSql("insert into xx ...”);

 <br/>

ss.addInsertSql("insert into yy ...”);

 <br/>

TableResult result = ss.execute();

對于在 Flink 1.11 上使用新接口遇到的一些常見問題,雲邪做了統一解答,可在 Appendix 部分檢視。

#### FLIP-95:TableSource & TableSink 重構

開發者們在 Flink SQL 1.11 版本花了大量經曆對 TableSource 和 TableSink API 進行了重構,核心優化點如下

  • 移除類型相關接口,簡化開發,解決迷惑的類型問題,支援全類型
  • 尋找 Factory 時,更清晰的報錯資訊
  • 解決找不到 primary key 的問題
  • 統一了流批 source,統一了流批 sink
  • 支援讀取 CDC 和輸出 CDC
  • 直接高效地生成 Flink SQL 内部資料結構 RowData

老 TableSink API 如下所示,其中有 6 個方法是類型相關并且還充斥着 deprecated 方法,導緻 connector 經常出 bug。新 DynamicTableSink API 去掉了所有類型相關接口,因為所有的類型都是從 DDL 來的,不需要 TableSink 告訴架構是什麼類型。而對于使用者來說,最直覺的體驗就是在老版本上遇到各種奇奇怪怪報錯的機率降低了很多,比如不支援的精度類型和找不到 primary key / table factory 的詭異報錯在新版本上都不複存在了。關于 Flink 1.11 是如何解決這些問題的詳細可以在 Appendix 部分閱讀。

#### FLIP-123:Hive Dialect

Flink 1.10 版本對 Hive connector 的支援達到了生産可用,但是老版本的 Flink SQL 不支援 Hive DDL 及使用 Hive syntax,這無疑限制了 Flink connectivity。在新版本中,開發者們為支援 HiveQL 引入了新 parser,使用者可以在 SQL client 的 yaml 檔案中指定是否使用 Hive 文法,也可以在 SQL client 中通過 

set table.sql-dialect=hive/default

 動态切換。更多資訊可以參考 FLIP-123。

以上簡要介紹了 Flink 1.11 在 減少使用者不必要的輸入和操作方面對 connectivity 和 simplicity 方面做出的優化。下面會重點介紹在外部系統和資料生态方面對 connectivity 和 simplicity 的兩個核心優化,并附上最佳實踐介紹。

3 Hive 數倉實時化 & Flink SQL + CDC 最佳實踐

FLINK-17433:Hive 數倉實時化

下圖是一張非常經典的 Lambda 數倉架構,在整個大資料行業從批處理逐漸擁抱流計算的許多年裡代表“最先進的生産力”。然而随着業務發展和規模擴大,兩套單獨的架構所帶來的開發、運維、計算成本問題已經日益凸顯。

flink1.11 Flink SQL 新功能解讀

而 Flink 作為一個流批一體的計算引擎,在最初的設計上就認為“萬物本質皆是流”,批處理是流計算的特例,如果能夠在自身提供高效批處理能力的同時與現有的大資料生态結合,則能以最小侵入的方式改造現有的數倉架構使其支援流批一體。在新版本中,Flink SQL 提供了開箱即用的 “Hive 數倉同步”功能,即所有的資料加工邏輯由 Flink SQL 以流計算模式執行,在資料寫入端,自動将 ODS,DWD 和 DWS 層的已經加工好的資料實時回流到 Hive table。One size (sql) fits for all suites (tables) 的設計,使得在 batch 層不再需要維護任何計算 pipeline。

flink1.11 Flink SQL 新功能解讀

對比傳統架構,它帶來的好處和解決的問題有哪些呢?

  • 計算口徑與處理邏輯統一,降低開發和運維成本

    傳統架構維護兩套資料 pipeline 最大的問題在于需要保持它們處理邏輯的等價性,但由于使用了不同的計算引擎(比如離線使用 Hive,實時使用 Flink 或 Spark Streaming),SQL 往往不能直接套用,存在代碼上的差異性,經年累月下來,離線和實時處理邏輯很可能會完全 diverge,有些大的公司甚至會存在兩個團隊分别去維護實時和離線數倉,人力物力成本非常高。Flink 支援 Hive Streaming Sink 後,實時處理結果可以實時回流到 Hive 表,離線的計算層可以完全去掉,處理邏輯由 Flink SQL 統一維護,離線層隻需要使用回流好的 ODS、DWD、DWS 表做進一步 ad-hoc 查詢即可。

  • 離線對于“資料漂移”的處理更自然,離線數倉“實時化”

    離線數倉 pipeline 非 data-driven 的排程執行方式,在跨分區的資料邊界處理上往往需要很多 trick 來保證分區資料的完整性,而在兩套數倉架構并行的情況下,有時會存在對 late event 處理差異導緻資料對比不一緻的問題。而實時 data-driven 的處理方式和 Flink 對于 event time 的友好支援本身就意味着以業務時間為分區(window),通過 event time + watermark 可以統一定義實時和離線資料的完整性和時效性,Hive Streaming Sink 更是解決了離線數倉同步的“最後一公裡問題”。

    FLIP-105:支援 Change Data Capture (CDC)

除了對 Hive Streaming Sink 的支援,Flink SQL 1.11 的另一大亮點就是引入了 CDC 機制。CDC 的全稱是 Change Data Capture,用于 tracking 資料庫表的增删改查操作,是目前非常成熟的同步資料庫變更的一種方案。在國内常見的 CDC 工具就是阿裡開源的 Canal,在國外比較流行的有 Debezium。Flink SQL 在設計之初就提出了 Dynamic Table 和“流表二象性”的概念,并且在 Flink SQL 内部完整支援了 Changelog 功能,相對于其他開源流計算系統是一個重要優勢。本質上 Changelog 就等價于一張一直在變化的資料庫的表。Dynamic Table 這個概念是 Flink SQL 的基石, Flink SQL 的各個算子之間傳遞的就是 Changelog,完整地支援了 Insert、Delete、Update 這幾種消息類型。

得益于 Flink SQL 運作時的強大,Flink 與 CDC 對接隻需要将外部的資料流轉為 Flink 系統内部的 Insert、Delete、Update 消息即可。進入到 Flink 内部後,就可以靈活地應用 Flink 各種 query 文法了。

flink1.11 Flink SQL 新功能解讀

在實際應用中,把 Debezium Kafka Connect Service 注冊到 Kafka 叢集并帶上想同步的資料庫表資訊,Kafka 則會自動建立 topic 并監聽 Binlog,把變更同步到 topic 中。在 Flink 端想要消費帶 CDC 的資料也很簡單,隻需要在 DDL 中聲明 

format = debezium-json

 即可。

flink1.11 Flink SQL 新功能解讀

在 Flink 1.11 上開發者們還做了一些有趣的探索,既然 Flink SQL 運作時能夠完整支援 Changelog,那是否有可能不需要 Debezium 或者 Canal 的服務,直接通過 Flink 擷取 MySQL 的變更呢?答案當然是可以,Debezium 類庫的良好設計使得它的 API 可以被封裝為 Flink 的 Source Function,不需要再起額外的 Service,目前這個項目已經開源,支援了 MySQL 和 Postgres 的 CDC 讀取,後續也會支援更多類型的資料庫,可移步`https://github.com/ververica/...

` 解鎖更多使用姿勢。

下面的 Demo 會介紹如何使用 flink-cdc-connectors 捕獲 mysql 和 postgres 的資料變更,并利用 Flink SQL 做多流 join 後實時同步到 elasticsearch 中。

flink1.11 Flink SQL 新功能解讀

假設你在一個電商公司,訂單和物流是你最核心的資料,你想要實時分析訂單的發貨情況。因為公司已經很大了,是以商品的資訊、訂單的資訊、物流的資訊,都分散在不同的資料庫和表中。我們需要建立一個流式 ETL,去實時消費所有資料庫全量和增量的資料,并将他們關聯在一起,打成一個大寬表。進而友善資料分析師後續的分析。

繼續閱讀