天天看點

flink sql 知其是以然(四)| sql api 類型系統1.序篇-先說結論2.背景篇3.目标篇-預期效果是什麼4.架構設計篇-具體方案實作5.總結篇

1.序篇-先說結論

protobuf 作為目前各大公司中最廣泛使用的高效的協定資料交換格式工具庫,會大量作為流式資料傳輸的序列化方式,是以在 flink sql 中如果能實作 protobuf 的 format 會非常有用(目前社群已經有對應的實作,不過目前還沒有 merge,預計在 1.14 系列版本中能 release)。

這一節原本是介紹 flink sql 中怎麼自定義實作 protobuf format 類型,但是 format 的實作過程中涉及到了 flink sql 類型系統的知識,是以此節先講解 flink sql 類型系統的内容作為鋪墊。以幫助能更好的了解 flink sql 的類型系統。

flink sql 類型系統并不是一開始就是目前這樣的

LogicalType

體系,其最開始也是複用了 datastream 的

TypeInformation

,後來才由

TypeInformation

轉變為了

LogicalType

,是以本節分為以下幾個小節,來說明 flink sql api 類型的轉變原因、過程以及新類型系統設計。

  1. 背景篇
  2. 目标篇-預期效果是什麼
  3. 架構設計篇-具體方案實作
  4. 總結篇

2.背景篇

熟悉 DataStream API 的同學都知道,DataStream API 的類型系統 TypeInformation 體系。是以初期 SQL API 的類型系統也是完全由 TypeInformation 實作的。但是随着 SQL API 的 feature 增強,使用者越來越多的使用 SQL API 之後,發現 TypeInformation 作為 SQL API 的類型系統還是有一些缺陷的。

具體我們參考

Flip-37

:https://cwiki.apache.org/confluence/display/FLINK/FLIP-37%3A+Rework+of+the+Table+API+Type+System。

Flip-65

:https://cwiki.apache.org/confluence/display/FLINK/FLIP-65%3A+New+type+inference+for+Table+API+UDFs

issue

:https://issues.apache.org/jira/browse/FLINK-12251

比如一些使用者回報有以下問題:

https://docs.google.com/document/d/1zKSY1z0lvtQdfOgwcLnCMSRHew3weeJ6QfQjSD0zWas/edit#heading=h.64s92ad5mb1

Flip-37

中介紹到:

1. TypeInformation 不能和 SQL 類型系統很好的內建,并且不同實作語言也會對其類型資訊産生影響。

2. TypeInformation 與 SQL 類型系統不一緻。

3. 不能為 DECIMAL 等定義精度和小數位數。

4. 不支援 CHAR/VARCHAR 之間的差異(FLINK-10257、FLINK-9559)。

5. 實體類型和邏輯類型是緊密耦合的。

flink sql 類型系統設計文檔

:https://docs.google.com/document/d/1a9HUb6OaBIoj9IRfbILcMFPrOL7ALeZ3rVI66dvA2_U/edit#heading=h.5qoorezffk0t

2.1.序列化器受執行環境影響

怎麼了解不同語言的環境會對類型資訊産生影響,直接來看一下下面這個例子(基于

flink 1.8

):

import org.apache.flink.table.functions.TableFunction

case class SimpleUser(name: String, age: Int)

class TableFunc0 extends TableFunction[SimpleUser] {

  // make sure input element's format is "<string&gt#<int>"

  def eval(user: String): Unit = {

    if (user.contains("#")) {

      val splits = user.split("#")

      collect(SimpleUser(splits(0), splits(1).toInt))

    }
  }

}
           

複制

TableFunc0

出參(SimpleUser)的

TypeInformation

不僅取決于

出參

本身,還取決于使用的

表環境

,而且最終的序列化器也是不同的,這裡以 java 環境和 scala 環境做比較:

2.1.1.java 環境

在 java 環境中,使用

org.apache.flink.table.api.java.StreamTableEnvironment#registerFunction

注冊函數。

Java 類型提取是通過基于反射的

TypeExtractor

提取

TypeInformation

示例代碼如下(基于 flink 1.8 版本):

public class JavaEnvTest {

    public static void main(String[] args) throws Exception {


        StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment();
        // create a TableEnvironment for streaming queries
        StreamTableEnvironment sTableEnv = StreamTableEnvironment.create(sEnv);

        sTableEnv.registerFunction("table1", new TableFunc0());

        TableSqlFunction tableSqlFunction =
                (TableSqlFunction) sTableEnv
                        .getFunctionCatalog()
                        .getSqlOperatorTable()
                        .getOperatorList()
                        .get(170);

        TypeSerializer<?> t = tableSqlFunction.getRowTypeInfo().createSerializer(sEnv.getConfig());

        sEnv.execute();

    }

}
           

