文章目錄
- 一 DWS層與DWM層的設計
-
- 1 設計思路
- 2 DWS層需求分析
- 二 DWM層-UV計算
-
- 1 需求分析與思路
- 2 從kafka中讀取資料
-
- (1)代碼實作
- (2)測試
- (3)總結
- 3 UV過濾 -- 獨立訪客計算
-
- (1)實作思路
- (2)代碼實作
- 4 寫入kafka
- 5 測試
一 DWS層與DWM層的設計
1 設計思路
之前通過分流等手段,把資料分拆成了獨立的kafka topic。那麼接下來如何處理資料,就要思考一下到底要通過實時計算出哪些名額項。
因為實時計算與離線不同,實時計算的開發和運維成本都是非常高的,要結合實際情況考慮是否有必要像離線數倉一樣,建一個大而全的中間層。
如果沒有必要大而全,這時候就需要大體規劃一下要實時計算出的名額需求了。把這些名額以主題寬表的形式輸出,就是DWS層。
2 DWS層需求分析
統計主題 | 需求名額 | 輸出方式 | 計算來源 | 來源層級 |
---|---|---|---|---|
訪客 | pv | 可視化大屏 | page_log直接可求 | dwd |
uv | 可視化大屏 | 需要用page_log過濾去重 | dwm | |
跳出明細 | 可視化大屏 | 需要通過page_log行為判斷 | dwm | |
進入頁面數 | 可視化大屏 | 需要識别開始通路辨別 | dwd | |
連續通路時長 | 可視化大屏 | page_log直接可求 | dwd | |
商品 | 點選 | 多元分析 | page_log直接可求 | dwd |
收藏 | 多元分析 | 收藏表 | dwd | |
加入購物車 | 多元分析 | 購物車表 | dwd | |
下單 | 可視化大屏 | 訂單寬表 | dwm | |
支付 | 多元分析 | 支付寬表 | dwm | |
退款 | 多元分析 | 退款表 | dwd | |
評論 | 多元分析 | 評論表 | dwd | |
地區 | pv | 多元分析 | page_log直接可求 | dwd |
uv | 多元分析 | 需要用page_log過濾去重 | dwm | |
下單 | 可視化大屏 | 訂單寬表 | dwm | |
關鍵詞 | 搜尋關鍵詞 | 可視化大屏 | 頁面通路日志 直接可求 | dwd |
點選商品關鍵詞 | 可視化大屏 | 商品主題下單再次聚合 | dws | |
下單商品關鍵詞 | 可視化大屏 | 商品主題下單再次聚合 | dws |
當然實際需求還會有更多,這裡主要以為可視化大屏為目的進行實時計算的處理。
DWM層的定位是主要服務于DWS,因為部分需求直接從DWD層到DWS層中間會有一定的計算量,而且這部分計算的結果很有可能被多個DWS層主題複用,是以部分DWD層會形成一層DWM,這裡涉及業務主要包括:通路UV計算、 跳出明細計算、訂單寬表、支付寬表。
二 DWM層-UV計算
1 需求分析與思路
UV,全稱是Unique Visitor,即獨立訪客,對于實時計算中,也可以稱為DAU(Daily Active User),即每日活躍使用者,因為實時計算中的uv通常是指當日的訪客數。
那麼如何從使用者行為日志中識别出當日的訪客,有以下兩點:
- 其一,是識别出該訪客打開的第一個頁面,表示這個訪客開始進入應用。
- 其二,由于訪客可以在一天中多次進入應用,是以要在一天的範圍内進行去重。
2 從kafka中讀取資料
工作流程如下:
(1)代碼實作
public class UnionVistorApp {
public static void main(String[] args) throws Exception {
//TODO 1 基本環境準備
//1.1 流處理環境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//1.2 設定并行度
env.setParallelism(4);
//TODO 2 檢查點設定
// //2.1 開啟檢查點
// env.enableCheckpointing(5000L, CheckpointingMode.EXACTLY_ONCE);
// //2.2 設定檢查點逾時時間
// env.getCheckpointConfig().setCheckpointTimeout(60000L);
// //2.3 設定重新開機政策
// env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,3000L));
// //2.4 設定job取消後,檢查點是否保留
// env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// //2.5 設定狀态後端 -- 基于記憶體 or 檔案系統 or RocksDB
// env.setStateBackend(new FsStateBackend("hdfs://hadoop101:8020/ck/gmall"));
// //2.6 指定操作HDFS的使用者
// System.setProperty("HADOOP_USER_NAME","hzy");
//TODO 3 從kafka中讀取資料
//3.1 聲明消費主題以及消費者組
String topic = "dwd_page_log";
String groupId = "union_visitor_app_group";
//3.2 擷取kafka消費者對象
FlinkKafkaConsumer<String> kafkaSource = MyKafkaUtil.getKafkaSource(topic, groupId);
//3.3 讀取資料封裝流
DataStreamSource<String> kafkaDS = env.addSource(kafkaSource);
//TODO 4 對讀取的資料進行類型轉換 String -> JSONObject
SingleOutputStreamOperator<JSONObject> jsonObjDS = kafkaDS.map(JSON::parseObject);
jsonObjDS.print(">>>");
env.execute();
}
}
(2)測試
需要啟動的程序:zookeeper、kafka、模拟生成日志jar包,logger.sh、UnionVistorApp、BaseLogApp。
- 啟動logger.sh、zk、kafka
- 運作Idea中的BaseLogApp
- 運作Idea中的UniqueVisitApp
- 檢視控制台輸出
- 執行流程
模拟生成資料->日志處理伺服器->寫到kafka的ODS層(ods_base_log)->BaseLogApp分流->dwd_page_log->UniqueVisitApp讀取輸出
輸出資訊如下:
BaseLogApp
啟動流::3> {"common":{"ar":"110000","uid":"45","os":"Android 11.0","ch":"360","is_new":"1","md":"Xiaomi Mix2 ","mid":"mid_13","vc":"v2.1.134","ba":"Xiaomi"},"start":{"entry":"install","open_ad_skip_ms":0,"open_ad_ms":5918,"loading_time":1480,"open_ad_id":11},"ts":1670158358000}
曝光流::1> {"display_type":"query","page_id":"good_detail","item":"10","item_type":"sku_id","pos_id":4,"order":4,"ts":1670158358000}
曝光流::3> {"display_type":"query","page_id":"good_detail","item":"1","item_type":"sku_id","pos_id":5,"order":6,"ts":1670158358000}
曝光流::1> {"display_type":"query","page_id":"good_detail","item":"5","item_type":"sku_id","pos_id":1,"order":5,"ts":1670158358000}
主流::3> {"common":{"ar":"110000","uid":"45","os":"Android 11.0","ch":"360","is_new":"1","md":"Xiaomi Mix2 ","mid":"mid_13","vc":"v2.1.134","ba":"Xiaomi"},"page":{"page_id":"cart","during_time":15330,"last_page_id":"good_detail"},"ts":1670158358000}
UnionVistorApp
>>>:2> {"common":{"ar":"110000","uid":"45","os":"Android 11.0","ch":"360","is_new":"1","md":"Xiaomi Mix2 ","mid":"mid_13","vc":"v2.1.134","ba":"Xiaomi"},"page":{"page_id":"good_detail","item":"3","during_time":9775,"item_type":"sku_id","last_page_id":"good_list","source_type":"query"},"displays":[{"display_type":"recommend","item":"10","item_type":"sku_id","pos_id":4,"order":1},{"display_type":"recommend","item":"3","item_type":"sku_id","pos_id":1,"order":2},{"display_type":"promotion","item":"2","item_type":"sku_id","pos_id":4,"order":3},{"display_type":"query","item":"8","item_type":"sku_id","pos_id":1,"order":4},{"display_type":"query","item":"10","item_type":"sku_id","pos_id":5,"order":5},{"display_type":"query","item":"1","item_type":"sku_id","pos_id":5,"order":6}],"actions":[{"item":"3","action_id":"favor_add","item_type":"sku_id","ts":1670158362887}],"ts":1670158358000}
>>>:4> {"common":{"ar":"110000","uid":"45","os":"Android 11.0","ch":"360","is_new":"1","md":"Xiaomi Mix2 ","mid":"mid_13","vc":"v2.1.134","ba":"Xiaomi"},"page":{"page_id":"trade","item":"4,6,10","during_time":5294,"item_type":"sku_ids","last_page_id":"cart"},"ts":1670158358000}
>>>:3> {"common":{"ar":"110000","uid":"45","os":"Android 11.0","ch":"360","is_new":"1","md":"Xiaomi Mix2 ","mid":"mid_13","vc":"v2.1.134","ba":"Xiaomi"},"page":{"page_id":"cart","during_time":15330,"last_page_id":"good_detail"},"ts":1670158358000}
(3)總結
執行流程:
- 模拟生成日志jar包
- 将模拟生成的日志資料發送給Nginx進行負載均衡
- Nginx将請求轉發給三台日志采集服務
- 三台日志采集服務接收到日志資料,将日志資料發送給kafka的ods_base_log主題中
- BaseLogApp應用程式從ods_base_log中讀取資料,進行分流
- 啟動日志:dwd_start_log
- 曝光日志:dwd_display_log
- 頁面日志:dwd_page_log
- UnionVistorApp從dwd_page_log主題中讀取資料
3 UV過濾 – 獨立訪客計算
(1)實作思路
- 首先用keyby按照mid進行分組,每組表示目前裝置的通路情況
- 分組後使用keystate狀态,記錄使用者進入時間,實作RichFilterFunction完成過濾
- 重寫open 方法用來初始化狀态
- 重寫filter方法進行過濾
- 可以直接篩掉last_page_id不為空的字段,因為隻要有上一頁,說明這條不是這個使用者進入的首個頁面。
- 狀态用來記錄使用者的進入時間,隻要這個lastVisitDate是今天,就說明使用者今天已經通路過了是以篩除掉。如果為空或者不是今天,說明今天還沒通路過,則保留。
- 因為狀态值主要用于篩選是否今天來過,是以這個記錄過了今天基本上沒有用了,這裡enableTimeToLive 設定了1天的過期時間,避免狀态過大。
(2)代碼實作
//TODO 5 按照裝置id對資料進行分組
KeyedStream<JSONObject, String> keyedDS = jsonObjDS.keyBy(jsonObj -> jsonObj.getJSONObject("common").getString("mid"));
//TODO 6 實作過濾
//實作目的:如有一個使用者在6月通路一次,11月通路一次,6-11月共通路兩次,
// 如果一直保留其6月的通路狀态,直到11月才去更新,會消耗很多資源,
// 是以需要将其通路時間放入狀态中,定時進行更新。
SingleOutputStreamOperator<JSONObject> filterDS = keyedDS.filter(
new RichFilterFunction<JSONObject>() {
// 聲明狀态變量,用于存放上次通路日期
private ValueState<String> lastVistDateState;
// 聲明日期格式工具類
private SimpleDateFormat sdf;
@Override
public void open(Configuration parameters) throws Exception {
sdf = new SimpleDateFormat("yyyyMMdd");
ValueStateDescriptor<String> valueStateDescriptor = new ValueStateDescriptor<>("lastVistDateState", String.class);
// 注意:UV可以延伸為日活統計,其狀态值主要用于篩選當天是否通路過
// 那麼狀态超過今天就沒有存在的意義
// 是以設定狀态的失效時間為1天
// 粒度為天,不記錄時分秒
StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.days(1))
// 預設值,當狀态建立或者寫入的時候會更新狀态失效時間
// .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
// 預設值,狀态過期後,如果還沒有被清理,是否傳回給狀态調用者
// .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build();
valueStateDescriptor.enableTimeToLive(ttlConfig);
lastVistDateState = getRuntimeContext().getState(valueStateDescriptor);
}
@Override
public boolean filter(JSONObject jsonObj) throws Exception {
// 如果從其他頁面跳轉過來,直接過濾掉
String lastPageId = jsonObj.getJSONObject("page").getString("last_page_id");
if (lastPageId != null && lastPageId.length() > 0) {
return false;
}
// 擷取狀态中的上次通路日期
String lastVisitDate = lastVistDateState.value();
String curVisitDate = sdf.format(jsonObj.getLong("ts"));
if (lastVisitDate != null && lastVisitDate.length() > 1 && lastVisitDate.equals(curVisitDate)) {
// 今天已經通路過
return false;
} else {
// 今天還沒通路過
lastVistDateState.update(curVisitDate);
return true;
}
}
}
);
filterDS.print(">>>");
env.execute();
4 寫入kafka
将過濾處理後的UV寫入到Kafka的dwm_unique_visitor。
//TODO 7 将過濾後的uv資料,寫回到kafka的dwm層
filterDS.map(jsonObj -> jsonObj.toJSONString()).addSink(
MyKafkaUtil.getKafkaSink("dwm_unique_visitor")
);
5 測試
# 啟動logger.sh、zk、kafka
# 運作Idea中的BaseLogApp
# 運作Idea中的UniqueVisitApp
# 檢視控制台輸出以及kafka的dwm_unique_visit主題
# 執行流程
模拟生成資料->日志處理伺服器->寫到kafka的ODS層(ods_base_log)->BaseLogApp分流->dwd_page_log->UniqueVisitApp讀取并處理->寫回到kafka的dwm層
程式運作整體流程如下: