天天看點

【Flink實戰系列】Flink 如何讀取 excel 檔案并注冊成表處理資料

Flink提供了一個CsvTableSource來讀取scv檔案,傳回的是CsvTableSource,然後利用registerTableSource注冊為一張表,我們就可以寫sql操作這張表了,非常的友善,廢話不多說了,直接看下面的demo

package flink.table

import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.table.api.{TableEnvironment, Types}
import org.apache.flink.table.sinks.CsvTableSink
import org.apache.flink.table.sources.CsvTableSource

/**
  * Flink讀取csv檔案注冊為表
  */
object flinkTable {
  def main(args: Array[String]): Unit = {
    val env =  ExecutionEnvironment.getExecutionEnvironment
    val tableEnv = TableEnvironment.getTableEnvironment(env)
    // 設定表的字段;
    val csv = CsvTableSource.builder()
      .path("D:/xxx.csv")
      .field("id",Types.INT)
      .field("url",Types.STRING)
      .field("p_name",Types.STRING)
      .field("price",Types.DOUBLE)
      .field("class_one",Types.INT)
      .field("class_two",Types.INT)
      .fie
           

繼續閱讀