天天看點

Apache Flink 零基礎入門(八): SQL 程式設計實踐

作者:伍翀(雲邪)

本文是 Apache Flink

零基礎入門系列文章 第八篇,将通過五個執行個體講解 Flink SQL 的程式設計實踐。

注: 本教程實踐基于 Ververica 開源的 sql-training 項目。基于 Flink 1.7.2 。

通過本課你能學到什麼?

本文将通過五個執行個體來貫穿 Flink SQL 的程式設計實踐,主要會涵蓋以下幾個方面的内容。

  1. 如何使用 SQL CLI 用戶端
  2. 如何在流上運作 SQL 查詢
  3. 運作 window aggregate 與 non-window aggregate,了解其差別
  4. 如何用 SQL 消費 Kafka 資料
  5. 如何用 SQL 将結果寫入 Kafka 和 ElasticSearch

本文假定您已具備基礎的 SQL 知識。

環境準備

本文教程是基于 Docker 進行的,是以你隻需要安裝了

Docker

即可。不需要依賴 Java、Scala 環境、或是IDE。

注意:Docker 預設配置的資源可能不太夠,會導緻運作 Flink Job 時卡死。是以推薦配置 Docker 資源到 3-4 GB,3-4 CPUs。

Apache Flink 零基礎入門(八): SQL 程式設計實踐

本次教程的環境使用 Docker Compose 來安裝,包含了所需的各種服務的容器,包括:

  • Flink SQL Client:用來送出query,以及可視化結果
  • Flink JobManager 和 TaskManager:用來運作 Flink SQL 任務。
  • Apache Kafka:用來生成輸入流和寫入結果流。
  • Apache Zookeeper:Kafka 的依賴項
  • ElasticSearch:用來寫入結果

我們已經提供好了Docker Compose 配置檔案,可以直接下載下傳

docker-compose.yml

檔案。

然後打開指令行視窗,進入存放

docker-compose.yml

檔案的目錄,然後運作以下指令:

  • Linux & MacOS
docker-compose up -d           
  • Windows
set COMPOSE_CONVERT_WINDOWS_PATHS=1
docker-compose up -d           

docker-compose

指令會啟動所有所需的容器。第一次運作的時候,Docker 會自動地從 Docker Hub 下載下傳鏡像,這可能會需要一段時間(将近 2.3GB)。之後運作的話,幾秒鐘就能啟動起來了。運作成功的話,會在指令行中看到以下輸出,并且也可以在

http://localhost:8081

通路到 Flink Web UI。

Apache Flink 零基礎入門(八): SQL 程式設計實踐

運作 Flink SQL CLI 用戶端

運作下面指令進入 Flink SQL CLI 。

docker-compose exec sql-client ./sql-client.sh           

該指令會在容器中啟動 Flink SQL CLI 用戶端。然後你會看到如下的歡迎界面。

Apache Flink 零基礎入門(八): SQL 程式設計實踐

資料介紹

Docker Compose 中已經預先注冊了一些表和資料,可以運作

SHOW TABLES;

來檢視。本文會用到的資料是

Rides

表,這是一張計程車的行車記錄資料流,包含了時間和位置資訊,運作

DESCRIBE Rides;

可以檢視表結構。

Flink SQL> DESCRIBE Rides;
root
 |-- rideId: Long           // 行為ID (包含兩條記錄,一條入一條出)
 |-- taxiId: Long           // 計程車ID 
 |-- isStart: Boolean       // 開始 or 結束
 |-- lon: Float             // 經度
 |-- lat: Float             // 緯度
 |-- rideTime: TimeIndicatorTypeInfo(rowtime)     // 時間
 |-- psgCnt: Integer        // 乘客數           

Rides 表的詳細定義見

training-config.yaml

執行個體1:過濾

例如我們現在隻想檢視發生在紐約的行車記錄。

注:Docker 環境中已經預定義了一些内置函數,如

isInNYC(lon, lat)

可以确定一個經緯度是否在紐約,

toAreaId(lon, lat)

可以将經緯度轉換成區塊。

是以,此處我們可以使用

isInNYC