複制

flink sql 知其是以然(四)| sql api 類型系統1.序篇-先說結論2.背景篇3.目标篇-預期效果是什麼4.架構設計篇-具體方案實作5.總結篇

1

java 環境,可以看到,最終使用的是

Kryo 序列化器

2.1.2.scala 環境

在 scala 環境中,使用

org.apache.flink.table.api.scala.StreamTableEnvironment#registerFunction

注冊函數。

使用 Scala 類型提取堆棧并通過使用

Scala 宏

提取

TypeInformation

示例代碼如下(基于 flink 1.8 版本):

object ScalaEnv {

  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    // create a TableEnvironment
    val tableEnv = StreamTableEnvironment.create(env)

    tableEnv.registerFunction("hashCode", new TableFunc0())

    val config = env.getConfig

    val function = new TableFunc0()

    registerFunction(config, function)

    // execute
    env.execute()
  }

  def registerFunction[T: TypeInformation](config: ExecutionConfig, tf: TableFunction[T]): Unit = {
    val typeInfo: TypeInformation[_] = if (tf.getResultType != null) {
      tf.getResultType
    } else {
      implicitly[TypeInformation[T]]
    }

    val ty = typeInfo.createSerializer(config)

  }

}
           

複制

flink sql 知其是以然(四)| sql api 類型系統1.序篇-先說結論2.背景篇3.目标篇-預期效果是什麼4.架構設計篇-具體方案實作5.總結篇

2

scala 環境,最終使用的是

Case Class 序列化器

但是邏輯上同一個 sql 的 model 的序列化方式隻應該與 model 本身有關,不應該與不同語言的 env 有關。不同的 env 的 model 序列化器都應該相同。

2.2.類型系統不一緻

SQL

類型系統與

TypeInformation

系統不一緻。如下圖

TypeInformation

類型系統的組成,熟悉 datastream 的同學應該都見過:

flink sql 知其是以然(四)| sql api 類型系統1.序篇-先說結論2.背景篇3.目标篇-預期效果是什麼4.架構設計篇-具體方案實作5.總結篇

但是标準的 sql 類型系統的組成應該是由如下組成這樣:

flink sql 知其是以然(四)| sql api 類型系統1.序篇-先說結論2.背景篇3.目标篇-預期效果是什麼4.架構設計篇-具體方案實作5.總結篇
flink sql 知其是以然(四)| sql api 類型系統1.序篇-先說結論2.背景篇3.目标篇-預期效果是什麼4.架構設計篇-具體方案實作5.總結篇
flink sql 知其是以然(四)| sql api 類型系統1.序篇-先說結論2.背景篇3.目标篇-預期效果是什麼4.架構設計篇-具體方案實作5.總結篇

可見

TypeInformation

類型系統與标準 SQL 類型系統的對應關系是不太一緻的,這也就導緻了 flink sql 與

TypeInformation

不能很好的內建。

2.3.TypeInformation 類型資訊與序列化器綁定

如圖

TypeInformation

的具體實作類需要實作

TypeInformation<T>#createSerializer

,來指定類型資訊的具體序列化器。

flink sql 知其是以然(四)| sql api 類型系統1.序篇-先說結論2.背景篇3.目标篇-預期效果是什麼4.架構設計篇-具體方案實作5.總結篇

3

舉例,舊類型系統中,flink sql api 中是使用

CRow

進行的内部資料的流轉,

CRowTypeInfo

如下圖,其序列化器固定為

CRowSerializer

flink sql 知其是以然(四)| sql api 類型系統1.序篇-先說結論2.背景篇3.目标篇-預期效果是什麼4.架構設計篇-具體方案實作5.總結篇

19

再來一個例子,

ListTypeInfo

的序列化器固定為

ListSerializer

flink sql 知其是以然(四)| sql api 類型系統1.序篇-先說結論2.背景篇3.目标篇-預期效果是什麼4.架構設計篇-具體方案實作5.總結篇

4

可以看到

TypeInformation

的類型體系中,一種

TypeInformation

就和一個

TypeSerializer

是綁定的。

3.目标篇-預期效果是什麼

部落客體感比較深的是:

1. 統一以及标準化 SQL 類型系統

2. 邏輯類型與實體類型解耦

然後來看看 flink 是怎麼做這件事情的,下面的代碼都基于

flink 1.13.1

4.架構設計篇-具體方案實作

先從最終最上層的角度出發,看看 flink sql 程式運作時資料載體的變化。

