1.需求
- 從csv檔案中讀取資料并儲存為資料表
- 執行sql語句進行查詢并輸出
2.分析程式運作步驟
- 擷取flink的執行環境,我們這裡用的是批處理的執行環境
- 擷取table的執行環境,這裡也擷取的是批處理的執行環境
-
從csv檔案中讀取資料,以POJO的形式接收,傳回DataSet
對flink如何從csv檔案讀取資料不清楚的朋友可以看我這篇部落格!
https://blog.csdn.net/weixin_44844089/article/details/103201216
- 将DataSet轉化成Table
- 将Table注冊成表
- 編寫sql語句,執行查詢,傳回結果(這個結果是Table)
- 将傳回的結果轉成DataSet進行輸出
3.原始資料和實體類定義
3.1資料 儲存在csv檔案中
transactionId,customerId,itemId,amountPaId
111,1,1,100.0
112,2,2,505.0
113,3,3,510.0
114,4,4,600.0
115,1,2,500.0
116,1,2,500.0
117,1,2,500.0
118,1,2,500.0
119,2,3,500.0
120,1,2,500.0
121,1,4,500.0
122,1,2,500.0
123,1,4,500.0
124,1,2,500.0
3.2實體類定義
這裡忽略了get,set方法,還有空的構造器方法,toString方法
切記,這幾個方法是必須的,否則會報錯
如何從csv檔案中讀取資料
public class SalesLog {
private String transactionId;
private String customerId;
private String itemId;
private Double amountPaId;
}
4.Java實作
```php
```php
```php
public class JavaTableSQLAPI {
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//擷取table的執行環境 注意這裡我們使用的是BatchTableEnvironment
//實際上我們也可以選擇流的!其實都差不多 可以去官網上看一看
BatchTableEnvironment tableEnv = BatchTableEnvironment.getTableEnvironment(env);
// StreamTableEnvironment tableEnv = StreamTableEnvironment.getTableEnvironment(env);
String filePath = "C:\\Users\\77051\\IdeaProjects\\flinktrain01\\src\\main\\scala\\com\\imooc\\file\\sales.csv";
//自己定義的SalesLog實體類 必須要有set,get方法,一個空的構造器方法和toString方法!
DataSet<SalesLog> csvData = env.readCsvFile(filePath)
.fieldDelimiter(",")//分隔符
.includeFields("1111")//設定我們要讀取的域 1讀 0不讀
.ignoreFirstLine()//是否忽略第一行
.pojoType(SalesLog.class,"transactionId","customerId","itemId","amountPaId");
//将DataSet 轉成一個table
Table csvTable = tableEnv.fromDataSet(csvData);
//将table注冊成一張表Table 之後就可以執行sql語句了
tableEnv.registerTable("sales",csvTable);
//執行sql語句 傳回結果table
Table resultTable = tableEnv.sqlQuery("select customerId,sum(amountPaId) from sales group by customerId");
//将結果的table轉成DataSet 進行輸出
DataSet<Row> result = tableEnv.toDataSet(resultTable, Row.class);
result.print();
}
}
5.scala實作
object TableSqlAPI {
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
//注意這裡要選擇擷取批處理的 因為我們擷取的執行環境就是批處理的
val tableEnv = TableEnvironment.getTableEnvironment(env)
val filePath = "C:\\Users\\77051\\IdeaProjects\\flinktrain01\\src\\main\\scala\\com\\imooc\\file\\sales.csv"
//注意這裡要傳類型 我們自己定義了一個模闆類SalesLog,并且要忽略第一行
val csvData = env.readCsvFile[SalesLog](filePath, ignoreFirstLine = true)
// csvData.print()
//将DataSet轉成table
val salesTable = tableEnv.fromDataSet(csvData)
//将table注冊成一張表 後面就可以進行sql查詢了
tableEnv.registerTable("sales",salesTable)
//執行查詢語句 傳回一個table
val resultTable = tableEnv.sqlQuery("select customerId,sum(amountPaId) from sales group by customerId")
//再将一個table轉成一個DataSet進行輸出
val result = tableEnv.toDataSet[Row](resultTable)
result.print()
}
//模闆類
case class SalesLog(transactionId:String,
customerId:String,
itemId:String,
amountPaId:Double)
}
6.輸出結果
4,600.0
2,1005.0
3,510.0
1,4600.0