天天看點

日處理資料量超10億:友信金服基于Flink建構實時使用者畫像系統的實踐

作者 | 楊毅,穆超峰,賀小兵,胡夕

導讀:當今生活節奏日益加快,企業面對不斷增加的海量資訊,其資訊篩選和處理效率低下的困擾與日俱增。由于使用者營銷不夠細化,企業 App 中許多不合時宜或不合偏好的消息推送很大程度上影響了使用者體驗,甚至引發了使用者流失。在此背景下,友信金服公司推行全域的資料體系戰略,通過打通和整合集團各個業務線資料,利用大資料、人工智能等技術建構統一的資料資産,如 ID-Mapping、使用者标簽等。友信金服使用者畫像項目正是以此為背景成立,旨在實作“資料驅動業務與營運”的集團戰略。目前該系統支援日處理資料量超 10 億,接入上百種合規資料源。

一、技術選型

傳統基于 Hadoop 生态的離線資料存儲計算方案已在業界大規模應用,但受制于離線計算的高時延性,越來越多的資料應用場景已從離線轉為實時。這裡引用一張表格對目前主流的實時計算架構做個對比。

日處理資料量超10億:友信金服基于Flink建構實時使用者畫像系統的實踐

Apache Storm 的容錯機制需要對每條資料進行應答(ACK),是以其吞吐量備受影響,在資料大吞吐量的場景下會有問題,是以不适用此項目的需求。

Apache Spark 總體生态更為完善,且在機器學習的內建和應用性暫時領先,但 Spark 底層還是采用微批(Micro Batching)處理的形式。

Apache Flink 在流式計算上有明顯優勢:首先其流式計算屬于真正意義上的單條處理,即每一條資料都會觸發計算。在這一點上明顯與 Spark 的微批流式處理方式不同。其次,Flink 的容錯機制較為輕量,對吞吐量影響較小,使得 Flink 可以達到很高的吞吐量。最後 Flink 還擁有易用性高,部署簡單等優勢。相比之下我們最終決定采用基于 Flink 的架構方案。

二、使用者畫像業務架構

使用者畫像系統目前為集團線上業務提供使用者實時标簽資料服務。為此我們的服務需要打通多種資料源,對海量的數字資訊進行實時不間斷的資料清洗、聚類、分析,進而将它們抽象成标簽,并最終為應用方提供高品質的标簽服務。在此背景下,我們設計使用者畫像系統的整體架構如下圖所示:

日處理資料量超10億:友信金服基于Flink建構實時使用者畫像系統的實踐

整體架構分為五層:

  1. 接入層:接入原始資料并對其進行處理,如 Kafka、Hive、檔案等。
  2. 計算層:選用 Flink 作為實時計算架構,對實時資料進行清洗,關聯等操作。
  3. 存儲層:對清洗完成的資料進行資料存儲,我們對此進行了實時使用者畫像的模型分層與建構,将不同應用場景的資料分别存儲在如 Phoenix,HBase,HDFS,Kafka 等。
  4. 服務層:對外提供統一的資料查詢服務,支援從底層明細資料到聚合層資料的多元計算服務。
  5. 應用層:以統一查詢服務對各個業務線資料場景進行支撐。目前業務主要包含使用者興趣分、使用者品質分、使用者的事實資訊等資料。

三、使用者畫像資料處理流程

在整體架構設計方案設計完成之後,我們針對資料也設計了詳盡的處理方案。在資料處理階段,鑒于 Kafka 高吞吐量、高穩定性的特點,我們的使用者畫像系統統一采用 Kafka 作為分布式釋出訂閱消息系統。資料清洗階段利用 Flink 來實作使用者唯一性識别、行為資料的清洗等,去除備援資料。這一過程支援互動計算和多種複雜算法,并支援資料實時 / 離線計算。目前我們資料處理流程疊代了兩版,具體方案如下:

1.0 版資料處理流程

資料接入、計算、存儲三層處理流程

整體資料來源包含兩種:

  1. 曆史資料:從外部資料源接入的海量曆史業務資料。接入後經過 ETL 處理,進入使用者畫像底層資料表。
  2. 實時資料:從外部資料源接入的實時業務資料,如使用者行為埋點資料,風控資料等。

