文章目錄
-
- 1. Flink SQL開發
-
- 1.1. 什麼是flink關系型API
- 1.2. Table API和SQL程式的結構
- 1.3. 在内部目錄中注冊一個表
-
- 1.3.1. 注冊一個表
- 1.3.2. 注冊一個TableSource
- 1.3.3. 注冊一個TableSink
- 1.4. Table和DataStream和DataSet的內建
-
- 1.4.1. 将DataStream或DataSet轉換為Table
- 1.4.2. 将Table轉換為DataStream或DataSet
- 1.5. 批處理案例1
- 1.6. 批處理案例2
- 1.7. 流資料處理案例
當我們在使用flink做流式和批式任務計算的時候,往往會想到幾個問題:
- 需要熟悉兩套API : DataStream/DataSet API,API有一定難度,開發人員無法集中精力到具體業務的開發
- 需要有Java或Scala的開發經驗
- flink同時支援批任務與流任務,如何做到API層的統一
flink已經擁有了強大的DataStream/DataSetAPI,滿足流計算和批計算中的各種場景需求,但是關于以上幾個問題,我們還需要提供一種關系型的API來實作flink API層的流與批的統一,那麼這就是flink的
Table API & SQL
。
首先Table API& SQL 是一種關系型API,使用者可以像操作MySQL資料庫表一樣的操作資料,而不需要寫Java代碼完成flink function,更不需要手工的優化Java代碼調優。另外,SQL作為一個非程式員可操作的語言,學習成本很低,如果一個系統提供SQL支援,将很容易被使用者接受。
總結來說,關系型API的好處:
- 關系型API是聲明式的
- 查詢能夠被有效的優化
- 查詢可以高效的執行
- “Everybody” knows SQL
Table API& SQL 是流處理和批處理統一的API層,如下圖。
- flink在runtime層是統一的,因為flink将批任務看做流的一種特例來執行
- 在API層,flink為批和流提供了兩套API(DataSet和DataStream)
- Table API & SQL就統一了flink的API層,批資料上的查詢會随着輸入資料的結束而結束并生成DataSet,流資料的查詢會一直運作并生成結果流。
- Table API & SQL做到了批與流上的查詢具有同樣的文法語義,是以不用改代碼就能同時在批和流上執行。
[外鍊圖檔轉存失敗(img-hPAHpUFn-1569146589978)(assets/1554261351405.png)]
Table API的特點
Table API和SQL都是Apache Flink中高等級的分析API,SQL所具備的特點Table API也都具有,如下:
- 聲明式 - 使用者隻關心做什麼,不用關心怎麼做;
- 高性能 - 支援查詢優化,可以擷取最好的執行性能;
- 流批統一 - 相同的統計邏輯,既可以流模式運作,也可以批模式運作;
- 标準穩定 - 語義遵循SQL标準,文法語義明确,不易變動。
當然除了SQL的特性,因為Table API是在Flink中專門設計的,是以Table API還具有自身的特點:
- 表達方式的擴充性 - 在Flink中可以為Table API開發很多便捷性功能,如:Row.flatten(), map/flatMap 等
- 功能的擴充性 - 在Flink中可以為Table API擴充更多的功能,如:Iteration,flatAggregate 等新功能
- 編譯檢查 - Table API支援java和scala語言開發,支援IDE中進行編譯檢查。
Table API和SQL捆綁在flink-table Maven工件中。必須将以下依賴項添加到你的項目才能使用Table API和SQL:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table_2.11</artifactId>
<version>1.6.1</version>
</dependency>
另外,你需要為Flink的Scala批處理或流式API添加依賴項。對于批量查詢,您需要添加:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.11</artifactId>
<version>1.6.1</version>
</dependency>
Table API一般與DataSet或者DataStream緊密關聯,可以通過一個DataSet或DataStream建立出一個Table,再用類似于filter, join, 或者 select關系型轉化操作來轉化為一個新的
Table
對象。最後将一個Table對象轉回一個DataSet或DataStream。從内部實作上來說,所有應用于Table的轉化操作都變成一棵邏輯表操作樹,在Table對象被轉化回DataSet或者DataStream之後,轉化器會将邏輯表操作樹轉化為對等的DataSet或者DataStream操作符。
Flink的批處理和流處理的Table API和SQL程式遵循相同的模式;是以我們隻需要使用一種來示範即可要想執行flink的SQL語句,首先需要擷取SQL的執行環境:兩種方式(batch和streaming):
批處理:
val bEnv = ExecutionEnvironment.getExecutionEnvironment
// create a TableEnvironment for batch queries
val bTableEnv = TableEnvironment.getTableEnvironment(bEnv)
流處理:
val sEnv = StreamExecutionEnvironment.getExecutionEnvironment
// create a TableEnvironment for streaming queries
val sTableEnv = TableEnvironment.getTableEnvironment(sEnv)
通過
getTableEnvironment
可以擷取
TableEnviromment
;這個TableEnviromment是Table API和SQL內建的核心概念。它負責:
- 在内部目錄中注冊一個表
- 注冊外部目錄
- 執行SQL查詢
- 注冊使用者定義的(标量,表格或聚合)函數
- 轉換DataStream或DataSet成Table
- 持有一個ExecutionEnvironment或一個參考StreamExecutionEnvironment
TableEnvironment維護一個按名稱注冊的表的目錄。有兩種類型的表格,
輸入
表格和
輸出
表格。輸入表可以在Table API和SQL查詢中引用并提供輸入資料。輸出表可用于将表API或SQL查詢的結果發送到外部系統輸入表可以從各種來源注冊:
- 現有Table對象,通常是表API或SQL查詢的結果。
- TableSource,它通路外部資料,例如檔案,資料庫或消息傳遞系統。
- DataStream或DataSet來自DataStream或DataSet程式。
輸出表可以使用注冊TableSink。
// get a TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)
// register the Table projTable as table "projectedX"
tableEnv.registerTable("projectedTable", projTable)
// Table is the result of a simple projection query
val projTable: Table = tableEnv.scan("projectedTable ").select(...)
TableSource提供對存儲在諸如資料庫(MySQL,HBase等),具有特定編碼(CSV,Apache [Parquet,Avro,ORC],…)的檔案的存儲系統中的外部資料的通路或者消息傳送系統(Apache Kafka,RabbitMQ,…)
// get a TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)
// create a TableSource
val csvSource: TableSource = CsvTableSource.builder().path("./data/score.csv")...
// register the TableSource as table "CsvTable"
tableEnv.registerTableSource("CsvTable", csvSource)
注冊TableSink可用于将表API或SQL查詢的結果發送到外部存儲系統,如資料庫,鍵值存儲,消息隊列或檔案系統(使用不同的編碼,例如CSV,Apache [Parquet ,Avro,ORC],…)
// get a TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)
// create a TableSink
val csvSink: TableSink = new CsvTableSink("/path/to/file", ...)
// define the field names and types
val fieldNames: Array[String] = Array("a", "b", "c")
val fieldTypes: Array[TypeInformation[_]] = Array(Types.INT, Types.STRING, Types.LONG)
// register the TableSink as table "CsvSinkTable"
tableEnv.registerTableSink("CsvSinkTable", fieldNames, fieldTypes, csvSink)
在上面的例子講解中,直接使用的是:
registerTableSource
系統資料庫
對于flink來說,還有更靈活的方式:比如直接注冊DataStream或者DataSet轉換為一張表。
然後DataStream或者DataSet就相當于表,這樣可以繼續使用SQL來操作流或者批次的資料
文法:
// get TableEnvironment
// registration of a DataSet is equivalent
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)
val stream: DataStream[(Long, String)] = ...
// register the DataStream as Table "myTable" with fields "f0", "f1"
tableEnv.registerDataStream("myTable", stream)
示例
以
流
處理方式,加載下列資料,并注冊為表,查詢所有資料,寫入到CSV檔案中。
id | product | amount |
---|---|---|
1 | beer | 3 |
2 | diaper | 4 |
rubber |
開發步驟
- 擷取流處理環境
- 擷取TableEnvironment
- 加載本地集合
- 根據資料系統資料庫
- 執行SQL
- 寫入CSV檔案中
- 執行任務
代碼
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.api.scala._
import org.apache.flink.core.fs.FileSystem
import org.apache.flink.table.sinks.CsvTableSink
object DataSet_DataStreamToTable {
def main(args: Array[String]): Unit = {
// 1. 擷取流處理環境
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 2. 擷取TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)
// 3. 加載本地集合
val orderA: DataStream[Order1] = env.fromCollection(Seq(
Order1(1L, "beer", 3),
Order1(1L, "diaper", 4),
Order1(3L, "rubber", 2)))
// 4. 根據資料系統資料庫
tableEnv.registerDataStream("OrderA", orderA)
// 5. 執行SQL
val result = tableEnv.sqlQuery("SELECT * FROM OrderA")
// 6. 寫入CSV檔案中
result.writeToSink(new CsvTableSink("./data/ccc", ",", 1, FileSystem.WriteMode.OVERWRITE))
// 7. 執行任務
env.execute()
}
}
// 定義樣例類
case class Order1(user: Long, product: String, amount: Int)
Table可以轉換為DataStream或者DataSet,這樣的話,自定義的DataStream或者DataSet程式就可以基于Table API或者SQL查詢的結果來執行了。
當将一個Table轉換為DataStream或者DataSet時,你需要指定生成的DataStream或者DataSet的資料類型,即需要轉換表的行的資料類型,通常最友善的轉換類型是Row,下面清單概述了不同選項的功能:
- Row:字段通過位置映射、可以是任意數量字段,支援空值,非類型安全通路
- POJO:字段通過名稱(POJO字段作為Table字段時,必須命名)映射,可以是任意數量字段,支援空值,類型安全通路
- Case Class:字段通過位置映射,不支援空值,類型安全通路
- Tuple:字段通過位置映射,不得多于22(Scala)或者25(Java)個字段,不支援空值,類型安全通路
- Atomic Type:Table必須有一個字段,不支援空值,類型安全通路。
将Table轉換為DataStream
流式查詢的結果Table會被動态地更新,即每個新的記錄到達輸入流時結果就會發生變化。是以,轉換此動态查詢的DataStream需要對表的更新進行編碼。
有兩種模式可以将 Table轉換為DataStream:
1:
Append Mode
:這種模式隻适用于當動态表僅由INSERT更改修改時,即僅附加,之前發送的結果不會被更新。
2:
Retract Mode
:始終都可以使用此模式,它使用一個boolean辨別來編碼INSERT和DELETE更改。
文法格式:
// get TableEnvironment.
// registration of a DataSet is equivalent
// ge val tableEnv = TableEnvironment.getTableEnvironment(env)
// Table with two fields (String name, Integer age)
val table: Table = ...
// convert the Table into an append DataStream of Row
val dsRow: DataStream[Row] = tableEnv.toAppendStream[Row](table)
// convert the Table into an append DataStream of Tuple2[String, Int]
val dsTuple: DataStream[(String, Int)] = tableEnv.toAppendStream[(String, Int)](table)
// convert the Table into a retract DataStream of Row.
// A retract stream of type X is a DataStream[(Boolean, X)].
// The boolean field indicates the type of the change.
// True is INSERT, false is DELETE.
val retractStream: DataStream[(Boolean, Row)] = tableEnv.toRetractStream[Row](table)
使用Flink流式環境, 加載下列集合資料, 轉換為Table, 并将Table轉換為DataStream
List(
(1L, 1, "Hello"),
(2L, 2, "Hello"),
(6L, 6, "Hello"),
(7L, 7, "Hello World"),
(8L, 8, "Hello World"),
(20L, 20, "Hello World"))
)
- 設定并行度
- 擷取Table運作環境
- 轉換DataStream為Table
- 将table轉換為DataStream----将一個表附加到流上Append Mode
- 将table轉換為DataStream----Retract Mode true代表添加消息,false代表撤銷消息
- 列印輸出
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.table.api.{Table, TableEnvironment}
import org.apache.flink.api.scala._
import org.apache.flink.core.fs.FileSystem
import org.apache.flink.table.sinks.CsvTableSink
object TableTODataStream {
def main(args: Array[String]): Unit = {
// 1. 擷取流處理環境
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 2. 設定并行度
env.setParallelism(1)
// 3. 擷取Table運作環境
val tEnv = TableEnvironment.getTableEnvironment(env)
// 4. 加載本地集合
val stream = env.fromCollection(
List(
(1L, 1, "Hello"),
(2L, 2, "Hello"),
(6L, 6, "Hello"),
(7L, 7, "Hello World"),
(8L, 8, "Hello World"),
(20L, 20, "Hello World"))
)
// 5. 轉換DataStream為Table
val table: Table = tEnv.fromDataStream(stream)
// 6. 将table轉換為DataStream----将一個表附加到流上Append Mode
val appendStream: DataStream[(Long, Int, String)] = tEnv.toAppendStream[(Long, Int, String)](table)
// 7. 将table轉換為DataStream----Retract Mode true代表添加消息,false代表撤銷消息
val retractStream: DataStream[(Boolean, (Long, Int, String))] = tEnv.toRetractStream[(Long, Int, String)](table)
// 8. 列印輸出
appendStream.print()
retractStream.print()
// 9. 執行任務
env.execute()
}
}
将Table轉換為DataSet
文法格式:
// get TableEnvironment
// registration of a DataSet is equivalent
val tableEnv = TableEnvironment.getTableEnvironment(env)
// Table with two fields (String name, Integer age)
val table: Table = ...
// convert the Table into a DataSet of Row
val dsRow: DataSet[Row] = tableEnv.toDataSet[Row](table)
// convert the Table into a DataSet of Tuple2[String, Int]
val dsTuple: DataSet[(String, Int)] = tableEnv.toDataSet[(String, Int)](table)
使用Flink批處理環境, 加載下列集合資料, 轉換為Table, 并将Table轉換為DataSet
List(
(1L, 1, "Hello"),
(2L, 2, "Hello"),
(6L, 6, "Hello"),
(7L, 7, "Hello World"),
(8L, 8, "Hello World"),
(20L, 20, "Hello World"))
)
- 擷取批處理環境
- DataSet轉換為Table
- table轉換為dataSet
package com.itheima
import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment}
import org.apache.flink.api.scala._
import org.apache.flink.table.api.{Table, TableEnvironment}
object TableTODataSet{
def main(args: Array[String]): Unit = {
//1. 批處理環境
val env = ExecutionEnvironment.getExecutionEnvironment
//2. 設定并行度
env.setParallelism(1)
//3. 擷取table運作環境
val tableEnvironment = TableEnvironment.getTableEnvironment(env)
//4. 加載本地集合
val collection: DataSet[(Long, Int, String)] = env.fromCollection(List(
(1L, 1, "Hello"),
(2L, 2, "Hello"),
(3L, 3, "Hello"),
(7L, 7, "Hello World"),
(8L, 8, "Hello World"),
(20L, 20, "Hello World")))
//5. DataSet轉換為Table
val table: Table = tableEnvironment.fromDataSet(collection)
//6. table轉換為dataSet
val toDataSet: DataSet[(Long, Int, String)] = tableEnvironment.toDataSet[(Long, Int, String)](table)
//7.列印資料
toDataSet.print()
// env.execute()
}
}
使用Flink SQL統計使用者消費訂單的總金額、最大金額、最小金額、訂單總數。
訂單id | 使用者名 | 訂單日期 | 消費基恩 |
---|---|---|---|
zhangsan | 2018-10-20 15:30 | 358.5 |
測試資料(訂單ID、使用者名、訂單日期、訂單金額)
(1,"zhangsan","2018-10-20 15:30",358.5),
(2,"zhangsan","2018-10-20 16:30",131.5),
(3,"lisi","2018-10-20 16:30",127.5),
(4,"lisi","2018-10-20 16:30",328.5),
(5,"lisi","2018-10-20 16:30",432.5),
(6,"zhaoliu","2018-10-20 22:30",451.0),
(7,"zhaoliu","2018-10-20 22:30",362.0),
(8,"zhaoliu","2018-10-20 22:30",364.0),
(9,"zhaoliu","2018-10-20 22:30",341.0)
步驟
- 擷取一個批處理運作環境
- 擷取一個Table運作環境
- 建立一個樣例類
用來映射資料(訂單名、使用者名、訂單日期、訂單金額)Order
- 基于本地
集合建立一個DataSet sourceOrder
- 使用Table運作環境将DataSet注冊為一張表
- 使用SQL語句來操作資料(統計使用者消費訂單的總金額、最大金額、最小金額、訂單總數)
- 使用TableEnv.toDataSet将Table轉換為DataSet
- 列印測試
參考代碼
import org.apache.flink.table.api.{Table, TableEnvironment}
import org.apache.flink.api.scala._
import org.apache.flink.types.Row
object BatchFlinkSqlDemo {
// 建立一個樣例類Order用來映射資料(訂單名、使用者名、訂單日期、訂單金額)
case class Order(id:Int, userName:String, createTime:String, money:Double)
def main(args: Array[String]): Unit = {
// 擷取一個批處理運作環境
val env = ExecutionEnvironment.getExecutionEnvironment
// 擷取一個Table運作環境
val tableEnv = TableEnvironment.getTableEnvironment(env)
// 基于本地Order集合建立一個DataSet source
val orderDataSet = env.fromCollection(List(
Order(1, "zhangsan", "2018-10-20 15:30", 358.5),
Order(2, "zhangsan", "2018-10-20 16:30", 131.5),
Order(3, "lisi", "2018-10-20 16:30", 127.5),
Order(4, "lisi", "2018-10-20 16:30", 328.5),
Order(5, "lisi", "2018-10-20 16:30", 432.5),
Order(6, "zhaoliu", "2018-10-20 22:30", 451.0),
Order(7, "zhaoliu", "2018-10-20 22:30", 362.0),
Order(8, "zhaoliu", "2018-10-20 22:30", 364.0),
Order(9, "zhaoliu", "2018-10-20 22:30", 341.0)
))
// 使用Table運作環境将DataSet注冊為一張表
tableEnv.registerDataSet("t_order", orderDataSet)
// 使用SQL語句來操作資料(統計使用者消費訂單的總金額、最大金額、最小金額、訂單總數)
val sql =
"""
|select
| userName,
| sum(money) as totalMoney,
| max(money) as maxMoney,
| min(money) as minMoney,
| count(1) as totalCount
|from
| t_order
|group by
| userName
""".stripMargin
val table: Table = tableEnv.sqlQuery(sql)
// 列印輸出的這個表格的定義(哪些列、列的類型)
table.printSchema()
// 使用TableEnv.toDataSet将Table轉換為DataSet
val resultDataSet: DataSet[Row] = tableEnv.toDataSet[Row](table)
// 列印測試
resultDataSet.print()
}
}
讀取CSV檔案,以table api的方式查詢
name
為
張三
的資料
1,張三,1,98
2,張三,2,77.5
3,張三,3,89
4,張三,4,65
5,張三,5,78
6,張三,6,70
7,李四,1,78
8,李四,2,58
9,李四,3,65
10,李四,4,78
- 擷取批處理運作環境
- 加載外部CSV檔案
- 将外部資料建構成表
- 使用table方式查詢資料
- 列印表結構
- 将資料落地到新的CSV檔案中
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.core.fs.FileSystem
import org.apache.flink.table.api.{Table, TableEnvironment, Types}
import org.apache.flink.table.sinks.CsvTableSink
import org.apache.flink.table.sources.CsvTableSource
object BatchTableDemo {
def main(args: Array[String]): Unit = {
//建立batch執行環境
val env = ExecutionEnvironment.getExecutionEnvironment
//建立table環境用于batch查詢
val tableEnvironment = TableEnvironment.getTableEnvironment(env)
//加載外部CSV資料
val csvTableSource:CsvTableSource = CsvTableSource.builder()
.path("./data/score.csv") //檔案路徑
.field("id", Types.INT) //第一列資料
.field("name", Types.STRING) //第二列資料
.field("subjectId", Types.INT) //第三列資料
.field("score", Types.DOUBLE) //第三列資料
.fieldDelimiter(",") //列分隔符,預設是","
.lineDelimiter("\n") //換行符
.ignoreFirstLine() //忽略第一行
.ignoreParseErrors() //忽略解析錯誤
.build()
//将外部資料建構成表
tableEnvironment.registerTableSource("tableA", csvTableSource)
//TODO 1:使用table方式查詢資料
val table: Table = tableEnvironment.scan("tableA").select("id , name , subjectId,score").filter("name == '張三'")
table.printSchema()
//将資料寫出去
table.writeToSink(new CsvTableSink("./data/table.csv", ",", 1, FileSystem.WriteMode.OVERWRITE))
//TODO 2:使用sql方式
// val sqlResult = tableEnvironment.sqlQuery("select id,name,subjectId,score from tableA where name='張三'")
// //将資料寫出去
// sqlResult.writeToSink(new CsvTableSink("./data/table.csv", ",", 1, FileSystem.WriteMode.OVERWRITE))
env.execute()
}
}
流進行中也可以支援SQL。但是需要注意以下幾點:
- 要使用流處理的SQL,必須要添加水印時間
- 使用
系統資料庫的時候,使用registerDataStream
來指定字段'
- 系統資料庫的時候,必須要指定一個rowtime,否則無法在SQL中使用視窗
- 必須要導入
隐式參數import org.apache.flink.table.api.scala._
- SQL中使用
來進行定義視窗tumble(時間列名, interval '時間' sencond)
使用Flink SQL來統計5秒内
使用者的
訂單總數、訂單的最大金額、訂單的最小金額。
相關SQL
select
userId,
count(1) as totalCount,
max(money) as maxMoney,
min(money) as minMoney
from
t_order
group by
tumble(createTime, interval '5' second),
userId
- 擷取流處理運作環境
- 設定處理時間為
EventTime
- 建立一個訂單樣例類
,包含四個字段(訂單ID、使用者ID、訂單金額、時間戳)Order
- 建立一個自定義資料源
- 使用for循環生成1000個訂單
- 随機生成訂單ID(UUID)
- 随機生成使用者ID(0-2)
- 随機生成訂單金額(0-100)
- 時間戳為目前系統時間
- 每隔1秒生成一個訂單
- 添加水印,允許延遲2秒
- 導入
import org.apache.flink.table.api.scala._
-
系統資料庫,并分别指定字段,還要指定rowtime字段registerDataStream
- 編寫SQL語句統計使用者訂單總數、最大金額、最小金額
- 分組時要使用
來建立視窗tumble(時間列, interval '視窗時間' second)
- 分組時要使用
-
執行sql語句tableEnv.sqlQuery
- 将SQL的執行結果轉換成DataStream再列印出來
- 啟動流處理程式
import java.util.UUID
import java.util.concurrent.TimeUnit
import org.apache.commons.lang.time.FastDateFormat
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction}
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.table.api.{Table, TableEnvironment}
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.table.api.scala._
import org.apache.flink.types.Row
import scala.util.Random
object StreamFlinkSqlDemo {
// 建立一個訂單樣例類Order,包含四個字段(訂單ID、使用者ID、訂單金額、時間戳)
case class Order(id:String, userId:Int, money:Long, createTime:Long)
def main(args: Array[String]): Unit = {
// 擷取流處理運作環境
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 設定處理時間為EventTime
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
// 擷取Table運作環境
val tableEnv = TableEnvironment.getTableEnvironment(env)
// 建立一個自定義資料源
val orderDataStream = env.addSource(new RichSourceFunction[Order] {
override def run(ctx: SourceFunction.SourceContext[Order]): Unit = {
// 使用for循環生成1000個訂單
for(i <- 0 until 1000) {
// 随機生成訂單ID(UUID)
val id = UUID.randomUUID().toString
// 随機生成使用者ID(0-2)
val userId = Random.nextInt(3)
// 随機生成訂單金額(0-100)
val money = Random.nextInt(101)
// 時間戳為目前系統時間
val timestamp = System.currentTimeMillis()
// 收集資料
ctx.collect(Order(id, userId, money, timestamp))
// 每隔1秒生成一個訂單
TimeUnit.SECONDS.sleep(1)
}
}
override def cancel(): Unit = ()
})
// 添加水印,允許延遲2秒
val watermarkDataStream: DataStream[Order] = orderDataStream.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[Order] {
var currentTimestamp: Long = _
// 允許延遲2秒
val delayTime = 2000
// 生成一個水印資料
override def getCurrentWatermark: Watermark = {
// 減去兩秒中,表示讓window視窗延遲兩秒計算
val watermark = new Watermark(currentTimestamp - delayTime)
val formater = FastDateFormat.getInstance("HH:mm:ss")
println(s"水印時間: ${formater.format(watermark.getTimestamp)},事件時間:${formater.format(currentTimestamp)}, 系統時間:${formater.format(System.currentTimeMillis())}")
watermark
}
// 表示從Order中擷取對應的時間戳
override def extractTimestamp(element: Order, previousElementTimestamp: Long): Long = {
// 擷取到Order訂單事件的時間戳
val timestamp = element.createTime
// 表示時間軸不會往前推,不能因為某些資料延遲了,導緻整個window資料得不到計算
currentTimestamp = Math.max(currentTimestamp, timestamp)
currentTimestamp
}
})
// 使用registerDataStream系統資料庫,并分别指定字段,還要指定rowtime字段
// 導入import org.apache.flink.table.api.scala._隐式參數
tableEnv.registerDataStream("t_order", watermarkDataStream, 'id, 'userId, 'money, 'createTime.rowtime)
// 編寫SQL語句統計使用者訂單總數、最大金額、最小金額
// 分組時要使用tumble(時間列, interval '視窗時間' second)來建立視窗
val sql =
"""
|select
| userId,
| count(1) as totalCount,
| max(money) as maxMoney,
| min(money) as minMoney
|from
| t_order
|group by
| tumble(createTime, interval '5' second),
| userId
""".stripMargin
// 使用tableEnv.sqlQuery執行sql語句
val table: Table = tableEnv.sqlQuery(sql)
table.printSchema()
// 将SQL的執行結果轉換成DataStream再列印出來
tableEnv.toAppendStream[Row](table).print()
// 啟動流處理程式
env.execute("StreamSQLApp")
}
}