天天看點

Demo:基于 Flink SQL 建構流式應用

上周四在 Flink 中文社群釘釘群中直播分享了《Demo:基于 Flink SQL 建構流式應用》,直播内容偏向實戰示範。這篇文章是對直播内容的一個總結,并且改善了部分内容,比如除 Flink 外其他元件全部采用 Docker Compose 安裝,簡化準備流程。讀者也可以結合視訊和本文一起學習。完整分享可以觀看視訊回顧: https://www.bilibili.com/video/av90560012

Flink 1.10.0 于近期剛釋出,釋放了許多令人激動的新特性。尤其是 Flink SQL 子產品,發展速度非常快,是以本文特意從實踐的角度出發,帶領大家一起探索使用 Flink SQL 如何快速建構流式應用。

本文将基于 Kafka, MySQL, Elasticsearch, Kibana,使用 Flink SQL 建構一個電商使用者行為的實時分析應用。本文所有的實戰演練都将在 Flink SQL CLI 上執行,全程隻涉及 SQL 純文字,無需一行 Java/Scala 代碼,無需安裝 IDE。本實戰演練的最終效果圖:

Demo:基于 Flink SQL 建構流式應用

準備

一台裝有 Docker 和 Java8 的 Linux 或 MacOS 計算機。

使用 Docker Compose 啟動容器

本實戰示範所依賴的元件全都編排到了容器中,是以可以通過

docker-compose

一鍵啟動。你可以通過

wget

指令自動下載下傳該

docker-compose.yml

檔案,也可以手動下載下傳。

mkdir flink-demo; cd flink-demo;
wget https://raw.githubusercontent.com/wuchong/flink-sql-demo/master/docker-compose.yml           

該 Docker Compose 中包含的容器有:

  • DataGen: 資料生成器。容器啟動後會自動開始生成使用者行為資料,并發送到 Kafka 叢集中。預設每秒生成 1000 條資料,持續生成約 3 小時。也可以更改

    docker-compose.yml

    中 datagen 的

    speedup

    參數來調整生成速率(重新開機 docker compose 才能生效)。
  • MySQL: 內建了 MySQL 5.7 ,以及預先建立好了類目表(

    category

    ),預先填入了子類目與頂級類目的映射關系,後續作為維表使用。
  • Kafka: 主要用作資料源。DataGen 元件會自動将資料灌入這個容器中。
  • Zookeeper: Kafka 容器依賴。
  • Elasticsearch: 主要存儲 Flink SQL 産出的資料。
  • Kibana: 可視化 Elasticsearch 中的資料。

在啟動容器前,建議修改 Docker 的配置,将資源調整到 4GB 以及 4核。啟動所有的容器,隻需要在

docker-compose.yml

所在目錄下運作如下指令。

docker-compose up -d           

該指令會以 detached 模式自動啟動 Docker Compose 配置中定義的所有容器。你可以通過

docker ps

來觀察上述的五個容器是否正常啟動了。 也可以通路

http://localhost:5601/

來檢視 Kibana 是否運作正常。

另外可以通過如下指令停止所有的容器:

docker-compose down           

下載下傳安裝 Flink 本地叢集

我們推薦使用者手動下載下傳安裝 Flink,而不是通過 Docker 自動啟動 Flink。因為這樣可以更直覺地了解 Flink 的各個元件、依賴、和腳本。

  1. 下載下傳 Flink 1.10.0 安裝包并解壓(解壓目錄

    flink-1.10.0

    ): https://www.apache.org/dist/flink/flink-1.10.0/flink-1.10.0-bin-scala_2.11.tgz
  2. 進入 flink-1.10.0 目錄:

    cd flink-1.10.0

  3. 通過如下指令下載下傳依賴 jar 包,并拷貝到

    lib/

    目錄下,也可手動下載下傳和拷貝。因為我們運作時需要依賴各個 connector 實作。
  4. -P ./lib/ https://repo1.maven.org/maven2/org/apache/flink/flink-json/1.10.0/flink-json-1.10.0.jar

    | \

    wget -P ./lib/

    https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-kafka_2.11/1.10.0/flink-sql-connector-kafka_2.11-1.10.0.jar https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-elasticsearch7_2.11/1.10.0/flink-sql-connector-elasticsearch7_2.11-1.10.0.jar https://repo1.maven.org/maven2/org/apache/flink/flink-jdbc_2.11/1.10.0/flink-jdbc_2.11-1.10.0.jar https://repo1.maven.org/maven2/mysql/mysql-connector-java/5.1.48/mysql-connector-java-5.1.48.jar
  5. conf/flink-conf.yaml

    中的

    taskmanager.numberOfTaskSlots

    修改成 10,因為我們會同時運作多個任務。
  6. 執行

    ./bin/start-cluster.sh

    ,啟動叢集。

    運作成功的話,可以在

    http://localhost:8081 通路到 Flink Web UI。并且可以看到可用 Slots 數為 10 個。
