Flink Table 和 SQL内置了很多SQL中支援的函數;如果有無法滿足的需要,則可以實作使用者自定義的函數(UDF)來解決。
一、系統内置函數
Flink Table API 和 SQL為使用者提供了一組用于資料轉換的内置函數。SQL中支援的很多函數,Table API和SQL都已經做了實作,其它還在快速開發擴充中。
以下是一些典型函數的舉例,全部的内置函數,可以參考官網介紹。
https://ci.apache.org/projects/flink/flink-docs-release-1.13/dev/table/functions/systemFunctions.html
1. 比較函數
SQL:
value1 = value2
value1 > value2
Table API:
ANY1 === ANY2
ANY1 > ANY2
2. 邏輯函數
SQL:
boolean1 OR boolean2
boolean IS FALSE
NOT boolean
Table API:
BOOLEAN1 || BOOLEAN2
BOOLEAN.isFalse
!BOOLEAN
3. 算術函數
SQL:
numeric1 + numeric2
POWER(numeric1, numeric2)
Table API:
NUMERIC1 + NUMERIC2
NUMERIC1.power(NUMERIC2)
4. 字元串函數
SQL:
string1 || string2
UPPER(string)
CHAR_LENGTH(string)
Table API:
STRING1 + STRING2
STRING.upperCase()
STRING.charLength()
5. 時間函數
SQL:
DATE string
TIMESTAMP string
CURRENT_TIME
INTERVAL string range
Table API:
STRING.toDate
STRING.toTimestamp
currentTime()
NUMERIC.days
NUMERIC.minutes
6. 聚合函數
SQL:
COUNT(*)
SUM([ ALL | DISTINCT ] expression)
RANK()
ROW_NUMBER()
Table API:
FIELD.count
FIELD.sum0
二、UDF
使用者定義函數(User-defined Functions,UDF)是一個重要的特性,因為它們顯著地擴充了查詢(Query)的表達能力。一些系統内置函數無法解決的需求,我們可以用UDF來自定義實作。
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/functions/udfs.html#table-functions
1. 注冊使用者自定義函數UDF
在大多數情況下,使用者定義的函數必須先注冊,然後才能在查詢中使用。不需要專門為Scala 的Table API注冊函數。
函數通過調用registerFunction()方法在TableEnvironment中注冊。當使用者定義的函數被注冊時,它被插入到TableEnvironment的函數目錄中,這樣Table API或SQL解析器就可以識别并正确地解釋它。
2. 标量函數(Scalar Functions)
使用者定義的标量函數,可以将0、1或多個标量值,映射到新的标量值。
為了定義标量函數,必須在org.apache.flink.table.functions中擴充基類Scalar Function,并實作(一個或多個)求值(evaluation,eval)方法。标量函數的行為由求值方法決定,求值方法必須公開聲明并命名為eval(直接public聲明,沒有override)。求值方法的參數類型和傳回類型,确定了标量函數的參數和傳回類型。
public static void main(String[] args) throws Exception {
//1.擷取執行環境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
//2.讀取元素得到DataStream
DataStreamSource<WaterSensor> waterSensorDataStreamSource = env.fromElements(new WaterSensor("sensor_1", 1000L, 10),
new WaterSensor("sensor_1", 2000L, 20),
new WaterSensor("sensor_2", 3000L, 30),
new WaterSensor("sensor_1", 4000L, 40),
new WaterSensor("sensor_1", 5000L, 50),
new WaterSensor("sensor_2", 6000L, 60));
//3.将流轉換為動态表
Table table = tableEnv.fromDataStream(waterSensorDataStreamSource);
//4.不注冊函數直接使用
// table.select(call(Mylenth.class,$("id"))).execute().print();
//4.1先注冊再使用
tableEnv.createTemporarySystemFunction("MyLenth", Mylenth.class);
//TableAPI
// table.select(call("MyLenth", $("id"))).execute().print();
//SQL
tableEnv.executeSql("select MyLenth(id) from "+table).print();
}
//自定義UDF函數,求資料的長度
public static class Mylenth extends ScalarFunction{
public int eval(String value) {
return value.length();
}
}
3. 表函數(Table Functions)
與使用者定義的标量函數類似,使用者定義的表函數,可以将0、1或多個标量值作為輸入參數;與标量函數不同的是,它可以傳回任意數量的行作為輸出,而不是單個值。
為了定義一個表函數,必須擴充org.apache.flink.table.functions中的基類TableFunction并實作(一個或多個)求值方法。表函數的行為由其求值方法決定,求值方法必須是public的,并命名為eval。求值方法的參數類型,決定表函數的所有有效參數。
傳回表的類型由TableFunction的泛型類型确定。求值方法使用protected collect(T)方法發出輸出行。
在Table API中,Table函數需要與.joinLateral或.leftOuterJoinLateral一起使用。
joinLateral算子,會将外部表中的每一行,與表函數(TableFunction,算子的參數是它的表達式)計算得到的所有行連接配接起來。
而leftOuterJoinLateral算子,則是左外連接配接,它同樣會将外部表中的每一行與表函數計算生成的所有行連接配接起來;并且,對于表函數傳回的是空表的外部行,也要保留下來。
在SQL中,則需要使用Lateral Table(<TableFunction>),或者帶有ON TRUE條件的左連接配接。
下面的代碼中,我們将定義一個表函數,在表環境中注冊它,并在查詢中調用它。
public static void main(String[] args) throws Exception {
//1.擷取執行環境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
//2.讀取檔案得到DataStream
DataStreamSource<WaterSensor> waterSensorDataStreamSource = env.fromElements(new WaterSensor("sensor_1", 1000L, 10),
new WaterSensor("sensor_1", 2000L, 20),
new WaterSensor("sensor_2", 3000L, 30),
new WaterSensor("sensor_1", 4000L, 40),
new WaterSensor("sensor_1", 5000L, 50),
new WaterSensor("sensor_2", 6000L, 60));
//3.将流轉換為動态表
Table table = tableEnv.fromDataStream(waterSensorDataStreamSource);
//4先注冊再使用
tableEnv.createTemporarySystemFunction("split", SplitFunction.class);
//TableAPI
/* table
.joinLateral(call("split", $("id")))
.select($("id"),$("word"))
.execute()
.print();
*/
//SQL
tableEnv.executeSql("select id, word from "+table +", lateral table(split(id))").print();
}
//自定義UDTF函數将傳入的id按照下劃線炸裂成兩條資料
//hint暗示,主要作用為類型推斷時使用
@FunctionHint(output = @DataTypeHint("ROW<word STRING>"))
public static class SplitFunction extends TableFunction<Row> {
public void eval(String str) {
for (String s : str.split("_")) {
collect(Row.of(s));
}
}
}
4. 聚合函數(Aggregate Functions)
使用者自定義聚合函數(User-Defined Aggregate Functions,UDAGGs)可以把一個表中的資料,聚合成一個标量值。使用者定義的聚合函數,是通過繼承AggregateFunction抽象類實作的。
上圖中顯示了一個聚合的例子。
假設現在有一張表,包含了各種飲料的資料。該表由三列(id、name和price)、五行組成資料。現在我們需要找到表中所有飲料的最高價格,即執行max()聚合,結果将是一個數值。
AggregateFunction的工作原理如下。
首先,它需要一個累加器,用來儲存聚合中間結果的資料結構(狀态)。可以通過調用AggregateFunction的createAccumulator()方法建立空累加器。
随後,對每個輸入行調用函數的accumulate()方法來更新累加器。
處理完所有行後,将調用函數的getValue()方法來計算并傳回最終結果。
AggregationFunction要求必須實作的方法:
- createAccumulator()
- accumulate()
- getValue()
除了上述方法之外,還有一些可選擇實作的方法。其中一些方法,可以讓系統執行查詢更有效率,而另一些方法,對于某些場景是必需的。例如,如果聚合函數應用在會話視窗(session group window)的上下文中,則merge()方法是必需的。
- retract()
- merge()
- resetAccumulator()
接下來我們寫一個自定義AggregateFunction,計算一下每個WaterSensor中VC的平均值。
public static void main(String[] args) throws Exception {
//1.擷取執行環境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
//2.讀取檔案得到DataStream
DataStreamSource<WaterSensor> waterSensorDataStreamSource = env.fromElements(new WaterSensor("sensor_1", 1000L, 10),
new WaterSensor("sensor_1", 2000L, 20),
new WaterSensor("sensor_2", 3000L, 30),
new WaterSensor("sensor_1", 4000L, 40),
new WaterSensor("sensor_1", 5000L, 50),
new WaterSensor("sensor_2", 6000L, 60));
//3.将流轉換為動态表
Table table = tableEnv.fromDataStream(waterSensorDataStreamSource);
//4先注冊再使用
tableEnv.createTemporarySystemFunction("myavg", MyAvg.class);
//TableAPI
table.groupBy($("id"))
.select($("id"),call("myavg",$("vc")))
.execute()
.print();
//SQL
tableEnv.executeSql("select id, myavg(vc) from "+table +" group by id").print();
}
//定義一個類當做累加器,并聲明總數和總個數這兩個值
public static class MyAvgAccumulator{
public long sum = 0;
public int count = 0;
}
//自定義UDAF函數,求每個WaterSensor中VC的平均值
public static class MyAvg extends AggregateFunction<Double, MyAvgAccumulator> {
//建立一個累加器
@Override
public MyAvgAccumulator createAccumulator() {
return new MyAvgAccumulator();
}
//做累加操作
public void accumulate(MyAvgAccumulator acc, Integer vc) {
acc.sum += vc;
acc.count += 1;
}
//将計算結果值傳回
@Override
public Double getValue(MyAvgAccumulator accumulator) {
return accumulator.sum*1D/accumulator.count;
}
}
5. 表聚合函數(Table Aggregate Functions)
使用者定義的表聚合函數(User-Defined Table Aggregate Functions,UDTAGGs),可以把一個表中資料,聚合為具有多行和多列的結果表。這跟AggregateFunction非常類似,隻是之前聚合結果是一個标量值,現在變成了一張表。
比如現在我們需要找到表中所有WaterSensor的前2個最高水位線,即執行top2()表聚合。
使用者定義的表聚合函數,是通過繼承TableAggregateFunction抽象類來實作的。
TableAggregateFunction的工作原理如下。
首先,它同樣需要一個累加器(Accumulator),它是儲存聚合中間結果的資料結構。通過調用TableAggregateFunction的createAccumulator()方法可以建立空累加器。
随後,對每個輸入行調用函數的accumulate()方法來更新累加器。
處理完所有行後,将調用函數的emitValue()方法來計算并傳回最終結果。
AggregationFunction要求必須實作的方法:
- createAccumulator()
- getValue()
除了上述方法之外,還有一些可選擇實作的方法。
- retract()
- merge()
- resetAccumulator()
- emitValue()
- emitUpdateWithRetract()
接下來我們寫一個自定義TableAggregateFunction,用來提取每個WaterSensor最高的兩個水位值。
public static void main(String[] args) throws Exception {
//1.擷取執行環境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
//2.讀取檔案得到DataStream
DataStreamSource<WaterSensor> waterSensorDataStreamSource = env.fromElements(new WaterSensor("sensor_1", 1000L, 10),
new WaterSensor("sensor_1", 2000L, 20),
new WaterSensor("sensor_2", 3000L, 30),
new WaterSensor("sensor_1", 4000L, 40),
new WaterSensor("sensor_1", 5000L, 50),
new WaterSensor("sensor_2", 6000L, 60));
//3.将流轉換為動态表
Table table = tableEnv.fromDataStream(waterSensorDataStreamSource);
//4先注冊再使用
tableEnv.createTemporarySystemFunction("Top2", Top2.class);
//TableAPI
table.groupBy($("id"))
.flatAggregate(call("Top2", $("vc")).as("top", "rank"))
.select($("id"), $("top"), $("rank"))
.execute()
.print();
}
//定義一個類當做累加器,并聲明第一和第二這兩個值
public static class vCTop2 {
public Integer first = Integer.MIN_VALUE;
public Integer second = Integer.MIN_VALUE;
}
//自定義UDATF函數(多進多出),求每個WaterSensor中最高的兩個水位值
public static class Top2 extends TableAggregateFunction<Tuple2<Integer, Integer>, vCTop2> {
//建立累加器
@Override
public vCTop2 createAccumulator() {
return new vCTop2();
}
//比較資料,如果目前資料大于累加器中存的資料則替換,并将原累加器中的資料往下(第二)指派
public void accumulate(vCTop2 acc, Integer value) {
if (value > acc.first) {
acc.second = acc.first;
acc.first = value;
} else if (value > acc.second) {
acc.second = value;
}
}
//計算(排名)
public void emitValue(vCTop2 acc, Collector<Tuple2<Integer, Integer>> out) {
// emit the value and rank
if (acc.first != Integer.MIN_VALUE) {
out.collect(Tuple2.of(acc.first, 1));
}
if (acc.second != Integer.MIN_VALUE) {
out.collect(Tuple2.of(acc.second, 2));
}
}
}
總結
Flink提供衆多系統内置函數:比較函數、邏輯函數、算術函數、字元串函數、時間函數、聚合函數。對于無法滿足的需求,我們可以用UDF自定義函數解決。使用者自定義聚合函數(UDA)可以把一個表中的資料,聚合成一個标量值。使用者定義的表聚合函數(UDTA),可以把一個表中資料,聚合為具有多行和多列的結果表。跟聚合函數AggregateFunction非常類似,隻是之前聚合結果是一個标量值,現在變成了一張表。