天天看點

Streamworks,基于擴充FlinkSQL實作流計算的源表導入、維表關聯與結果表導出為什麼擴充Flink-SQL?擴充了哪些Flink相關SQLDemo:Kafka流資料關聯Oracle維表,寫入PostgreSQL 。

Streamworks,袋鼠雲基于SQL的流計算開發平台,其通過擴充FlinkSQL,實作FlinkSQL與界面化配置映射結合的方式,完成Kafka源資料的讀入,并支援流資料與Mysql/Oracle/MongDB等資料源進行維表關聯,将最終結果資料導出至Hbase/ES/Greenplum/Oracle/OceanBase等目标資料庫,進行一站式的流資料開發。

Streamworks,基于擴充FlinkSQL實作流計算的源表導入、維表關聯與結果表導出為什麼擴充Flink-SQL?擴充了哪些Flink相關SQLDemo:Kafka流資料關聯Oracle維表,寫入PostgreSQL 。

為什麼擴充Flink-SQL?

Flink 本身的SQL文法并不提供對接輸入源和輸出目的的SQL文法,資料開發在使用過程中需要根據其提供的API接口編寫Source和 Sink,不僅需要了解FLink 各類Operator的API,還需要對各個元件的相關調用方式有了解(比如Kafka,Redis,Mongo、Hbase等),異常繁瑣。并且在需要關聯到外部資料源的時候Flink也沒有提供SQL相關的實作方式,若資料開發直接基于原生的Flink SQL進行實時的資料分析,需要較大的額外工作量。

袋鼠雲的SteamWorks則聚焦于資料開發人員使用Flink SQL時專注于業務邏輯,隻需要關心做什麼,而不需要關心怎麼做。研發團隊對FlinkSQL進行了擴充,使用者隻需通過可視化配置,完成源表到導入、維表的關聯、結果表的導出。

擴充了哪些Flink相關SQL

1.建立源表語句

CREATE TABLE tableName(

colName colType,
...
function(colNameX) AS aliasName,
WATERMARK FOR colName AS withOffset( colName , delayTime )           

)WITH(

type ='kafka09',
bootstrapServers ='ip:port,ip:port...',
zookeeperQuorum ='ip:port,ip:port/zkparent',
offsetReset ='latest',
topic ='topicName',
parallelism ='parllNum'           

);

2.建立輸出表語句

    colName colType,

    ...

    colNameX colType

    type ='mysql',

    url ='jdbcUrl',

    userName ='userName',

    password ='pwd',

    tableName ='tableName',

    parallelism ='parllNum'

  );

3.建立自定義函數;

Create (scala|table) FUNCTION name WITH com.xx.xx

4.維表關聯語句;

    colName cloType,

    PRIMARY KEY(keyInfo),

    PERIOD FOR SYSTEM_TIME

    type='mysql',

    url='jdbcUrl',

    userName='dbUserName',

    password='dbPwd',

    tableName='tableName',

    cache ='LRU',

    cacheSize ='10000',

    cacheTTLMs ='60000',

     parallelism ='1',

     partitionedJoin='false'

 );

 # 各個子產品是如何翻譯到Flink,進行資料處理

1.如何将建立源表的SQL語句轉換為Flink的operator

Flink中表的都會映射到Table這個類。然後調用注冊方法将Table注冊到environment,StreamTableEnvironment.registerTable(tableName, table)。

目前我們隻支援kafka資料源。Flink本身有讀取kafka 的實作類(FlinkKafkaConsumer09),是以隻需要根據指定參數執行個體化出該對象,并調用注冊方法注冊即可。

另外需要注意在Flink SQL經常會需要用到Rowtime、Proctime, 是以我們在系統資料庫結構的時候額外添加Rowtime、Proctime。當需要用到rowtime的時候需要額外指定DataStream.watermarks(assignTimestampsAndWatermarks),自定義watermark主要做兩個事情1:如何從Row中擷取時間字段。 2:設定最大延遲時間。

2.如何将建立的輸出表sql語句轉換為Flink的operator

Flink輸出Operator的基類是OutputFormat, 我們這裡繼承的是RichOutputFormat, 該抽象類繼承OutputFormat,額外實作了擷取運作環境的方法getRuntimeContext(), 友善于我們之後自定義metric等操作。

我們以輸出到Mysql插件Mysql-Sink為例。

分兩部分

