天天看點

Flink SQL 用戶端如何使用

Flink 版本 1.13.0

Flink 的 Table & SQL API 可以處理 SQL 語言編寫的查詢語句,但是這些查詢需要嵌入用 Java 或 Scala 編寫的 Table 程式中。此外,這些程式在送出到叢集前需要用建構工具打包。這或多或少限制了 Java/Scala 程式員對 Flink 的使用。

SQL 用戶端的目的是提供一種簡單的方式來編寫、調試和送出表程式到 Flink 叢集上,不需寫 Java 或 Scala 代碼。SQL 用戶端指令行界面(CLI) 能夠在指令行中檢索和可視化分布式應用的實時結果。

1. 入門

本節介紹如何在指令行裡啟動和運作你的第一個 Flink SQL 程式。SQL 用戶端綁定在正常的 Flink 發行包中,是以可以直接運作。僅需要一個正在運作的 Flink 叢集就可以在上面執行 Table 程式。如果僅想試用 SQL 用戶端,也可以使用以下指令啟動本地叢集:

./bin/start-cluster.sh           

複制

1.1 啟動SQL用戶端CLI

SQL 用戶端腳本也位于 Flink 的 bin 目錄中。将來,使用者有兩種方式來啟動 SQL 用戶端指令行界面:通過嵌入式獨立程序或者通過連接配接到遠端 SQL 用戶端網關。目前僅支援嵌入式模式,現在預設模式就是嵌入式。可以通過以下方式啟動 CLI:

./bin/sql-client.sh           

複制

注意低版本不能使用該指令

或者顯式使用嵌入式模式:

./bin/sql-client.sh embedded           

複制

Flink SQL 用戶端如何使用

1.2 執行SQL查詢

CLI 啟動後,你可以使用 HELP 指令列出所有可用的 SQL 語句。為了驗證你的設定及叢集連接配接是否正确,可以輸入一條 SQL 查詢語句并按 Enter 鍵執行:

SELECT 'Hello World';           

複制

該查詢不需要資料表,并且隻會産生一行結果。CLI 将從叢集中檢索結果并将其可視化。可以按 Q 鍵退出結果視圖。CLI 為維護和可視化結果提供三種模式。下面具體看一下。

注意:Flink 1.24.0 版本使用 execution.result-mode 參數。
1.2.1 表格模式

表格模式(table mode)在記憶體中物化結果,并将結果用規則的分頁表格的形式可視化展示出來。執行如下指令啟用:

SET sql-client.execution.result-mode = table;           

複制

文檔中 SET ‘sql-client.execution.result-mode’ = ‘xxx’ 方式配置不生效

可以使用如下查詢語句檢視不同模式的的運作結果:

SELECT name, COUNT(*) AS cnt FROM (VALUES ('Bob'), ('Alice'), ('Greg'), ('Bob')) AS NameTable(name) GROUP BY name;           

複制

Flink SQL 用戶端如何使用
1.2.2 變更日志模式

變更日志模式(changelog mode)不會物化結果。可視化展示由插入(+)和撤銷(-)組成的持續查詢結果流。

SET sql-client.execution.result-mode = changelog;           

複制

Flink SQL 用戶端如何使用
1.2.3 Tableau模式

Tableau模式(tableau mode)更接近傳統的資料庫,會将執行的結果以制表的形式直接打在螢幕之上。具體顯示的内容取決于作業執行模式(execution.type):

SET sql-client.execution.result-mode = tableau;           

複制

Flink SQL 用戶端如何使用
注意:當你在流式查詢上使用這種模式時,Flink 會将結果持續的列印在目前的控制台上。如果流式查詢的輸入是有限資料集,那麼 Flink 在處理完所有的輸入資料之後,作業會自動停止,同時控制台上的列印也會自動停止。如果你想提前結束這個查詢,那麼可以直接使用 CTRL-C 按鍵,這個會停止作業同時停止在控制台上的列印。

2. 配置

2.1 啟動選項

可以使用如下可選 CLI 指令啟動 SQL 用戶端:

./bin/sql-client.sh --help