Demo:基于 Flink SQL 建構流式應用
  1. bin/sql-client.sh embedded

    啟動 SQL CLI。便會看到如下的松鼠歡迎界面。
Demo:基于 Flink SQL 建構流式應用

使用 DDL 建立 Kafka 表

Datagen 容器在啟動後會往 Kafka 的

user_behavior

topic 中持續不斷地寫入資料。資料包含了2017年11月27日一天的使用者行為(行為包括點選、購買、加購、喜歡),每一行表示一條使用者行為,以 JSON 的格式由使用者ID、商品ID、商品類目ID、行為類型和時間組成。該原始資料集來自

阿裡雲天池公開資料集

,特此鳴謝。

我們可以在

docker-compose.yml

所在目錄下運作如下指令,檢視 Kafka 叢集中生成的前10條資料。

docker-compose exec kafka bash -c 'kafka-console-consumer.sh --topic user_behavior --bootstrap-server kafka:9094 --from-beginning --max-messages 10'           
{"user_id": "952483", "item_id":"310884", "category_id": "4580532", "behavior": "pv", "ts": "2017-11-27T00:00:00Z"}
{"user_id": "794777", "item_id":"5119439", "category_id": "982926", "behavior": "pv", "ts": "2017-11-27T00:00:00Z"}
...           

有了資料源後,我們就可以用 DDL 去建立并連接配接這個 Kafka 中的 topic 了。在 Flink SQL CLI 中執行該 DDL。

CREATE TABLE user_behavior (
    user_id BIGINT,
    item_id BIGINT,
    category_id BIGINT,
    behavior STRING,
    ts TIMESTAMP(3),
    proctime as PROCTIME(),   -- 通過計算列産生一個處理時間列
    WATERMARK FOR ts as ts - INTERVAL '5' SECOND  -- 在ts上定義watermark,ts成為事件時間列
) WITH (
    'connector.type' = 'kafka',  -- 使用 kafka connector
    'connector.version' = 'universal',  -- kafka 版本,universal 支援 0.11 以上的版本
    'connector.topic' = 'user_behavior',  -- kafka topic
    'connector.startup-mode' = 'earliest-offset',  -- 從起始 offset 開始讀取
    'connector.properties.zookeeper.connect' = 'localhost:2181',  -- zookeeper 位址
    'connector.properties.bootstrap.servers' = 'localhost:9092',  -- kafka broker 位址
    'format.type' = 'json'  -- 資料源格式為 json
);           

如上我們按照資料的格式聲明了 5 個字段,除此之外,我們還通過計算列文法和

PROCTIME()

内置函數聲明了一個産生處理時間的虛拟列。我們還通過 WATERMARK 文法,在 ts 字段上聲明了 watermark 政策(容忍5秒亂序), ts 字段是以也成了事件時間列。關于時間屬性以及 DDL 文法可以閱讀官方文檔了解更多:

在 SQL CLI 中成功建立 Kafka 表後,可以通過

show tables;

describe user_behavior;

來檢視目前已注冊的表,以及表的詳細資訊。我們也可以直接在 SQL CLI 中運作

SELECT * FROM user_behavior;

預覽下資料(按

q

退出)。

接下來,我們會通過三個實戰場景來更深入地了解 Flink SQL 。

統計每小時的成交量

使用 DDL 建立 Elasticsearch 表

我們先在 SQL CLI 中建立一個 ES 結果表,根據場景需求主要需要儲存兩個資料:小時、成交量。

CREATE TABLE buy_cnt_per_hour ( 
    hour_of_day BIGINT,
    buy_cnt BIGINT
) WITH (
    'connector.type' = 'elasticsearch', -- 使用 elasticsearch connector
    'connector.version' = '6',  -- elasticsearch 版本,6 能支援 es 6+ 以及 7+ 的版本
    'connector.hosts' = 'http://localhost:9200',  -- elasticsearch 位址
    'connector.index' = 'buy_cnt_per_hour',  -- elasticsearch 索引名,相當于資料庫的表名
    'connector.document-type' = 'user_behavior', -- elasticsearch 的 type,相當于資料庫的庫名
    'connector.bulk-flush.max-actions' = '1',  -- 每條資料都重新整理
    'format.type' = 'json',  -- 輸出資料格式 json
    'update-mode' = 'append'
);           

我們不需要在 Elasticsearch 中事先建立

buy_cnt_per_hour

索引,Flink Job 會自動建立該索引。

送出 Query

