package tablesql
import org.apache.flink.api.scala.typeutils.Types
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.{EnvironmentSettings, Table}
import org.apache.flink.table.api.scala.StreamTableEnvironment
import org.apache.flink.table.sources.CsvTableSource
/**
* @Author yqq
* @Date 2021/12/28 14:13
* @Version 1.0
*/
object TestCreateTableByFile {
def main(args: Array[String]): Unit = {
val environment = StreamExecutionEnvironment.getExecutionEnvironment
val settings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build()
val table = StreamTableEnvironment.create(environment, settings)
//讀取資料
val source: CsvTableSource = new CsvTableSource("data/statefile.log",
Array[String]("f1", "f2", "f3", "f4", "f5", "f6"),
Array(Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.LONG, Types.LONG)
)
//注冊一張表
table.registerTableSource("t_station_log",source)
//列印表結構,或者使用Table API, 需要得到Table對象 API
val t: Table = table.scan("t_station_log")
t.printSchema()
}
}