Mode "embedded" (default) submits Flink jobs from the local machine.

  Syntax: [embedded] [OPTIONS]
  "embedded" mode options:
         -d,--defaults <environment file>      Deprecated feature: the environment
                                               properties with which every new
                                               session is initialized. Properties
                                               might be overwritten by session
...           

複制

2.2 用戶端配置

Key 預設值 類型 說明
sql-client.execution.max-table-result.rows 1000000 Integer 在表格模式下緩存的行數。如果行數超過指定值,則以 FIFO 樣式重試該行。
sql-client.execution.result-mode TABLE 枚舉值,可以是 TABLE, CHANGELOG, TABLEAU 确定展示查詢結果的模式。可以是 table、tableau、changelog。
sql-client.verbose false Boolean 确定是否将輸出詳細資訊輸出到控制台。如果将選項設定為 true,會列印異常堆棧。否則,隻輸出原因。

2.2 使用SQL檔案初始化會話

SQL 查詢需要配置執行環境。SQL 用戶端支援 -i 啟動選項以在啟動 SQL 用戶端時執行初始化 SQL 檔案以設定環境。所謂的初始化 SQL 檔案,可以使用 DDL 來定義可用的 catalogs、table source、sink、使用者自定義函數以及其他執行和部署所需的屬性。

下面給出了此類檔案的示例:

-- 定義 Catalogs

CREATE CATALOG MyCatalog
  WITH (
    'type' = 'hive'
  );

USE CATALOG MyCatalog;

-- 定義 DataBase

CREATE DATABASE MyDatabase;

USE MyDatabase;

-- 定義 TABLE

CREATE TABLE MyTable (
  MyField1 INT,
  MyField2 STRING
) WITH (
  'connector' = 'filesystem',
  'path' = '/path/to/something',
  'format' = 'csv'
);

-- 定義 VIEW

CREATE VIEW MyCustomView AS SELECT MyField2 FROM MyTable;

-- 定義使用者自定義函數

CREATE FUNCTION foo.bar.AggregateUDF AS myUDF;

-- 修改屬性
SET table.planner = blink; -- planner: either blink (default) or old
SET execution.runtime-mode = streaming; -- execution mode either batch or streaming
SET sql-client.execution.result-mode = table; -- available values: table, changelog and tableau
SET sql-client.execution.max-table-result.rows = 10000; -- optional: maximum number of maintained rows
SET parallelism.default = 1; -- optional: Flinks parallelism (1 by default)
SET pipeline.auto-watermark-interval = 200; --optional: interval for periodic watermarks
SET pipeline.max-parallelism = 10; -- optional: Flink's maximum parallelism
SET table.exec.state.ttl = 1000; -- optional: table program's idle state time
SET restart-strategy = fixed-delay;

SET table.optimizer.join-reorder-enabled = true;
SET table.exec.spill-compression.enabled = true;
SET table.exec.spill-compression.block-size = 128kb;           

複制

上述配置:

  • 連接配接到 Hive Catalog 并使用 MyCatalog 作為目前 Catalog,使用 MyDatabase 作為目錄的目前資料庫
  • 定義一個可以從 CSV 檔案中讀取資料的表 MyTable
  • 定義一個視圖 MyCustomView,它使用 SQL 查詢聲明一個虛拟表
  • 定義了一個可以使用類名執行個體化的使用者定義函數 myUDF
  • 在流模式下使用 blink 計劃器運作語句,并且設定并行度為 1
  • 使用表格模式運作 SQL 進行探索性查詢,

使用 -i 選項初始化 SQL 用戶端會話時,初始化 SQL 檔案中允許使用以下語句:

  • DDL(CREATE/DROP/ALTER)
  • USE CATALOG/DATABASE
  • LOAD/UNLOAD MODULE
  • SET 指令
  • RESET 指令

執行查詢或插入語句時,請進入互動模式或使用-f選項送出SQL語句。如果 SQL 用戶端在初始化時遇到錯誤,SQL 用戶端将退出并顯示錯誤資訊。

3. 使用SQL用戶端送出作業

SQL 用戶端可以允許使用者在互動式指令行中或使用 -f 選項執行 sql 檔案來送出作業。在這兩種模式下,SQL 用戶端都可以支援解析和執行 Flink 支援的所有類型的 SQL 語句。

