1.序篇-先說結論
protobuf 作為目前各大公司中最廣泛使用的高效的協定資料交換格式工具庫,會大量作為流式資料傳輸的序列化方式,是以在 flink sql 中如果能實作 protobuf 的 format 會非常有用(目前社群已經有對應的實作,不過目前還沒有 merge,預計在 1.14 系列版本中能 release)。
這一節原本是介紹 flink sql 中怎麼自定義實作 protobuf format 類型,但是 format 的實作過程中涉及到了 flink sql 類型系統的知識,是以此節先講解 flink sql 類型系統的内容作為鋪墊。以幫助能更好的了解 flink sql 的類型系統。
flink sql 類型系統并不是一開始就是目前這樣的
LogicalType
體系,其最開始也是複用了 datastream 的
TypeInformation
,後來才由
TypeInformation
轉變為了
LogicalType
,是以本節分為以下幾個小節,來說明 flink sql api 類型的轉變原因、過程以及新類型系統設計。
- 背景篇
- 目标篇-預期效果是什麼
- 架構設計篇-具體方案實作
- 總結篇
2.背景篇
熟悉 DataStream API 的同學都知道,DataStream API 的類型系統 TypeInformation 體系。是以初期 SQL API 的類型系統也是完全由 TypeInformation 實作的。但是随着 SQL API 的 feature 增強,使用者越來越多的使用 SQL API 之後,發現 TypeInformation 作為 SQL API 的類型系統還是有一些缺陷的。
具體我們參考
Flip-37
:https://cwiki.apache.org/confluence/display/FLINK/FLIP-37%3A+Rework+of+the+Table+API+Type+System。
Flip-65
:https://cwiki.apache.org/confluence/display/FLINK/FLIP-65%3A+New+type+inference+for+Table+API+UDFs
issue
:https://issues.apache.org/jira/browse/FLINK-12251
比如一些使用者回報有以下問題:
https://docs.google.com/document/d/1zKSY1z0lvtQdfOgwcLnCMSRHew3weeJ6QfQjSD0zWas/edit#heading=h.64s92ad5mb1
在
Flip-37
中介紹到:
1. TypeInformation 不能和 SQL 類型系統很好的內建,并且不同實作語言也會對其類型資訊産生影響。
2. TypeInformation 與 SQL 類型系統不一緻。
3. 不能為 DECIMAL 等定義精度和小數位數。
4. 不支援 CHAR/VARCHAR 之間的差異(FLINK-10257、FLINK-9559)。
5. 實體類型和邏輯類型是緊密耦合的。
flink sql 類型系統設計文檔
:https://docs.google.com/document/d/1a9HUb6OaBIoj9IRfbILcMFPrOL7ALeZ3rVI66dvA2_U/edit#heading=h.5qoorezffk0t
2.1.序列化器受執行環境影響
怎麼了解不同語言的環境會對類型資訊産生影響,直接來看一下下面這個例子(基于
flink 1.8
):
import org.apache.flink.table.functions.TableFunction
case class SimpleUser(name: String, age: Int)
class TableFunc0 extends TableFunction[SimpleUser] {
// make sure input element's format is "<string>#<int>"
def eval(user: String): Unit = {
if (user.contains("#")) {
val splits = user.split("#")
collect(SimpleUser(splits(0), splits(1).toInt))
}
}
}
複制
TableFunc0
出參(SimpleUser)的
TypeInformation
不僅取決于
出參
本身,還取決于使用的
表環境
,而且最終的序列化器也是不同的,這裡以 java 環境和 scala 環境做比較:
2.1.1.java 環境
在 java 環境中,使用
org.apache.flink.table.api.java.StreamTableEnvironment#registerFunction
注冊函數。
Java 類型提取是通過基于反射的
TypeExtractor
提取
TypeInformation
。
示例代碼如下(基于 flink 1.8 版本):
public class JavaEnvTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment();
// create a TableEnvironment for streaming queries
StreamTableEnvironment sTableEnv = StreamTableEnvironment.create(sEnv);
sTableEnv.registerFunction("table1", new TableFunc0());
TableSqlFunction tableSqlFunction =
(TableSqlFunction) sTableEnv
.getFunctionCatalog()
.getSqlOperatorTable()
.getOperatorList()
.get(170);
TypeSerializer<?> t = tableSqlFunction.getRowTypeInfo().createSerializer(sEnv.getConfig());
sEnv.execute();
}
}
複制

