GitHub 位址 https://github.com/apache/flink 歡迎大家給 Flink 點贊送 star~
1. 背景資訊
在使用者行為分析和圈人場景中,經常需要從億級甚至幾十億級使用者中快速篩選出符合特定标簽的名額結果。UV 便是行為分析中最常見的名額,代表通路網頁的自然人,可以引申為某段時間内某個名額精确去重後的量。例如大促時,電商商家需要實時計算店鋪的實時 UV,并根據 UV 情況及時調整營運政策,進而達成銷售目标。
在計算使用者 UV 時,由于業務需求不同,計算的次元和資料量也不同,通常來講,會有以下幾點訴求:
- 使用者資料量大,每天幾億條,次元多(10+ 以上),需要支援各次元間任意組合查詢;
- 查詢時間需要更靈活,不僅局限于天、周、月、年等,還需要支援更細粒度實時更新查詢;
- 需要對使用者數精确去重。
面對上訴高複雜度 UV 計算場景,業界常見的手段包括使用 Apche Kylin 等預計算系統或者 Flink + MySQL 的固定次元組合方案,但也會遇見以下幾個痛點:
- 需求次元過多時,會帶來存儲爆炸,預計算時間長;
- 精确去重需要消耗大量資源,容易 OOM;
- 實時更新難,無法支援更加靈活開放的時間周期處理。
Hologres 是基于分析服務一體化理念(Hybrid Serving & Analytical Processing)設計的實時數倉産品,它采用分布式架構,支援資料實時寫入,高并發、低延時的分析處理 PB 級資料,相容 PostgreSQL 協定,使用最熟悉的工具就能進行開發。
Hologres 與
實時計算Flink版有着強大的融合優化,支援 Flink 資料高通量實時寫入,寫入即可見;支援 Flink SQL 維表關聯,以及作為 CDC Source 事件驅動開發;同時,Hologres 也支援
RoaringBitmap,利用其超高算法能力和高效 Bitmap 壓縮能力,對使用者标簽篩選,對去重等場景有着高性能的支援。
在上訴 UV 計算場景中,可以使用
+ Hologres 方式,并基于 RoaringBitmap,實時對使用者标簽去重。這樣的方式,可以較細粒度的實時得到使用者 UV、PV 資料,同時便于根據需求調整最小統計視窗(如最近 5 分鐘的 UV),實作類似實時監控的效果,更好的在大屏等 BI 展示。相較于以天、周、月等機關的去重,更适合在活動日期進行更細粒度的統計,并且通過簡單的聚合,也可以得到較大時間機關的統計結果。
該方案資料鍊路簡單,可以任意次元靈活計算,隻需要一份 Bitmap 存儲,也沒有存儲爆炸的問題,還能保證明時更新,進而實作更實時、開發更靈活、功能更完善的多元分析數倉。
下面将會就 UV 計算場景講解具體操作步驟。
2. 業務架構圖
- 實時訂閱實時采集的資料,資料源可以來源于日志資料,如 Kafka 等;
- 對資料做進一步加工處理;
- Hologres 對 實時寫入的資料實時查詢;
- 最終查詢的資料對接可視化工具,如 DataV 等,用作大屏展示。

