天天看點

Flink1.9 Create table 語句轉換 為 Operation流程分析

本文主要描述 Flink1.9 新提供的 create table sql 通過 Calcite 解析,校驗 并注冊到catalog的過程。

樣例SQL:

CREATE TABLE user_log (
    user_id VARCHAR,
    item_id VARCHAR,
    category_id VARCHAR,
    behavior VARCHAR,
    ts TIMESTAMP
) WITH (
    'connector.type' = 'kafka', -- 使用 kafka connector
    'connector.version' = 'universal',  -- kafka 版本,universal 支援 0.11 以上的版本
    'connector.topic' = 'xc_user_behavior',  -- kafka topic
    'connector.startup-mode' = 'earliest-offset', -- 從起始 offset 開始讀取
    'connector.properties.0.key' = 'zookeeper.connect',  -- 連接配接資訊
    'connector.properties.0.value' = '172.16.8.107:2181', 
    'connector.properties.1.key' = 'bootstrap.servers',
    'connector.properties.1.value' = '172.16.8.107:9092', 
    'update-mode' = 'append',
    'format.type' = 'json',  -- 資料源格式為 json
    'format.derive-schema' = 'true' -- 從 DDL schema 确定 json 解析規則
)           

帶着兩個疑問:1:如何自定義create table 文法(之後會另外寫一篇), 2:create table 的表資訊是如何注冊到catalog上的,供之後的select 語句查詢;

一:StreamTableEnvironment.create() 關注:Planner 的初始化

  1. 執行個體化一個CatalogManager(中繼資料管理器)
    --初始化一個GenericInMemroyCatalog作為預設的Catalog  
               
  2. 執行個體化StreamPlanner(本文以流作為背景,是以執行個體化目标為StreamPlanner)
  • 初始化StreamPlanner繼承的超類PlannerContext
  • 可以看到提供了統一生成Calcite FrameworkConfig的函數createFrameworkConfig,關注其中對defaultSchema的設定,(之前執行個體的CatalogManager的包裝類CatalogManagerCalciteSchema作為預設的Schema),另外ParserFactory參數也需要關注下,FlinkSqlParserImpl.FACTORY 這個是flink 基于Calcite 擴充的支援DDL的解析類;
  • 初始化查詢優化器:
    RelOptPlanner planner = new VolcanoPlanner(frameworkConfig.getCostFactory(), frameworkConfig.getContext());//基于cost的優化器               
    到這一步Planner環境初始化相關的操作已經完成。

二:解析Create table Sql語句:StreamTableEnvironment.sqlUpdate(createSqlStr)

  1. StreamTableEnvironment 調用 之前初始化環境過程中建立的 StreamPlanner 解析create table 語句 生成 CreateTableOperation;
    其中生成Operation 的部分主體是:           
  • 生成Planner
  • PlannerContext基于統一提供的createFrameworkConfig() 生成FlinkPlannerImpl。
  • FlinkPlannerImpl 解析 DDL語句 生成SqlNode(如何根據Calcite自定義Table create 這個會後會另外單獨寫一篇)。
  • 調用validate() 對create table 資訊進行校驗, 包括table 的column, primaryKeys,uniqueKeys,partitionKey設定是否正确,這個校驗不包括對create table 文法的校驗,create 文法的校驗在parse階段已經處理過;
  • org.apache.flink.table.planner.operations.SqlToOperationConverter#convert:将SqlNode轉換為org.apache.flink.table.operations.ddl.CreateTableOperation,當然convert()方法包含對Query, Create, Drop, Insert 的處理,本文隻關注convertCreateTable子產品;
  1. 調用TableEnvironmentImpl#registerCatalogTableInternal 把生成的CreateTableOperation注冊到CatalogManager 上, 注冊的中繼資料資訊之後可以被sql 查詢引用;
調用GenericInMemoryCatalog#createTable 将表的資訊添加到catalog 中繼資料上;