序:今天開始Flink SQL 跟 Flink table了,我個人覺得新手的話更要好好學習這一塊的東西,Flink sql的API還是很全的,目前市面已知的很多公司已經用Flink SQL開發了很多實際業務場景,不關是Flink,幾乎所有的流處理,都是向SQL靠攏,這個是主流。而且Flink的 SQL我覺得還是很強大的,值得學習的。
一,Flink Table
首先我們要知道,Flink SQL是基于 apache Calcite架構實作了SQL的标準協定,是建構在Table API之上的更進階接口。
是以這裡的Table我們就簡單的寫一寫,知道Table跟Stream的轉換就好了,至于Table跟DataSet的轉換啥的,有興趣的自己去查官網的API
,1,基本的API:
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._
object Flink_table {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment;
// 建立table對象
val tableEnv = TableEnvironment.getTableEnvironment(env)
//Stream 或者 dataSet 與Table的轉換
val dataStream = env.fromElements((1, 2), (12, 23), (15, 22))
// 注冊成表
tableEnv.registerDataStream("table1", dataStream)
// 注冊成表,指定字段名稱
tableEnv.registerDataStream("table2", dataStream, 'myLong, 'myString)
// stream 轉出table
val table3 = tableEnv.fromDataStream(dataStream)
val table4 = tableEnv.fromDataStream(dataStream, 'field1, 'field2)
// Table 轉成DataStream
}
}
2,Table 轉成 DataStream
在流式計算中 Table的資料是不斷的動态更新的,将Table轉成Stream的話需要設定資料輸出的格式:
1)Append Model模式
val stream2: DataStream[Row] =tableEnv.toAppendStream[Row](table3) /
/ 指定類型
val stream3: DataStream[(Int, Int)] =tableEnv.toAppendStream[(Int,Int)](table3)
2)Retract Model模式----這是一種更進階模式(實際使用這個比較常見)
val stream4: DataStream[(Boolean, Row)] =tableEnv.toRetractStream[Row](table3) val stream5: DataStream[(Boolean, (Int, Int))] =tableEnv.toRetractStream[(Int,Int)](table3)
注意: 這裡的boolean類型代表資料更新更新,True對應的是INSERT操作更新的資料,False對應的是DELETE操作更新的資料。
3) 字段名稱映射
tableEnv.fromDataStream(dataStream,'_2,'_1)
tableEnv.fromDataStream(dataStream,'_2 as 'field1,'_1 as 'field2)
//實體類轉換操作:
val dstream11: DataStream[Event] = env.fromElements(Event("a"),Event("b"))
tableEnv.fromDataStream(dstream11,'filed_name)
tableEnv.fromDataStream(dstream11,'name as 'name_new)
2,外部連接配接器:
标準格式為:
tableEnv.connect(.........)
.withFormat(........)
.withSchema(.........)
.inAppendMode(.........)
.registerTableSource("mytable")
1)File System Connector 本地檔案連接配接器
// 本地檔案系統連接配接器
tableEnv.connect(new FileSystem().path("file:///path/temp"))
2)kafka 連接配接器
tableEnv.connect(
new Kafka()
.version("0.10") //支援 0.8 , 0.9 , 0.10 , 0.11
.topic("test")
.property("zookeeper,connect", "node1:2181")
.property("bootstrap.servers", "node1:9092")
.startFromEarliest() //最早的偏移量
.startFromLatest() //最新的偏移量消費
.startFromSpecificOffsets(map) //指定偏移量
//指定分區政策
.sinkPartitionerFixed()//每個flink分區最多被配置設定到一個kafka分區
.sinkPartitionerRoundRobin() //随機配置設定
.sinkPartitionerCustom(CustomPartitioner.class) //自定義分區
注意:Flink -kafka 預設情況下支援 at-least-once ,也支援 exactly-once級别的一緻性保障。
3,Table format
為了傳輸不同的格式使用
1)CSV Format
.withFormat(
new Csv()
.field("field1",Types.STRING)
.field("field2",Types.STRING)
.fieldDelimiter(",") //指定切割符号
.lineDelimiter("\n") //指定行切割符号
.quoteCharacter('"')//指定字元串中的單個字元
.commentPrefix("#") //指定comment字首
.ignoreFirstLine() //是否忽略第一行
.ignoreParseErrors()//是否忽略解析錯誤的資料
2) JSON Format
.withFormat(
new Json()
.failOnMissingField(true) // optional: flag whether to fail if a field is missing or not, false by default
// required: define the schema either by using type information which parses numbers to corresponding types
.schema(Type.ROW(...))
// or by using a JSON schema which parses to DECIMAL and TIMESTAMP
.jsonSchema(
"{" +
" type: 'object'," +
" properties: {" +
" lon: {" +
" type: 'number'" +
" }," +
" rideTime: {" +
" type: 'string'," +
" format: 'date-time'" +
" }" +
" }" +
"}"
)
// or use the table's schema
.deriveSchema()
3)Avro
.withFormat(
new Avro()
// required: define the schema either by using an Avro specific record class
.recordClass(User.class)
// or by using an Avro schema
.avroSchema(
"{" +
" \"type\": \"record\"," +
" \"name\": \"test\"," +
" \"fields\" : [" +
" {\"name\": \"a\", \"type\": \"long\"}," +
" {\"name\": \"b\", \"type\": \"string\"}" +
" ]" +
"}"
)
)
2)Table Schema 定義了Flink Table的資料表結構
.withSchema(
new Schema().field("id",Types.STRING)
.proctime() //擷取process time屬性
.field("name",Types.STRING)
.rowtime(.....) //擷取event time屬性
)
注意:還可以直接接入Watermark的生成邏輯,
5 Update Modes
tableEnv.connect(........)
.inAppendMode() //僅僅是insert
.inRetractMode()//insert update delete 操作
.inUpsertMode() //insert delete 操作
6,最後寫一個完整的執行個體
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.descriptors.{Json, Kafka, Schema}
import org.apache.flink.types.Row
/**
* Table Connector 一個案完整案例 ,從kafka讀取資料指定格式直接轉成了Table
*/
object TableConnector {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
// create a TableEnvironment for streaming queries
val tableEnv = TableEnvironment.getTableEnvironment(env)
tableEnv
.connect(
new Kafka()
.version("0.10")
.topic("SM_USER_PROFILE")
.startFromEarliest()
.property("zookeeper.connect", "localhost:2181")
.property("bootstrap.servers", "localhost:9092"))
.withFormat(
//todo 這裡會報錯,需要導入包flink-json-1.6.1-sql-jar
new Json()
.deriveSchema()
)
.withSchema(
new Schema()
.field("COD_USERNO","string")
.field("COD_USER_ID","string")
)
.inAppendMode()
//todo 注冊成table
.registerTableSource("sm_user")
val stream = tableEnv.scan("sm_user")
//結果轉成stream 列印
tableEnv.toAppendStream[Row](stream).print().setParallelism(1)
env.execute("example")
}
}