天天看點

【實時數倉】DWM層設計模式、獨立訪客(UV)的計算一 DWS層與DWM層的設計二 DWM層-UV計算

文章目錄

  • 一 DWS層與DWM層的設計
    • 1 設計思路
    • 2 DWS層需求分析
  • 二 DWM層-UV計算
    • 1 需求分析與思路
    • 2 從kafka中讀取資料
      • (1)代碼實作
      • (2)測試
      • (3)總結
    • 3 UV過濾 -- 獨立訪客計算
      • (1)實作思路
      • (2)代碼實作
    • 4 寫入kafka
    • 5 測試

一 DWS層與DWM層的設計

1 設計思路

之前通過分流等手段,把資料分拆成了獨立的kafka topic。那麼接下來如何處理資料,就要思考一下到底要通過實時計算出哪些名額項。

因為實時計算與離線不同,實時計算的開發和運維成本都是非常高的,要結合實際情況考慮是否有必要像離線數倉一樣,建一個大而全的中間層。

如果沒有必要大而全,這時候就需要大體規劃一下要實時計算出的名額需求了。把這些名額以主題寬表的形式輸出,就是DWS層。

2 DWS層需求分析

統計主題 需求名額 輸出方式 計算來源 來源層級
訪客 pv 可視化大屏 page_log直接可求 dwd
uv 可視化大屏 需要用page_log過濾去重 dwm
跳出明細 可視化大屏 需要通過page_log行為判斷 dwm
進入頁面數 可視化大屏 需要識别開始通路辨別 dwd
連續通路時長 可視化大屏 page_log直接可求 dwd
商品 點選 多元分析 page_log直接可求 dwd
收藏 多元分析 收藏表 dwd
加入購物車 多元分析 購物車表 dwd
下單 可視化大屏 訂單寬表 dwm
支付 多元分析 支付寬表 dwm
退款 多元分析 退款表 dwd
評論 多元分析 評論表 dwd
地區 pv 多元分析 page_log直接可求 dwd
uv 多元分析 需要用page_log過濾去重 dwm
下單 可視化大屏 訂單寬表 dwm
關鍵詞 搜尋關鍵詞 可視化大屏 頁面通路日志 直接可求 dwd
點選商品關鍵詞 可視化大屏 商品主題下單再次聚合 dws
下單商品關鍵詞 可視化大屏 商品主題下單再次聚合 dws

當然實際需求還會有更多,這裡主要以為可視化大屏為目的進行實時計算的處理。

DWM層的定位是主要服務于DWS,因為部分需求直接從DWD層到DWS層中間會有一定的計算量,而且這部分計算的結果很有可能被多個DWS層主題複用,是以部分DWD層會形成一層DWM,這裡涉及業務主要包括:通路UV計算、 跳出明細計算、訂單寬表、支付寬表。

二 DWM層-UV計算

1 需求分析與思路

UV,全稱是Unique Visitor,即獨立訪客,對于實時計算中,也可以稱為DAU(Daily Active User),即每日活躍使用者,因為實時計算中的uv通常是指當日的訪客數。

那麼如何從使用者行為日志中識别出當日的訪客,有以下兩點:

  • 其一,是識别出該訪客打開的第一個頁面,表示這個訪客開始進入應用。
  • 其二,由于訪客可以在一天中多次進入應用,是以要在一天的範圍内進行去重。
【實時數倉】DWM層設計模式、獨立訪客(UV)的計算一 DWS層與DWM層的設計二 DWM層-UV計算

2 從kafka中讀取資料

工作流程如下:

【實時數倉】DWM層設計模式、獨立訪客(UV)的計算一 DWS層與DWM層的設計二 DWM層-UV計算

(1)代碼實作

public class UnionVistorApp {
    public static void main(String[] args) throws Exception {
        //TODO 1 基本環境準備
        //1.1 流處理環境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //1.2 設定并行度
        env.setParallelism(4);

        //TODO 2 檢查點設定
//        //2.1 開啟檢查點
//        env.enableCheckpointing(5000L, CheckpointingMode.EXACTLY_ONCE);
//        //2.2 設定檢查點逾時時間
//        env.getCheckpointConfig().setCheckpointTimeout(60000L);
//        //2.3 設定重新開機政策
//        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,3000L));
//        //2.4 設定job取消後,檢查點是否保留
//        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
//        //2.5 設定狀态後端 -- 基于記憶體 or 檔案系統 or RocksDB
//        env.setStateBackend(new FsStateBackend("hdfs://hadoop101:8020/ck/gmall"));
//        //2.6 指定操作HDFS的使用者
//        System.setProperty("HADOOP_USER_NAME","hzy");

        //TODO 3 從kafka中讀取資料
        //3.1 聲明消費主題以及消費者組
        String topic = "dwd_page_log";
        String groupId = "union_visitor_app_group";
        //3.2 擷取kafka消費者對象
        FlinkKafkaConsumer<String> kafkaSource = MyKafkaUtil.getKafkaSource(topic, groupId);
        //3.3 讀取資料封裝流
        DataStreamSource<String> kafkaDS = env.addSource(kafkaSource);

        //TODO 4 對讀取的資料進行類型轉換 String -> JSONObject
        SingleOutputStreamOperator<JSONObject> jsonObjDS = kafkaDS.map(JSON::parseObject);

        jsonObjDS.print(">>>");
        env.execute();
    }
}
           

