天天看點

Flink原理實戰每日一篇09

 序:今天開始Flink SQL 跟 Flink table了,我個人覺得新手的話更要好好學習這一塊的東西,Flink sql的API還是很全的,目前市面已知的很多公司已經用Flink SQL開發了很多實際業務場景,不關是Flink,幾乎所有的流處理,都是向SQL靠攏,這個是主流。而且Flink的 SQL我覺得還是很強大的,值得學習的。

一,Flink Table

首先我們要知道,Flink SQL是基于 apache Calcite架構實作了SQL的标準協定,是建構在Table API之上的更進階接口。

是以這裡的Table我們就簡單的寫一寫,知道Table跟Stream的轉換就好了,至于Table跟DataSet的轉換啥的,有興趣的自己去查官網的API

,1,基本的API:

import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._

object Flink_table {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment;
    // 建立table對象
    val tableEnv = TableEnvironment.getTableEnvironment(env)

    //Stream 或者 dataSet 與Table的轉換
    val dataStream = env.fromElements((1, 2), (12, 23), (15, 22))

    // 注冊成表
    tableEnv.registerDataStream("table1", dataStream)

    // 注冊成表,指定字段名稱
    tableEnv.registerDataStream("table2", dataStream, 'myLong, 'myString)

    // stream 轉出table
    val table3 = tableEnv.fromDataStream(dataStream)
    val table4 = tableEnv.fromDataStream(dataStream, 'field1, 'field2)

    // Table 轉成DataStream
    

  }
}      

2,Table 轉成 DataStream 

   在流式計算中 Table的資料是不斷的動态更新的,将Table轉成Stream的話需要設定資料輸出的格式:

1)Append Model模式

val stream2: DataStream[Row] =tableEnv.toAppendStream[Row](table3) /

/ 指定類型

val stream3: DataStream[(Int, Int)] =tableEnv.toAppendStream[(Int,Int)](table3)

2)Retract Model模式----這是一種更進階模式(實際使用這個比較常見)

val stream4: DataStream[(Boolean, Row)] =tableEnv.toRetractStream[Row](table3) val stream5: DataStream[(Boolean, (Int, Int))] =tableEnv.toRetractStream[(Int,Int)](table3)

注意: 這裡的boolean類型代表資料更新更新,True對應的是INSERT操作更新的資料,False對應的是DELETE操作更新的資料。

3) 字段名稱映射

tableEnv.fromDataStream(dataStream,'_2,'_1)
tableEnv.fromDataStream(dataStream,'_2 as 'field1,'_1 as 'field2)      

//實體類轉換操作:

val dstream11: DataStream[Event] = env.fromElements(Event("a"),Event("b"))
tableEnv.fromDataStream(dstream11,'filed_name)
tableEnv.fromDataStream(dstream11,'name as 'name_new)      

2,外部連接配接器:

标準格式為:

tableEnv.connect(.........)

.withFormat(........)

.withSchema(.........)

.inAppendMode(.........)

.registerTableSource("mytable")

1)File System Connector  本地檔案連接配接器

// 本地檔案系統連接配接器
tableEnv.connect(new FileSystem().path("file:///path/temp"))      

2)kafka 連接配接器

tableEnv.connect(
  new Kafka()
    .version("0.10") //支援 0.8 , 0.9 , 0.10 , 0.11
    .topic("test")
    .property("zookeeper,connect", "node1:2181")
    .property("bootstrap.servers", "node1:9092")
    .startFromEarliest() //最早的偏移量
    .startFromLatest() //最新的偏移量消費
    .startFromSpecificOffsets(map) //指定偏移量
    //指定分區政策
    .sinkPartitionerFixed()//每個flink分區最多被配置設定到一個kafka分區
    .sinkPartitionerRoundRobin() //随機配置設定
    .sinkPartitionerCustom(CustomPartitioner.class) //自定義分區      

注意:Flink -kafka 預設情況下支援 at-least-once ,也支援 exactly-once級别的一緻性保障。

3,Table format

 為了傳輸不同的格式使用

1)CSV Format

.withFormat(
  new Csv()
    .field("field1",Types.STRING)
    .field("field2",Types.STRING)
    .fieldDelimiter(",") //指定切割符号
    .lineDelimiter("\n") //指定行切割符号
    .quoteCharacter('"')//指定字元串中的單個字元
    .commentPrefix("#") //指定comment字首
    .ignoreFirstLine() //是否忽略第一行
    .ignoreParseErrors()//是否忽略解析錯誤的資料      

2) JSON Format

.withFormat(
    new Json()
      .failOnMissingField(true)   // optional: flag whether to fail if a field is missing or not, false by default

      // required: define the schema either by using type information which parses numbers to corresponding types
      .schema(Type.ROW(...))

// or by using a JSON schema which parses to DECIMAL and TIMESTAMP
.jsonSchema(
  "{" +
    "  type: 'object'," +
    "  properties: {" +
    "    lon: {" +
    "      type: 'number'" +
    "    }," +
    "    rideTime: {" +
    "      type: 'string'," +
    "      format: 'date-time'" +
    "    }" +
    "  }" +
    "}"
)

  // or use the table's schema
  .deriveSchema()      

3)Avro 

.withFormat(
    new Avro()
      // required: define the schema either by using an Avro specific record class
      .recordClass(User.class)

// or by using an Avro schema
.avroSchema(
  "{" +
    "  \"type\": \"record\"," +
    "  \"name\": \"test\"," +
    "  \"fields\" : [" +
    "    {\"name\": \"a\", \"type\": \"long\"}," +
    "    {\"name\": \"b\", \"type\": \"string\"}" +
    "  ]" +
    "}"
)
)
      

2)Table Schema 定義了Flink Table的資料表結構

.withSchema(
  new Schema().field("id",Types.STRING)
    .proctime()   //擷取process time屬性
  .field("name",Types.STRING)
    .rowtime(.....) //擷取event time屬性
)      

注意:還可以直接接入Watermark的生成邏輯,

5 Update Modes

tableEnv.connect(........)
.inAppendMode()  //僅僅是insert
.inRetractMode()//insert update delete 操作
.inUpsertMode() //insert delete 操作
      

6,最後寫一個完整的執行個體

import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.descriptors.{Json, Kafka, Schema}
import org.apache.flink.types.Row

/**
  * Table Connector 一個案完整案例 ,從kafka讀取資料指定格式直接轉成了Table
  */
object TableConnector {
  def main(args: Array[String]): Unit = {

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    // create a TableEnvironment for streaming queries
    val tableEnv = TableEnvironment.getTableEnvironment(env)

    tableEnv
      .connect(
        new Kafka()
          .version("0.10")
          .topic("SM_USER_PROFILE")
          .startFromEarliest()
          .property("zookeeper.connect", "localhost:2181")
          .property("bootstrap.servers", "localhost:9092"))
      .withFormat(
        //todo 這裡會報錯,需要導入包flink-json-1.6.1-sql-jar
        new Json()
          .deriveSchema()
      )
      .withSchema(
        new Schema()
          .field("COD_USERNO","string")
          .field("COD_USER_ID","string")
      )
      .inAppendMode()
      //todo 注冊成table
      .registerTableSource("sm_user")

    val stream = tableEnv.scan("sm_user")

  //結果轉成stream 列印
    tableEnv.toAppendStream[Row](stream).print().setParallelism(1)
    env.execute("example")


  }
}
      

 

繼續閱讀