天天看點

生産實踐 | 基于 Flink 的短視訊生産消費監控

本文詳細介紹了實時監控類名額的資料流轉鍊路以及技術方案,大多數的實時監控類名額都可按照本文中的幾種方案實作。

短視訊生産消費監控

短視訊帶來了全新的傳播場域和節目形态,小螢幕、快節奏成為行業潮流的同時,也催生了新的使用者消費習慣,為創作者和商戶帶來收益。而多元化的短視訊也可以為品牌方提供營銷機遇。

其中對于垂類生态短視訊的生産消費熱點的監控分析目前成為了實時資料處理很常見的一個應用場景,比如對某個圈定的垂類生态下的視訊生産或者視訊消費進行監控,對熱點視訊生成對應的優化推薦政策,促進熱點視訊的生産或者消費,建構整個生産消費資料鍊路的閉環,進而提高創作者收益以及消費者留存。

本文将完整分析垂類生态短視訊生産消費資料的整條鍊路流轉方式,并基于 Flink 提供幾種對于垂類視訊生産消費監控的方案設計。通過本文,你可以了解到:

垂類生态短視訊生産消費資料鍊路閉環

實時監控短視訊生産消費的方案設計

不同監控量級場景下的代碼實作

flink 學習資料

項目簡介

垂類生态短視訊生産消費資料鍊路流轉架構圖如下,此資料流轉圖也适用于其他場景:

生産實踐 | 基于 Flink 的短視訊生産消費監控
鍊路

在上述場景中,使用者生産和消費短視訊,進而用戶端、服務端以及資料庫會産生相應的行為記錄檔,這些日志會通過日志抽取中間件抽取到消息隊列中,我們目前的場景中是使用 Kafka 作為消息隊列;然後使用 flink 對垂類生态中的視訊進行生産或消費監控(内容生産通常是圈定垂類作者 id 池,内容消費通常是圈定垂類視訊 id 池),最後将實時聚合資料産出到下遊;下遊可以以資料服務,實時看闆的方式展現,營運同學或者自動化工具最終會幫助我們分析目前垂類下的生産或者消費熱點,進而生成推薦政策。

方案設計

生産實踐 | 基于 Flink 的短視訊生産消費監控
架構

其中資料源如下:

Kafka 為全量内容生産和内容消費的日志。

Rpc/Http/Mysql/配置中心/Redis/HBase 為需要監控的垂類生态内容 id 池(内容生産則為作者 id 池,内容消費則為視訊 id 池),其主要是提供給營運同學動态配置需要監控的 id 範圍,其可以在 flink 中進行實時查詢,解析營運同學想要的監控名額範圍,以及監控的名額和計算方式,然後加工資料産出,可以支援随時配置,實時資料随時計算産出。

其中資料彙為聚類好的内容生産或者消費熱點話題或者事件名額:

Redis/HBase 主要是以低延遲(Redis 5ms p99,HBase 100ms p99,不同公司的服務能力不同)并且高 QPS 提供資料服務,給 Server 端或者線上使用者提供低延遲的資料查詢。

Druid/Mysql 可以做為 OLAP 引擎為 BI 分析提供靈活的上卷下鑽聚合分析能力,供營運同學配置可視化圖表使用。

Kafka 可以以流式資料産出,進而提供給下遊繼續消費或者進行特征提取。

廢話不多說,我們直接上方案和代碼,下述幾種方案按照監控 id 範圍量級區分,不同的量級對應着不同的方案,其中的代碼示例為 ProcessWindowFunction,也可以使用 AggregateFunction 代替,其中主要監控邏輯都相同。

方案 1

适合監控 id 資料量小的場景(幾千 id),其實作方式是在 flink 任務初始化時将需要監控的 id 池或動态配置中心的 id 池加載到記憶體當中,之後隻需要在記憶體中判斷内容生産或者消費資料是否在這個監控池當中。

ProcessWindowFunction p = new ProcessWindowFunction<CommonModel, CommonModel, Long, TimeWindow>() {
    
    // 配置中心動态 id 池
    private Config<Set<Long>> needMonitoredIdsConfig;

    @Override
    public void open(Configuration parameters) throws Exception {
        this.needMonitoredIdsConfig = ConfigBuilder
                .buildSet("needMonitoredIds", Long.class);
    }

    @Override
    public void process(Long bucket, Context context, Iterable<CommonModel> iterable, Collector<CommonModel> collector) throws Exception {
        Set<Long> needMonitoredIds = needMonitoredIdsConfig.get();
        /**
         * 判斷 commonModel 中的 id 是否在 needMonitoredIds 池中
         */
    }
}           

