天天看點

(14) flink sql與tableAPI

文章目錄

    • 1. Flink SQL開發
      • 1.1. 什麼是flink關系型API
      • 1.2. Table API和SQL程式的結構
      • 1.3. 在内部目錄中注冊一個表
        • 1.3.1. 注冊一個表
        • 1.3.2. 注冊一個TableSource
        • 1.3.3. 注冊一個TableSink
      • 1.4. Table和DataStream和DataSet的內建
        • 1.4.1. 将DataStream或DataSet轉換為Table
        • 1.4.2. 将Table轉換為DataStream或DataSet
      • 1.5. 批處理案例1
      • 1.6. 批處理案例2
      • 1.7. 流資料處理案例

當我們在使用flink做流式和批式任務計算的時候,往往會想到幾個問題:

  1. 需要熟悉兩套API : DataStream/DataSet API,API有一定難度,開發人員無法集中精力到具體業務的開發
  2. 需要有Java或Scala的開發經驗
  3. flink同時支援批任務與流任務,如何做到API層的統一

flink已經擁有了強大的DataStream/DataSetAPI,滿足流計算和批計算中的各種場景需求,但是關于以上幾個問題,我們還需要提供一種關系型的API來實作flink API層的流與批的統一,那麼這就是flink的

Table API & SQL

首先Table API& SQL 是一種關系型API,使用者可以像操作MySQL資料庫表一樣的操作資料,而不需要寫Java代碼完成flink function,更不需要手工的優化Java代碼調優。另外,SQL作為一個非程式員可操作的語言,學習成本很低,如果一個系統提供SQL支援,将很容易被使用者接受。

總結來說,關系型API的好處:

  1. 關系型API是聲明式的
  2. 查詢能夠被有效的優化
  3. 查詢可以高效的執行
  4. “Everybody” knows SQL

Table API& SQL 是流處理和批處理統一的API層,如下圖。

  • flink在runtime層是統一的,因為flink将批任務看做流的一種特例來執行
  • 在API層,flink為批和流提供了兩套API(DataSet和DataStream)
  • Table API & SQL就統一了flink的API層,批資料上的查詢會随着輸入資料的結束而結束并生成DataSet,流資料的查詢會一直運作并生成結果流。
  • Table API & SQL做到了批與流上的查詢具有同樣的文法語義,是以不用改代碼就能同時在批和流上執行。

[外鍊圖檔轉存失敗(img-hPAHpUFn-1569146589978)(assets/1554261351405.png)]

Table API的特點

Table API和SQL都是Apache Flink中高等級的分析API,SQL所具備的特點Table API也都具有,如下:

  • 聲明式 - 使用者隻關心做什麼,不用關心怎麼做;
  • 高性能 - 支援查詢優化,可以擷取最好的執行性能;
  • 流批統一 - 相同的統計邏輯,既可以流模式運作,也可以批模式運作;
  • 标準穩定 - 語義遵循SQL标準,文法語義明确,不易變動。

當然除了SQL的特性,因為Table API是在Flink中專門設計的,是以Table API還具有自身的特點:

  • 表達方式的擴充性 - 在Flink中可以為Table API開發很多便捷性功能,如:Row.flatten(), map/flatMap 等
  • 功能的擴充性 - 在Flink中可以為Table API擴充更多的功能,如:Iteration,flatAggregate 等新功能
  • 編譯檢查 - Table API支援java和scala語言開發,支援IDE中進行編譯檢查。

Table API和SQL捆綁在flink-table Maven工件中。必須将以下依賴項添加到你的項目才能使用Table API和SQL:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table_2.11</artifactId>
    <version>1.6.1</version>
</dependency>
           

另外,你需要為Flink的Scala批處理或流式API添加依賴項。對于批量查詢,您需要添加:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-scala_2.11</artifactId>
    <version>1.6.1</version>
</dependency>
           

Table API一般與DataSet或者DataStream緊密關聯,可以通過一個DataSet或DataStream建立出一個Table,再用類似于filter, join, 或者 select關系型轉化操作來轉化為一個新的