3. 詳細業務流程
- 将流式資料轉化為表與維表進行 JOIN 操作,再轉化為流式資料。此舉可以利用 Hologres 維表的 insertIfNotExists 特性結合 自增字段 實作高效的 uid 映射;
- 把關聯的結果資料按照時間視窗進行處理,根據查詢次元使用 RoaringBitmap 進行聚合,并将查詢次元以及聚合的 uid 存放在聚合結果表,其中聚合出的 uid 結果放入 Hologres 的 RoaringBitmap 類型的字段中;
- 查詢時,與離線方式相似,直接按照查詢條件查詢聚合結果表,并對其中關鍵的 RoaringBitmap 字段做 or 運算後并統計基數,即可得出對應使用者數;
- 具體資料處理流程如下圖所示:
實時計算Flink版 + Hologres,億級使用者實時 UV 精确去重最佳實踐
4. 方案最佳實踐
4.1 前提條件
- 開通 Hologres 并連接配接開發工具,示例使用 holoweb,詳情見 holoweb 快速入門
- 準備并搭建好 Flink 叢集環境,您可以使用 阿裡雲Flink全托管 或者 開源 Flink
4.2 操作步驟
4.2.1 Hologres 建立相關基礎表
1)在 Hologres 建立表 uid_mapping 為 uid 映射表,用于映射 uid 到 32位 int 類型。如果原始 uid 已經是 int32 類型,此步驟可忽略。
- 常見的業務系統或者埋點中的使用者 ID 很多是字元串類型或 Long 類型,是以需要使用 uid_mapping 類型建構一張映射表。 RoaringBitmap 類型要求使用者 ID 必須是 32位 int 類型,而且越稠密越好(即使用者 ID 最好連續)。映射表利用 Hologres 的 SERIAL 類型(自增的 32位 int)來實作使用者映射的自動管理和穩定映射。
- 由于是實時資料, 在 Hologres 中該表為行存表,以提高 Flink 維表實時 JOIN 的 QPS。
BEGIN;
CREATE TABLE public.uid_mapping (
uid text NOT NULL,
uid_int32 serial,
PRIMARY KEY (uid)
);
--将uid設為clustering_key和distribution_key便于快速查找其對應的int32值
CALL set_table_property('public.uid_mapping', 'clustering_key', 'uid');
CALL set_table_property('public.uid_mapping', 'distribution_key', 'uid');
CALL set_table_property('public.uid_mapping', 'orientation', 'row');
COMMIT;
2)建立表 dws_app 為基礎聚合表,用于存放在基礎次元上聚合後的結果。
- 使用 RoaringBitmap 前需要建立 RoaringBitmap extention,同時也需要 Hologres 執行個體為 0.10 及以上版本。
CREATE EXTENSION IF NOT EXISTS roaringbitmap;
- 為了更好的性能,建議根據基礎聚合表資料量合理的設定 Shard 數,但建議基礎聚合表的 Shard 數設定不超過計算資源總 Core 數的 60%。推薦使用以下哨兵表方式,設定 Table Group 的 Shard 數。
--建立shard數為16的Table Group,
--本次測試資料量百萬級,總計算資源為128core,設定shard數為16
BEGIN;
CREATE TABLE tg16 (a int); --Table Group哨兵表
CALL set_table_property('tg16', 'shard_count', '16');
COMMIT;
- 相比離線結果表,此結果表增加了時間戳字段,用于實作以 Flink 視窗周期為機關的統計。結果表 DDL 如下:
BEGIN;
CREATE TABLE dws_app(
country text,
prov text,
city text,
ymd text NOT NULL, --日期字段
timetz TIMESTAMPTZ, --統計時間戳,可以實作以Flink視窗周期為機關的統計
uid32_bitmap roaringbitmap, -- 使用roaringbitmap記錄uv
PRIMARY KEY (country, prov, city, ymd, timetz)--查詢次元和時間作為主鍵,防止重複插入資料
);
CALL set_table_property('public.dws_app', 'orientation', 'column');
--日期字段設為clustering_key和event_time_column,便于過濾
CALL set_table_property('public.dws_app', 'clustering_key', 'ymd');
CALL set_table_property('public.dws_app', 'event_time_column', 'ymd');
--等價于将表放在shard數為16的table group
call set_table_property('public.dws_app', 'colocate_with', 'tg16');
--group by字段設為distribution_key
CALL set_table_property('public.dws_app', 'distribution_key', 'country,prov,city');
COMMIT;
4.2.2 實時讀取資料并更新 dws_app 基礎聚合表
在
中的完整示例源碼請見
alibabacloud-hologres-connectors examples,下面是在
中的具體操作步驟:
-
Flink 流式讀取資料源(DataStream),并轉化為源表(Table)。
在 Flink 中使用流式讀取資料源,資料源可以是 CSV 檔案,也可以來源于 Kafka、Redis 等,可以根據業務場景準備,此次不再具體展開講述。在 Flink 中轉化為源表的代碼示例如下:
//此處使用csv檔案作為資料源,也可以是kafka/redis等
DataStreamSource odsStream = env.createInput(csvInput, typeInfo);
// 與維表join需要添加proctime字段,詳見https://help.aliyun.com/document_detail/62506.html
Table odsTable =
tableEnv.fromDataStream(
odsStream,
$("uid"),
$("country"),
$("prov"),
$("city"),
$("ymd"),
$("proctime").proctime());
// 注冊到catalog環境
tableEnv.createTemporaryView("odsTable", odsTable);
- 在 Flink 中将源表與 Hologres 維表(uid_mapping)進行關聯。
- 在 Flink 中建立 Hologres 維表,需要使用 insertIfNotExists 參數,即查詢不到資料時自行插入,uid_int32 字段便可以利用 Hologres 的 Serial 類型自增建立。
- 将 Flink 源表與 Hologres 維表進行關聯(JOIN)
// 建立Hologres維表,其中insertIfNotExists表示查詢不到則自行插入
String createUidMappingTable =
String.format(
"create table uid_mapping_dim("
+ " uid string,"
+ " uid_int32 INT"
+ ") with ("
+ " 'connector'='hologres',"
+ " 'dbname' = '%s'," //Hologres DB名
+ " 'tablename' = '%s',"//Hologres 表名
+ " 'username' = '%s'," //目前賬号access id
+ " 'password' = '%s'," //目前賬号access key
+ " 'endpoint' = '%s'," //Hologres endpoint
+ " 'insertifnotexists'='true'"
+ ")",
database, dimTableName, username, password, endpoint);
tableEnv.executeSql(createUidMappingTable);
// 源表與維表join
String odsJoinDim =
"SELECT ods.country, ods.prov, ods.city, ods.ymd, dim.uid_int32"
+ " FROM odsTable AS ods JOIN uid_mapping_dim FOR SYSTEM_TIME AS OF ods.proctime AS dim"
+ " ON ods.uid = dim.uid";
Table joinRes = tableEnv.sqlQuery(odsJoinDim);
- 将關聯結果轉化為 DataStream,通過 Flink 時間視窗處理,結合 RoaringBitmap 進行對名額進行去重處理。
DataStream<Tuple6<String, String, String, String, Timestamp, byte[]>> processedSource =
source
// 篩選需要統計的次元(country, prov, city, ymd)
.keyBy(0, 1, 2, 3)
// 滾動時間視窗;此處由于使用讀取csv模拟輸入流,采用ProcessingTime,實際使用中可使用EventTime
.window(TumblingProcessingTimeWindows.of(Time.minutes(5)))
// 觸發器,可以在視窗未結束時擷取聚合結果
.trigger(ContinuousProcessingTimeTrigger.of(Time.minutes(1)))
.aggregate(
// 聚合函數,根據key By篩選的次元,進行聚合
new AggregateFunction<
Tuple5<String, String, String, String, Integer>,
RoaringBitmap,
RoaringBitmap>() {
@Override
public RoaringBitmap createAccumulator() {
return new RoaringBitmap();
}
@Override
public RoaringBitmap add(
Tuple5<String, String, String, String, Integer> in,
RoaringBitmap acc) {
// 将32位的uid添加到RoaringBitmap進行去重
acc.add(in.f4);
return acc;
}
@Override
public RoaringBitmap getResult(RoaringBitmap acc) {
return acc;
}
@Override
public RoaringBitmap merge(
RoaringBitmap acc1, RoaringBitmap acc2) {
return RoaringBitmap.or(acc1, acc2);
}
},
//視窗函數,輸出聚合結果
new WindowFunction<
RoaringBitmap,
Tuple6<String, String, String, String, Timestamp, byte[]>,
Tuple,
TimeWindow>() {
@Override
public void apply(
Tuple keys,
TimeWindow timeWindow,
Iterable<RoaringBitmap> iterable,
Collector<
Tuple6<String, String, String, String, Timestamp, byte[]>> out)
throws Exception {
RoaringBitmap result = iterable.iterator().next();
// 優化RoaringBitmap
result.runOptimize();
// 将RoaringBitmap轉化為位元組數組以存入Holo中
byte[] byteArray = new byte[result.serializedSizeInBytes()];
result.serialize(ByteBuffer.wrap(byteArray));
// 其中 Tuple6.f4(Timestamp) 字段表示以視窗長度為周期進行統計,以秒為機關
out.collect(
new Tuple6<>(
keys.getField(0),
keys.getField(1),
keys.getField(2),
keys.getField(3),
new Timestamp(
timeWindow.getEnd() / 1000 * 1000),
byteArray));
}
});
- 寫入 Hologres 結果表。
經過 Flink 去重處理的資料寫入至 Hologres 結果表 dws_app,但需要注意的是,Hologres 中 RoaringBitmap 類型在 Flink 中對應 Byte 數組類型。Flink 中代碼如下:
// 計算結果轉換為表
Table resTable =
tableEnv.fromDataStream(
processedSource,
$("country"),
$("prov"),
$("city"),
$("ymd"),
$("timest"),
$("uid32_bitmap"));
// 建立Hologres結果表, 其中Hologres的RoaringBitmap類型通過Byte數組存入
String createHologresTable =
String.format(
"create table sink("
+ " country string,"
+ " prov string,"
+ " city string,"
+ " ymd string,"
+ " timetz timestamp,"
+ " uid32_bitmap BYTES"
+ ") with ("
+ " 'connector'='hologres',"
+ " 'dbname' = '%s',"
+ " 'tablename' = '%s',"
+ " 'username' = '%s',"
+ " 'password' = '%s',"
+ " 'endpoint' = '%s',"
+ " 'connectionSize' = '%s',"
+ " 'mutatetype' = 'insertOrReplace'"
+ ")",
database, dwsTableName, username, password, endpoint, connectionSize);
tableEnv.executeSql(createHologresTable);
// 寫入計算結果到dws表
tableEnv.executeSql("insert into sink select * from " + resTable);
4.2.3 資料查詢
在 Hologres 中對結果表(dws_app)進行 UV 計算。按照查詢次元做聚合計算,查詢 Bitmap 基數,得出 Group by 條件下的使用者數
- 示例查詢 1:查詢某天内各個城市的 uv。
--運作下面RB_AGG運算查詢,可執行參數先關閉三階段聚合開關(預設關閉),性能更好,此步驟可選
set hg_experimental_enable_force_three_stage_agg=off
SELECT country
,prov
,city
,RB_CARDINALITY(RB_OR_AGG(uid32_bitmap)) AS uv
FROM dws_app
WHERE ymd = '20210329'
GROUP BY country
,prov
,city
;
- 示例查詢 2:查詢某段時間内各個省份的 UV、PV。
--運作下面RB_AGG運算查詢,可執行參數先關閉三階段聚合開關(預設關閉),性能更好,此步驟可選
set hg_experimental_enable_force_three_stage_agg=off
SELECT country
,prov
,RB_CARDINALITY(RB_OR_AGG(uid32_bitmap)) AS uv
,SUM(1) AS pv
FROM dws_app
WHERE time > '2021-04-19 18:00:00+08' and time < '2021-04-19 19:00:00+08'
GROUP BY country
,prov
;
4.2.4 可視化展示
計算出 UV、PV 和大多數情況需要用 BI 工具以更直覺的方式可視化展示,由于需要使用 RB_CARDINALITY 和 RB_OR_AGG 進行聚合計算,需要使用 BI 的自定義聚合函數的能力,常見的具備該能力的 BI 包括 Apache Superset 和 Tableau,下面将會講述這兩個 BI 工具的最佳實踐。
使用 Apache Superset
Apache Superset 對接 Hologres 的方式,請參考
産品手冊。在 Superset 中可以直接使用 dws_app 表作為 Dataset 使用
并且在資料集中,建立一個單獨 Metrics,名為 UV,表達式如下:
RB_CARDINALITY(RB_OR_AGG(uid32_bitmap))。
然後就可以開始探索資料了。
當然也可以建立 Dashborad。
使用 Tableau
Tableau 對接 Hologres 的方式,請參考
。可以使用 Tableau 的直通函數直接實作自定義函數的能力,詳細介紹請參照
Tableau 的手冊。在 Tableau 對接 Hologres 後,可以建立一個計算字段,表達式如下:
RAWSQLAGG_INT("RB_CARDINALITY(RB_OR_AGG(%1))", [Uid32 Bitmap])。
5. 總結
能支撐億級使用者的實時 UV 計算,是充分利用了
和 Hologres 各自的優秀能力:
- 的實時事件驅動能力、豐富的視窗定義能力、可擴充的程式設計接口,支援了 RoaringBitmap 累加行為的定義;
- Hologres 實時更新能力、實時寫入能力,保證了資料的高吞吐寫入和生效;
- Hologres 内置 bitmap 資料類型,原生支援高效率的 RoaringBitmap 各類操作函數,保證了計算的高效率。
通過
+ Hologres 實時更新微批次的統計結果,同時基于 bitmap 豐富的表達能力和極緻的壓縮效率,實作了存儲不膨脹,計算高效率和全實時。讓億級使用者的 UV 計算,不再是門檻高、消耗資源的複雜操作。基于 bitmap 資料結構也可以實時還原原始的資料狀态,確定了資料的高品質、可回溯。更靈活的資料結構,給了使用者更靈活的分析體驗,讓自助式多元分析成為每個營運同學的必備工具。
活動推薦
阿裡雲基于 Apache Flink 建構的企業級産品-實時計算Flink版現開啟活動:
99元試用
(包年包月、10CU)即有機會獲得 Flink 獨家定制T恤;另包3個月及以上還有85折優惠!
了解活動詳情:
https://www.aliyun.com/product/bigdata/sc