統計每小時的成交量就是每小時共有多少 "buy" 的使用者行為。是以會需要用到 TUMBLE 視窗函數,按照一小時切窗。然後每個視窗分别統計 "buy" 的個數,這可以通過先過濾出 "buy" 的資料,然後

COUNT(*)

實作。

INSERT INTO buy_cnt_per_hour
SELECT HOUR(TUMBLE_START(ts, INTERVAL '1' HOUR)), COUNT(*)
FROM user_behavior
WHERE behavior = 'buy'
GROUP BY TUMBLE(ts, INTERVAL '1' HOUR);           

這裡我們使用

HOUR

内置函數,從一個 TIMESTAMP 列中提取出一天中第幾個小時的值。使用了

INSERT INTO

将 query 的結果持續不斷地插入到上文定義的 es 結果表中(可以将 es 結果表了解成 query 的物化視圖)。另外可以閱讀該文檔了解更多關于視窗聚合的内容:

https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/queries.html#group-windows

在 Flink SQL CLI 中運作上述查詢後,在 Flink Web UI 中就能看到送出的任務,該任務是一個流式任務,是以會一直運作。

Demo:基于 Flink SQL 建構流式應用

使用 Kibana 可視化結果

我們已經通過 Docker Compose 啟動了 Kibana 容器,可以通過

http://localhost:5601

通路 Kibana。首先我們需要先配置一個 index pattern。點選左側工具欄的 "Management",就能找到 "Index Patterns"。點選 "Create Index Pattern",然後通過輸入完整的索引名 "buy_cnt_per_hour" 建立 index pattern。建立完成後, Kibana 就知道了我們的索引,我們就可以開始探索資料了。

先點選左側工具欄的"Discovery"按鈕,Kibana 就會列出剛剛建立的索引中的内容。

Demo:基于 Flink SQL 建構流式應用

接下來,我們先建立一個 Dashboard 用來展示各個可視化的視圖。點選頁面左側的"Dashboard",建立一個名為 ”使用者行為日志分析“ 的Dashboard。然後點選 "Create New" 建立一個新的視圖,選擇 "Area" 面積圖,選擇 "buy_cnt_per_hour" 索引,按照如下截圖中的配置(左側)畫出成交量面積圖,并儲存為”每小時成交量“。

Demo:基于 Flink SQL 建構流式應用

可以看到淩晨是一天中成交量的低谷。

統計一天每10分鐘累計獨立使用者數

另一個有意思的可視化是統計一天中每一刻的累計獨立使用者數(uv),也就是每一刻的 uv 數都代表從0點到目前時刻為止的總計 uv 數,是以該曲線肯定是單調遞增的。

我們仍然先在 SQL CLI 中建立一個 Elasticsearch 表,用于存儲結果彙總資料。主要有兩個字段:時間和累積 uv 數。

CREATE TABLE cumulative_uv (
    time_str STRING,
    uv BIGINT
) WITH (
    'connector.type' = 'elasticsearch',
    'connector.version' = '6',
    'connector.hosts' = 'http://localhost:9200',
    'connector.index' = 'cumulative_uv',
    'connector.document-type' = 'user_behavior',
    'format.type' = 'json',
    'update-mode' = 'upsert'
);           

為了實作該曲線,我們可以先通過 OVER WINDOW 計算出每條資料的目前分鐘,以及目前累計 uv(從0點開始到目前行為止的獨立使用者數)。 uv 的統計我們通過内置的

COUNT(DISTINCT user_id)

來完成,Flink SQL 内部對 COUNT DISTINCT 做了非常多的優化,是以可以放心使用。

CREATE VIEW uv_per_10min AS
SELECT 
  MAX(SUBSTR(DATE_FORMAT(ts, 'HH:mm'),1,4) || '0') OVER w AS time_str, 
  COUNT(DISTINCT user_id) OVER w AS uv
FROM user_behavior
WINDOW w AS (ORDER BY proctime ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW);           

SUBSTR

DATE_FORMAT

還有

||

内置函數,将一個 TIMESTAMP 字段轉換成了 10分鐘機關的時間字元串,如:

12:10

,

12:20

。關于 OVER WINDOW 的更多内容可以參考文檔:

https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/queries.html#aggregations

我們還使用了 CREATE VIEW 文法将 query 注冊成了一個邏輯視圖,可以友善地在後續查詢中對該 query 進行引用,這有利于拆解複雜 query。注意,建立邏輯視圖不會觸發作業的執行,視圖的結果也不會落地,是以使用起來非常輕量,沒有額外開銷。由于

uv_per_10min

每條輸入資料都産生一條輸出資料,是以對于存儲壓力較大。我們可以基于

uv_per_10min

再根據分鐘時間進行一次聚合,這樣每10分鐘隻有一個點會存儲在 Elasticsearch 中,對于 Elasticsearch 和 Kibana 可視化渲染的壓力會小很多。