Table

對象。最後将一個Table對象轉回一個DataSet或DataStream。從内部實作上來說,所有應用于Table的轉化操作都變成一棵邏輯表操作樹,在Table對象被轉化回DataSet或者DataStream之後,轉化器會将邏輯表操作樹轉化為對等的DataSet或者DataStream操作符。

Flink的批處理和流處理的Table API和SQL程式遵循相同的模式;是以我們隻需要使用一種來示範即可要想執行flink的SQL語句,首先需要擷取SQL的執行環境:兩種方式(batch和streaming):

批處理:

val bEnv = ExecutionEnvironment.getExecutionEnvironment
// create a TableEnvironment for batch queries
val bTableEnv = TableEnvironment.getTableEnvironment(bEnv)
           

流處理:

val sEnv = StreamExecutionEnvironment.getExecutionEnvironment
// create a TableEnvironment for streaming queries
val sTableEnv = TableEnvironment.getTableEnvironment(sEnv)
           

通過

getTableEnvironment

可以擷取

TableEnviromment

;這個TableEnviromment是Table API和SQL內建的核心概念。它負責:

  • 在内部目錄中注冊一個表
  • 注冊外部目錄
  • 執行SQL查詢
  • 注冊使用者定義的(标量,表格或聚合)函數
  • 轉換DataStream或DataSet成Table
  • 持有一個ExecutionEnvironment或一個參考StreamExecutionEnvironment

TableEnvironment維護一個按名稱注冊的表的目錄。有兩種類型的表格,

輸入

表格和

輸出

表格。輸入表可以在Table API和SQL查詢中引用并提供輸入資料。輸出表可用于将表API或SQL查詢的結果發送到外部系統輸入表可以從各種來源注冊:

  • 現有Table對象,通常是表API或SQL查詢的結果。
  • TableSource,它通路外部資料,例如檔案,資料庫或消息傳遞系統。
  • DataStream或DataSet來自DataStream或DataSet程式。

輸出表可以使用注冊TableSink。

// get a TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)
// register the Table projTable as table "projectedX"
tableEnv.registerTable("projectedTable", projTable)
// Table is the result of a simple projection query 
val projTable: Table = tableEnv.scan("projectedTable ").select(...)
           

TableSource提供對存儲在諸如資料庫(MySQL,HBase等),具有特定編碼(CSV,Apache [Parquet,Avro,ORC],…)的檔案的存儲系統中的外部資料的通路或者消息傳送系統(Apache Kafka,RabbitMQ,…)

// get a TableEnvironment 
val tableEnv = TableEnvironment.getTableEnvironment(env) 
// create a TableSource
val csvSource: TableSource = CsvTableSource.builder().path("./data/score.csv")...
// register the TableSource as table "CsvTable" 
tableEnv.registerTableSource("CsvTable", csvSource)
           

注冊TableSink可用于将表API或SQL查詢的結果發送到外部存儲系統,如資料庫,鍵值存儲,消息隊列或檔案系統(使用不同的編碼,例如CSV,Apache [Parquet ,Avro,ORC],…)

// get a TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)
// create a TableSink
val csvSink: TableSink = new CsvTableSink("/path/to/file", ...)
// define the field names and types
val fieldNames: Array[String] = Array("a", "b", "c")
val fieldTypes: Array[TypeInformation[_]] = Array(Types.INT, Types.STRING, Types.LONG)
// register the TableSink as table "CsvSinkTable"
tableEnv.registerTableSink("CsvSinkTable", fieldNames, fieldTypes, csvSink)
           

在上面的例子講解中,直接使用的是:

registerTableSource

系統資料庫

對于flink來說,還有更靈活的方式:比如直接注冊DataStream或者DataSet轉換為一張表。

然後DataStream或者DataSet就相當于表,這樣可以繼續使用SQL來操作流或者批次的資料

文法:

// get TableEnvironment 
// registration of a DataSet is equivalent
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)

val stream: DataStream[(Long, String)] = ...