監控的 id 池可以按照固定或者可配置進而分出兩種擷取方式:第一種是在 flink 任務開始時就全部加載進記憶體中,這種方式适合監控 id 池不變的情況;第二種是使用動态配置中心,每次都從配置中心通路到最新的監控 id 池,其可以滿足動态配置或者更改 id 池的需求,并且這種實作方式通常可以實時感覺到配置更改,幾乎無延遲。

方案 2

适合監控 id 資料量适中(幾十萬 id),監控資料範圍會不定時發生變動的場景。其實作方式是在 flink 算子中定時通路接口擷取最新的監控 id 池,以擷取最新監控資料範圍。

ProcessWindowFunction p = new ProcessWindowFunction<CommonModel, CommonModel, Long, TimeWindow>() {

    private long lastRefreshTimestamp;

    private Set<Long> needMonitoredIds;

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        this.refreshNeedMonitoredIds(System.currentTimeMillis());
    }

    @Override
    public void process(Long bucket, Context context, Iterable<CommonModel> iterable, Collector<CommonModel> collector) throws Exception {
        long windowStart = context.window().getStart();
        this.refreshNeedMonitoredIds(windowStart);
        /**
         * 判斷 commonModel 中的 id 是否在 needMonitoredIds 池中
         */
    }

    public void refreshNeedMonitoredIds(long windowStart) {
        // 每隔 10 秒通路一次
        if (windowStart - this.lastRefreshTimestamp >= 10000L) {
            this.lastRefreshTimestamp = windowStart;
            this.needMonitoredIds = Rpc.get(...)
        }
    }
}           

根據上述代碼實作方式,按照時間間隔的方式重新整理 id 池,其缺點在于不能實時感覺監控 id 池的變化,是以重新整理時間可能會和需求場景強耦合(如果 id 池會頻繁更新,那麼就需要縮小重新整理時間間隔)。也可根據需求場景在每個視窗開始前重新整理 id 池,這樣可保證每個視窗中的 id 池中的資料一直保持更新。

方案 3

方案 3 對方案 2 的一個優化(幾十萬 id,我們生産環境中最常用的)。其實作方式是在 flink 中使用 broadcast 算子定時通路監控 id 池,并将 id 池以廣播的形式下發給下遊參與計算的各個算子。其優化點在于:比如任務的并行度為 500,每 1s 通路一次,采用方案 2 則通路監控 id 池接口的 QPS 為 500,在使用 broadcast 算子之後,其通路 QPS 可以減少到 1,可以大大減少對接口的通路量,減輕接口壓力。

public class Example {

    @Slf4j
    static class NeedMonitorIdsSource implements SourceFunction<Map<Long, Set<Long>>> {

        private volatile boolean isCancel;

        @Override
        public void run(SourceContext<Map<Long, Set<Long>>> sourceContext) throws Exception {
            while (!this.isCancel) {
                try {
                    TimeUnit.SECONDS.sleep(1);
                    Set<Long> needMonitorIds = Rpc.get(...);
                    // 可以和上一次通路的資料做比較檢視是否有變化,如果有變化,才發送出去
                    if (CollectionUtils.isNotEmpty(needMonitorIds)) {
                        sourceContext.collect(new HashMap<Long, Set<Long>>() {{
                            put(0L, needMonitorIds);
                        }});
                    }
                } catch (Throwable e) {
                    // 防止接口通路失敗導緻的錯誤導緻 flink job 挂掉
                    log.error("need monitor ids error", e);
                }
            }
        }

        @Override
        public void cancel() {
            this.isCancel = true;
        }
    }

    public static void main(String[] args) {
        ParameterTool parameterTool = ParameterTool.fromArgs(args);
        InputParams inputParams = new InputParams(parameterTool);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();

        final MapStateDescriptor<Long, Set<Long>> broadcastMapStateDescriptor = new MapStateDescriptor<>(
                "config-keywords",
                BasicTypeInfo.LONG_TYPE_INFO,
                TypeInformation.of(new TypeHint<Set<Long>>() {
                }));

        /********************* kafka source *********************/
        BroadcastStream<Map<Long, Set<Long>>> broadcastStream = env
                .addSource(new NeedMonitorIdsSource()) // redis photoId 資料廣播
                .setParallelism(1)
                .broadcast(broadcastMapStateDescriptor);

        DataStream<CommonModel> logSourceDataStream = SourceFactory.getSourceDataStream(...);

        /********************* dag *********************/
        DataStream<CommonModel> resultDataStream = logSourceDataStream
                .keyBy(KeySelectorFactory.getStringKeySelector(CommonModel::getKeyField))
                .connect(broadcastStream)
                .process(new KeyedBroadcastProcessFunction<String, CommonModel, Map<Long, Set<Long>>, CommonModel>() {

                    private Set<Long> needMonitoredIds;

                    @Override
                    public void open(Configuration parameters) throws Exception {
                        super.open(parameters);
                        this.needMonitoredIds = Rpc.get(...)
                    }

                    @Override
                    public void processElement(CommonModel commonModel, ReadOnlyContext readOnlyContext, Collector<CommonModel> collector) throws Exception {
                        // 判斷 commonModel 中的 id 是否在 needMonitoredIds 池中
                    }

                    @Override
                    public void processBroadcastElement(Map<Long, Set<Long>> longSetMap, Context context, Collector<CommonModel> collector) throws Exception {
                        // 需要監控的字段
                        Set<Long> needMonitorIds = longSetMap.get(0L);
                        if (CollectionUtils.isNotEmpty(needMonitorIds)) {
                            this.needMonitoredIds = needMonitorIds;
                        }
                    }
                });

        /********************* kafka sink *********************/
        SinkFactory.setSinkDataStream(...);
        
        env.execute(inputParams.jobName);
    }

}           

