天天看點

如何基于 Flink 生成線上機器學習的樣本?

作者:曹富強(微網誌)

線上機器學習與離線相比,在模型更新的時效性,模型的疊代周期,業務實驗效果等方面有更好的表現。是以将機器學習從離線遷移到線上已經成為提升業務名額的一個有效的手段。

線上機器學習中,樣本是關鍵的一環。本文将給大家詳細的介紹微網誌是如何用 Flink 來實作線上樣本生成的。

為何選擇 Flink 來做線上的樣本生成?

線上樣本生成對樣本的時效性和準确性都有極高的要求。同樣對作業的穩定性及是否容災也都有嚴格的名額要求。基于這個前提,我們對目前較為流行的幾種實時計算架構(Storm 0.10, Spark 2.11, Flink 1.10)進行了分析比較,結論如下:

如何基于 Flink 生成線上機器學習的樣本?

是以,我們決定使用 Flink 來作為線上樣本生成的實時流計算架構。

如何實作?

線上樣本生成,簡單描述一個業務場景:對使用者的曝光資料和點選資料實時的做關聯,關聯後将資料輸出到 Kafka 中,給下遊的線上訓練作業用。

首先我們要确定兩個資料流關聯的時間視窗。這一步一般建議先離線對兩個資料流的日志做關聯,通過離線的方式對兩份資料在不同的時間範圍内做 join,來判斷線上需要的時間視窗。比如業務接受的最低關聯比例是 85%,并且通過離線測試确認 20 分鐘内兩個資料流可以關聯 85%的資料,那麼就可以采用 20 分鐘作為時間視窗。這裡的關聯比例和視窗時間實際上是在準确性和實時性之間的一個 trade-off。

确定時間視窗後,我們并沒有使用 Flink 的 time window 來實作多個資料流的 join,而是選擇采用 union + timer 方式來實作。這裡主要考慮兩點:第一、Flink 自帶的 join 操作不支援多個資料流。第二、使用 timer+state 來實作,自定義程度更高,限制更少,也更友善。

接下來,我們把樣本生成過程細分為:

① 輸入資料流

一般我們的資料源包括 Kafka,Trigger,MQ 等。Flink 需要從資料源中實時的讀取日志。

② 輸入資料流的格式化和過濾

讀取日志後,對資料做格式化,并且過濾掉不需要的字段和資料。

指定樣本 join 的 key。例如:使用者 id 和 内容 id 作 key。

輸出的資料格式一般為 tuple2(K,V),K:參與 join 的 key。V:樣本用到的字段。

③ 輸入資料流的 union

使用 Flink 的 union 操作,将多個輸入流疊加到一起,形成一個 DataStream。

為每個輸入流指定一個可以區分的别名或者增加一個可以區分的字段。

④ 輸入資料流的聚合:keyby 操作

對 join 的 key 做 keyby 操作。接上例,表示按照使用者 id 和内容 id 對多個資料流做 join。

如果 key 存在資料傾斜的情況,建議對 key 加随機數後先聚合,去掉随機數後再次聚合。

⑤ 資料存儲 state + timer

  1. 定義一個Value State。
  2. keyby後的process方法中,我們會重寫processElement方法,在processElement方法中判斷,如果value state為空,則new 一個新的state,并将資料寫到value state中,并且為這條資料注冊一個timer(timer會由Flink按key+timestamp自動去重),另外此處我們使用的是ProcessingTime(表示onTimer()在系統時間戳達到Timer設定的時間戳時觸發)。如果不為空則按照拼接的政策,更新已經存在的結果。比如:時間視窗内 使用者id1,内容id1的第一條日志資料沒有點選行為,則這個字段為0,第二條點選資料進入後,将這個字段更新為1。當然除了更新操作,還有計數、累加、均值等各種操作。如何在process裡區分資料是來自曝光還是點選呢,使用上面步驟③定義的别名。
  3. 重寫onTimer方法,在onTimer方法中主要是定義定時器觸發時執行的邏輯:從value state裡擷取到存入的資料,并将資料輸出。然後執行state.clear。
  4. 樣本從視窗輸出的條件有2個:第一,timer到期。第二,業務需要的樣本都拼接上了。

此處參考僞代碼:

public class StateSampleFunction extends KeyedProcessFunction<String, Tuple2, ReturnSample> {
    /**
     * 這個狀态是通過過程函數來維護,使用ValueState
     */
    private ValueState state;

    private Long timer = null;

    public StateSampleFunction (String time){
        timer = Long.valueOf(time);
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        // 擷取state
        state = getRuntimeContext().getState(new ValueStateDescriptor<>("state", TypeInformation.of(new TypeHint< ReturnSample >() {})));
    }

    @Override
    public void processElement(Tuple2value, Context context, Collector< ReturnSample > collector) throws Exception {
        if (value.f0 == null){
            return;
        }

        Object sampleValue = value.f1;
        Long time = context.timerService().currentProcessingTime();
        ReturnSample returnSample = state.value();
        if (returnSample == null) {
            returnSample = new ReturnSample();
            returnSample.setKey(value.f0);
            returnSample.setTime(time);
            context.timerService().registerProcessingTimeTimer(time +timer);
        }

        // 更新點選資料到state裡
        if (sampleValue instanceof ClickLog){
            ClickLog clickLog = (ClickLog)values;
            returnSample =(ReturnSample) clickLog.setSample(returnSample);
        }
        state.update(returnSample);
    }