// register the DataStream as Table "myTable" with fields "f0", "f1"
tableEnv.registerDataStream("myTable", stream)

           

示例

處理方式,加載下列資料,并注冊為表,查詢所有資料,寫入到CSV檔案中。

id product amount
1 beer 3
2 diaper 4
rubber

開發步驟

  1. 擷取流處理環境
  2. 擷取TableEnvironment
  3. 加載本地集合
  4. 根據資料系統資料庫
  5. 執行SQL
  6. 寫入CSV檔案中
  7. 執行任務

代碼

import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.api.scala._
import org.apache.flink.core.fs.FileSystem
import org.apache.flink.table.sinks.CsvTableSink

object DataSet_DataStreamToTable {
  def main(args: Array[String]): Unit = {

    // 1. 擷取流處理環境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    // 2. 擷取TableEnvironment
    val tableEnv = TableEnvironment.getTableEnvironment(env)
    // 3. 加載本地集合
    val orderA: DataStream[Order1] = env.fromCollection(Seq(
      Order1(1L, "beer", 3),
      Order1(1L, "diaper", 4),
      Order1(3L, "rubber", 2)))

    // 4. 根據資料系統資料庫
    tableEnv.registerDataStream("OrderA", orderA)
    // 5. 執行SQL
    val result = tableEnv.sqlQuery("SELECT * FROM OrderA")
    // 6. 寫入CSV檔案中
    result.writeToSink(new CsvTableSink("./data/ccc", ",", 1, FileSystem.WriteMode.OVERWRITE))
    // 7. 執行任務
    env.execute()
  }
}

// 定義樣例類
case class Order1(user: Long, product: String, amount: Int)

           

Table可以轉換為DataStream或者DataSet,這樣的話,自定義的DataStream或者DataSet程式就可以基于Table API或者SQL查詢的結果來執行了。

當将一個Table轉換為DataStream或者DataSet時,你需要指定生成的DataStream或者DataSet的資料類型,即需要轉換表的行的資料類型,通常最友善的轉換類型是Row,下面清單概述了不同選項的功能:

  • Row:字段通過位置映射、可以是任意數量字段,支援空值,非類型安全通路
  • POJO:字段通過名稱(POJO字段作為Table字段時,必須命名)映射,可以是任意數量字段,支援空值,類型安全通路
  • Case Class:字段通過位置映射,不支援空值,類型安全通路
  • Tuple:字段通過位置映射,不得多于22(Scala)或者25(Java)個字段,不支援空值,類型安全通路
  • Atomic Type:Table必須有一個字段,不支援空值,類型安全通路。

将Table轉換為DataStream

流式查詢的結果Table會被動态地更新,即每個新的記錄到達輸入流時結果就會發生變化。是以,轉換此動态查詢的DataStream需要對表的更新進行編碼。

有兩種模式可以将 Table轉換為DataStream:

1:

Append Mode

:這種模式隻适用于當動态表僅由INSERT更改修改時,即僅附加,之前發送的結果不會被更新。

2:

Retract Mode

:始終都可以使用此模式,它使用一個boolean辨別來編碼INSERT和DELETE更改。

文法格式:

// get TableEnvironment. 
// registration of a DataSet is equivalent
// ge val tableEnv = TableEnvironment.getTableEnvironment(env)

// Table with two fields (String name, Integer age)
val table: Table = ...

// convert the Table into an append DataStream of Row
val dsRow: DataStream[Row] = tableEnv.toAppendStream[Row](table)

// convert the Table into an append DataStream of Tuple2[String, Int]
val dsTuple: DataStream[(String, Int)] = tableEnv.toAppendStream[(String, Int)](table)

// convert the Table into a retract DataStream of Row.
//   A retract stream of type X is a DataStream[(Boolean, X)]. 
//   The boolean field indicates the type of the change. 
//   True is INSERT, false is DELETE.
val retractStream: DataStream[(Boolean, Row)] = tableEnv.toRetractStream[Row](table)
           

使用Flink流式環境, 加載下列集合資料, 轉換為Table, 并将Table轉換為DataStream

