背景
Spark 中國社群聯合阿裡雲 EMR 技術交流群,Tablestore 技術交流群舉辦了一場聯合技術直播。直播的話題是“海量結構化資料的實時計算和處理”,主要介紹基于 Tablestore 的資料變更實時捕獲訂閱能力,實作雲上Lambda 架構的輕量化實作。在直播中有一個demo環節,本篇文章會提供demo環節的簡單操作步驟,友善大家後續在阿裡雲上搭建和demo場景類似的一整套架構,實作資料的實時和離線處理。
示範場景介紹
示範模拟了一個電商訂單場景,通過流計算實作訂單大屏的場景,做到海量訂單實時注入的同時,進行10s的訂單統計聚合以及交易金額統計并做實時的大螢幕展示。整個訂單的大螢幕樣例如下:

大屏我們使用阿裡雲的 DATAV 對接 Tablestore資料源來實作,那麼下面我們就具體看看從訂單的原始資料到結果大屏資料的産生過程以及操作步驟。
整套背景的架構大體如下:
- 在ecs,或者本地模拟一個訂單生成器,實時的注入訂單資料到 Tablestore 中。
- 在 Tablestore 控制台建立通道
- 在 EMR 控制台購買 Spark 叢集
- 下載下傳最新的 EMR SDK
- 執行下面提供的建表語句和SQL指令實作實時計算,結果表會寫回 Tablestore中。
- 通過 DATAV 進行實時大屏展示結果表資料
操作步驟一:登陸阿裡雲官網 Tablestore 控制台 進行執行個體和表建立
建立執行個體後,可以建立一張表,表主鍵schema如下:
啟動用戶端注入程式随機寫入資料,樣例資料如下:
Tablestore 産品是 Serverless的形态,使用者使用無需購買大小或者規格,産品回根據業務做自動水準擴充。
操作步驟二:登陸阿裡雲官網 EMR 控制台購買Spark叢集
Spark的叢集規模可以根據業務需求靈活選取,我們實測三節點,可以輕松的實時消費100w/s的資料做聚合計算喲!
操作步驟三:登陸EMR叢集執行作業腳本
登陸EMR的master節點,執行下面指令啟動流任務:
1.啟動stream sql互動
在EMR 官網擷取最新版本EMR sdk(1.8)
streaming-sql --driver-class-path emr-datasources_shaded_2.11-1.8.0.jar --jars emr-datasources_shaded_2.11-1.8.0.jar --master yarn-client --num-executors 8 --executor-memory 2g --executor-cores 2
2.建立streaming source 表
DROP TABLE IF EXISTS ots_order_test;
CREATE TABLE ots_order_test
USING tablestore
OPTIONS(
endpoint="填寫Tablestore VPC的位址",
access.key.id="",
access.key.secret="",
instance.name="",
table.name="",
tunnel.id="在Tablestore控制台查找對應想消費通道ID",
catalog='{"columns": {"UserId": {"col": "UserId", "type": "string"}, "OrderId": {"col": "OrderId", "type": "string"},"price": {"cols": "price", "type": "long"}, "timestamp": {"cols": "timestamp", "type": "long"}}}'
);
3.建立streaming sink表
DROP TABLE IF EXISTS ots_order_sink_test;
CREATE TABLE ots_order_sink_test
endpoint="",
tunnel.id="",
catalog='{"columns": {"begin": {"col": "begin", "type": "string"},"end": {"col": "end", "type": "string"}, "count": {"col": "count", "type": "long"}, "totalPrice": {"col": "totalPrice", "type": "long"}}}'
4.建立Streaming作業
CREATE SCAN ots_table_stream on ots_order_test USING STREAM OPTIONS ("maxoffsetsperchannel"="10000");
CREATE STREAM job1
options(
checkpointLocation='/tmp/spark/cp/test1',
outputMode='update'
)
insert into ots_order_sink_test
SELECT CAST(window.start AS String) AS begin, CAST(window.end AS String) AS end, count(*) AS count, sum(price) AS totalPrice FROM ots_table_stream GROUP BY window(to_timestamp(timestamp / 1000000000), "10 seconds");
最後實驗有任何問題,或者希望做技術交流的同學歡迎加入我們的技術交流群(釘釘:23307953 或者11789671),來與我們一起探讨。