    /**
     * @param timestamp
     * @param ctx
     * @param out
     * @throws Exception
     */
    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector< ReturnSample > out) throws Exception {
        ReturnSample value = state.value();
        state.clear();
        out.collect(value);
    }
}           

⑥ 拼接後的日志格式化和過濾

拼接後的資料需要按照線上訓練作業的要求對資料做格式化,比如 json、CSV 等格式。

過濾:決定什麼樣的資料是合格的樣本。例如:有真正閱讀的内容才算是可用的樣本。

⑦ 輸出

樣本最終輸出到實時的資料隊列中。下面是實際的作業拓撲和運作時狀态:

如何基于 Flink 生成線上機器學習的樣本?
如何基于 Flink 生成線上機器學習的樣本?

整個樣本拼接過程的流程圖:

如何基于 Flink 生成線上機器學習的樣本?

StateBackend 的選取

使用 RocksDB/Gemini 作為 state 的 Backend 的優勢和建議:

我們用大資料對 memory 和 RocksDB,Gemini 做了實驗對比,結果顯示 RocksDB 和 Gemin 在資料處理,作業穩定性和資源使用等方面比 memory 更合理。其中 Gemini 的優勢最為明顯。

此外,如果是大資料量的 state,建議使用 Gemini + SSD 固态硬碟。

樣本的監控

1. Flink 作業的異常監控

  • 作業失敗監控
  • Failover 監控
  • Checkpoint 失敗的監控
  • RocksDB 使用情況的監控
  • 作業消費 Kafka 的 Comsumer Lag 的監控
  • 作業反壓的監控

2. 樣本輸入端 Kafka 的消費延遲監控

3. 樣本輸出端 Kafka 的寫入量的監控

4. 樣本監控

  • 拼接率監控
  • 正樣本監控
  • 輸出樣本格式的監控
  • 輸出标簽對應的值是否在正常範圍
  • 輸入标簽對應的值是否為 null
  • 輸出标簽對應的值是否為空

樣本的校驗

樣本生成後,如何驗證資料是否準确

  1. 線上和離線的互相校驗
    将線上樣本從輸出的 Kafka 中接入到 HDFS 上離線存儲。并按照線上 join 的時間視窗來分區。           
  2. 用同等條件下生成的離線樣本和線上樣本做對比
  3. 白名單使用者的全流程校驗
    将白名單使用者的日志和樣本結果存入 ES 等實時數倉中,來做校驗。
               

故障的處理

樣本異常對線上模型訓練的影響非常大。當發現異常報警時,首先要做的是向線上模型訓練作業發送樣本異常的報警。收到報警資訊後,模型停止更新。進而避免影響模型線上效果。

普通意義的業務故障解決後,丢棄原來的資料,所有輸入日志流從最新的時間點開始消費并生成新的樣本即可。重要業務需要重置輸入日志流的 Kafka offset 從故障時間點開始重新生成樣本資料。

平台化

通過平台化對樣本生成的流程做出嚴格的規範非常重要。在平台化的過程中,需要提供簡單通用的開發模闆以提高作業開發效率;提供平台化的作業監控和樣本名額監控架構,避免重複造車;提供通用的樣本輸出落地政策,和線上/離線校驗政策,更便捷的為業務方服務。

微網誌基于 Flink 搭建的線上樣本生成平台架構,如圖:

如何基于 Flink 生成線上機器學習的樣本?

UI 頁面,如圖:

如何基于 Flink 生成線上機器學習的樣本?
如何基于 Flink 生成線上機器學習的樣本?

基于平台化開發,使用者隻需要關心業務邏輯部分即可。需要使用者開發的有:

  1. 對應輸入資料的資料清洗邏輯
  2. 樣本輸出前的資料清洗邏輯

其餘的在 UI 上配置即可實作,具體有:

  1. 輸入 Kafka 的配置資訊及對應資料清洗的 UDF 類
  2. 樣本拼接的時間視窗
  3. 視窗内對字段的聚合操作
  4. 樣本輸出的 Kafka 配置資訊及輸出前資料清洗和格式化的 UDF 類

資源情況由平台方稽核并配置。完成後,自動生成并送出作業。

如何基于 Flink 生成線上機器學習的樣本?

作業送出後:

1. 平台會提供如前所述的作業相關監控,如下:

■ Flink 作業的異常監控

■ 樣本監控

2. 平台會自動将資料落盤,存儲到HDFS上。友善離線驗證或者離線訓練。

3. 使用者隻需将精力放到樣本的驗證上即可,由平台方保證作業的穩定性。

作者介紹:

曹富強,微網誌機器學習研發中心-進階系統工程師。現負責微網誌機器學習平台資料計算/資料存儲子產品,主要涉及實時計算 Flink、Storm、Spark Streaming,資料存儲Kafka、Redis,離線計算 Hive、Spark 等。目前專注于 Flink/Kafka/Redis 在微網誌機器學習場景的應用,為機器學習提供架構,技術,應用層面的支援。

繼續閱讀