天天看點

Tablestore + Blink實戰:交易資料的實時統計背景架構示例設計實戰源碼

背景

交易資料的實時統計是電商網站一個核心功能,可以幫助使用者實時統計網站的整體銷售情況,快速驗證“新銷售政策”的效果。我們今天介紹一個基于表格存儲(Tablestore)實作交易資料的實時計算,給大家提供一個新使用方式。

Tablestore作為線上的結構化資料庫,提供了毫秒級的通路延時和豐富的查詢方式,能高效的支撐交易資料的存儲和查詢,同時Tablestore已經原生支援阿裡雲的流計算架構Flink/Blink,可以實作資料的實時計算。

架構

Tablestore + Blink實戰:交易資料的實時統計背景架構示例設計實戰源碼

示例設計

基本場景

注意:示例是模拟一個電商網站的交易資料的存儲和實時計算,目的是為了展示Tablestore + Blink的使用流程。

使用者通過SDK将網站交易資料實時的存儲(PutRow/BatchWrite/TableStoreWriter)到Tablestore的source_order表中,Tablestore通過Tunnel服務,将實時增量的資料流入到Flink/Blink中,每5秒進行一次聚合計算,并将計算的結果重新寫回Tablestore的sink_order表中。最後提供給“大屏”實時讀取(GetRange)展示。

Source表(源表)- source_order

source表是原始資料表,存儲了所有交易記錄。

字段 類型 注釋
metering(PrimaryKey) string 計量類型,樣例中預設是web
orderid(PrimaryKey) 訂單号ID
ts integer 交易時間(Unix時間戳,毫秒精度)
price double 交易金額
buyerid 買家ID
sellerid 賣家ID
productid 商品ID

Sink表(結果表)- sink_order

ts(PrimaryKey)
ordercount 交易次數

Flink SQL

DDL參考
注意:目前Blink在支援Tablestore source上還有些限制,隻能運作ProcessingTime的方式,未來會支援EventTime模式,按照使用者資料參數的時間進行計算。
-- Source 源表建立
CREATE TABLE ots_input (
    metering VARCHAR,
    orderid VARCHAR,
    price DOUBLE,
    byerid BIGINT,
    sellerid BIGINT,
    productid BIGINT,
    primary key(metering,orderid),
    ts AS PROCTIME()
) WITH (
    type = 'ots',
    instanceName = 'ordertest',
    tableName = 'source_order',
    accessId = '******************',
    accessKey = '******************',
    endpoint = 'http://ordertest.cn-zhangjiakou.vpc.tablestore.aliyuncs.com',
    tunnelName = 'blink_agg'
);
-- Sink 結果表建立
CREATE TABLE ots_output (
    metering VARCHAR,
    ts BIGINT,
    price DOUBLE,
    ordercount BIGINT,
    primary key(metering,ts)
) WITH (
    type = 'ots',
    instanceName = 'ordertest',
    tableName = 'sink_order',
    accessId = '******************',
    accessKey = '******************',
    endpoint = 'http://ordertest.cn-zhangjiakou.vpc.tablestore.aliyuncs.com',
    valueColumns = 'price,ordercount'
);

-- 計算
INSERT INTO ots_output
SELECT 
    DISTINCT metering as metering,
    CAST(TUMBLE_START(ots_input.ts, INTERVAL '5' SECOND) AS BIGINT) AS ts,
    SUM(price) as price,
    COUNT(orderid) as ordercount
FROM ots_input
GROUP BY TUMBLE(ts, INTERVAL '5' SECOND),metering;           

實戰

第一步:準備賬戶與開服

準備表格存儲執行個體,可以參考

《表格存儲執行個體建立》

準備Flink/Blink項目,可以參考

《Blink如何購買》

第二步:建立資源

表格存儲資源

表格存儲控制台入口

,建立表格存儲執行個體ordertest (使用者自定義即可,下面對于參數位置更換為自定義的執行個體名),并記錄執行個體的VPC位址

Tablestore + Blink實戰:交易資料的實時統計背景架構示例設計實戰源碼

同時在控制台建立Source表和Sink表, 并為Source表(source_order)開啟一個Tunnel服務blink_agg

Tablestore + Blink實戰:交易資料的實時統計背景架構示例設計實戰源碼

圖三 Source表(source_order)

Tablestore + Blink實戰:交易資料的實時統計背景架構示例設計實戰源碼

圖四 Sink表(sink_order)

Tablestore + Blink實戰:交易資料的實時統計背景架構示例設計實戰源碼

圖五 源表和目标表

Tablestore + Blink實戰:交易資料的實時統計背景架構示例設計實戰源碼

圖六 建立通道

Blink資源

Blink控制台入口

,建立一個Blink項目(獨享模式要建立叢集之後才能建立項目),分别建立一個作業,agg_order,并将上面的Flink SQL粘貼到視窗中,上線服務

Tablestore + Blink實戰:交易資料的實時統計背景架構示例設計實戰源碼
Tablestore + Blink實戰:交易資料的實時統計背景架構示例設計實戰源碼
Tablestore + Blink實戰:交易資料的實時統計背景架構示例設計實戰源碼

在運維視窗中啟動該任務

Tablestore + Blink實戰:交易資料的實時統計背景架構示例設計實戰源碼

第三步:壓入資料 并 實時擷取結算結果

1 準備配置檔案

程式預設會從'~/tablestoreConf.json'擷取配置

vim ~/tablestoreConf.json

# 内容
{
    "endpoint":"http://ordertest.cn-zhangjiakou.ots.aliyuncs.com",
    "accessId":"************",
    "accessKey":"************",
    "instanceName":"ordertest"
}           

2 建構源碼

mvn install
cd target
tar xzvf stream-compute-1.0-SNAPSHOT-release.tar.gz           

3 啟動壓力器和模拟大屏

可以直接下載下傳工具包:

stream-compute-1.0-SNAPSHOT-release.tar.gz
# 視窗1
./bin/mock_order_generator
# 視窗2
./bin/data_show_screen           

4 效果

Tablestore + Blink實戰:交易資料的實時統計背景架構示例設計實戰源碼

源碼

源碼:

https://github.com/aliyun/tablestore-examples/tree/master/demos/StreamCompute

歡迎加入

如果您對表格存儲使用有疑問、想探讨,歡迎加入【表格存儲公開交流群】,群号:11789671。

Tablestore + Blink實戰:交易資料的實時統計背景架構示例設計實戰源碼