(2)測試

需要啟動的程序:zookeeper、kafka、模拟生成日志jar包,logger.sh、UnionVistorApp、BaseLogApp。

  • 啟動logger.sh、zk、kafka
  • 運作Idea中的BaseLogApp
  • 運作Idea中的UniqueVisitApp
  • 檢視控制台輸出
  • 執行流程

模拟生成資料->日志處理伺服器->寫到kafka的ODS層(ods_base_log)->BaseLogApp分流->dwd_page_log->UniqueVisitApp讀取輸出

輸出資訊如下:

BaseLogApp

啟動流::3> {"common":{"ar":"110000","uid":"45","os":"Android 11.0","ch":"360","is_new":"1","md":"Xiaomi Mix2 ","mid":"mid_13","vc":"v2.1.134","ba":"Xiaomi"},"start":{"entry":"install","open_ad_skip_ms":0,"open_ad_ms":5918,"loading_time":1480,"open_ad_id":11},"ts":1670158358000}
曝光流::1> {"display_type":"query","page_id":"good_detail","item":"10","item_type":"sku_id","pos_id":4,"order":4,"ts":1670158358000}
曝光流::3> {"display_type":"query","page_id":"good_detail","item":"1","item_type":"sku_id","pos_id":5,"order":6,"ts":1670158358000}
曝光流::1> {"display_type":"query","page_id":"good_detail","item":"5","item_type":"sku_id","pos_id":1,"order":5,"ts":1670158358000}
主流::3> {"common":{"ar":"110000","uid":"45","os":"Android 11.0","ch":"360","is_new":"1","md":"Xiaomi Mix2 ","mid":"mid_13","vc":"v2.1.134","ba":"Xiaomi"},"page":{"page_id":"cart","during_time":15330,"last_page_id":"good_detail"},"ts":1670158358000}
           

UnionVistorApp

>>>:2> {"common":{"ar":"110000","uid":"45","os":"Android 11.0","ch":"360","is_new":"1","md":"Xiaomi Mix2 ","mid":"mid_13","vc":"v2.1.134","ba":"Xiaomi"},"page":{"page_id":"good_detail","item":"3","during_time":9775,"item_type":"sku_id","last_page_id":"good_list","source_type":"query"},"displays":[{"display_type":"recommend","item":"10","item_type":"sku_id","pos_id":4,"order":1},{"display_type":"recommend","item":"3","item_type":"sku_id","pos_id":1,"order":2},{"display_type":"promotion","item":"2","item_type":"sku_id","pos_id":4,"order":3},{"display_type":"query","item":"8","item_type":"sku_id","pos_id":1,"order":4},{"display_type":"query","item":"10","item_type":"sku_id","pos_id":5,"order":5},{"display_type":"query","item":"1","item_type":"sku_id","pos_id":5,"order":6}],"actions":[{"item":"3","action_id":"favor_add","item_type":"sku_id","ts":1670158362887}],"ts":1670158358000}
>>>:4> {"common":{"ar":"110000","uid":"45","os":"Android 11.0","ch":"360","is_new":"1","md":"Xiaomi Mix2 ","mid":"mid_13","vc":"v2.1.134","ba":"Xiaomi"},"page":{"page_id":"trade","item":"4,6,10","during_time":5294,"item_type":"sku_ids","last_page_id":"cart"},"ts":1670158358000}
>>>:3> {"common":{"ar":"110000","uid":"45","os":"Android 11.0","ch":"360","is_new":"1","md":"Xiaomi Mix2 ","mid":"mid_13","vc":"v2.1.134","ba":"Xiaomi"},"page":{"page_id":"cart","during_time":15330,"last_page_id":"good_detail"},"ts":1670158358000}
           

(3)總結

執行流程:

  • 模拟生成日志jar包
  • 将模拟生成的日志資料發送給Nginx進行負載均衡
  • Nginx将請求轉發給三台日志采集服務
  • 三台日志采集服務接收到日志資料,将日志資料發送給kafka的ods_base_log主題中
  • BaseLogApp應用程式從ods_base_log中讀取資料,進行分流
    • 啟動日志:dwd_start_log
    • 曝光日志:dwd_display_log
    • 頁面日志:dwd_page_log
  • UnionVistorApp從dwd_page_log主題中讀取資料