List(
      (1L, 1, "Hello"),
      (2L, 2, "Hello"),
      (6L, 6, "Hello"),
      (7L, 7, "Hello World"),
      (8L, 8, "Hello World"),
      (20L, 20, "Hello World"))
    )
           
  1. 設定并行度
  2. 擷取Table運作環境
  3. 轉換DataStream為Table
  4. 将table轉換為DataStream----将一個表附加到流上Append Mode
  5. 将table轉換為DataStream----Retract Mode true代表添加消息,false代表撤銷消息
  6. 列印輸出
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.table.api.{Table, TableEnvironment}
import org.apache.flink.api.scala._
import org.apache.flink.core.fs.FileSystem
import org.apache.flink.table.sinks.CsvTableSink


object TableTODataStream {
  def main(args: Array[String]): Unit = {
    // 1. 擷取流處理環境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    // 2. 設定并行度
    env.setParallelism(1)
    // 3. 擷取Table運作環境
    val tEnv = TableEnvironment.getTableEnvironment(env)
    // 4. 加載本地集合
    val stream = env.fromCollection(
      List(
      (1L, 1, "Hello"),
      (2L, 2, "Hello"),
      (6L, 6, "Hello"),
      (7L, 7, "Hello World"),
      (8L, 8, "Hello World"),
      (20L, 20, "Hello World"))
    )
    // 5. 轉換DataStream為Table
    val table: Table = tEnv.fromDataStream(stream)
    // 6. 将table轉換為DataStream----将一個表附加到流上Append Mode
    val appendStream: DataStream[(Long, Int, String)] = tEnv.toAppendStream[(Long, Int, String)](table)
    // 7. 将table轉換為DataStream----Retract Mode true代表添加消息,false代表撤銷消息
    val retractStream: DataStream[(Boolean, (Long, Int, String))] = tEnv.toRetractStream[(Long, Int, String)](table)
    // 8. 列印輸出
    appendStream.print()
    retractStream.print()
    // 9. 執行任務
    env.execute()
  }
}
           

将Table轉換為DataSet

文法格式:

// get TableEnvironment 
// registration of a DataSet is equivalent
val tableEnv = TableEnvironment.getTableEnvironment(env)
// Table with two fields (String name, Integer age)
val table: Table = ...
// convert the Table into a DataSet of Row
val dsRow: DataSet[Row] = tableEnv.toDataSet[Row](table)
// convert the Table into a DataSet of Tuple2[String, Int]
val dsTuple: DataSet[(String, Int)] = tableEnv.toDataSet[(String, Int)](table)
           

使用Flink批處理環境, 加載下列集合資料, 轉換為Table, 并将Table轉換為DataSet

List(
      (1L, 1, "Hello"),
      (2L, 2, "Hello"),
      (6L, 6, "Hello"),
      (7L, 7, "Hello World"),
      (8L, 8, "Hello World"),
      (20L, 20, "Hello World"))
    )
           
  1. 擷取批處理環境
  2. DataSet轉換為Table
  3. table轉換為dataSet
package com.itheima

import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment}
import org.apache.flink.api.scala._
import org.apache.flink.table.api.{Table, TableEnvironment}

object TableTODataSet{
  def main(args: Array[String]): Unit = {


    //1. 批處理環境
    val env = ExecutionEnvironment.getExecutionEnvironment
    //2. 設定并行度
    env.setParallelism(1)
    //3. 擷取table運作環境
    val tableEnvironment = TableEnvironment.getTableEnvironment(env)
    //4. 加載本地集合
    val collection: DataSet[(Long, Int, String)] = env.fromCollection(List(
      (1L, 1, "Hello"),
      (2L, 2, "Hello"),
      (3L, 3, "Hello"),
      (7L, 7, "Hello World"),
      (8L, 8, "Hello World"),
      (20L, 20, "Hello World")))
    //5. DataSet轉換為Table
    val table: Table = tableEnvironment.fromDataSet(collection)
    //6. table轉換為dataSet
    val toDataSet: DataSet[(Long, Int, String)] = tableEnvironment.toDataSet[(Long, Int, String)](table)
    //7.列印資料
    toDataSet.print()
//    env.execute()
  }
}
           