3.1 互動式指令行

在互動式指令行中,SQL 用戶端讀取使用者輸入并在擷取分号 (;) 時執行語句。如果語句成功執行,SQL 用戶端會列印成功消息。當出現錯誤時,SQL 用戶端也會列印錯誤資訊。預設情況下,錯誤消息僅包含錯誤原因。為了列印完整的異常堆棧以進行調試,需要通過如下指令設定:

SET sql-client.verbose = true;           

複制

将 sql-client.verbose 設定為 true
Flink SQL 用戶端如何使用

3.2 執行SQL檔案

SQL 用戶端支援使用 -f 選項執行 SQL 腳本檔案。SQL 用戶端會一一執行 SQL 腳本檔案中的語句,并為每條執行的語句列印執行資訊。一旦一條語句失敗,SQL 用戶端就會退出,所有剩餘的語句也不會執行。

下面給出了此類檔案的示例:

CREATE TEMPORARY TABLE users (
  user_id BIGINT,
  user_name STRING,
  user_level STRING,
  region STRING,
  PRIMARY KEY (user_id) NOT ENFORCED
) WITH (
  'connector' = 'upsert-kafka',
  'topic' = 'users',
  'properties.bootstrap.servers' = '...',
  'key.format' = 'csv',
  'value.format' = 'avro'
);

-- set sync mode
SET table.dml-sync = true;

-- set the job name
SET pipeline.name = SqlJob;

-- set the queue that the job submit to
SET yarn.application.queue = root;

-- set the job parallism
SET parallism.default = 100;

-- restore from the specific savepoint path
SET execution.savepoint.path = /tmp/flink-savepoints/savepoint-cca7bc-bb1e257f0dab;

INSERT INTO pageviews_enriched
SELECT *
FROM pageviews AS p
LEFT JOIN users FOR SYSTEM_TIME AS OF p.proctime AS u
ON p.user_id = u.user_id;           

複制

這個配置:

  • 定義從 CSV 檔案讀取的時态表 users,
  • 設定屬性,例如作業名稱,
  • 設定儲存點路徑,
  • 送出從指定儲存點路徑加載儲存點的 sql 作業。
與互動模式相比,SQL 用戶端遇到錯誤會停止執行并退出。

3.3 執行一組SQL語句

SQL 用戶端将每個 INSERT INTO 語句作為單個 Flink 作業執行。但是,這有時性能不是最佳,因為 Pipeline 的某些部分可以重複使用。SQL 用戶端支援 STATEMENT SET 文法來執行一組 SQL 語句。這與 Table API 中 StatementSet 功能類似。STATEMENT SET 文法包含一個或多個 INSERT INTO 語句。 STATEMENT SET 塊中的所有語句都要經過整體優化後作為一個 Flink 作業執行。

具體文法如下:

BEGIN STATEMENT SET;
  -- one or more INSERT INTO statements
  { INSERT INTO|OVERWRITE <select_statement>; }+
END;           

複制

包含在 STATEMENT SET 中的語句必須用分号 (;) 分隔。

具體看一下示例:

CREATE TABLE pageviews (
  user_id BIGINT,
  page_id BIGINT,
  viewtime TIMESTAMP,
  proctime AS PROCTIME()
) WITH (
  'connector' = 'kafka',
  'topic' = 'pageviews',
  'properties.bootstrap.servers' = '...',
  'format' = 'avro'
);

CREATE TABLE pageview (
  page_id BIGINT,
  cnt BIGINT
) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:mysql://localhost:3306/mydatabase',
  'table-name' = 'pageview'
);

CREATE TABLE uniqueview (
  page_id BIGINT,
  cnt BIGINT
) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:mysql://localhost:3306/mydatabase',
  'table-name' = 'uniqueview'
);

BEGIN STATEMENT SET;

INSERT INTO pageviews
SELECT page_id, count(1)
FROM pageviews
GROUP BY page_id;

INSERT INTO uniqueview
SELECT page_id, count(distinct user_id)
FROM pageviews
GROUP BY page_id;

END;           

複制

3.4 同步/異步執行DML語句

