本文主要描述 Flink1.9 新提供的 create table sql 通過 Calcite 解析,校驗 并注冊到catalog的過程。
樣例SQL:
CREATE TABLE user_log (
user_id VARCHAR,
item_id VARCHAR,
category_id VARCHAR,
behavior VARCHAR,
ts TIMESTAMP
) WITH (
'connector.type' = 'kafka', -- 使用 kafka connector
'connector.version' = 'universal', -- kafka 版本,universal 支援 0.11 以上的版本
'connector.topic' = 'xc_user_behavior', -- kafka topic
'connector.startup-mode' = 'earliest-offset', -- 從起始 offset 開始讀取
'connector.properties.0.key' = 'zookeeper.connect', -- 連接配接資訊
'connector.properties.0.value' = '172.16.8.107:2181',
'connector.properties.1.key' = 'bootstrap.servers',
'connector.properties.1.value' = '172.16.8.107:9092',
'update-mode' = 'append',
'format.type' = 'json', -- 資料源格式為 json
'format.derive-schema' = 'true' -- 從 DDL schema 确定 json 解析規則
)
帶着兩個疑問:1:如何自定義create table 文法(之後會另外寫一篇), 2:create table 的表資訊是如何注冊到catalog上的,供之後的select 語句查詢;
一:StreamTableEnvironment.create() 關注:Planner 的初始化
- 執行個體化一個CatalogManager(中繼資料管理器)
--初始化一個GenericInMemroyCatalog作為預設的Catalog
- 執行個體化StreamPlanner(本文以流作為背景,是以執行個體化目标為StreamPlanner)
- 初始化StreamPlanner繼承的超類PlannerContext
- 可以看到提供了統一生成Calcite FrameworkConfig的函數createFrameworkConfig,關注其中對defaultSchema的設定,(之前執行個體的CatalogManager的包裝類CatalogManagerCalciteSchema作為預設的Schema),另外ParserFactory參數也需要關注下,FlinkSqlParserImpl.FACTORY 這個是flink 基于Calcite 擴充的支援DDL的解析類;
- 初始化查詢優化器:
到這一步Planner環境初始化相關的操作已經完成。RelOptPlanner planner = new VolcanoPlanner(frameworkConfig.getCostFactory(), frameworkConfig.getContext());//基于cost的優化器
二:解析Create table Sql語句:StreamTableEnvironment.sqlUpdate(createSqlStr)
- StreamTableEnvironment 調用 之前初始化環境過程中建立的 StreamPlanner 解析create table 語句 生成 CreateTableOperation;
其中生成Operation 的部分主體是:
- 生成Planner
- PlannerContext基于統一提供的createFrameworkConfig() 生成FlinkPlannerImpl。
- FlinkPlannerImpl 解析 DDL語句 生成SqlNode(如何根據Calcite自定義Table create 這個會後會另外單獨寫一篇)。
- 調用validate() 對create table 資訊進行校驗, 包括table 的column, primaryKeys,uniqueKeys,partitionKey設定是否正确,這個校驗不包括對create table 文法的校驗,create 文法的校驗在parse階段已經處理過;
- org.apache.flink.table.planner.operations.SqlToOperationConverter#convert:将SqlNode轉換為org.apache.flink.table.operations.ddl.CreateTableOperation,當然convert()方法包含對Query, Create, Drop, Insert 的處理,本文隻關注convertCreateTable子產品;
- 調用TableEnvironmentImpl#registerCatalogTableInternal 把生成的CreateTableOperation注冊到CatalogManager 上, 注冊的中繼資料資訊之後可以被sql 查詢引用;
調用GenericInMemoryCatalog#createTable 将表的資訊添加到catalog 中繼資料上;