使用Flink SQL統計使用者消費訂單的總金額、最大金額、最小金額、訂單總數。

訂單id 使用者名 訂單日期 消費基恩
zhangsan 2018-10-20 15:30 358.5

測試資料(訂單ID、使用者名、訂單日期、訂單金額)

(1,"zhangsan","2018-10-20 15:30",358.5),
(2,"zhangsan","2018-10-20 16:30",131.5),
(3,"lisi","2018-10-20 16:30",127.5),
(4,"lisi","2018-10-20 16:30",328.5),
(5,"lisi","2018-10-20 16:30",432.5),
(6,"zhaoliu","2018-10-20 22:30",451.0),
(7,"zhaoliu","2018-10-20 22:30",362.0),
(8,"zhaoliu","2018-10-20 22:30",364.0),
(9,"zhaoliu","2018-10-20 22:30",341.0)
           

步驟

  1. 擷取一個批處理運作環境
  2. 擷取一個Table運作環境
  3. 建立一個樣例類

    Order

    用來映射資料(訂單名、使用者名、訂單日期、訂單金額)
  4. 基于本地

    Order

    集合建立一個DataSet source
  5. 使用Table運作環境将DataSet注冊為一張表
  6. 使用SQL語句來操作資料(統計使用者消費訂單的總金額、最大金額、最小金額、訂單總數)
  7. 使用TableEnv.toDataSet将Table轉換為DataSet
  8. 列印測試

參考代碼

import org.apache.flink.table.api.{Table, TableEnvironment}
import org.apache.flink.api.scala._
import org.apache.flink.types.Row

object BatchFlinkSqlDemo {

  // 建立一個樣例類Order用來映射資料(訂單名、使用者名、訂單日期、訂單金額)
  case class Order(id:Int, userName:String, createTime:String, money:Double)

  def main(args: Array[String]): Unit = {
    // 擷取一個批處理運作環境
    val env = ExecutionEnvironment.getExecutionEnvironment

    // 擷取一個Table運作環境
    val tableEnv = TableEnvironment.getTableEnvironment(env)

    // 基于本地Order集合建立一個DataSet source
    val orderDataSet = env.fromCollection(List(
      Order(1, "zhangsan", "2018-10-20 15:30", 358.5),
      Order(2, "zhangsan", "2018-10-20 16:30", 131.5),
      Order(3, "lisi", "2018-10-20 16:30", 127.5),
      Order(4, "lisi", "2018-10-20 16:30", 328.5),
      Order(5, "lisi", "2018-10-20 16:30", 432.5),
      Order(6, "zhaoliu", "2018-10-20 22:30", 451.0),
      Order(7, "zhaoliu", "2018-10-20 22:30", 362.0),
      Order(8, "zhaoliu", "2018-10-20 22:30", 364.0),
      Order(9, "zhaoliu", "2018-10-20 22:30", 341.0)
    ))

    // 使用Table運作環境将DataSet注冊為一張表
    tableEnv.registerDataSet("t_order", orderDataSet)

    // 使用SQL語句來操作資料(統計使用者消費訂單的總金額、最大金額、最小金額、訂單總數)
    val sql =
      """
        |select
        | userName,
        | sum(money) as totalMoney,
        | max(money) as maxMoney,
        | min(money) as minMoney,
        | count(1) as totalCount
        |from
        | t_order
        |group by
        | userName
      """.stripMargin

    val table: Table = tableEnv.sqlQuery(sql)
    // 列印輸出的這個表格的定義(哪些列、列的類型)
    table.printSchema()

    // 使用TableEnv.toDataSet将Table轉換為DataSet
    val resultDataSet: DataSet[Row] = tableEnv.toDataSet[Row](table)

    // 列印測試
    resultDataSet.print()
  }
}
           

讀取CSV檔案,以table api的方式查詢

name

張三

的資料

