數棧是雲原生—站式資料中台PaaS,我們在github和gitee上有一個有趣的開源項目:FlinkX,FlinkX是一個基于Flink的批流統一的資料同步工具,既可以采集靜态的資料,也可以采集實時變化的資料,是全域、異構、批流一體的資料同步引擎。大家喜歡的話請給我們點個star!star!star!
github開源項目:https://github.com/DTStack/flinkx
gitee開源項目:https://gitee.com/dtstack_dev_0/flinkx
首先,本文所述均基于flink 1.5.4。
一、我們為什麼擴充Flink-SQL?
由于Flink 本身SQL文法并不提供在對接輸入源和輸出目的的SQL文法。資料開發在使用的過程中需要根據其提供的Api接口編寫Source和 Sink, 異常繁瑣,不僅需要了解FLink 各類Operator的API,還需要對各個元件的相關調用方式有了解(比如kafka,redis,mongo,hbase等),并且在需要關聯到外部資料源的時候沒有提供SQL相關的實作方式,是以資料開發直接使用Flink編寫SQL作為實時的資料分析時需要較大的額外工作量。
我們的目的是在使用Flink-SQL的時候隻需要關心做什麼,而不需要關心怎麼做。不需要過多的關心程式的實作,專注于業務邏輯。
接下來,我們一起來看下Flink-SQL的擴充實作吧!
二、擴充了哪些flink相關sql
1、建立源表語句
2、建立輸出表語句
3、建立自定義函數
4、維表關聯
三、各個子產品是如何翻譯到flink的實作
1、如何将建立源表的sql語句轉換為flink的operator
Flink中表的都會映射到Table這個類。然後調用注冊方法将Table注冊到environment。
StreamTableEnvironment.registerTable(tableName, table);
目前我們隻支援kafka資料源。Flink本身有讀取kafka 的實作類, FlinkKafkaConsumer09,是以隻需要根據指定參數執行個體化出該對象。并調用注冊方法注冊即可。
另外需要注意在flink sql經常會需要用到rowtime, proctime, 是以我們在系統資料庫結構的時候額外添加rowtime,proctime。
當需要用到rowtime的使用需要額外指定DataStream.watermarks(assignTimestampsAndWatermarks),自定義watermark主要做兩個事情:1:如何從Row中擷取時間字段。 2:設定最大延遲時間。
2、 如何将建立的輸出表sql語句轉換為flink的operator
Flink輸出Operator的基類是OutputFormat, 我們這裡繼承的是RichOutputFormat, 該抽象類繼承OutputFormat,額外實作了擷取運作環境的方法getRuntimeContext(), 友善于我們之後自定義metric等操作。
我們以輸出到mysql插件mysql-sink為例,分兩部分:
- 将create table 解析出表名稱,字段資訊,mysql連接配接資訊。
該部分使用正規表達式的方式将create table 語句轉換為内部的一個實作類。該類存儲了表名稱,字段資訊,插件類型,插件連接配接資訊。
- 繼承RichOutputFormat将資料寫到對應的外部資料源。
主要是實作writeRecord方法,在mysql插件中其實就是調用jdbc 實作插入或者更新方法。
3、如何将自定義函數語句轉換為flink的operator;
Flink對udf提供兩種類型的實作方式:
1)繼承ScalarFunction
2)繼承TableFunction
需要做的将使用者提供的jar添加到URLClassLoader, 并加載指定的class (實作上述接口的類路徑),然後調用TableEnvironment.registerFunction(funcName, udfFunc);即完成了udf的注冊。之後即可使用改定義的udf;
4、維表功能是如何實作的?
流計算中一個常見的需求就是為資料流補齊字段。因為資料采集端采集到的資料往往比較有限,在做資料分析之前,就要先将所需的次元資訊補全,但是目前flink并未提供join外部資料源的SQL功能。
實作該功能需要注意的幾個問題:
1)維表的資料是不斷變化的
在實作的時候需要支援定時更新記憶體中的緩存的外部資料源,比如使用LRU等政策。
2)IO吞吐問題
如果每接收到一條資料就串行到外部資料源去擷取對應的關聯記錄的話,網絡延遲将會是系統最大的瓶頸。這裡我們選擇阿裡貢獻給flink社群的算子RichAsyncFunction。該算子使用異步的方式從外部資料源擷取資料,大大減少了花費在網絡請求上的時間。
3)如何将sql 中包含的維表解析到flink operator