INSERT INTO cumulative_uv
SELECT time_str, MAX(uv)
FROM uv_per_10min
GROUP BY time_str;           

送出上述查詢後,在 Kibana 中建立

cumulative_uv

的 index pattern,然後在 Dashboard 中建立一個"Line"折線圖,選擇

cumulative_uv

索引,按照如下截圖中的配置(左側)畫出累計獨立使用者數曲線,并儲存。

Demo:基于 Flink SQL 建構流式應用

頂級類目排行榜

最後一個有意思的可視化是類目排行榜,進而了解哪些類目是支柱類目。不過由于源資料中的類目分類太細(約5000個類目),對于排行榜意義不大,是以我們希望能将其歸約到頂級類目。是以筆者在 mysql 容器中預先準備了子類目與頂級類目的映射資料,用作維表。

在 SQL CLI 中建立 MySQL 表,後續用作維表查詢。

CREATE TABLE category_dim (
    sub_category_id BIGINT,  -- 子類目
    parent_category_id BIGINT -- 頂級類目
) WITH (
    'connector.type' = 'jdbc',
    'connector.url' = 'jdbc:mysql://localhost:3306/flink',
    'connector.table' = 'category',
    'connector.driver' = 'com.mysql.jdbc.Driver',
    'connector.username' = 'root',
    'connector.password' = '123456',
    'connector.lookup.cache.max-rows' = '5000',
    'connector.lookup.cache.ttl' = '10min'
);           

同時我們再建立一個 Elasticsearch 表,用于存儲類目統計結果。

CREATE TABLE top_category (
    category_name STRING,  -- 類目名稱
    buy_cnt BIGINT  -- 銷量
) WITH (
    'connector.type' = 'elasticsearch',
    'connector.version' = '6',
    'connector.hosts' = 'http://localhost:9200',
    'connector.index' = 'top_category',
    'connector.document-type' = 'user_behavior',
    'format.type' = 'json',
    'update-mode' = 'upsert'
);           

第一步我們通過維表關聯,補全類目名稱。我們仍然使用 CREATE VIEW 将該查詢注冊成一個視圖,簡化邏輯。維表關聯使用 temporal join 文法,可以檢視文檔了解更多:

https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/joins.html#join-with-a-temporal-table
CREATE VIEW rich_user_behavior AS
SELECT U.user_id, U.item_id, U.behavior, 
  CASE C.parent_category_id
    WHEN 1 THEN '服飾鞋包'
    WHEN 2 THEN '家裝家飾'
    WHEN 3 THEN '家電'
    WHEN 4 THEN '美妝'
    WHEN 5 THEN '母嬰'
    WHEN 6 THEN '3C數位'
    WHEN 7 THEN '運動戶外'
    WHEN 8 THEN '食品'
    ELSE '其他'
  END AS category_name
FROM user_behavior AS U LEFT JOIN category_dim FOR SYSTEM_TIME AS OF U.proctime AS C
ON U.category_id = C.sub_category_id;           

最後根據 類目名稱分組,統計出

buy

的事件數,并寫入 Elasticsearch 中。

INSERT INTO top_category
SELECT category_name, COUNT(*) buy_cnt
FROM rich_user_behavior
WHERE behavior = 'buy'
GROUP BY category_name;           

top_category

的 index pattern,然後在 Dashboard 中建立一個"Horizontal Bar"條形圖,選擇

top_category

索引,按照如下截圖中的配置(左側)畫出類目排行榜,并儲存。

Demo:基于 Flink SQL 建構流式應用

可以看到服飾鞋包的成交量遠遠領先其他類目。

到目前為止,我們已經完成了三個實戰案例及其可視化視圖。現在可以回到 Dashboard 頁面,對各個視圖進行拖拽編排,讓我們的 Dashboard 看上去更加正式、直覺(如本文開篇效果圖)。當然,Kibana 還提供了非常豐富的圖形和可視化選項,而使用者行為資料中也有很多有意思的資訊值得挖掘,感興趣的讀者可以用 Flink SQL 對資料進行更多元度的分析,并使用 Kibana 展示更多可視化圖,并觀測圖形資料的實時變化。

結尾

在本文中,我們展示了如何使用 Flink SQL 內建 Kafka, MySQL, Elasticsearch 以及 Kibana 來快速搭建一個實時分析應用。整個過程無需一行 Java/Scala 代碼,使用 SQL 純文字即可完成。期望通過本文,可以讓讀者了解到 Flink SQL 的易用和強大,包括輕松連接配接各種外部系統、對事件時間和亂序資料處理的原生支援、維表關聯、豐富的内置函數等等。希望你能喜歡我們的實戰演練,并從中獲得樂趣和知識!