1
java 環境,可以看到,最終使用的是
Kryo 序列化器
。
2.1.2.scala 環境
在 scala 環境中,使用
org.apache.flink.table.api.scala.StreamTableEnvironment#registerFunction
注冊函數。
使用 Scala 類型提取堆棧并通過使用
Scala 宏
提取
TypeInformation
。
示例代碼如下(基于 flink 1.8 版本):
object ScalaEnv {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
// create a TableEnvironment
val tableEnv = StreamTableEnvironment.create(env)
tableEnv.registerFunction("hashCode", new TableFunc0())
val config = env.getConfig
val function = new TableFunc0()
registerFunction(config, function)
// execute
env.execute()
}
def registerFunction[T: TypeInformation](config: ExecutionConfig, tf: TableFunction[T]): Unit = {
val typeInfo: TypeInformation[_] = if (tf.getResultType != null) {
tf.getResultType
} else {
implicitly[TypeInformation[T]]
}
val ty = typeInfo.createSerializer(config)
}
}
複制
2
scala 環境,最終使用的是
Case Class 序列化器
。
但是邏輯上同一個 sql 的 model 的序列化方式隻應該與 model 本身有關,不應該與不同語言的 env 有關。不同的 env 的 model 序列化器都應該相同。
2.2.類型系統不一緻
SQL
類型系統與
TypeInformation
系統不一緻。如下圖
TypeInformation
類型系統的組成,熟悉 datastream 的同學應該都見過:
但是标準的 sql 類型系統的組成應該是由如下組成這樣:
可見
TypeInformation
類型系統與标準 SQL 類型系統的對應關系是不太一緻的,這也就導緻了 flink sql 與
TypeInformation
不能很好的內建。
2.3.TypeInformation 類型資訊與序列化器綁定
如圖
TypeInformation
的具體實作類需要實作
TypeInformation<T>#createSerializer
,來指定類型資訊的具體序列化器。
3
舉例,舊類型系統中,flink sql api 中是使用
CRow
進行的内部資料的流轉,
CRowTypeInfo
如下圖,其序列化器固定為
CRowSerializer
:
19
再來一個例子,
ListTypeInfo
的序列化器固定為
ListSerializer
。
4
可以看到
TypeInformation
的類型體系中,一種
TypeInformation
就和一個
TypeSerializer
是綁定的。
3.目标篇-預期效果是什麼
部落客體感比較深的是:
1. 統一以及标準化 SQL 類型系統
2. 邏輯類型與實體類型解耦
然後來看看 flink 是怎麼做這件事情的,下面的代碼都基于
flink 1.13.1
。
4.架構設計篇-具體方案實作
先從最終最上層的角度出發,看看 flink sql 程式運作時資料載體的變化。
1.old planner:
内部資料流的基本資料類型:
CRow
=
Row
+ 辨別(是否回撤資料)
類型資訊:
CRowTypeInfo
,其類型系統使用的完全也是
TypeInformation
那一套
序列化器:
CRowSerializer
=
RowSerializer
+ 辨別序列化
2.blink planner:
内部資料流的基本資料類型:
RowData
類型資訊:
RowType
,基于
LogicalType
序列化器:
RowDataSerializer
4.1.統一以及标準化 SQL 類型系統
先來重溫下,SQL 标準類型:
然後開看看,flink sql 的類型系統設計,代碼位于
flink-table-common
子產品:
新的類型系統是基于
LogicalTypeFamily
,
LogicalTypeRoot
,
LogicalType
進行實作的:
LogicalTypeFamily
:
LogicalTypeRoot
:
LogicalType
:
具體
LogicalType
的各類實作類如下圖所示:
12
可以發現其設計(枚舉資訊、實作等)都是與 SQL 标準進行了對齊的。
具體類型詳情可以參考官方文檔,這裡不過多贅述。https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/types/
4.2.邏輯類型與實體類型解耦
解耦這部分的實作比較好了解,部落客通過兩種方式來解釋其解耦方式:
4.2.1.看看解耦的具體實作
部落客畫了一張圖來比較下
TypeInformation
與
LogicalType
,如下圖。
20
- datastream\old planner:如左圖所示,都是基于
體系,一種TypeInformation
就和一個TypeInformation
是綁定的。TypeSerializer
- blink planner:如右圖所示,都是基于
體系,但是與LogicalType
通過中間的一層映射層進行解耦,這層映射層是 blink planner 獨有的,當然如果你也能自定義一個 planner,你也可以自定義對應的映射方式。TypeSerializer
LogicalType
隻包含類型資訊,關于具體的序列化器是在不同的 planner 中實作的。Blink Planner 是
InternalSerializers
。
4.2.2.看看包的劃分
其實我們也可以通過這些具體實作類的在 flink 中所在的包也可以看出其解耦方式。如圖所示。
21
- datastream\old planner:如左圖所示,其中的核心邏輯類型、序列化器都是在
中實作的。都是基于以及複用了flink-core
體系。TypeInformation
- blink planner:如右圖所示,
體系都是位于LogicalType
子產品中,作為 sql 基礎、标準的體系。而其中具體的序列化器是在flink-table-common
中的,可以說明不同的 planner 是有對應不同的實作的,進而實作了邏輯類型和實體序列化器的解耦。flink-table-runtime-blink
5.總結篇
本文主要介紹了 flink sql 類型系統的内容,從背景、目标以及最終的實作上做了一些思考和分析。
希望能抛磚引玉,讓大家能在使用層面之上還能有一些更深層次的思考~