本文由京東搜尋算法架構團隊分享,主要介紹 Apache Flink 在京東商品搜尋排序線上學習中的應用實踐。文章的主要大綱如下:
1、背景
2、京東搜尋線上學習架構
3、實時樣本生成
4、Flink Online Learning
5、監控系統
6、規劃總結
一、背景
在京東的商品搜尋排序中,經常會遇到搜尋結果多樣性不足導緻系統非最優解的問題。為了解決資料馬太效應帶來的模型商品排序多樣性的不足,我們利用基于二項式湯普森采樣模組化,但是該算法仍存在對所有使用者采用一緻的政策,未有效考慮使用者和商品的個性化資訊。基于該現狀,我們采取線上學習,使深度學習和湯普森采樣融合,實作個性化多樣性排序方案,實時更新模型的關參數。
在該方案中,Flink 主要應用于實時樣本的生成和 online learning 的實作。在線上學習過程中,樣本是模型訓練的基石,在超大規模樣本資料的處理上,我們對比了 Flink、Storm 和 Spark Streaming 之後,最終選擇用 Flink 作為實時樣本流資料的生産以及疊代 online learning 參數的架構。線上學習的整體鍊路特别長,涉及線上端特征日志、流式特征處理、流式特征與使用者行為标簽關聯、異常樣本處理、模型動态參數實時訓練與更新等環節,online learning 對樣本處理和參數狀态處理的準确性和穩定性要求較高,任何一個階段都有可能出現問題,為此我們接入京東的 observer 體系,擁有完整的全鍊路監控系統,保證各個階段資料的穩定性和完整性;下面我們首先介紹一下京東搜尋線上學習架構。
二、京東搜尋線上學習架構
京東搜尋的排序模型系統架構主要包括以下幾個部分:
1、Predictor 是模型預估服務,在 load 模型中分為 static 部分和 dynamic 部分,static 部分由離線資料訓練得到,主要學習 user 和 doc 的稠密特征表示,dynamic 部分主要包含 doc 粒度的權重向量,這部分由實時的 online learning 任務實時更新。
2、Rank 主要包括一些排序政策,在排序最終結果确定之後,會實時落特征日志,将 doc 的特征按順序寫入特征資料流,作為後續實時樣本的資料源(feature)。
3、Feature Collector 的任務是承接線上預估系統發出的特征資料,對下遊屏蔽緩存、去重、篩選等線上系統特有邏輯,産出 Query+Doc 粒度的特征流。
4、Sample join 的任務将上面的 feature 資料、曝光、點選、加購、下單等使用者行為标簽資料作為資料源,通過 Flink 的 union + timer 資料模型關聯成為符合業務要求的樣本資料,算法可根據目标需求選擇不同的标簽作為正負樣本标記。
5、Online learning 任務負責消費上遊生成的實時樣本做訓練,負責更新 model 的 dynamic 部分。