3 UV過濾 – 獨立訪客計算

(1)實作思路

  • 首先用keyby按照mid進行分組,每組表示目前裝置的通路情況
  • 分組後使用keystate狀态,記錄使用者進入時間,實作RichFilterFunction完成過濾
  • 重寫open 方法用來初始化狀态
  • 重寫filter方法進行過濾
    • 可以直接篩掉last_page_id不為空的字段,因為隻要有上一頁,說明這條不是這個使用者進入的首個頁面。
    • 狀态用來記錄使用者的進入時間,隻要這個lastVisitDate是今天,就說明使用者今天已經通路過了是以篩除掉。如果為空或者不是今天,說明今天還沒通路過,則保留。
    • 因為狀态值主要用于篩選是否今天來過,是以這個記錄過了今天基本上沒有用了,這裡enableTimeToLive 設定了1天的過期時間,避免狀态過大。

(2)代碼實作

//TODO 5 按照裝置id對資料進行分組
        KeyedStream<JSONObject, String> keyedDS = jsonObjDS.keyBy(jsonObj -> jsonObj.getJSONObject("common").getString("mid"));

        //TODO 6 實作過濾
        //實作目的:如有一個使用者在6月通路一次,11月通路一次,6-11月共通路兩次,
        // 如果一直保留其6月的通路狀态,直到11月才去更新,會消耗很多資源,
        // 是以需要将其通路時間放入狀态中,定時進行更新。
        SingleOutputStreamOperator<JSONObject> filterDS = keyedDS.filter(
                new RichFilterFunction<JSONObject>() {
                    // 聲明狀态變量,用于存放上次通路日期
                    private ValueState<String> lastVistDateState;
                    // 聲明日期格式工具類
                    private SimpleDateFormat sdf;

                    @Override
                    public void open(Configuration parameters) throws Exception {
                        sdf = new SimpleDateFormat("yyyyMMdd");
                        ValueStateDescriptor<String> valueStateDescriptor = new ValueStateDescriptor<>("lastVistDateState", String.class);
                        // 注意:UV可以延伸為日活統計,其狀态值主要用于篩選當天是否通路過
                        // 那麼狀态超過今天就沒有存在的意義
                        // 是以設定狀态的失效時間為1天
                        // 粒度為天,不記錄時分秒
                        StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.days(1))
                                // 預設值,當狀态建立或者寫入的時候會更新狀态失效時間
//                                .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
                                // 預設值,狀态過期後,如果還沒有被清理,是否傳回給狀态調用者
//                                .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
                                .build();
                        valueStateDescriptor.enableTimeToLive(ttlConfig);
                        lastVistDateState = getRuntimeContext().getState(valueStateDescriptor);
                    }

                    @Override
                    public boolean filter(JSONObject jsonObj) throws Exception {
                        // 如果從其他頁面跳轉過來,直接過濾掉
                        String lastPageId = jsonObj.getJSONObject("page").getString("last_page_id");
                        if (lastPageId != null && lastPageId.length() > 0) {
                            return false;
                        }
                        // 擷取狀态中的上次通路日期
                        String lastVisitDate = lastVistDateState.value();
                        String curVisitDate = sdf.format(jsonObj.getLong("ts"));
                        if (lastVisitDate != null && lastVisitDate.length() > 1 && lastVisitDate.equals(curVisitDate)) {
                            // 今天已經通路過
                            return false;
                        } else {
                            // 今天還沒通路過
                            lastVistDateState.update(curVisitDate);
                            return true;
                        }
                    }
                }
        );


        filterDS.print(">>>");
        env.execute();
           

4 寫入kafka

将過濾處理後的UV寫入到Kafka的dwm_unique_visitor。

//TODO 7 将過濾後的uv資料,寫回到kafka的dwm層
filterDS.map(jsonObj -> jsonObj.toJSONString()).addSink(
        MyKafkaUtil.getKafkaSink("dwm_unique_visitor")
);
           

5 測試

# 啟動logger.sh、zk、kafka
# 運作Idea中的BaseLogApp
# 運作Idea中的UniqueVisitApp
# 檢視控制台輸出以及kafka的dwm_unique_visit主題
# 執行流程 
模拟生成資料->日志處理伺服器->寫到kafka的ODS層(ods_base_log)->BaseLogApp分流->dwd_page_log->UniqueVisitApp讀取并處理->寫回到kafka的dwm層
           

程式運作整體流程如下:

【實時數倉】DWM層設計模式、獨立訪客(UV)的計算一 DWS層與DWM層的設計二 DWM層-UV計算