Flink提供了一個CsvTableSource來讀取scv檔案,傳回的是CsvTableSource,然後利用registerTableSource注冊為一張表,我們就可以寫sql操作這張表了,非常的友善,廢話不多說了,直接看下面的demo
package flink.table
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.table.api.{TableEnvironment, Types}
import org.apache.flink.table.sinks.CsvTableSink
import org.apache.flink.table.sources.CsvTableSource
/**
* Flink讀取csv檔案注冊為表
*/
object flinkTable {
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)
// 設定表的字段;
val csv = CsvTableSource.builder()
.path("D:/xxx.csv")
.field("id",Types.INT)
.field("url",Types.STRING)
.field("p_name",Types.STRING)
.field("price",Types.DOUBLE)
.field("class_one",Types.INT)
.field("class_two",Types.INT)
.fie