問題導讀
1.為何會有Flink SQL?
2.本文哪些地方涉及Flink 1.7?
4.如何定義源(sources )和接收器(sinks)?
5.Flink SQL本文介紹了哪些sql?
6.将資料格式化為正确的格式以便進一步處理?
7.如何監控Flink sql查詢
8.使用Flink SQL中的視圖的作用是什麼?
9.本文使用Flink sql實作了什麼案例?
雖然Flink SQL最初于2016年8月與Flink 1.1.0一起釋出,但最近的Flink版本增加了相當多的功能,使Flink SQL更易于使用,無需會編寫Java / Scala代碼。 在這篇文章中,我們希望(重新)從這些變化所帶來的新角度介紹Flink SQL,同時為經驗豐富的使用者提供一些額外的知識。
新添加的SQL指令行(SQL CLI)可以輕松快速浏覽流中的資料或靜态資料(例如,在資料庫或HDFS中)。 它還可用于建構功能強大的資料轉換管道或分析管道。 在這篇文章中,我們想要探索目前可用的功能,而後續文章将更詳細地介紹特定功能,并介紹Flink 1.7即将推出的令人興奮的新功能,例如使用MATCH_RECOGNIZE擴充的複雜event處理和改進 基于時間的enrichment(富集) join。
在我們深入研究一些實踐執行個體之前,我們列出了Flink SQL的一些亮點:
- Flink SQL是批處理和流處理的統一API:這允許使用相同的查詢來處理曆史資料和實時資料
- 支援處理時間和事件時間語義
- 支援使用嵌套的Avro和JSON資料
- 使用者定義的scalar,聚合和表值(table-valued)函數
- 無需編碼的SQL指令行(即沒有Java / Scala編碼)
- 支援各種類型的流連接配接
- 支援聚合,包括視窗和沒有視窗
定義 Sources 和Sinks
使用Flink SQL的指令行用戶端時,我們要做的第一件事就是定義源(sources )和接收器(sinks)。 否則,我們将無法讀取或寫入任何資料。 源和接收器在YAML配置檔案中定義,以及其他配置設定。 YAML檔案中的源和接收器配置類似于SQL DDL語句(Flink社群目前正在讨論對SQL DDL的支援)。 對于我們正在進行的示例,我們假設我們有一個Kafka主題(topic),其中存儲了我們想要進一步處理和分析的計程車遊樂設施的資訊。 它的配置如下所示:
tables:
- name: TaxiRides
type: source
update-mode: append
schema:
- name: rideId
type: LONG
- name: rowTime
type: TIMESTAMP
rowtime:
timestamps:
type: "from-field"
from: "rideTime"
watermarks:
type: "periodic-bounded"
delay: "60000"
- name: isStart
type: BOOLEAN
- name: lon
type: FLOAT
- name: lat
type: FLOAT
- name: taxiId
type: LONG
- name: driverId
type: LONG
- name: psgCnt
type: INT
connector:
property-version: 1
type: kafka
version: 0.11
topic: TaxiRides
startup-mode: earliest-offset
properties:
- key: zookeeper.connect
value: zookeeper:2181
- key: bootstrap.servers
value: kafka:9092
- key: group.id
value: testGroup
format:
property-version: 1
type: json
schema: "ROW(rideId LONG, isStart BOOLEAN,
rideTime TIMESTAMP, lon FLOAT, lat FLOAT,
psgCnt INT, taxiId LONG, driverId LONG)"
在Flink SQL中,源,接收器以及介于兩者之間的所有内容稱為表。 在這裡,我們基于包含JSON格式的事件的Kafka主題定義初始表。 我們定義Kafka配置設定,格式以及我們如何将其映射到模式,以及我們希望如何從資料中導出watermarks 。 除了JSON之外,Flink SQL還内置了對CSV和Avro格式的支援,并且還可以使用自定義格式對其進行擴充。 Flink SQL始終支援在JSON和Avro架構中處理嵌套資料。
Flink SQL的使用
現在我們讨論了源表的配置和格式,下面我們說說 Flink SQL的使用
從Flink SQL指令行用戶端,我們可以列出我們定義的表:
Flink SQL> SHOW TABLES;
TaxiRides
TaxiRides_Avro
我們還可以檢查任何表的schema :
Flink SQL> DESCRIBE TaxiRides;
root
|-- rideId: Long
|-- rowTime: TimeIndicatorTypeInfo(rowtime)
|-- isStart: Boolean
|-- lon: Float
|-- lat: Float
|-- taxiId: Long
|-- driverId: Long
|-- psgCnt: Integer
有了這個,讓我們看看我們可以用我們的表做什麼。
有關配置Flink SQL以及定義源,接收器及其格式的詳細資訊,請參閱文檔(https://ci.apache.org/projects/f ... l#environment-files)。
格式化資料
我們可能想要做的最簡單的事情之一是将資料格式化為正确的格式以便進一步處理。 這可能包括:
- 在schema之間轉換,例如将JSON事件流轉換為Avro編碼
- 用SQL語句中删除字段或将其投影
- 過濾掉我們不感興趣的整個事件(events )
讓我們看一下從架構轉換開始我們将如何做到這些。 當我們想要從Kafka讀取資料時,将資料轉換為不同的格式,并将資料寫回不同的Kafka主題以進行下遊處理,我們所要做的就是定義源表(如上所述)然後定義 作為接收器的表格具有不同的格式:
tables:
- name: TaxiRides_Avro0
type: sink
update-mode: append
schema:
- name: rideId
type: LONG
- name: rowTime
type: TIMESTAMP
- name: isStart
type: BOOLEAN
- name: lon
type: FLOAT
- name: lat
type: FLOAT
- name: taxiId
type: LONG
- name: driverId
type: LONG
- name: psgCnt
type: INT
connector:
property-version: 1
type: kafka
version: 0.11
topic: TaxiRides_Avro
properties:
- key: zookeeper.connect
value: zookeeper:2181
- key: bootstrap.servers
value: kafka:9092
- key: group.id
value: trainingGroup
format:
property-version: 1
type: avro
avro-schema: >
{
"type": "record",
"name": "test",
"fields" : [
{"name": "rideId", "type": "long"},
{"name": "rowTime", "type": {"type": "long", "logicalType": "timestamp-millis"}},
{"name": "isStart", "type": "boolean"},
{"name": "lon", "type": "float"},
{"name": "lat", "type": "float"},
{"name": "taxiId", "type": "long"},
{"name": "driverId", "type": "long"},
{"name": "psgCnt", "type": "int"}
]
}
通過我們定義的源和接收器轉換資料變得如此簡單:
Flink SQL> INSERT INTO TaxiRides_Avro SELECT * FROM TaxiRides;
[INFO] Submitting SQL update statement to the cluster...
[INFO] Table update statement has been successfully submitted to the cluster:
Cluster ID: StandaloneClusterId
Job ID: ffa9109b9cad077ec83137f55ec6d1c5
Web interface: http://jobmanager:8081
我們的查詢作為常設查詢送出給Flink叢集。可以通過通路http://localhost:8081來監視和控制來自Flink的WebUI的查詢。
我們可以通過引入(projection)投影和(filtering)過濾來建構這個簡單的模式。 如果我們隻想在結果中包含某些字段,我們可以在SELECT查詢中指定。 例如:
Flink SQL> INSERT INTO TaxiRides_Avro SELECT rideIdId, taxiId, driverId FROM TaxiRides;
這隻會給我們events中的ID。 (請記住,需要調整接收器的格式才能使此查詢起作用。)
基于此,我們可以做的另一件簡單事情就是過濾掉整個事件。 考慮一下我們隻對在某個城市發生的計程車乘坐感興趣的情況。 事件具有lon和lat字段,分别給出事件發生的經度和緯度。 我們可以使用它們來确定事件是否發生在某個城市:
Flink SQL> SELECT * FROM TaxiRides WHERE isInNYC(lon, lat);
你可能會注意到,那就是isInNYC()。 這是我們在SQL用戶端配置中定義的使用者定義函數或UDF。 我們可以通過以下方式檢視我們提供的使用者功能:
Flink SQL> SHOW FUNCTIONS;
timeDiff
toCoords
isInNYC
toAreaId
就像在Flink SQL用戶端配置檔案中配置的其他内容一樣:
functions:
- name: timeDiff
from: class
class: com.dataartisans.udfs.TimeDiff
- name: isInNYC
from: class
class: com.dataartisans.udfs.IsInNYC
- name: toAreaId
from: class
class: com.dataartisans.udfs.ToAreaId
- name: toCoords
from: class
class: com.dataartisans.udfs.ToCoords
UDF是實作特定接口并在用戶端注冊的Java類。 有不同類型的使用者功能:(scalar )标量函數,表函數和聚合函數。 其中詳細介紹了使用者定義的函數,可以檢視UDF文檔。
使用Flink SQL中的視圖建構查詢
一旦我們有足夠複雜的SQL查詢,它們就會變得有點難以了解。 我們可以通過在Flink SQL中定義視圖來緩解這種情況。 這類似于在程式設計語言中定義變量以給出某個名稱的方式,以便以後能夠重用它。 假設我們想要在早期的例子的基礎上進行建構,并建立一個在給定日期之後在某個城市發生的遊樂設施的視圖。 我們會這樣做:
Flink SQL> CREATE VIEW TaxiRides_NYC AS SELECT * FROM TaxiRides
WHERE isInNYC(lon, lat)
AND rowTime >= TIMESTAMP '2013-01-01 00:00:00';
[INFO] View has been created.
我們可以通過以下方式找出視圖:
Flink SQL> SHOW TABLES;
TaxiRides
TaxiRides_Avro
TaxiRides_NYC
需要注意的一點是,建立視圖實際上并不執行個體化任何常設查詢或産生任何輸出或中間結果。 視圖隻是可以重用的查詢的邏輯名稱,并允許更好地建構查詢。 這與其他一些類似SQL的流式系統不同,在這些系統中,每個中間查詢都會建立資料并使用資源。
視圖是Flink 1.7的即将推出的功能,但它已經實作并合并到主分支中( master branch),這就是為什麼我們已經在這裡提到它。 另外,它非常有用。
基于事件時間的視窗化聚合
作為最後一步,我們希望展示一個更複雜的查詢,它将我們到目前為止所解釋的内容彙集在一起。 考慮一種情況,我們希望監控正在發生的遊樂設施,并且需要知道某個城市某個特定區域的遊樂設施數量何時超過門檻值(比如說5)。 這是這樣做的查詢:
SELECT
toAreaId(lon, lat) AS area,
TUMBLE_END(rowTime, INTERVAL '5' MINUTE) AS t,
COUNT(*) AS c
FROM TaxiRides_NYC
WHERE isStart = TRUE
GROUP BY
toAreaId(lon, lat),
TUMBLE(rowTime, INTERVAL '5' MINUTE)
HAVING COUNT(*) >= 5;
在上面的示例中,我們執行以下操作:
- 我們使用之前建立的視圖,其中包含在特定日期之後發生的某個城市的事件,
- 我們過濾掉那些不是“開始事件”的事件,
- 我們使用另一個使用者定義的函數将lon,lat對轉換為區域id和group by,
- 我們指定我們想要有五分鐘的視窗,最後
- 我們過濾掉那些計數小于5的視窗。
在現實世界的用例中,我們現在将其寫入Elasticsearch接收器并使用它為儀表闆或通知系統供電。留給大家思考。
總結
在這篇博文中,我們解釋了如何在不編寫Java代碼的情況下使用Flink SQL實作簡單的資料轉換和資料Massaging作業。 我們還解釋了如何使用視圖來建構更複雜的查詢并使其易于了解。 最後,我們開發了一個更複雜的查詢,它結合了使用者定義的函數,視窗聚合和事件時間支援。
在後續文章中,我們将提供有關如何開發和使用使用者定義函數的更多内容,我們将深入了解Flink SQL的強大連接配接以及如何使用它們來豐富資料。 在Flink 1.7.0釋出之後使用Flink SQL的資料豐富,複雜事件處理和模式檢測引入強大的新增功能。