(1)将create table 解析出表名稱,字段資訊,mysql連接配接資訊。

    該部分使用正規表達式的方式将create table 語句轉換為内部的一個實作類。該類存儲了表名稱,字段資訊,插件類型,插件連接配接資訊。

(2)繼承RichOutputFormat将資料寫到對應的外部資料源。

    主要是實作writeRecord方法,在mysql插件中其實就是調用jdbc 實作插入或者更新方法。

3.如何将自定義函數語句轉換為Flink的operator;

   Flink對udf提供兩種類型的實作方式:

繼承ScalarFunction

繼承TableFunction

需要做的将使用者提供的jar添加到URLClassLoader, 并加載指定的class (實作上述接口的類路徑),然後調用TableEnvironment.registerFunction(funcName, udfFunc);即完成UDF的注冊。之後即可使用改定義的UDF;

4.維表功能是如何實作的?

流計算中一個常見的需求就是為資料流補齊字段。因為資料采集端采集到的資料往往比較有限,在做資料分析之前,就要先将所需的次元資訊補全,但是目前Flink并未提供join外部資料源的SQL功能。

實作該功能需要注意的幾個問題

(1)維表的資料是不斷變化的

     在實作的時候需要支援定時更新記憶體中的緩存的外部資料源,比如使用LRU等政策。

(2)IO吞吐問題

     如果每接收到一條資料就串行到外部資料源去擷取對應的關聯記錄的話,網絡延遲将會是系統最大的瓶頸。這裡我們選擇阿裡貢獻給flink社群的算子RichAsyncFunction。該算子使用異步的方式從外部資料源擷取資料,大大減少了花費在網絡請求上的時間。

(3)如何将SQL 中包含的維表解析到Flink operator   

      為了從SQL中解析出指定的維表和過濾條件,使用正則明顯不是一個合适的辦法,需要比對各種可能性,将是一個無窮無盡的過程。檢視Flink本身對SQL的解析,它使用了calcite做為sql解析的工作。将sql解析出一個文法樹,通過疊代的方式,搜尋到對應的維表,然後将維表和非維表結構分開。

Streamworks,基于擴充FlinkSQL實作流計算的源表導入、維表關聯與結果表導出為什麼擴充Flink-SQL?擴充了哪些Flink相關SQLDemo:Kafka流資料關聯Oracle維表,寫入PostgreSQL 。

通過上述步驟便可以通過Flink SQL完成常用的從kafka源表讀取資料,join部資料源,并寫入到指定的外部目标結構中,且袋鼠雲開源了相關實作:

https://github.com/DTStack/flinkStreamSQL

Demo:Kafka流資料關聯Oracle維表,寫入PostgreSQL 。

讀取Kafka源資料:

CREATE TABLE MyTable(

    name varchar,

    channel varchar,

    pv INT,

    xctime bigint,

    CHARACTER_LENGTH(channel) AS timeLeng

 )WITH(

    type ='kafka09',

    kafka.bootstrap.servers ='172.16.8.198:9092',

    kafka.zookeeper.quorum ='172.16.8.198:2181/kafka',

    kafka.auto.offset.reset ='latest',

    kafka.topic ='nbTest1,nbTest2,nbTest3',

    --kafka.topic ='mqTest.*',

    --patterntopic='true'

    parallelism ='1',

    sourcedatatype ='json' #可不設定

維表關聯Oracle語句

create table sideTable(

    info varchar,

    PRIMARY KEY(channel),

    type='oracle',

    url='jdbc:oracle:thin:@xx.xx.xx.xx:1521:orcl',

    userName='xx',

    password='xx',

    tableName='sidetest',

    cacheMode='unordered',

    asyncCapacity='1000',

    asyncTimeout='10000'

    partitionedJoin='false',

    schema = 'MQTEST'

--建立PostgreSQL輸出表語句

Greenplum、LibrA資料庫寫入方式與PostgreSQL類似

CREATE TABLE MyResult(

    channel VARCHAR,

    info VARCHAR

    type ='postgresql',

    url ='jdbc:postgresql://localhost:9001/test?sslmode=disable',

    userName ='dtstack',

    password ='abc123',

    tableName ='pv2',

    parallelism ='1'

--基于FlinkSQL進行Kafka流資料讀入關聯Oracle維表,并寫入PostgreSQL的實作

insert

into

    MyResult

    select

        a.channel,

        b.info

    from

        MyTable a

    join

        sideTable b

            on a.channel=b.channel