天天看點

Flink 1.11:更好用的流批一體 SQL 引擎

許多的資料科學家,分析師和 BI 使用者依賴互動式 SQL 查詢分析資料。Flink SQL 是 Flink 的核心子產品之一。作為一個分布式的 SQL 查詢引擎。Flink SQL 提供了各種異構資料源的聯合查詢。開發者可以很友善地在一個程式中通過 SQL 編寫複雜的分析查詢。通過 CBO 優化器、列式存儲、和代碼生成技術,Flink SQL 擁有非常高的查詢效率。同時借助于 Flink runtime 良好的容錯和擴充性,Flink SQL 可以輕松處理海量資料。

在保證優秀性能的同時,易用性是 1.11 版本 Flink SQL 的重頭戲。易用性的提升主要展現在以下幾個方面:

  • 更友善的追加或修改表定義
  • 靈活的聲明動态的查詢參數
  • 加強和統一了原有 TableEnv 上的 SQL 接口
  • 簡化了 connector 的屬性定義
  • 對 Hive 的 DDL 做了原生支援
  • 加強了對 python UDF 的支援

下面逐一為大家介紹 ~

Create Table Like

在生産中,使用者常常有調整現有表定義的需求。例如使用者想在一些外部的表定義(例如 Hive metastore)基礎上追加 Flink 特有的一些定義比如 watermark。在 ETL 場景中,将多張表的資料合并到一張表,目标表的 schema 定義其實是上遊表的合集,需要一種友善合并表定義的方式。

從 1.11 版本開始,Flink 提供了 LIKE 文法,使用者可以很友善的在已有的表定義上追加新的定義。

例如我們可以使用下面的文法給已有表 base_table 追加 watermark 定義:

CREATE [TEMPORARY] TABLE base_table (
    id BIGINT,
    name STRING,
    tstmp TIMESTAMP,
    PRIMARY KEY(id)
) WITH (
    'connector': 'kafka'
)
 
CREATE [TEMPORARY] TABLE derived_table (
    WATERMARK FOR tstmp AS tsmp - INTERVAL '5' SECOND
)
LIKE base_table;           

這裡 derived_table 表定義等價于如下定義:

CREATE [TEMPORARY] TABLE derived_table (
    id BIGINT,
    name STRING,
    tstmp TIMESTAMP,
    PRIMARY KEY(id),
    WATERMARK FOR tstmp AS tsmp - INTERVAL '5' SECOND
) WITH (
    ‘connector’: ‘kafka’
)           

對比之下,新的文法省去了重複的 schema 定義,使用者隻需要定義追加屬性,非常友善簡潔。

多屬性政策

有的小夥伴會問,原表和新表的屬性隻是新增或追加嗎?如果我想覆寫或者排除某些屬性該如何操作?這是一個好問題,Flink LIKE 文法提供了非常靈活的表屬性操作政策。

LIKE 文法支援使用不同的 keyword 對表屬性分類:

  • ALL:完整的表定義
  • CONSTRAINTS: primary keys, unique key 等限制
  • GENERATED: 主要指計算列和 watermark
  • OPTIONS: WITH (...) 語句内定義的 table options
  • PARTITIONS: 表分區資訊

在不同的屬性分類上可以追加不同的屬性行為:

  • INCLUDING:包含(預設行為)
  • EXCLUDING:排除
  • OVERWRITING:覆寫

下面這張表格說明了不同的分類屬性允許的行為:

Flink 1.11:更好用的流批一體 SQL 引擎

例如下面的語句:

CREATE [TEMPORARY] TABLE base_table (
    id BIGINT,
    name STRING,
    tstmp TIMESTAMP,
    PRIMARY KEY(id)
) WITH (
    'connector': 'kafka',
    'scan.startup.specific-offsets': 'partition:0,offset:42;partition:1,offset:300',
    'format': 'json'
)
 
CREATE [TEMPORARY] TABLE derived_table (
    WATERMARK FOR tstmp AS tsmp - INTERVAL '5' SECOND
)
WITH (
    'connector.starting-offset': '0'
)
LIKE base_table (OVERWRITING OPTIONS, EXCLUDING CONSTRAINTS);           

等價的表屬性定義為:

CREATE [TEMPORARY] TABLE derived_table (
    id BIGINT,
    name STRING,
    tstmp TIMESTAMP,
    WATERMARK FOR tstmp AS tsmp - INTERVAL '5' SECOND
) WITH (
    'connector': 'kafka',
    'scan.startup.specific-offsets': 'partition:0,offset:42;partition:1,offset:300',
    'format': 'json'
)           

細節參見:

https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/create.html#create-table

Dynamic Table Options

在生産中,調整參數是一個常見需求,很多的時候是臨時修改(比如通過終端查詢和展示),比如下面這張 Kafka 表:

create table kafka_table (
  id bigint,
  age int,
  name STRING
) WITH (
  'connector' = 'kafka',
  'topic' = 'employees',
  'scan.startup.mode' = 'timestamp',
  'scan.startup.timestamp-millis' = '123456',
  'format' = 'csv',
  'csv.ignore-parse-errors' = 'false'
)           