根據不同業務的名額需求我們直接從集團資料倉庫抽取資料并落入 Kafka,或者直接從業務端以 CDC(Capture Data Change)的方式寫入 Kafka。在計算層,資料被導入到 Flink 中,通過 DataStream 生成 ID-Mapping、使用者标簽碎片等資料,然後将生成資料存入 JanusGraph(JanusGraph 是以 HBase 作為後端存儲的圖資料庫媒體)與 Kafka,并由 Flink 消費落入 Kafka 的使用者标簽碎片資料,進行聚合生成最新的使用者标簽碎片(使用者标簽碎片是由使用者畫像系統擷取來自多種管道的碎片化資料塊處理後生成的)。

日處理資料量超10億:友信金服基于Flink建構實時使用者畫像系統的實踐

資料服務層處理流程

服務層将存儲層存儲的使用者标簽碎片資料,通過 JanusGraph Spark On Yarn 模式,執行 TinkerPop OLAP 計算生成全量使用者 Yids 清單檔案。Yid 是使用者畫像系統中定義的集團級使用者 ID 辨別。結合 Yids 清單檔案,在 Flink 中批量讀取 HBase 聚合成完整使用者畫像資料,生成 HDFS 檔案,再通過 Flink 批量操作新生成的資料生成使用者評分預測标簽,将使用者評分預測标簽落入 Phoenix,之後資料便可通過統一資料服務接口進行擷取。下圖完整地展示了這一流程。

日處理資料量超10億:友信金服基于Flink建構實時使用者畫像系統的實踐

ID-Mapping 資料結構

為了實作使用者标簽的整合,使用者 ID 之間的強打通,我們将使用者 ID 辨別看成圖的頂點、ID pair 關系看作圖的邊,比如已經識别浏覽器 Cookie 的使用者使用手機号登陸了公司網站就形成了對應關系。這樣所有使用者 ID 辨別就構成了一張大圖,其中每個小的連通子圖 / 連通分支就是一個使用者的全部辨別 ID 資訊。

ID-Mapping 資料由圖結構模型建構,圖節點包含 UserKey、Device、IdCard、Phone 等類型,分别表示使用者的業務 ID、裝置 ID、身份證以及電話等資訊。節點之間邊的生成規則是通過解析資料流中包含的節點資訊,以一定的優先級順序進行節點之間的連接配接,進而生成節點之間的邊。比如,識别了使用者手機系統的 Android_ID,之後使用者使用郵箱登陸了公司 App,在系統中找到了業務線 UID 就形成了和關系的 ID pair,然後系統根據節點類型進行優先級排序,生成 Android_ID、mail、UID 的關系圖。資料圖結構模型如下圖所示:

日處理資料量超10億:友信金服基于Flink建構實時使用者畫像系統的實踐

Gephi

1.0 版本資料處理流程性能瓶頸

1.0 版本資料處理流程在系統初期較好地滿足了我們的日常需求,但随着資料量的增長,該方案遇到了一些性能瓶頸:

  1. 首先,這版的資料處理使用了自研的 Java 程式來實作。随着資料量上漲,自研 JAVA 程式由于資料量暴增導緻 JVM 記憶體大小不可控,同時它的維護成本很高,是以我們決定在新版本中将處理邏輯全部遷移至 Flink 中。
  2. 其次,在生成使用者标簽過程中,ID-Mapping 出現很多大的連通子圖(如下圖所示)。這通常是因為使用者的行為資料比較随機離散,導緻部分節點間連接配接混亂。這不僅增加了資料的維護難度,也導緻部分資料被“污染”。另外這類異常大的子圖會嚴重降低 JanusGraph 與 HBase 的查詢性能。
日處理資料量超10億:友信金服基于Flink建構實時使用者畫像系統的實踐
  1. 最後,該版方案中資料經 Protocol Buffer(PB)序列化之後存入 HBase,這會導緻合并 / 更新使用者畫像标簽碎片的次數過多,使得一個标簽需要讀取多次 JanusGraph 與 HBase,這無疑會加重 HBase 讀取壓力。此外,由于資料經過了 PB 序列化,使得其原始存儲格式不可讀,增加了排查問題的難度。

鑒于這些問題,我們提出了 2.0 版本的解決方案。在 2.0 版本中,我們通過利用 HBase 列式存儲、修改圖資料結構等優化方案嘗試解決以上三個問題。

2.0 版資料處理流程

版本流程優化點

如下圖所示,2.0 版本資料處理流程大部分承襲了 1.0 版本。新版本資料處理流程在以下幾個方面做了優化:

日處理資料量超10億:友信金服基于Flink建構實時使用者畫像系統的實踐