預設情況下,SQL 用戶端異步執行 DML 語句。這意味着,SQL 用戶端将 DML 語句的作業送出給 Flink 叢集即可,不用等待作業完成。是以 SQL 用戶端可以同時送出多個作業。這對于通常長時間運作的流作業很有用。SQL 用戶端確定語句成功送出到叢集。送出語句後,CLI 将顯示有關 Flink 作業的資訊:

Flink SQL> INSERT INTO MyTableSink SELECT * FROM MyTableSource;
[INFO] Table update statement has been successfully submitted to the cluster:
Cluster ID: StandaloneClusterId
Job ID: 6f922fe5cba87406ff23ae4a7bb79044           

複制

SQL 用戶端再送出作業後不會跟蹤作業的狀态。CLI 程序可以在送出後關閉而不影響查詢。Flink 的重新開機政策負責容錯。可以使用 Flink 的 Web 界面、指令行或 REST API 取消查詢。但是,對于批處理使用者,更常見的是下一個 DML 語句需要等待前一個 DML 語句完成才能執行。為了同步執行 DML 語句,我們可以在 SQL 用戶端中設定 table.dml-sync 選項為 true:

Flink SQL> SET table.dml-sync = true;
[INFO] Session property has been set.

Flink SQL> INSERT INTO MyTableSink SELECT * FROM MyTableSource;
[INFO] Submitting SQL update statement to the cluster...
[INFO] Execute statement in sync mode. Please wait for the execution finish...
[INFO] Complete execution of the SQL update statement.           

複制

如果要終止作業,隻需鍵入 CTRL-C 即可取消執行。

3.5 從儲存點啟動SQL作業

Flink 支援從指定的儲存點啟動作業。在 SQL 用戶端中,允許使用 SET 指令指定儲存點的路徑:

Flink SQL> SET execution.savepoint.path = /tmp/flink-savepoints/savepoint-cca7bc-bb1e257f0dab;
[INFO] Session property has been set.

-- all the following DML statements will be restroed from the specified savepoint path
Flink SQL> INSERT INTO ...           

複制

當指定了儲存點的路徑時,Flink 會在執行 DML 語句時會嘗試從儲存點恢複狀态。因為指定的儲存點路徑會影響後面所有的 DML 語句,你可以使用 RESET 指令來重置這個配置選項,即禁用從儲存點恢複:

Flink SQL> RESET execution.savepoint.path;
[INFO] Session property has been reset.           

複制

3.6 自定義作業名稱

SQL 用戶端支援通過 SET 指令為查詢和 DML 語句定義作業名稱:

Flink SQL> SET pipeline.name = kafka-to-hive;
[INFO] Session property has been set.

-- all the following DML statements will use the specified job name.
Flink SQL> INSERT INTO ...           

複制

因為指定的作業名會影響後面所有的查詢和 DML 語句,你也可以使用 RESET 指令來重置這個配置,即使用預設的作業名:

Flink SQL> RESET pipeline.name;
[INFO] Session property has been reset.           

複制

如果未指定選項 pipeline.name,SQL 用戶端将為送出的作業生成預設名稱,例如 insert-into_ 用于 INSERT INTO 語句。

4. 相容性

為了與之前版本相容,SQL 用戶端仍然支援使用 YAML 檔案進行初始化,并允許在 YAML 檔案中設定 key。當在 YAML 檔案中定義 key 時,SQL 用戶端将列印警告消息以通知:

Flink SQL> SET execution.type = batch;
[WARNING] The specified key 'execution.type' is deprecated. Please use 'execution.runtime-mode' instead.
[INFO] Session property has been set.

-- all the following DML statements will be restored from the specified savepoint path
Flink SQL> INSERT INTO ...           

複制

當使用 SET 指令列印屬性時,SQL 用戶端會列印所有的屬性。為了區分不推薦使用的 key,SQL 用戶端使用 [DEPRECATED] 作為辨別符:

Flink SQL>SET;
execution.runtime-mode=batch
sql-client.execution.result-mode=table
table.planner=blink
[DEPRECATED] execution.planner=blink
[DEPRECATED] execution.result-mode=table
[DEPRECATED] execution.type=batch           

複制

Flink SQL 用戶端如何使用