方案 4

适合于超大監控範圍的資料(幾百萬,我們自己的生産實踐中使用擴量到 500 萬)。其原理是将監控範圍接口按照 id 按照一定規則分桶。flink 消費到日志資料後将 id 按照 監控範圍接口 id 相同的分桶方法進行分桶 keyBy,這樣在下遊算子中每個算子中就可以按照桶變量值,從接口中拿到對應桶的監控 id 資料,這樣 flink 中并行的每個算子隻需要擷取到自己對應的桶的資料,可以大大減少請求的壓力。

public class Example {

    public static void main(String[] args) {
        ParameterTool parameterTool = ParameterTool.fromArgs(args);
        InputParams inputParams = new InputParams(parameterTool);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();

        final MapStateDescriptor<Long, Set<Long>> broadcastMapStateDescriptor = new MapStateDescriptor<>(
                "config-keywords",
                BasicTypeInfo.LONG_TYPE_INFO,
                TypeInformation.of(new TypeHint<Set<Long>>() {
                }));

        /********************* kafka source *********************/

        DataStream<CommonModel> logSourceDataStream = SourceFactory.getSourceDataStream(...);

        /********************* dag *********************/
        DataStream<CommonModel> resultDataStream = logSourceDataStream
                .keyBy(KeySelectorFactory.getLongKeySelector(CommonModel::getKeyField))
                .timeWindow(Time.seconds(inputParams.accTimeWindowSeconds))
                .process(new ProcessWindowFunction<CommonModel, CommonModel, Long, TimeWindow>() {

                    private long lastRefreshTimestamp;

                    private Set<Long> oneBucketNeedMonitoredIds;

                    @Override
                    public void open(Configuration parameters) throws Exception {
                        super.open(parameters);
                    }

                    @Override
                    public void process(Long bucket, Context context, Iterable<CommonModel> iterable, Collector<CommonModel> collector) throws Exception {
                        long windowStart = context.window().getStart();
                        this.refreshNeedMonitoredIds(windowStart, bucket);
                        /**
                         * 判斷 commonModel 中的 id 是否在 needMonitoredIds 池中
                         */
                    }

                    public void refreshNeedMonitoredIds(long windowStart, long bucket) {
                        // 每隔 10 秒通路一次
                        if (windowStart - this.lastRefreshTimestamp >= 10000L) {
                            this.lastRefreshTimestamp = windowStart;
                            this.oneBucketNeedMonitoredIds = Rpc.get(bucket, ...)
                        }
                    }
                });

        /********************* kafka sink *********************/
        SinkFactory.setSinkDataStream(...);

        env.execute(inputParams.jobName);
    }
}           

總結

本文首先介紹了,在短視訊領域中,短視訊生産消費資料鍊路的整個閉環,并且其資料鍊路閉環一般情況下也适用于其他場景;以及對應的實時監控方案的設計和不同場景下的代碼實作,包括:

垂類生态短視訊生産消費資料鍊路閉環:使用者操作行為日志的流轉,日志上傳,實時計算,以及流轉到 BI,資料服務,最後資料賦能的整個流程

實時監控方案設計:監控類實時計算流程中各類資料源,資料彙的選型

監控 id 池在不同量級場景下具體代碼實作

學習資料

https://github.com/flink-china/flink-training-course/blob/master/README.md https://ververica.cn/developers-resources/ https://space.bilibili.com/33807709

作者:YangYichao

來源:Flink 微信公衆号

原文連結:

https://mp.weixin.qq.com/s/t_hbmx_xHly9y0nBcZmtnQ

繼續閱讀