2.0 版本資料處理流程

  1. 曆史資料的離線補錄方式由 JAVA 服務變更為使用 Flink 實作。
  2. 優化使用者畫像圖資料結構模型,主要是對邊的連接配接方式進行了修改。之前我們會判斷節點的類型并根據預設的優先級順序将多個節點進行連接配接,新方案則采用以 UserKey 為中心的連接配接方式。做此修改後,之前的大的連通子圖(圖 6)優化為下面的小的連通子圖(圖 8),同時解決了資料污染問題,保證了資料準确性。另外,1.0 版本中一條資料需要平均讀取十多次 HBase 的情況也得到極大緩解。采用新方案之後平均一條資料隻需讀取三次 HBase,進而降低 HBase 六七倍的讀取壓力(此處優化是資料計算層優化)。
日處理資料量超10億:友信金服基于Flink建構實時使用者畫像系統的實踐
  1. 舊版本是用 Protocol Buffer 作為使用者畫像資料的存儲對象,生成使用者畫像資料後作為一個列整體存入 HBase。新版本使用 Map 存儲使用者畫像标簽資料,Map 的每對 KV 都是單獨的标簽,KV 在存入 HBase 後也是單獨的列。新版本存儲模式利用 HBase 做列的擴充與合并,直接生成完整使用者畫像資料,去掉 Flink 合并 / 更新使用者畫像标簽過程,優化資料加工流程。使用此方案後,存入 HBase 的标簽資料具備了即席查詢功能。資料具備即席查詢是指在 HBase 中可用特定條件直接檢視指定标簽資料詳情的功能,它是資料治理可以實作校驗資料品質、資料生命周期、資料安全等功能的基礎條件。
  2. 在資料服務層,我們利用 Flink 批量讀取 HBase 的 Hive 外部表生成使用者品質分等資料,之後将其存入 Phoenix。相比于舊方案中 Spark 全量讀 HBase 導緻其讀壓力過大,進而會出現叢集節點當機的問題,新方案能夠有效地降低 HBase 的讀取壓力。經過我們線上驗證,新方案對 HBase 的讀負載下降了數十倍(此處優化與 2 優化不同,屬于服務層優化)。

四、問題

目前,線上部署的使用者畫像系統中的資料絕大部分是來自于 Kafka 的實時資料。随着資料量越來越多,系統的壓力也越來越大,以至于出現了 Flink 背壓與 Checkpoint 逾時等問題,導緻 Flink 送出 Kafka 位移失敗,進而影響了資料一緻性。這些線上出現的問題讓我們開始關注 Flink 的可靠性、穩定性以及性能。針對這些問題,我們進行了詳細的分析并結合自身的業務特點,探索并實踐出了一些相應的解決方案。

CheckPointing 流程分析與性能優化方案

CheckPointing 流程分析

下圖展示了 Flink 中 checkpointing 執行流程圖:

日處理資料量超10億:友信金服基于Flink建構實時使用者畫像系統的實踐

Flink 中 checkpointing 執行流程

  1. Coordinator 向所有 Source 節點發出 Barrier。
  2. Task 從輸入中收到所有 Barrier 後,将自己的狀态寫入持久化存儲中,并向自己的下遊繼續傳遞 Barrier。
  3. 當 Task 完成狀态持久化之後将存儲後的狀态位址通知到 Coordinator。
  4. 當 Coordinator 彙總所有 Task 的狀态,并将這些資料的存放路徑寫入持久化存儲中,完成 CheckPointing。

性能優化方案

通過以上流程分析,我們通過三種方式來提高 Checkpointing 性能。這些方案分别是:

  1. 選擇合适的 Checkpoint 存儲方式
  2. 合理增加算子(Task)并行度
  3. 縮短算子鍊(Operator Chains)長度

CheckPoint 存儲方式有 MemoryStateBackend、FsStateBackend 和 RocksDBStateBackend。由官方文檔可知,不同 StateBackend 之間的性能以及安全性是有很大差異的。通常情況下,MemoryStateBackend 适合應用于測試環境,線上環境則最好選擇 RocksDBStateBackend。

這有兩個原因:首先,RocksDBStateBackend 是外部存儲,其他兩種 Checkpoint 存儲方式都是 JVM 堆存儲。受限于 JVM 堆記憶體的大小,Checkpoint 狀态大小以及安全性可能會受到一定的制約;其次,RocksDBStateBackend 支援增量檢查點。增量檢查點機制(Incremental Checkpoints)僅僅記錄對先前完成的檢查點的更改,而不是生成完整的狀态。與完整檢查點相比,增量檢查點可以顯著縮短 checkpointing 時間,但代價是需要更長的恢複時間。