來快速過濾出紐約的行車記錄。在 SQL CLI 中運作如下 Query:

SELECT * FROM Rides WHERE isInNYC(lon, lat);           

SQL CLI 便會送出一個 SQL 任務到 Docker 叢集中,從資料源(Rides 流存儲在Kafka中)不斷拉取資料,并通過

isInNYC

過濾出所需的資料。SQL CLI 也會進入可視化模式,并不斷重新整理展示過濾後的結果:

Apache Flink 零基礎入門(八): SQL 程式設計實踐

也可以到

檢視 Flink 作業的運作情況。

執行個體2:Group Aggregate

我們的另一個需求是計算搭載每種乘客數量的行車事件數。也就是搭載1個乘客的行車數、搭載2個乘客的行車... 當然,我們仍然隻關心紐約的行車事件。

是以,我們可以按照乘客數

psgCnt

做分組,使用

COUNT(*)

計算出每個分組的事件數,注意在分組前需要先過濾出

isInNYC

的資料。在 SQL CLI 中運作如下 Query:

SELECT psgCnt, COUNT(*) AS cnt 
FROM Rides 
WHERE isInNYC(lon, lat)
GROUP BY psgCnt;           

SQL CLI 的可視化結果如下所示,結果每秒都在發生變化。不過最大的乘客數不會超過 6 人。

Apache Flink 零基礎入門(八): SQL 程式設計實踐

執行個體3:Window Aggregate

為了持續地監測紐約的交通流量,需要計算出每個區塊每5分鐘的進入的車輛數。我們隻關心至少有5輛車子進入的區塊。

此處需要涉及到視窗計算(每5分鐘),是以需要用到 Tumbling Window 的文法。“每個區塊” 是以還要按照

toAreaId

進行分組計算。“進入的車輛數” 是以在分組前需要根據

isStart

字段過濾出進入的行車記錄,并使用

COUNT(*)

統計車輛數。最後還有一個 “至少有5輛車子的區塊” 的條件,這是一個基于統計值的過濾條件,是以可以用 SQL HAVING 子句來完成。

最後的 Query 如下所示:

SELECT 
  toAreaId(lon, lat) AS area, 
  TUMBLE_END(rideTime, INTERVAL '5' MINUTE) AS window_end, 
  COUNT(*) AS cnt 
FROM Rides 
WHERE isInNYC(lon, lat) and isStart
GROUP BY 
  toAreaId(lon, lat), 
  TUMBLE(rideTime, INTERVAL '5' MINUTE) 
HAVING COUNT(*) >= 5;           

在 SQL CLI 中運作後,其可視化結果如下所示,每個 area + window_end 的結果輸出後就不會再發生變化,但是會每隔 5 分鐘會輸出一批新視窗的結果。因為 Docker 環境中的source我們做了10倍的加速讀取(相對于原始速度),是以示範的時候,大概每隔30秒就會輸出一批新視窗。

Apache Flink 零基礎入門(八): SQL 程式設計實踐

Window Aggregate 與 Group Aggregate 的差別

從執行個體2和執行個體3的結果顯示上,可以體驗出來 Window Aggregate 與 Group Aggregate 是有一些明顯的差別的。其主要的差別是,Window Aggregate 是當window結束時才輸出,其輸出的結果是最終值,不會再進行修改,其輸出流是一個 Append 流。而 Group Aggregate 是每處理一條資料,就輸出最新的結果,其結果是在不斷更新的,就好像資料庫中的資料一樣,其輸出流是一個 Update 流。

另外一個差別是,window 由于有 watermark ,可以精确知道哪些視窗已經過期了,是以可以及時清理過期狀态,保證狀态維持在穩定的大小。而 Group Aggregate 因為不知道哪些資料是過期的,是以狀态會無限增長,這對于生産作業來說不是很穩定,是以建議對 Group Aggregate 的作業配上 State TTL 的配置。

Apache Flink 零基礎入門(八): SQL 程式設計實踐

例如統計每個店鋪每天的實時PV,那麼就可以将 TTL 配置成 24+ 小時,因為一天前的狀态一般來說就用不到了。