三、實時樣本生成
Online Learning 對于線上樣本生成的時效性和準确性都有很高的要求,同時也對作業的穩定性有很高的要求。在海量使用者日志資料實時湧入的情況下,我們不僅要保證作業的資料延時低、樣本關聯率高且任務穩定,而且作業的吞吐不受影響、資源使用率達到最高。
京東搜尋排序線上樣本的主要流程如下:
1、資料源大緻有曝光流、feature 流和使用者行為流等作為實時樣本的資料源,統一以 JDQ 管道流的形式,由京東實時計算平台提供平台支撐。
2、接到 feature 流和曝光流、label 流後,進行資料清洗,得到任務需要的資料格式。
3、拿到各個标準流後,對各個流進行 union 操作,之後進行 keyby。
4、我們在 process function 裡面添加 Flink timer 定時器,作為樣本生成的實時視窗。
5、将生成的樣本實時落入 jdq 和 HDFS,jdq 可以用作後面的 online learning 的 input,HDFS 持久存儲樣本資料,用于離線訓練、增量學習和資料分析。
線上樣本任務優化實踐:
京東搜尋樣本資料吞吐量每秒達到 GB 規模,對分布式處理分片、超大狀态和異常處理提出很高的優化要求。
1、資料傾斜
使用 keyby 的時候,難免會有資料傾斜的情況,這裡我們假設 key 設計合理、 shuffle 方式選擇正确、任務沒有反壓且資源足夠使用,由于任務 parallelism 設定導緻的資料傾斜的情況。我們先看 Flink 裡面 key 是如何被分發到 subtask 上面的。
keygroup = assignToKeyGroup(key, maxParallelism)
subtaskNum = computeOperatorIndexForKeyGroup(maxParallelism, parallelism, keyGroupId)
假設我們的并發設定的是 300,那麼 maxParallelism 就是 512,如此設計,必然導緻有的 subtask 分布 1 個 keygroup 有的配置設定兩個,同時也導緻了資料自然傾斜。針對上述問題,有兩個解決方案:
● 設定并行度為 2 的 n 次方;
● 設定最大并行度為 并行度的 n 倍。
如果使用方案 1 ,調整并發的話隻能調整 2 的幂次,建議使用方案 2,且假如 parallelism 為 300,maxParallelism 設定為 1200 的情況下假如資料還是有傾斜,可以再相應的把 maxParallelism 設定大一些保證每個 keygroup 的 key 少一些,如此也可以降低資料傾斜的發生。
2、large checkpoint
線上樣本用到了 Flink 的 state,我們之前預設将 state 放到了記憶體裡面,但是随着放量的增加,state 資料量激增,發現 GC 時間特别長,之後改變政策,将 state 放入了 RocksDB,GC 問題得以解決。我們針對 checkpoint 做了如下配置:
● 開啟增量 checkpoint;
● 合理設定 checkpoint 的逾時時間、間隔時間和最小暫停時間。
image.png● 讓 Flink 自己管理 RocksDB 占用的記憶體,對 RocksDB 的 blockcache、writebuffer 等進行調優。
● 優化 state 的資料使用,将 state 資料放入多個 state object 裡面使用,降低序列化/反序列化的代價。
在任務調優的時候我們發現我們的任務通路 RocksDB 的時間非常長,檢視 jstack 發現,很多線程都在等待資料的序列化和反序列化,随着算法特征的逐漸增多,樣本中的特征個數超過 500 個,使得每條資料的量級越來越大。但是在做樣本關聯的時候其實是不需要特征關聯的,隻需要相應的主鍵關聯就可以了,是以,我們用 ValueState 存儲主鍵,用 MapState/ListState 存儲特征等值。當然了還可以将這些特征值存儲到外部存儲裡面,這裡就需要對網絡 io 和 本地 io 之間的選擇做一個取舍。
● failure recovery 的時候開啟本地恢複。
由于我們的 checkpoint 資料達到了 TB 級别,一旦任務發生 failover,不管是針對 HDFS 還是針對任務本身,壓力都非常大,是以,我們優先使用本地進行 recovery,這樣,不僅可以降低 HDFS 的壓力還可以增加 recovery 的速度。
四、Flink Online Learning
對于 online learning,我們先介紹一下伯努利湯普森采樣算法,假設每個商品的 reward 機率服從 Beta 分布,是以給每個商品維護兩個參數成功次數 si 及失敗次數 fi,及所有商品的公共先驗參數成功次數 α 和失敗次數 β。
每次根據商品相應的 Beta 分布采樣為最優商品的期望 reward: Q(at) = θi,并選擇期望 reward 最大的商品展現給使用者。最後根據環境給出真實 reward,更新模型相應的參數達到 online learning 的效果。該參數代表一個商品特征,用一個 n 維向量表示,該向量由原始特征通過 MLP 網絡預測得到。原始特征經過 DNN 網絡得到一個 N 維向量作為該商品的個性化表征,采用 Logistic Regression 函數模組化似然函數,利用 Flink 建構該表征和實時回報所組成的實時樣本,用于不斷疊代近似更新參數分布。
1、資料有序性保證
從 jdq 接過實時樣本之後,由于之前并沒有保證資料的有序性,這裡采用 watermark 機制保證資料的有序性。
2、樣本資料處理
把隻曝光無行為的商品看做負樣本,有點選及後續行為的商品看做正樣本,當視窗将達到一定正負比例或資料量時進行一次 batch 訓練,疊代出新的參數向量,将商品 embedding 資料放到 Flink 的 state 裡面,之後作為 model 的 dynamic 部分更新參數。
3、 同步疊代、異步疊代
個性化 ee 參數線上學習采用異步更新方式的時候,存在參數更新順序錯亂問題,這會降低線上學習模型收斂速度,進而造成了流量的浪費,是以,參數異步更新方式更改為同步更新方式,避免參數讀寫錯亂問題。在同步更新的方式下,存儲在 status 中的參數向量需要在下一次訓練疊代時使用,若參數發生丢失會使該商品的疊代過程中斷,為防止系統風險造成參數丢失,設計了參數雙重保障。一般的任務異常或重新開機後參數可從 checkpoint 或 savepoint 中恢複,如果意外情況下參數無法恢複,從遠端線上服務中取回上一版參數并記錄到 state。
4、多試驗版本支援
線上學習任務使用同一個 Flink 任務來支援多個版本模型在不同實驗桶下進行 AB 實驗,通過版本号區分不同的 AB 流量桶,對應的實時樣本以 docid+version 作為 key 進行處理,疊代過程互不影響。
5、custom serialization
為了提高帶寬使用率以及性能的需求,我們内部采用 pb 格式傳輸資料,經過調研,pb 的傳輸格式優于 Flink 的兜底的 general class 的 kryo 序列化方式,是以我們采用了 Flink 的 custom serialization 解決方案,直接用 pb 格式在 op 之間傳輸資料。
五、監控系統
這裡我們區分業務全鍊路監控和任務穩定性相關監控,具體情況下面将詳細介紹。
1、全鍊路監控
整個系統使用京東内部的 observer 平台來實作業務全鍊路監控,主要包括 predictor 服務相關的監控、feature dump 的 QPS 監控、特征和标簽品質監控、關聯情況監控、train 相關的監控以及 AB 名額相關的一些監控,如下:
2、任務穩定性監控
任務穩定性監控這裡主要是指 Flink 的任務穩定性監控,鍊路吞吐量達 GB/s規模,特征消息 QPS 達 10W 規模,且 online learning 的不可間斷性,不管對于線上樣本任務還是 online learning 的任務,相關監控報警都是必不可少的。
■ 容器的記憶體、cpu 監控、thread 個數,gc 監控
■ 樣本相關業務監控
六、規劃總結
Flink 在實時資料處理方面有優秀的性能、容災、吞吐等表現、算子豐富易上手使用、自然支援批流一體化,且目前已有線上學習的架構開源,做線上學習是個不二的選擇,随着機器學習資料規模的擴大和對資料時效性、模型時效性要求的提升,線上學習不僅僅作為離線模型訓練的補充,更成為模型系統效率發展的趨勢。為此我們做的規劃如下:
作者緻謝:感謝實時計算研發部、搜尋排序算法團隊的支援。