1,張三,1,98
2,張三,2,77.5
3,張三,3,89
4,張三,4,65
5,張三,5,78
6,張三,6,70
7,李四,1,78
8,李四,2,58
9,李四,3,65
10,李四,4,78
           
  1. 擷取批處理運作環境
  2. 加載外部CSV檔案
  3. 将外部資料建構成表
  4. 使用table方式查詢資料
  5. 列印表結構
  6. 将資料落地到新的CSV檔案中
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.core.fs.FileSystem
import org.apache.flink.table.api.{Table, TableEnvironment, Types}
import org.apache.flink.table.sinks.CsvTableSink
import org.apache.flink.table.sources.CsvTableSource

object BatchTableDemo {

  def main(args: Array[String]): Unit = {
    //建立batch執行環境
    val env = ExecutionEnvironment.getExecutionEnvironment
    //建立table環境用于batch查詢
    val tableEnvironment = TableEnvironment.getTableEnvironment(env)
    //加載外部CSV資料
    val csvTableSource:CsvTableSource = CsvTableSource.builder()
      .path("./data/score.csv") //檔案路徑
      .field("id", Types.INT) //第一列資料
      .field("name", Types.STRING) //第二列資料
      .field("subjectId", Types.INT) //第三列資料
      .field("score", Types.DOUBLE) //第三列資料
      .fieldDelimiter(",") //列分隔符,預設是","
      .lineDelimiter("\n") //換行符
      .ignoreFirstLine() //忽略第一行
      .ignoreParseErrors() //忽略解析錯誤
      .build()
    //将外部資料建構成表
    tableEnvironment.registerTableSource("tableA", csvTableSource)

    //TODO 1:使用table方式查詢資料
    val table: Table = tableEnvironment.scan("tableA").select("id , name , subjectId,score").filter("name == '張三'")
    table.printSchema()
    //将資料寫出去
    table.writeToSink(new CsvTableSink("./data/table.csv", ",", 1, FileSystem.WriteMode.OVERWRITE))
    //TODO 2:使用sql方式
    //    val sqlResult = tableEnvironment.sqlQuery("select id,name,subjectId,score from tableA where name='張三'")
    //    //将資料寫出去
    //    sqlResult.writeToSink(new CsvTableSink("./data/table.csv", ",", 1, FileSystem.WriteMode.OVERWRITE))
    env.execute()
  }
}

           

流進行中也可以支援SQL。但是需要注意以下幾點:

  1. 要使用流處理的SQL,必須要添加水印時間
  2. 使用

    registerDataStream

    系統資料庫的時候,使用

    '

    來指定字段
  3. 系統資料庫的時候,必須要指定一個rowtime,否則無法在SQL中使用視窗
  4. 必須要導入

    import org.apache.flink.table.api.scala._

    隐式參數
  5. SQL中使用

    tumble(時間列名, interval '時間' sencond)

    來進行定義視窗

使用Flink SQL來統計5秒内

使用者的

訂單總數、訂單的最大金額、訂單的最小金額。

相關SQL

select
 userId,
 count(1) as totalCount,
 max(money) as maxMoney,
 min(money) as minMoney
from
 t_order
group by
 tumble(createTime, interval '5' second),
 userId
           
  1. 擷取流處理運作環境
  2. 設定處理時間為

    EventTime

  3. 建立一個訂單樣例類

    Order

    ,包含四個字段(訂單ID、使用者ID、訂單金額、時間戳)
  4. 建立一個自定義資料源
    • 使用for循環生成1000個訂單
    • 随機生成訂單ID(UUID)
    • 随機生成使用者ID(0-2)
    • 随機生成訂單金額(0-100)
    • 時間戳為目前系統時間
    • 每隔1秒生成一個訂單
  5. 添加水印,允許延遲2秒
  6. 導入

    import org.apache.flink.table.api.scala._

  7. registerDataStream

    系統資料庫,并分别指定字段,還要指定rowtime字段
  8. 編寫SQL語句統計使用者訂單總數、最大金額、最小金額
    • 分組時要使用

      tumble(時間列, interval '視窗時間' second)

      來建立視窗
  9. tableEnv.sqlQuery

    執行sql語句
  10. 将SQL的執行結果轉換成DataStream再列印出來
  11. 啟動流處理程式