SELECT  DATE_FORMAT(ts, 'yyyy-MM-dd'), shop_id, COUNT(*) as pv
FROM T
GROUP BY DATE_FORMAT(ts, 'yyyy-MM-dd'), shop_id           

當然,如果 TTL 配置地太小,可能會清除掉一些有用的狀态和資料,進而導緻資料精确性地問題。這也是使用者需要權衡地一個參數。

執行個體4:将 Append 流寫入 Kafka

上一小節介紹了 Window Aggregate 和 Group Aggregate 的差別,以及 Append 流和 Update 流的差別。在 Flink 中,目前 Update 流隻能寫入支援更新的外部存儲,如 MySQL, HBase, ElasticSearch。Append 流可以寫入任意地存儲,不過一般寫入日志類型的系統,如 Kafka。

這裡我們希望将“每10分鐘的搭乘的乘客數”寫入Kafka。

我們已經預定義了一張 Kafka 的結果表

Sink_TenMinPsgCnts

中有完整的表定義)。

在執行 Query 前,我們先運作如下指令,來監控寫入到

TenMinPsgCnts

topic 中的資料:

docker-compose exec sql-client /opt/kafka-client/bin/kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic TenMinPsgCnts --from-beginning           

每10分鐘的搭乘的乘客數可以使用 Tumbling Window 來描述,我們使用

INSERT INTO Sink_TenMinPsgCnts

來直接将 Query 結果寫入到結果表。

INSERT INTO Sink_TenMinPsgCnts 
SELECT 
  TUMBLE_START(rideTime, INTERVAL '10' MINUTE) AS cntStart,  
  TUMBLE_END(rideTime, INTERVAL '10' MINUTE) AS cntEnd,
  CAST(SUM(psgCnt) AS BIGINT) AS cnt 
FROM Rides 
GROUP BY TUMBLE(rideTime, INTERVAL '10' MINUTE);           

我們可以監控到

TenMinPsgCnts

topic 的資料以 JSON 的形式寫入到了 Kafka 中:

Apache Flink 零基礎入門(八): SQL 程式設計實踐

執行個體5:将 Update 流寫入 ElasticSearch

最後我們實踐一下将一個持續更新的 Update 流寫入 ElasticSearch 中。我們希望将“每個區域出發的行車數”,寫入到 ES 中。

我們也已經預定義好了一張

Sink_AreaCnts

的 ElasticSearch 結果表(

中有完整的表定義)。該表中隻有兩個字段

areaId

cnt

同樣的,我們也使用

INSERT INTO

将 Query 結果直接寫入到

Sink_AreaCnts

表中。

INSERT INTO Sink_AreaCnts 
SELECT toAreaId(lon, lat) AS areaId, COUNT(*) AS cnt 
FROM Rides 
WHERE isStart
GROUP BY toAreaId(lon, lat);           

在 SQL CLI 中執行上述 Query 後,Elasticsearch 會自動地建立

area-cnts

索引。Elasticsearch 提供了一個 REST API 。我們可以通路

随着 Query 的一直運作,你也可以觀察到一些統計值(

_all.primaries.docs.count

,

_all.primaries.docs.deleted

)在不斷的增長:

總結

本文帶大家使用 Docker Compose 快速上手 Flink SQL 的程式設計,并對比 Window Aggregate 和 Group Aggregate 的差別,以及這兩種類型的作業如何寫入到 外部系統中。感興趣的同學,可以基于這個 Docker 環境更加深入地去實踐,例如運作自己寫的 UDF , UDTF, UDAF。查詢内置地其他源表等等。

▼ Apache Flink 社群推薦 ▼

Apache Flink 及大資料領域頂級盛會 Flink Forward Asia 2019 重磅開啟,目前正在征集議題,限量早鳥票優惠ing。了解 Flink Forward Asia 2019 的更多資訊,請檢視:

https://developer.aliyun.com/special/ffa2019

首屆 Apache Flink 極客挑戰賽重磅開啟,聚焦機器學習與性能優化兩大熱門領域,40萬獎金等你拿,加入挑戰請點選:

https://tianchi.aliyun.com/markets/tianchi/flink2019