在之前的版本,如果使用者有如下需求:

  • 使用者需要指定特性的消費時間戳,即修改 scan.startup.timestamp-millis 屬性
  • 使用者想忽略掉解析錯誤,需要将 format.ignore-parse-errors 改為 true

隻能使用 ALTER TABLE 這樣的語句修改表的定義,從 1.11 開始,使用者可以通過動态參數的形式靈活地設定表的屬性參數,覆寫或者追加原表的 WITH (...) 語句内定義的 table options。

基本文法為:

table_name /*+ OPTIONS('k1'='v1', 'aa.bb.cc'='v2') */           

OPTIONS 内的鍵值對會覆寫原表的 table options,使用者可以在各種 SQL 語境中使用這樣的文法,例如:

CREATE TABLE kafka_table1 (id BIGINT, name STRING, age INT) WITH (...);
CREATE TABLE kafka_table2 (id BIGINT, name STRING, age INT) WITH (...);

-- override table options in query source
select id, name from kafka_table1 /*+ OPTIONS('scan.startup.mode'='earliest-offset') */;

-- override table options in join
select * from
    kafka_table1 /*+ OPTIONS('scan.startup.mode'='earliest-offset') */ t1
    join
    kafka_table2 /*+ OPTIONS('scan.startup.mode'='earliest-offset') */ t2
    on t1.id = t2.id;

-- override table options for INSERT target table
insert into kafka_table1 /*+ OPTIONS('sink.partitioner'='round-robin') */ select * from kafka_table2;           

動态參數的使用沒有語境限制,隻要是引用表的地方都可以追加定義。在指定的表後面追加的動态參數會自動追加到原表定義中,是不是很友善呢 :)

由于可能對查詢結果有影響,動态參數功能預設是關閉的, 使用下面的方式開啟該功能:

// instantiate table environment
TableEnvironment tEnv = ...
// access flink configuration
Configuration configuration = tEnv.getConfig().getConfiguration();
// set low-level key-value options
configuration.setString("table.dynamic-table-options.enabled", "true");           
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/hints.html

SQL API 改進

随着 Flink SQL 支援的語句越來越豐富,老的 API 容易引起一些困惑:

  • 原先的 sqlUpdate() 方法傳遞 DDL 語句會立即執行,而 INSERT INTO 語句在調用 execute 方法時才會執行
  • Table 程式的執行入口不夠清晰,像 TableEnvironment.execute() 和 StreamExecutionEnvironment.execute() 都可以觸發 table 程式執行
  • execute 方法沒有傳回值。像 SHOW TABLES 這樣的語句沒有很好地方式傳回結果。另外,sqlUpdate 方法加入了越來越多的語句導緻接口定義不清晰,sqlUpdate 可以執行 SHOW TABLES 就是一個反例
  • 在 Blink planner 一直提供多 sink 優化執行的能力,但是在 API 層沒有展現出來

1.11 重新梳理了 TableEnv 上的 sql 相關接口,提供了更清晰的執行語義,同時執行任意 sql 語句現在都有傳回值,使用者可以通過新的 API 靈活的組織多行 sql 語句一起執行。

更清晰的執行語義

新的接口 TableEnvironment#executeSql 統一傳回抽象 TableResult,使用者可以疊代 TableResult 拿到執行結果。根據執行語句的不同,傳回結果的資料結構也有變化,比如 SELECT 語句會傳回查詢結果,而 INSERT 語句會異步送出作業到叢集。

組織多條語句一起執行

新的接口 TableEnvironment#createStatementSet 允許使用者添加多條 INSERT 語句并一起執行,在多 sink 場景,Blink planner 會針對性地對執行計劃做優化。

新舊 API 對比

一張表格感受新老 API 的變化:

Flink 1.11:更好用的流批一體 SQL 引擎

詳情參見:

https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=134745878

Hive 文法相容加強

從 1.11 開始,Flink SQL 将 Hive parser 子產品獨立出來,用以相容 Hive 的文法,目前 DDL 層面,DB、Table、View、Function 相關的文法均已支援。搭配 HiveCatalog,Hive 的同學可以直接使用 Hive 的文法來進行相關的操作。

在使用 hive 語句之前需要設定正确的 Dialect:

EnvironmentSettings settings = EnvironmentSettings.newInstance()...build();
TableEnvironment tableEnv = TableEnvironment.create(settings);
// to use hive dialect
tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
// use the hive catalog
tableEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog);
tableEnv.useCatalog(hiveCatalog.getName());           

之後我們便可以使用 Hive 的文法來執行一些 DDL,例如最常見的建表操作:

create external table tbl1 (
  d decimal(10,0),
  ts timestamp)
partitioned by (p string)
location '%s'
tblproperties('k1'='v1');
  
create table tbl2 (s struct<ts:timestamp,bin:binary>) stored as orc;

create table tbl3 (
  m map<timestamp,binary>
)
partitioned by (p1 bigint, p2 tinyint)
row format serde 'org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe';

create table tbl4 (
  x int,
  y smallint)
row format delimited fields terminated by '|' lines terminated by '\n';           

