天天看點

Flink Table&SQL API的簡單使用1.需求2.分析程式運作步驟3.原始資料和實體類定義4.Java實作5.scala實作6.輸出結果

1.需求

  1. 從csv檔案中讀取資料并儲存為資料表
  2. 執行sql語句進行查詢并輸出

2.分析程式運作步驟

  1. 擷取flink的執行環境,我們這裡用的是批處理的執行環境
  2. 擷取table的執行環境,這裡也擷取的是批處理的執行環境
  3. 從csv檔案中讀取資料,以POJO的形式接收,傳回DataSet

    對flink如何從csv檔案讀取資料不清楚的朋友可以看我這篇部落格!

    https://blog.csdn.net/weixin_44844089/article/details/103201216

  4. 将DataSet轉化成Table
  5. 将Table注冊成表
  6. 編寫sql語句,執行查詢,傳回結果(這個結果是Table)
  7. 将傳回的結果轉成DataSet進行輸出

3.原始資料和實體類定義

3.1資料 儲存在csv檔案中

transactionId,customerId,itemId,amountPaId
111,1,1,100.0
112,2,2,505.0
113,3,3,510.0
114,4,4,600.0
115,1,2,500.0
116,1,2,500.0
117,1,2,500.0
118,1,2,500.0
119,2,3,500.0
120,1,2,500.0
121,1,4,500.0
122,1,2,500.0
123,1,4,500.0
124,1,2,500.0
           

3.2實體類定義

這裡忽略了get,set方法,還有空的構造器方法,toString方法

切記,這幾個方法是必須的,否則會報錯

如何從csv檔案中讀取資料

public class SalesLog {
    private String transactionId;
    private String customerId;
    private String itemId;
    private Double amountPaId;
}
           

4.Java實作

```php

```php
```php
public class JavaTableSQLAPI {
    public static void main(String[] args) throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
//        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //擷取table的執行環境  注意這裡我們使用的是BatchTableEnvironment
        //實際上我們也可以選擇流的!其實都差不多  可以去官網上看一看
        BatchTableEnvironment tableEnv = BatchTableEnvironment.getTableEnvironment(env);
//        StreamTableEnvironment tableEnv = StreamTableEnvironment.getTableEnvironment(env);
        String filePath = "C:\\Users\\77051\\IdeaProjects\\flinktrain01\\src\\main\\scala\\com\\imooc\\file\\sales.csv";
        //自己定義的SalesLog實體類 必須要有set,get方法,一個空的構造器方法和toString方法!
        DataSet<SalesLog> csvData = env.readCsvFile(filePath)
                                        .fieldDelimiter(",")//分隔符
                                        .includeFields("1111")//設定我們要讀取的域 1讀 0不讀
                                        .ignoreFirstLine()//是否忽略第一行
                                        .pojoType(SalesLog.class,"transactionId","customerId","itemId","amountPaId");

        //将DataSet 轉成一個table
        Table csvTable = tableEnv.fromDataSet(csvData);

        //将table注冊成一張表Table 之後就可以執行sql語句了
        tableEnv.registerTable("sales",csvTable);

        //執行sql語句 傳回結果table
        Table resultTable = tableEnv.sqlQuery("select customerId,sum(amountPaId) from sales group by customerId");

        //将結果的table轉成DataSet 進行輸出
        DataSet<Row> result = tableEnv.toDataSet(resultTable, Row.class);
        result.print();
    }

}
           

5.scala實作

object TableSqlAPI {

  def main(args: Array[String]): Unit = {

    val env = ExecutionEnvironment.getExecutionEnvironment

    //注意這裡要選擇擷取批處理的 因為我們擷取的執行環境就是批處理的
    val tableEnv = TableEnvironment.getTableEnvironment(env)

    val filePath = "C:\\Users\\77051\\IdeaProjects\\flinktrain01\\src\\main\\scala\\com\\imooc\\file\\sales.csv"
    //注意這裡要傳類型 我們自己定義了一個模闆類SalesLog,并且要忽略第一行
    val csvData = env.readCsvFile[SalesLog](filePath, ignoreFirstLine = true)
//    csvData.print()

    //将DataSet轉成table
    val salesTable = tableEnv.fromDataSet(csvData)

    //将table注冊成一張表 後面就可以進行sql查詢了
    tableEnv.registerTable("sales",salesTable)

    //執行查詢語句 傳回一個table
    val resultTable = tableEnv.sqlQuery("select customerId,sum(amountPaId) from sales group by customerId")

    //再将一個table轉成一個DataSet進行輸出
    val result = tableEnv.toDataSet[Row](resultTable)
    result.print()
  }

    //模闆類
  case class SalesLog(transactionId:String,
                      customerId:String,
                      itemId:String,
                      amountPaId:Double)
}
           

6.輸出結果

4,600.0

2,1005.0

3,510.0

1,4600.0