import java.util.UUID
import java.util.concurrent.TimeUnit

import org.apache.commons.lang.time.FastDateFormat
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction}
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.table.api.{Table, TableEnvironment}
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.table.api.scala._
import org.apache.flink.types.Row

import scala.util.Random

object StreamFlinkSqlDemo {
  // 建立一個訂單樣例類Order,包含四個字段(訂單ID、使用者ID、訂單金額、時間戳)
  case class Order(id:String, userId:Int, money:Long, createTime:Long)

  def main(args: Array[String]): Unit = {
    // 擷取流處理運作環境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    // 設定處理時間為EventTime
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    // 擷取Table運作環境
    val tableEnv = TableEnvironment.getTableEnvironment(env)

    // 建立一個自定義資料源
    val orderDataStream = env.addSource(new RichSourceFunction[Order] {
      override def run(ctx: SourceFunction.SourceContext[Order]): Unit = {
        // 使用for循環生成1000個訂單
        for(i <- 0 until 1000) {
          // 随機生成訂單ID(UUID)
          val id = UUID.randomUUID().toString
          // 随機生成使用者ID(0-2)
          val userId = Random.nextInt(3)
          // 随機生成訂單金額(0-100)
          val money = Random.nextInt(101)
          // 時間戳為目前系統時間
          val timestamp = System.currentTimeMillis()

          // 收集資料
          ctx.collect(Order(id, userId, money, timestamp))
          // 每隔1秒生成一個訂單
          TimeUnit.SECONDS.sleep(1)
        }
      }

      override def cancel(): Unit = ()
    })

    // 添加水印,允許延遲2秒
    val watermarkDataStream: DataStream[Order] = orderDataStream.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[Order] {
      var currentTimestamp: Long = _
      // 允許延遲2秒
      val delayTime = 2000

      // 生成一個水印資料
      override def getCurrentWatermark: Watermark = {
        // 減去兩秒中,表示讓window視窗延遲兩秒計算
        val watermark = new Watermark(currentTimestamp - delayTime)
        val formater = FastDateFormat.getInstance("HH:mm:ss")

        println(s"水印時間: ${formater.format(watermark.getTimestamp)},事件時間:${formater.format(currentTimestamp)}, 系統時間:${formater.format(System.currentTimeMillis())}")
        watermark
      }

      // 表示從Order中擷取對應的時間戳
      override def extractTimestamp(element: Order, previousElementTimestamp: Long): Long = {
        // 擷取到Order訂單事件的時間戳
        val timestamp = element.createTime
        // 表示時間軸不會往前推,不能因為某些資料延遲了,導緻整個window資料得不到計算
        currentTimestamp = Math.max(currentTimestamp, timestamp)
        currentTimestamp
      }
    })

    // 使用registerDataStream系統資料庫,并分别指定字段,還要指定rowtime字段
    // 導入import org.apache.flink.table.api.scala._隐式參數
    tableEnv.registerDataStream("t_order", watermarkDataStream, 'id, 'userId, 'money, 'createTime.rowtime)

    // 編寫SQL語句統計使用者訂單總數、最大金額、最小金額
    // 分組時要使用tumble(時間列, interval '視窗時間' second)來建立視窗
    val sql =
      """
        |select
        | userId,
        | count(1) as totalCount,
        | max(money) as maxMoney,
        | min(money) as minMoney
        |from
        | t_order
        |group by
        | tumble(createTime, interval '5' second),
        | userId
      """.stripMargin

    // 使用tableEnv.sqlQuery執行sql語句
    val table: Table = tableEnv.sqlQuery(sql)
    table.printSchema()

    // 将SQL的執行結果轉換成DataStream再列印出來
    tableEnv.toAppendStream[Row](table).print()

    // 啟動流處理程式
    env.execute("StreamSQLApp")
  }
}