1.old planner:

内部資料流的基本資料類型:

CRow

=

Row

+ 辨別(是否回撤資料)

類型資訊:

CRowTypeInfo

,其類型系統使用的完全也是

TypeInformation

那一套

序列化器:

CRowSerializer

=

RowSerializer

+ 辨別序列化

2.blink planner:

内部資料流的基本資料類型:

RowData

類型資訊:

RowType

,基于

LogicalType

序列化器:

RowDataSerializer

4.1.統一以及标準化 SQL 類型系統

先來重溫下,SQL 标準類型:

flink sql 知其是以然(四)| sql api 類型系統1.序篇-先說結論2.背景篇3.目标篇-預期效果是什麼4.架構設計篇-具體方案實作5.總結篇
flink sql 知其是以然(四)| sql api 類型系統1.序篇-先說結論2.背景篇3.目标篇-預期效果是什麼4.架構設計篇-具體方案實作5.總結篇
flink sql 知其是以然(四)| sql api 類型系統1.序篇-先說結論2.背景篇3.目标篇-預期效果是什麼4.架構設計篇-具體方案實作5.總結篇

然後開看看,flink sql 的類型系統設計,代碼位于

flink-table-common

子產品:

新的類型系統是基于

LogicalTypeFamily

LogicalTypeRoot

LogicalType

進行實作的:

LogicalTypeFamily

flink sql 知其是以然(四)| sql api 類型系統1.序篇-先說結論2.背景篇3.目标篇-預期效果是什麼4.架構設計篇-具體方案實作5.總結篇

LogicalTypeRoot

flink sql 知其是以然(四)| sql api 類型系統1.序篇-先說結論2.背景篇3.目标篇-預期效果是什麼4.架構設計篇-具體方案實作5.總結篇

LogicalType

flink sql 知其是以然(四)| sql api 類型系統1.序篇-先說結論2.背景篇3.目标篇-預期效果是什麼4.架構設計篇-具體方案實作5.總結篇

具體

LogicalType

的各類實作類如下圖所示:

flink sql 知其是以然(四)| sql api 類型系統1.序篇-先說結論2.背景篇3.目标篇-預期效果是什麼4.架構設計篇-具體方案實作5.總結篇

12

可以發現其設計(枚舉資訊、實作等)都是與 SQL 标準進行了對齊的。

具體類型詳情可以參考官方文檔,這裡不過多贅述。https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/types/

4.2.邏輯類型與實體類型解耦

解耦這部分的實作比較好了解,部落客通過兩種方式來解釋其解耦方式:

4.2.1.看看解耦的具體實作

部落客畫了一張圖來比較下

TypeInformation

LogicalType

,如下圖。

flink sql 知其是以然(四)| sql api 類型系統1.序篇-先說結論2.背景篇3.目标篇-預期效果是什麼4.架構設計篇-具體方案實作5.總結篇

20

  • datastream\old planner:如左圖所示,都是基于

    TypeInformation

    體系,一種

    TypeInformation

    就和一個

    TypeSerializer

    是綁定的。
  • blink planner:如右圖所示,都是基于

    LogicalType

    體系,但是與

    TypeSerializer

    通過中間的一層映射層進行解耦,這層映射層是 blink planner 獨有的,當然如果你也能自定義一個 planner,你也可以自定義對應的映射方式。

LogicalType

隻包含類型資訊,關于具體的序列化器是在不同的 planner 中實作的。Blink Planner 是

InternalSerializers

4.2.2.看看包的劃分

其實我們也可以通過這些具體實作類的在 flink 中所在的包也可以看出其解耦方式。如圖所示。

flink sql 知其是以然(四)| sql api 類型系統1.序篇-先說結論2.背景篇3.目标篇-預期效果是什麼4.架構設計篇-具體方案實作5.總結篇

21

  • datastream\old planner:如左圖所示,其中的核心邏輯類型、序列化器都是在

    flink-core

    中實作的。都是基于以及複用了

    TypeInformation

    體系。
  • blink planner:如右圖所示,

    LogicalType

    體系都是位于

    flink-table-common

    子產品中,作為 sql 基礎、标準的體系。而其中具體的序列化器是在

    flink-table-runtime-blink

    中的,可以說明不同的 planner 是有對應不同的實作的,進而實作了邏輯類型和實體序列化器的解耦。

5.總結篇

本文主要介紹了 flink sql 類型系統的内容,從背景、目标以及最終的實作上做了一些思考和分析。

希望能抛磚引玉,讓大家能在使用層面之上還能有一些更深層次的思考~