Checkpointing 需要對每個 Task 進行資料狀态采集。單個 Task 狀态資料越多則 Checkpointing 越慢。是以我們可以通過增加 Task 并行度,減少單個 Task 狀态資料的數量來達到縮短 CheckPointing 時間的效果。

Flink 算子鍊(Operator Chains)越長,Task 也會越多,相應的狀态資料也就更多,Checkpointing 也會越慢。通過縮短算子鍊長度,可以減少 Task 數量,進而減少系統中的狀态資料總量,間接的達到優化 Checkpointing 的目的。下面展示了 Flink 算子鍊的合并規則:

  1. 上下遊的并行度一緻
  2. 下遊節點的入度為 1
  3. 上下遊節點都在同一個 Slot Group 中
  4. 下遊節點的 Chain 政策為 ALWAYS
  5. 上遊節點的 Chain 政策為 ALWAYS 或 HEAD
  6. 兩個節點間資料分區方式是 Forward
  7. 使用者沒有禁用 Chain

基于以上這些規則,我們在代碼層面上合并了相關度較大的一些 Task,使得平均的操作算子鍊長度至少縮短了 60%~70%。

Flink 背壓産生過程分析及解決方案

背壓産生過程分析

在 Flink 運作過程中,每一個操作算子都會消費一個中間 / 過渡狀态的流,并對它們進行轉換,然後生産一個新的流。這種機制可以類比為:Flink 使用阻塞隊列作為有界的緩沖區。跟 Java 裡阻塞隊列一樣,一旦隊列達到容量上限,處理速度較慢的消費者會阻塞生産者向隊列發送新的消息或事件。下圖展示了 Flink 中兩個操作算子之間的資料傳輸以及如何感覺到背壓的:

日處理資料量超10億:友信金服基于Flink建構實時使用者畫像系統的實踐

首先,Source 中的事件進入 Flink 并被操作算子 1 處理且被序列化到 Buffer 中,然後操作算子 2 從這個 Buffer 中讀出該事件。當操作算子 2 處理能力不足的時候,操作算子 1 中的資料便無法放入 Buffer,進而形成背壓。背壓出現的原因可能有以下兩點:

  1. 下遊算子處理能力不足;
  2. 資料發生了傾斜。

背壓解決方案

實踐中我們通過以下方式解決背壓問題。首先,縮短算子鍊會合理的合并算子,節省出資源。其次縮短算子鍊也會減少 Task(線程)之間的切換、消息的序列化 / 反序列化以及資料在緩沖區的交換次數,進而提高系統的整體吞吐量。最後,根據資料特性将不需要或者暫不需要的資料進行過濾,然後根據業務需求将資料分别處理,比如有些資料源需要實時的處理,有些資料是可以延遲的,最後通過使用 keyBy 關鍵字,控制 Flink 時間視窗大小,在上遊算子處理邏輯中盡量合并更多資料來達到降低下遊算子的處理壓力。

優化結果

經過以上優化,在每天億級資料量下,使用者畫像可以做到實時資訊實時處理并無持續背壓,Checkpointing 平均時長穩定在 1 秒以内。

五、未來工作的思考和展望

端到端的實時流處理

目前使用者畫像部分資料都是從 Hive 資料倉庫拿到的,資料倉庫本身是 T+1 模式,資料延時性較大,是以為了提高資料實時性,端到端的實時流處理很有必要。

端到端是指一端采集原始資料,另一端以報表 / 标簽 / 接口的方式對這些對數進行呈現與應用,連接配接兩端的是中間實時流。在後續的工作中,我們計劃将現有的非實時資料源全部切換到實時資料源,統一經過 Kafka 和 Flink 處理後再導入到 Phoenix/JanusGraph/HBase。強制所有資料源資料進入 Kafka 的一個好處在于它能夠提高整體流程的穩定性與可用性:首先 Kafka 作為下遊系統的緩沖,可以避免下遊系統的異常影響實時流的計算,起到“削峰填谷”的作用;其次,Flink 自 1.4 版本開始正式支援與 Kafka 的端到端精确一次處理語義,在一緻性方面上更有保證。

日處理資料量超10億:友信金服基于Flink建構實時使用者畫像系統的實踐

作者介紹:

楊毅:友信金服計算平台部 JAVA 工程師

穆超峰:友信金服計算平台部資料開發進階工程師

賀小兵:友信金服計算平台部資料開發工程師

胡夕:友信金服計算平台部技術總監