對于 DQL 的 Hive 文法相容已經在規劃中,1.12 版本會相容更多 query 文法 ~

https://ci.apache.org/projects/flink/flink-docs-master/dev/table/hive/hive_dialect.html

更簡潔的 connector 屬性

1.11 重新規範了 connector 的屬性定義,新的屬性 key 更加直覺簡潔,和原有的屬性 key 相比主要做了如下改動:

  • 使用 connector 作為 connector 的類型 key,connector 版本資訊直接放到 value 中,比如 0.11 的 kafka 為 kafka-0.11
  • 去掉了其餘屬性中多餘的 connector 字首
  • 使用 scan 和 sink 字首标記 source 和 sink 專有屬性
  • format.type 精簡為 format ,同時 format 自身屬性使用 format 的值作為字首,比如 csv format 的自身屬性使用 csv 統一作字首

例如,1.11 Kafka 表的定義如下:

CREATE TABLE kafkaTable (
 user_id BIGINT,
 item_id BIGINT,
 category_id BIGINT,
 behavior STRING,
 ts TIMESTAMP(3)
) WITH (
 'connector' = 'kafka',
 'topic' = 'user_behavior',
 'properties.bootstrap.servers' = 'localhost:9092',
 'properties.group.id' = 'testGroup',
 'format' = 'csv',
 'scan.startup.mode' = 'earliest-offset'
)           
https://cwiki.apache.org/confluence/display/FLINK/FLIP-122%3A+New+Connector+Property+Keys+for+New+Factory

JDBC catalog

在之前的版本中,使用者隻能通過顯示建表的方式建立關系型資料庫的鏡像表。使用者需要手動追蹤 Flink SQL 的表 schema 和資料庫的 schema 變更。在 1.11,Flink SQL 提供了一個 JDBC catalog 接口對接各種外部的資料庫系統,例如 Postgres、MySQL、MariaDB、AWS Aurora、etc。

目前 Flink 内置了 Postgres 的 catalog 實作,使用下面的代碼配置 JDBC catalog:

CREATE CATALOG mypg WITH(
    'type' = 'jdbc',
    'default-database' = '...',
    'username' = '...',
    'password' = '...',
    'base-url' = '...'
);

USE CATALOG mypg;           

使用者也可以實作 JDBCCatalog 接口定制其他資料庫的 catalog ~

https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/jdbc.html#postgres-database-as-a-catalog

Python UDF 增強

1.11 版本的 py-flink 在 python UDF 方面提供了很多增強,包括 DDL 的定義方式、支援了标量的向量化 python UDF,支援全套的 python UDF metrics 定義,以及在 SQL-CLI 中定義 python UDF。

DDL 定義 python UDF

1.10.0 版本引入了對 python UDF 的支援。但是僅僅支援 python table api 的方式。1.11 提供了 SQL DDL 的方式定義 python UDF, 使用者可以在 Java/Scala table API 以及 SQL-CLI 場景下使用。

例如,現在使用者可以使用如下方式定義 Java table API 程式使用 python UDF:

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tEnv = BatchTableEnvironment.create(env);

tEnv.getConfig().getConfiguration().setString("python.files", "/home/my/test1.py");
tEnv.getConfig().getConfiguration().setString("python.client.executable", "python3");
tEnv.sqlUpdate("create temporary system function func1 as 'test1.func1' language python");
Table table = tEnv.fromDataSet(env.fromElements("1", "2", "3")).as("str").select("func1(str)");

tEnv.toDataSet(table, String.class).collect();           

向量化支援

向量化 Python UDF 相較于普通函數大大提升了性能。使用者可以使用流行的 python 庫例如 Pandas、Numpy 來實作向量化的 python UDF。使用者隻需在裝飾器 udf 中添加額外的參數 udf_type="pandas" 即可。

例如,下面的樣例展示了如何定義向量化的 Python 标量函數以及在 python table api 中的應用:

@udf(input_types=[DataTypes.BIGINT(), DataTypes.BIGINT()], result_type=DataTypes.BIGINT(), udf_type="pandas")
def add(i, j):
  return i + j

table_env = BatchTableEnvironment.create(env)

# register the vectorized Python scalar function
table_env.register_function("add", add)

# use the vectorized Python scalar function in Python Table API
my_table.select("add(bigint, bigint)")

# use the vectorized Python scalar function in SQL API
table_env.sql_query("SELECT add(bigint, bigint) FROM MyTable")           
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/python/vectorized_python_udfs.html

另外,1.11 對 python UDF 的 metrics 做了全面支援,現在使用者可以在 UDF 中友善地定義各種類型的 metrics,由于篇幅關系,這裡不作較長的描述,見 python UDF metrics。

https://ci.apache.org/projects/flink/flink-docs-master/dev/table/python/metrics.html

展望後續

在後續版本,易用性仍然是 Flink SQL 的核心主題,比如 schema 的易用性增強,Descriptor API 簡化以及更豐富的流 DDL 将會是努力的方向,讓我們拭目以待 ~

了解更多 Flink 1.11 重大變更與新增功能特性可點選「

閱讀原文

」~