作者:伍翀(雲邪)
本文是 Apache Flink
零基礎入門系列文章 第八篇,将通過五個執行個體講解 Flink SQL 的程式設計實踐。注: 本教程實踐基于 Ververica 開源的 sql-training 項目。基于 Flink 1.7.2 。
通過本課你能學到什麼?
本文将通過五個執行個體來貫穿 Flink SQL 的程式設計實踐,主要會涵蓋以下幾個方面的内容。
- 如何使用 SQL CLI 用戶端
- 如何在流上運作 SQL 查詢
- 運作 window aggregate 與 non-window aggregate,了解其差別
- 如何用 SQL 消費 Kafka 資料
- 如何用 SQL 将結果寫入 Kafka 和 ElasticSearch
本文假定您已具備基礎的 SQL 知識。
環境準備
本文教程是基于 Docker 進行的,是以你隻需要安裝了
Docker即可。不需要依賴 Java、Scala 環境、或是IDE。
注意:Docker 預設配置的資源可能不太夠,會導緻運作 Flink Job 時卡死。是以推薦配置 Docker 資源到 3-4 GB,3-4 CPUs。

本次教程的環境使用 Docker Compose 來安裝,包含了所需的各種服務的容器,包括:
- Flink SQL Client:用來送出query,以及可視化結果
- Flink JobManager 和 TaskManager:用來運作 Flink SQL 任務。
- Apache Kafka:用來生成輸入流和寫入結果流。
- Apache Zookeeper:Kafka 的依賴項
- ElasticSearch:用來寫入結果
我們已經提供好了Docker Compose 配置檔案,可以直接下載下傳
docker-compose.yml檔案。
然後打開指令行視窗,進入存放
docker-compose.yml
檔案的目錄,然後運作以下指令:
- Linux & MacOS
docker-compose up -d
- Windows
set COMPOSE_CONVERT_WINDOWS_PATHS=1
docker-compose up -d
docker-compose
指令會啟動所有所需的容器。第一次運作的時候,Docker 會自動地從 Docker Hub 下載下傳鏡像,這可能會需要一段時間(将近 2.3GB)。之後運作的話,幾秒鐘就能啟動起來了。運作成功的話,會在指令行中看到以下輸出,并且也可以在
http://localhost:8081通路到 Flink Web UI。
運作 Flink SQL CLI 用戶端
運作下面指令進入 Flink SQL CLI 。
docker-compose exec sql-client ./sql-client.sh
該指令會在容器中啟動 Flink SQL CLI 用戶端。然後你會看到如下的歡迎界面。
資料介紹
Docker Compose 中已經預先注冊了一些表和資料,可以運作
SHOW TABLES;
來檢視。本文會用到的資料是
Rides
表,這是一張計程車的行車記錄資料流,包含了時間和位置資訊,運作
DESCRIBE Rides;
可以檢視表結構。
Flink SQL> DESCRIBE Rides;
root
|-- rideId: Long // 行為ID (包含兩條記錄,一條入一條出)
|-- taxiId: Long // 計程車ID
|-- isStart: Boolean // 開始 or 結束
|-- lon: Float // 經度
|-- lat: Float // 緯度
|-- rideTime: TimeIndicatorTypeInfo(rowtime) // 時間
|-- psgCnt: Integer // 乘客數
Rides 表的詳細定義見
training-config.yaml。
執行個體1:過濾
例如我們現在隻想檢視發生在紐約的行車記錄。
注:Docker 環境中已經預定義了一些内置函數,如
isInNYC(lon, lat)
可以确定一個經緯度是否在紐約,
toAreaId(lon, lat)
可以将經緯度轉換成區塊。
是以,此處我們可以使用
isInNYC
來快速過濾出紐約的行車記錄。在 SQL CLI 中運作如下 Query:
SELECT * FROM Rides WHERE isInNYC(lon, lat);
SQL CLI 便會送出一個 SQL 任務到 Docker 叢集中,從資料源(Rides 流存儲在Kafka中)不斷拉取資料,并通過
isInNYC
過濾出所需的資料。SQL CLI 也會進入可視化模式,并不斷重新整理展示過濾後的結果:
也可以到
檢視 Flink 作業的運作情況。
執行個體2:Group Aggregate
我們的另一個需求是計算搭載每種乘客數量的行車事件數。也就是搭載1個乘客的行車數、搭載2個乘客的行車... 當然,我們仍然隻關心紐約的行車事件。
是以,我們可以按照乘客數
psgCnt
做分組,使用
COUNT(*)
計算出每個分組的事件數,注意在分組前需要先過濾出
isInNYC
的資料。在 SQL CLI 中運作如下 Query:
SELECT psgCnt, COUNT(*) AS cnt
FROM Rides
WHERE isInNYC(lon, lat)
GROUP BY psgCnt;
SQL CLI 的可視化結果如下所示,結果每秒都在發生變化。不過最大的乘客數不會超過 6 人。
執行個體3:Window Aggregate
為了持續地監測紐約的交通流量,需要計算出每個區塊每5分鐘的進入的車輛數。我們隻關心至少有5輛車子進入的區塊。
此處需要涉及到視窗計算(每5分鐘),是以需要用到 Tumbling Window 的文法。“每個區塊” 是以還要按照
toAreaId
進行分組計算。“進入的車輛數” 是以在分組前需要根據
isStart
字段過濾出進入的行車記錄,并使用
COUNT(*)
統計車輛數。最後還有一個 “至少有5輛車子的區塊” 的條件,這是一個基于統計值的過濾條件,是以可以用 SQL HAVING 子句來完成。
最後的 Query 如下所示:
SELECT
toAreaId(lon, lat) AS area,
TUMBLE_END(rideTime, INTERVAL '5' MINUTE) AS window_end,
COUNT(*) AS cnt
FROM Rides
WHERE isInNYC(lon, lat) and isStart
GROUP BY
toAreaId(lon, lat),
TUMBLE(rideTime, INTERVAL '5' MINUTE)
HAVING COUNT(*) >= 5;
在 SQL CLI 中運作後,其可視化結果如下所示,每個 area + window_end 的結果輸出後就不會再發生變化,但是會每隔 5 分鐘會輸出一批新視窗的結果。因為 Docker 環境中的source我們做了10倍的加速讀取(相對于原始速度),是以示範的時候,大概每隔30秒就會輸出一批新視窗。
Window Aggregate 與 Group Aggregate 的差別
從執行個體2和執行個體3的結果顯示上,可以體驗出來 Window Aggregate 與 Group Aggregate 是有一些明顯的差別的。其主要的差別是,Window Aggregate 是當window結束時才輸出,其輸出的結果是最終值,不會再進行修改,其輸出流是一個 Append 流。而 Group Aggregate 是每處理一條資料,就輸出最新的結果,其結果是在不斷更新的,就好像資料庫中的資料一樣,其輸出流是一個 Update 流。
另外一個差別是,window 由于有 watermark ,可以精确知道哪些視窗已經過期了,是以可以及時清理過期狀态,保證狀态維持在穩定的大小。而 Group Aggregate 因為不知道哪些資料是過期的,是以狀态會無限增長,這對于生産作業來說不是很穩定,是以建議對 Group Aggregate 的作業配上 State TTL 的配置。
例如統計每個店鋪每天的實時PV,那麼就可以将 TTL 配置成 24+ 小時,因為一天前的狀态一般來說就用不到了。
SELECT DATE_FORMAT(ts, 'yyyy-MM-dd'), shop_id, COUNT(*) as pv
FROM T
GROUP BY DATE_FORMAT(ts, 'yyyy-MM-dd'), shop_id
當然,如果 TTL 配置地太小,可能會清除掉一些有用的狀态和資料,進而導緻資料精确性地問題。這也是使用者需要權衡地一個參數。
執行個體4:将 Append 流寫入 Kafka
上一小節介紹了 Window Aggregate 和 Group Aggregate 的差別,以及 Append 流和 Update 流的差別。在 Flink 中,目前 Update 流隻能寫入支援更新的外部存儲,如 MySQL, HBase, ElasticSearch。Append 流可以寫入任意地存儲,不過一般寫入日志類型的系統,如 Kafka。
這裡我們希望将“每10分鐘的搭乘的乘客數”寫入Kafka。
我們已經預定義了一張 Kafka 的結果表
Sink_TenMinPsgCnts
(
中有完整的表定義)。
在執行 Query 前,我們先運作如下指令,來監控寫入到
TenMinPsgCnts
topic 中的資料:
docker-compose exec sql-client /opt/kafka-client/bin/kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic TenMinPsgCnts --from-beginning
每10分鐘的搭乘的乘客數可以使用 Tumbling Window 來描述,我們使用
INSERT INTO Sink_TenMinPsgCnts
來直接将 Query 結果寫入到結果表。
INSERT INTO Sink_TenMinPsgCnts
SELECT
TUMBLE_START(rideTime, INTERVAL '10' MINUTE) AS cntStart,
TUMBLE_END(rideTime, INTERVAL '10' MINUTE) AS cntEnd,
CAST(SUM(psgCnt) AS BIGINT) AS cnt
FROM Rides
GROUP BY TUMBLE(rideTime, INTERVAL '10' MINUTE);
我們可以監控到
TenMinPsgCnts
topic 的資料以 JSON 的形式寫入到了 Kafka 中:
執行個體5:将 Update 流寫入 ElasticSearch
最後我們實踐一下将一個持續更新的 Update 流寫入 ElasticSearch 中。我們希望将“每個區域出發的行車數”,寫入到 ES 中。
我們也已經預定義好了一張
Sink_AreaCnts
的 ElasticSearch 結果表(
中有完整的表定義)。該表中隻有兩個字段
areaId
和
cnt
同樣的,我們也使用
INSERT INTO
将 Query 結果直接寫入到
Sink_AreaCnts
表中。
INSERT INTO Sink_AreaCnts
SELECT toAreaId(lon, lat) AS areaId, COUNT(*) AS cnt
FROM Rides
WHERE isStart
GROUP BY toAreaId(lon, lat);
在 SQL CLI 中執行上述 Query 後,Elasticsearch 會自動地建立
area-cnts
索引。Elasticsearch 提供了一個 REST API 。我們可以通路
- 檢視
索引的詳細資訊: http://localhost:9200/area-cntsarea-cnts
-
索引的統計資訊: http://localhost:9200/area-cnts/_statsarea-cnts
- 傳回
索引的内容: http://localhost:9200/area-cnts/_searcharea-cnts
- 顯示 區塊 49791 的行車數: http://localhost:9200/area-cnts/_search?q=areaId:49791
随着 Query 的一直運作,你也可以觀察到一些統計值(
_all.primaries.docs.count
,
_all.primaries.docs.deleted
)在不斷的增長:
總結
本文帶大家使用 Docker Compose 快速上手 Flink SQL 的程式設計,并對比 Window Aggregate 和 Group Aggregate 的差別,以及這兩種類型的作業如何寫入到 外部系統中。感興趣的同學,可以基于這個 Docker 環境更加深入地去實踐,例如運作自己寫的 UDF , UDTF, UDAF。查詢内置地其他源表等等。
▼ Apache Flink 社群推薦 ▼
Apache Flink 及大資料領域頂級盛會 Flink Forward Asia 2019 重磅開啟,目前正在征集議題,限量早鳥票優惠ing。了解 Flink Forward Asia 2019 的更多資訊,請檢視:
https://developer.aliyun.com/special/ffa2019首屆 Apache Flink 極客挑戰賽重磅開啟,聚焦機器學習與性能優化兩大熱門領域,40萬獎金等你拿,加入挑戰請點選:
https://tianchi.aliyun.com/markets/tianchi/flink2019