本文基于位元組跳動推薦系統基礎服務方向負責人郭文飛在 5 月 22 日 Apache Flink Meetup 分享的《Flink 在位元組跳動推薦特征體系中的落地實踐》整理,主要内容包括:
- 業務背景
- 新一代系統架構
- 後續規劃
GitHub 位址
https://github.com/apache/flink歡迎大家給 Flink 點贊送 star~
2021 年,位元組跳動旗下産品總 MAU 已超過 19 億。在以抖音、今日頭條、西瓜視訊等為代表的産品業務背景下,強大的推薦系統顯得尤為重要。Flink 提供了非常強大的 SQL 子產品和有狀态計算子產品。目前在位元組推薦場景,實時簡單計數特征、視窗計數特征、序列特征已經完全遷移到 Flink SQL 方案上。結合 Flink SQL 和 Flink 有狀态計算能力,我們正在建構下一代通用的基礎特征計算統一架構,期望可以高效支援常用有狀态、無狀态基礎特征的生産。
一、業務背景
對于今日頭條、抖音、西瓜視訊等位元組跳動旗下産品,基于 Feed 流和短時效的推薦是核心業務場景。而推薦系統最基礎的燃料是特征,高效生産基礎特征對業務推薦系統的疊代至關重要。
1. 主要業務場景
- 抖音、火山短視訊等為代表的短視訊應用推薦場景,例如 Feed 流推薦、關注、社交、同城等各個場景,整體在國内大概有 6 億 + 規模 DAU;
- 頭條、西瓜等為代表的 Feed 資訊流推薦場景,例如 Feed 流、關注、子頻道等各個場景,整體在國内有數億規模 DAU;
2. 業務痛點和挑戰
目前位元組跳動推薦場景基礎特征的生産現狀是“百花齊放”。離線特征計算的基本模式都是通過消費 Kafka、BMQ、Hive、HDFS、Abase、RPC 等資料源,基于 Spark、Flink 計算引擎實作特征的計算,而後把特征的結果寫入線上、離線存儲。各種不同類型的基礎特征計算散落在不同的服務中,缺乏業務抽象,帶來了較大的運維成本和穩定性問題。
而更重要的是,缺乏統一的基礎特征生産平台,使業務特征開發疊代速度和維護存在諸多不便。如業務方需自行維護大量離線任務、特征生産鍊路缺乏監控、無法滿足不斷發展的業務需求等。
在位元組的業務規模下,建構統一的實時特征生産系統面臨着較大挑戰,主要來自四個方面:
巨大的業務規模:抖音、頭條、西瓜、火山等産品的資料規模可達到日均 PB 級别。例如在抖音場景下,晚高峰 Feed 播放量達數百萬 QPS,用戶端上報使用者行為資料高達數千萬 IOPS。 業務方期望在任何時候,特征任務都可以做到不斷流、消費沒有 lag 等,這就要求特征生産具備非常高的穩定性。
較高的特征實時化要求:在以直播、電商、短視訊為代表的推薦場景下,為保證推薦效果,實時特征離線生産的時效性需實作常态穩定于分鐘級别。
更好的擴充性和靈活性:随着業務場景不斷複雜,特征需求更為靈活多變。從統計、序列、屬性類型的特征生産,到需要靈活支援視窗特征、多元特征等,業務方需要特征中台能夠支援逐漸衍生而來的新特征類型和需求。
業務疊代速度快:特征中台提供的面向業務的 DSL 需要足夠場景,特征生産鍊路盡量讓業務少寫代碼,底層的計算引擎、存儲引擎對業務完全透明,徹底釋放業務計算、存儲選型、調優的負擔,徹底實作實時基礎特征的規模化生産,不斷提升特征生産力;
3. 疊代演進過程
在位元組業務爆發式增長的過程中,為了滿足各式各樣的業務特征的需求,推薦場景衍生出了衆多特征服務。這些服務在特定的業務場景和曆史條件下較好支援了業務快速發展,大體的曆程如下:
在這其中 2020 年初是一個重要節點,我們開始在特征生産中引入 Flink SQL、Flink State 技術體系,逐漸在計數特征系統、模型訓練的樣本拼接、視窗特征等場景進行落地,探索出新一代特征生産方案的思路。
二、新一代系統架構
結合上述業務背景,我們基于 Flink SQL 和 Flink 有狀态計算能力重新設計了新一代實時特征計算方案。新方案的定位是:解決基礎特征的計算和線上 Serving,提供更加抽象的基礎特征業務層 DSL。
在計算層,我們基于 Flink SQL 靈活的資料處理表達能力,以及 Flink State 狀态存儲和計算能力等技術,支援各種複雜的視窗計算。極大地縮短業務基礎特征的生産周期,提升特征産對外連結路的穩定性。新的架構裡,我們将特征生産的鍊路分為資料源抽取 / 拼接、狀态存儲、計算三個階段。Flink SQL 完成特征資料的抽取和流式拼接,Flink State 完成特征計算的中間狀态存儲。
有狀态特征是非常重要的一類特征,其中最常用的就是帶有各種視窗的特征,例如統計最近 5 分鐘視訊的播放 VV 等。對于視窗類型的特征在位元組内部有一些基于存儲引擎的方案,整體思路是“輕離線重線上”,即把視窗狀态存儲、特征聚合計算全部放在存儲層和線上完成。離線資料流負責基本資料過濾和寫入,離線明細資料按照時間切分聚合存儲(類似于 micro batch),底層的存儲大部分是 KV 存儲、或者專門優化的存儲引擎,線上層完成複雜的視窗聚合計算邏輯,每個請求來了之後線上層拉取存儲層的明細資料做聚合計算。
我們新的解決思路是“輕線上重離線”,即把比較重的 時間切片明細資料 狀态存儲和視窗聚合計算全部放在離線層。視窗結果聚合通過 離線視窗觸發機制 完成,把特征結果 推到 線上 KV 存儲。線上子產品非常輕量級,隻負責簡單的線上 serving,極大地簡化了線上層的架構複雜度。在離線狀态存儲層。我們主要依賴 Flink 提供的 原生狀态存儲引擎 RocksDB,充分利用離線計算叢集本地的 SSD 磁盤資源,極大減輕線上 KV 存儲的資源壓力。
對于長視窗的特征(7 天以上視窗特征),由于涉及 Flink 狀态層明細資料的回溯過程,Flink Embedded 狀态存儲引擎沒有提供特别好的外部資料回灌機制(或者說不适合做)。是以對于這種“狀态冷啟動”場景,我們引入了中心化存儲作為底層狀态存儲層的存儲媒體,整體是 Hybrid 架構。例如 7 天以内的狀态存儲在本地 SSD,7~30 天狀态存儲到中心化的存儲引擎,離線資料回溯可以非常友善的寫入中心化存儲。
除視窗特征外,這套機制同樣适用于其他類型的有狀态特征(如序列類型的特征)。
1. 實時特征分類體系
2. 整體架構
帶有視窗的特征,例如抖音視訊最近 1h 的點贊量(滑動視窗)、直播間使用者最近一個 session 的看播時長(session 視窗)等;
2.1 資料源層
在新的一體化特征架構中,我們統一把各種類型資料源抽象為 Schema Table,這是因為底層依賴的 Flink SQL 計算引擎層對資料源提供了非常友好的 Table Format 抽象。在推薦場景,依賴的資料源非常多樣,每個特征上遊依賴一個或者多個資料源。資料源可以是 Kafka、RMQ、KV 存儲、RPC 服務。對于多個資料源,支援資料源流式、批式拼接,拼接類型包括 Window Join 和基于 key 粒度的 Window Union Join,維表 Join 支援 Abase、RPC、HIVE 等。具體每種類型的拼接邏輯如下:
三種類型的 Join 和 Union 可以組合使用,實作複雜的多資料流拼接。例如 (A union B) Window Join (C Lookup Join D)。
另外,Flink SQL 支援複雜字段的計算能力,也就是業務方可以基于資料源定義的 TableSchema 基礎字段實作擴充字段的計算。業務計算邏輯本質是一個 UDF,我們會提供 UDF API 接口給業務方,然後上傳 JAR 到特征背景加載。另外對于比較簡單的計算邏輯,背景也支援通過送出簡單的 Python 代碼實作多語言計算。
2.2 業務 DSL
從業務視角提供高度抽象的特征生産 DSL 語言,屏蔽底層計算、存儲引擎細節,讓業務方聚焦于業務特征定義。業務 DSL 層提供:資料來源、資料格式、資料抽取邏輯、資料生成特征類型、資料輸出方式等。
2.3 狀态存儲層
如上文所述,新的特征一體化方案解決的主要痛點是:如何應對各種類型(一般是滑動視窗)有狀态特征的計算問題。對于這類特征,在離線計算層架構裡會有一個狀态存儲層,把抽取層提取的 RawFeature 按照切片 Slot 存儲起來 (切片可以是時間切片、也可以是 Session 切片等)。切片類型在内部是一個接口類型,在架構上可以根據業務需求自行擴充。狀态裡面其實存儲的不是原始 RawFeature(存儲原始的行為資料太浪費存儲空間),而是轉化為 FeaturePayload 的一種 POJO 結構,這個結構裡面支援了常見的各種資料結構類型:
- Int:存儲簡單的計數值類型 (多元度 counter);
- HashMap<int, int>:存儲二維計數值,例如 Action Counter,key 為 target_id,value 為計數值;
- SortedMap<int, int>: 存儲 topk 二維計數 ;
-
LinkedList
:存儲 id_list 類型資料;
-
HashMap<int, List
\>:存儲二維 id_list;
- 自定義類型,業務可以根據需求 FeaturePayload 裡面自定義資料類型
狀态層更新的業務接口:輸入是 SQL 抽取 / 拼接層抽取出來的 RawFeature,業務方可以根據業務需求實作 updateFeatureInfo 接口對狀态層的更新。對于常用的特征類型内置實作了 update 接口,業務方自定義特征類型可以繼承 update 接口實作。
/**
* 特征狀态 update 接口
*/
public interface FeatureStateApi extends Serializable {
/**
* 特征更新接口, 上遊每條日志會提取必要字段轉換為 fields, 用來更新對應的特征狀态
*
* @param fields
* context: 儲存特征名稱、主鍵 和 一些配置參數 ;
* oldFeature: 特征之前的狀态
* fields: 平台 / 配置檔案 中的抽取字段
* @return
*/
FeaturePayLoad assign(Context context,FeaturePayLoad feature, Map<String, Object> rawFeature);
}
當然對于無狀态的 ETL 特征是不需要狀态存儲層的。
2.4 計算層
特征計算層完成特征計算聚合邏輯,有狀态特征計算輸入的資料是狀态存儲層存儲的帶有切片的 FeaturePayload 對象。簡單的 ETL 特征沒有狀态存儲層,輸入直接是 SQL 抽取層的資料 RawFeature 對象,具體的接口如下:
有狀态特征聚合接口:
/**
* 有狀态特征計算接口
*/
public interface FeatureStateApi extends Serializable {
/**
* 特征聚合接口,會根據配置的特征計算視窗, 讀取視窗内所有特征狀态,排序後傳入該接口
*
* @param featureInfos, 包含 2 個 field
* timeslot: 特征狀态對應的時間槽
* Feature: 該時間槽的特征狀态
* @return
*/
FeaturePayLoad aggregate(Context context, List<Tuple2<Slot, FeaturePayLoad>> slotStates);
}
無狀态特征計算接口:
/**
* 無狀态特征計算接口
*/
public interface FeatureConvertApi extends Serializable {
/**
* 轉換接口, 上遊每條日志會提取必要字段轉換為 fields, 無狀态計算時,轉換為内部的 feature 類型 ;
*
* @param fields
* fields: 平台 / 配置檔案 中的抽取字段
* @return
*/
FeaturePayLoad convert(Context context, FeaturePayLoad featureSnapshot, Map<String, Object> rawFeatures);
}
另外通過觸發機制來觸發特征計算層的執行,目前支援的觸發機制主要有:
3. 業務落地
目前在位元組推薦場景,新一代特征架構已經在抖音直播、電商、推送、抖音推薦等場景陸續上線了一些實時特征。主要是有狀态類型的特征,帶有視窗的一維統計類型、二維倒排拉鍊類型、二維 TOPK 類型、實時 CTR/CVR Rate 類型特征、序列類型特征等。
在業務核心名額達成方面成效顯著。在直播場景,依托新特征架構強大的表達能力上線了一批特征之後,業務看播核心名額、互動名額收益非常顯著。在電商場景,基于新特征架構上線了 400+ 實時特征。其中在直播電商方面,業務核心 GMV、下單率名額收益顯著。在抖音推送場景,基于新特征架構離線狀态的存儲能力,聚合使用者行為資料然後寫入下遊各路存儲,極大地緩解了業務下遊資料庫的壓力,在一些場景中 QPS 可以下降到之前的 10% 左右。此外,抖音推薦 Feed、評論等業務都在基于新特征架構重構原有的特征體系。
值得一提的是,在電商和抖音直播場景,Flink 流式任務狀态最大已經達到 60T,而且這個量級還在不斷增大。預計不久的将來,單任務的狀态有可能會突破 100T,這對架構的穩定性是一個不小的挑戰。
4. 性能優化
4.1 Flink State Cache
目前 Flink 提供兩類 StateBackend:基于 Heap 的 FileSystemStateBackend 和基于 RocksDB 的 RocksDBStateBackend。對于 FileSystemStateBackend,由于資料都在記憶體中,通路速率很快,沒有額外開銷。而 RocksDBStateBackend 存在查盤、序列化 / 反序列化等額外開銷,CPU 使用量會有明顯上升。在位元組内部有大量使用 State 的作業,對于大狀态作業,通常會使用 RocksDBStateBackend 來管理本地狀态資料。RocksDB 是一個 KV 資料庫,以 LSM 的形式組織資料,在實際使用的過程中,有以下特點:
- 應用層和 RocksDB 的資料互動是以 Bytes 數組的形式進行,應用層每次通路都需要序列化 / 反序列化;
- 資料以追加的形式不斷寫入 RocksDB 中,RocksDB 背景會不斷進行 compaction 來删除無效資料。
業務方使用 State 的場景多是 get-update,在使用 RocksDB 作為本地狀态存儲的過程中,出現過以下問題:
- 爬蟲資料導緻熱 key,狀态會不斷進行更新 (get-update),單 KV 資料達到 5MB,而 RocksDB 追加更新的特點導緻背景在不斷進行 flush 和 compaction,單 task 出現慢節點(抖音直播場景)。
- 電商場景作業多數為大狀态作業 (目前已上線作業狀态約 60TB),業務邏輯中會頻繁進行 State 操作。在融合 Flink State 過程中發現 CPU 的開銷和原有 的 基于記憶體或 abase 的實作有 40%~80% 的升高。經優化後,CPU 開銷主要集中在序列化 / 反序列化的過程中。
針對上述問題,可以通過在記憶體維護一個對象 Cache,達到優化熱點資料通路和降低 CPU 開銷的目的。通過上述背景介紹,我們希望能為 StateBackend 提供一個通用的 Cache 功能,通過 Flink StateBackend Cache 功能設計方案達成以下目标:
- 減少 CPU 開銷 : 通過對熱點資料進行緩存,減少和底層 StateBackend 的互動次數,達到減少序列化 / 反序列化開銷的目的。
- 提升 State 吞吐能力 : 通過增加 Cache 後,State 吞吐能力應比原有的 StateBackend 提供的吞吐能力更高。理論上在 Cache 足夠大的情況下,吞吐能力應和基于 Heap 的 StateBackend 近似。
- Cache 功能通用化 : 不同的 StateBackend 可以直接适配該 Cache 功能。目前我們主要支援 RocksDB,未來希望可以直接提供給别的 StateBackend 使用,例如 RemoteStateBackend。
經過和位元組基礎架構 Flink 團隊的合作,在實時特征生産更新 ,上線 Cache 大部分場景的 CPU 使用率大概會有高達 50% 左右的收益;
4.2 PB IDL 裁剪
在位元組内部的實時特征離線生成鍊路當中,我們主要依賴的資料流是 Kafka。這些 Kafka 都是通過 PB 定義的資料,字段繁多。公司級别的大 Topic 一般會有 100+ 的字段,但大部分的特征生産任務隻使用了其中的部分字段。對于 Protobuf 格式的資料源,我們可以完全通過裁剪資料流,mask 一些非必要的字段來節省反序列化的開銷。PB 類型的日志,可以直接裁剪 idl,保持必要字段的序号不變,在反序列化的時候會跳過 unknown field 的解析,這 對于 CPU 來說是更節省的,但是網絡帶寬不會有收益, 預計裁剪後能節省非常多的 CPU 資源。在上線了 PB IDL 裁剪之後,大部分任務的 CPU 收益在 30% 左右。
5. 遇到的問題
新架構特征生産任務本質就是一個有狀态的 Flink 任務,底層的狀态存儲 StateBackend 主要是本地的 RocksDB。主要面臨兩個比較難解的問題,一是任務 DAG 變化 Checkpoint 失效,二是本地存儲不能很好地支援特征狀态曆史資料回溯。
- 實時特征任務不能動态添加新的特征:對于一個線上的 Flink 實時特征生産任務,我們不能随意添加新的特征。這是由于引入新的特征會導緻 Flink 任務計算的 DAG 發生改變,進而導緻 Flink 任務的 Checkpoint 無法恢複,這對實時有狀态特征生産任務來說是不能接受的。目前我們的解法是禁止更改線上部署的特征任務配置,但這也就導緻了線上生成的特征是不能随便下線的。對于這個問題暫時沒有找到更好的解決辦法,後期仍需不斷探索。
- 特征狀态冷啟動問題:目前主要的狀态存儲引擎是 RocksDB,不能很好地支援狀态資料的回溯。
三、後續規劃
目前新一代架構還在位元組推薦場景中快速演進,目前已較好解決了實時視窗特征的生産問題。
出于實作統一推薦場景下特征生産的目的,我們後續會繼續基于 Flink SQL 流批一體能力,在批式特征生産發力。此外也會基于 Hudi 資料湖技術,完成特征的實時入湖,高效支援模型訓練場景離線特征回溯痛點。規則引擎方向,計劃繼續探索 CEP,推動在電商場景有更多落地實踐。在實時視窗計算方向,将繼續深入調研 Flink 原生視窗機制,以期解決目前方案面臨的視窗特征資料退場問題。
- 支援批式特征:這套特征生産方案主要是解決實時有狀态特征的問題,而目前位元組離線場景下還有大量批式特征是通過 Spark SQL 任務生産的。後續我們也會基于 Flink SQL 流批一體的計算能力,提供對批式場景特征的統一支援,目前也初步有了幾個場景的落地;
- 特征離線入湖:基于 Hudi On Flink 支援實時特征的離線數倉建設,主要是為了支援模型訓練樣本拼接場景離線特征回溯;
- Flink CEP 規則引擎支援:Flink SQL 本質上就是一種規則引擎,目前線上上我們把 Flink SQL 作為業務 DSL 過濾語義底層的執行引擎。但 Flink SQL 擅長表達的 ETL 類型的過濾規則,不能表達帶有時序類型的規則語義。在直播、電商場景的時序規則需要嘗試 Flink CEP 更加複雜的規則引擎。
- Flink Native Windowing 機制引入:對于視窗類型的有狀态特征,我們目前采用上文所述的抽象 SlotState 時間切片方案統一進行支援。另外 Flink 本身提供了非常完善的視窗機制,通過 Window Assigner、Window Trigger 等元件可以非常靈活地支援各種視窗語義。是以後續我們也會在視窗特征計算場景引入 Flink 原生的 Windowing 機制,更加靈活地支援視窗特征疊代。
- Flink HybridState Backend 架構:目前在位元組的線上場景中,Flink 底層的 StateBackend 預設都是使用 RocksDB 存儲引擎。這種内嵌的存儲引擎不能通過外部機制去提供狀态資料的回灌和多任務共享,是以我們需要支援 KV 中心化存儲方案,實作靈活的特征狀态回溯。
- 靜态屬性類型特征統一管理:通過特征平台提供統一的 DSL 語義,統一管理其他外部靜态類型的特征服務。例如一些其他業務團隊次元的使用者分類、标簽服務等。
12 月 4-5 日,北京國家會議中心,Flink Forward Asia 2021 重磅開啟,全球 40+ 多行業一線廠商,80+ 幹貨議題,帶來專屬于開發者的技術盛宴。
大會議程已正式上線,點選下方連結即可免費報名
https://flink-forward.org.cn/更多 Flink 相關技術問題,可掃碼加入社群釘釘交流群
第一時間擷取最新技術文章和社群動态,請關注公衆号~
活動推薦
阿裡雲基于 Apache Flink 建構的企業級産品-實時計算Flink版現開啟活動:
99 元試用
實時計算Flink版(包年包月、10CU)即有機會獲得 Flink 獨家定制衛衣;另包 3 個月及以上還有 85 折優惠!
了解活動詳情:
https://www.aliyun.com/product/bigdata/sc