天天看點

TableAPI和SQL之建立Table(一)

package tablesql

import org.apache.flink.api.scala.typeutils.Types
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.{EnvironmentSettings, Table}
import org.apache.flink.table.api.scala.StreamTableEnvironment
import org.apache.flink.table.sources.CsvTableSource

/**
 * @Author yqq
 * @Date 2021/12/28 14:13
 * @Version 1.0
 */
object TestCreateTableByFile {
  def main(args: Array[String]): Unit = {
    val environment = StreamExecutionEnvironment.getExecutionEnvironment
    val settings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build()
    val table = StreamTableEnvironment.create(environment, settings)

    //讀取資料
    val source: CsvTableSource = new CsvTableSource("data/statefile.log",
      Array[String]("f1", "f2", "f3", "f4", "f5", "f6"),
      Array(Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.LONG, Types.LONG)
    )

    //注冊一張表
    table.registerTableSource("t_station_log",source)
    //列印表結構,或者使用Table API, 需要得到Table對象 API
    val t: Table = table.scan("t_station_log")
    t.printSchema()

  }
}