本文整理自快手資料技術專家李天朔在 5 月 22 日北京站 Flink Meetup 分享的議題《快手基于 Flink 建構實時數倉場景化實踐》,内容包括:
- 快手實時計算場景
- 快手實時數倉架構及保障措施
- 快手場景問題及解決方案
- 未來規劃
GitHub 位址
https://github.com/apache/flink歡迎大家給 Flink 點贊送 star~
一、快手實時計算場景

快手業務中的實時計算場景主要分為四塊:
- 公司級别的核心資料:包括公司經營大盤,實時核心日報,以及移動版資料。相當于團隊會有公司的大盤名額,以及各個業務線,比如視訊相關、直播相關,都會有一個核心的實時看闆;
- 大型活動實時名額:其中最核心的内容是實時大屏。例如快手的春晚活動,我們會有一個總體的大屏去看總體活動現狀。一個大型的活動會分為 N 個不同的子產品,我們對每一個子產品不同的玩法會有不同的實時資料看闆;
-
營運部分的資料:營運資料主要包括兩方面,一個是創作者,另一個是内容。對于創作者和内容,在營運側,比如上線一個大 V 的活動,我們想看到一些資訊如直播間的實時現狀,以及直播間對于大盤的牽引情況。基于這個場景,我們會做各種實時大屏的多元資料,以及大盤的一些資料。
此外,這塊還包括營運政策的支撐,比如我們可能會實時發掘一些熱點内容和熱點創作者,以及目前的一些熱點情況。我們基于這些熱點情況輸出政策,這個也是我們需要提供的一些支撐能力;
最後還包括 C 端資料展示,比如現在快手裡有創作者中心和主播中心,這裡會有一些如主播關播的關播頁,關播頁的實時資料有一部分也是我們做的。
- 實時特征:包含搜尋推薦特征和廣告實時特征。
二、快手實時數倉架構及保障措施
1. 目标及難點
1.1 目标
- 首先由于我們是做數倉的,是以希望所有的實時名額都有離線名額去對應,要求實時名額和離線名額整體的資料差異在 1% 以内,這是最低标準。
- 其次是資料延遲,其 SLA 标準是活動期間所有核心報表場景的資料延遲不能超過 5 分鐘,這 5 分鐘包括作業挂掉之後和恢複時間,如果超過則意味着 SLA 不達标。
- 最後是穩定性,針對一些場景,比如作業重新開機後,我們的曲線是正常的,不會因為作業重新開機導緻名額産出一些明顯的異常。
1.2 難點
- 第一個難點是資料量大。每天整體的入口流量資料量級大概在萬億級。在活動如春晚的場景,QPS 峰值能達到億 / 秒。
- 第二個難點是元件依賴比較複雜。可能這條鍊路裡有的依賴于 Kafka,有的依賴 Flink,還有一些依賴 KV 存儲、RPC 接口、OLAP 引擎等,我們需要思考在這條鍊路裡如何分布,才能讓這些元件都能正常工作。
- 第三個難點是鍊路複雜。目前我們有 200+ 核心業務作業,50+ 核心資料源,整體作業超過 1000。
2. 實時數倉 - 分層模型
基于上面三個難點,來看一下數倉架構:
如上所示:
- 最下層有三個不同的資料源,分别是用戶端日志、服務端日志以及 Binlog 日志;
- 在公共基礎層分為兩個不同的層次,一個是 DWD 層,做明細資料,另一個是 DWS 層,做公共聚合資料,DIM 是我們常說的次元。我們有一個基于離線數倉的主題預分層,這個主題預分層可能包括流量、使用者、裝置、視訊的生産消費、風控、社交等。
- DWD 層的核心工作是标準化的清洗;
- DWS 層是把次元的資料和 DWD 層進行關聯,關聯之後生成一些通用粒度的聚合層次。
- 再往上是應用層,包括一些大盤的資料,多元分析的模型以及業務專題資料;
- 最上面是場景。
整體過程可以分為三步:
- 第一步是做業務資料化,相當于把業務的資料接進來;
- 第二步是資料資産化,意思是對資料做很多的清洗,然後形成一些規則有序的資料;
- 第三步是資料業務化,可以了解資料在實時資料層面可以反哺業務,為業務資料價值建設提供一些賦能。
3. 實時數倉 - 保障措施
基于上面的分層模型,來看一下整體的保障措施:
保障層面分為三個不同的部分,分别是品質保障,時效保障以及穩定保障。
- 我們先看藍色部分的品質保障。針對品質保障,可以看到在資料源階段,做了如資料源的亂序監控,這是我們基于自己的 SDK 的采集做的,以及資料源和離線的一緻性校準。研發階段的計算過程有三個階段,分别是研發階段、上線階段和服務階段。
- 研發階段可能會提供一個标準化的模型,基于這個模型會有一些 Benchmark,并且做離線的比對驗證,保證品質是一緻的;
- 上線階段更多的是服務監控和名額監控;
- 在服務階段,如果出現一些異常情況,先做 Flink 狀态拉起,如果出現了一些不符合預期的場景,我們會做離線的整體資料修複。
- 第二個是時效性保障。針對資料源,我們把資料源的延遲情況也納入監控。在研發階段其實還有兩個事情:
- 首先是壓測,正常的任務會拿最近 7 天或者最近 14 天的峰值流量去看它是否存在任務延遲的情況;
- 通過壓測之後,會有一些任務上線和重新開機性能評估,相當于按照 CP 恢複之後,重新開機的性能是什麼樣子。
- 最後一個是穩定保障,這在大型活動中會做得比較多,比如切換演練和分級保障。我們會基于之前的壓測結果做限流,目的是保障作業在超過極限的情況下,仍然是穩定的,不會出現很多的不穩定或者 CP 失敗的情況。之後我們會有兩種不同的标準,一種是冷備雙機房,另外一種是熱備雙機房。
- 冷備雙機房是:當一個單機房挂掉,我們會從另一個機房去拉起;
- 熱備雙機房:相當于同樣一份邏輯在兩個機房各部署一次。
以上就是我們整體的保障措施。
三、快手場景問題及解決方案
1. PV/UV 标準化
1.1 場景
第一個問題是 PV/UV 标準化,這裡有三個截圖:
第一張圖是春晚活動的預熱場景,相當于是一種玩法,第二和第三張圖是春晚當天的發紅包活動和直播間截圖。
在活動進行過程中,我們發現 60~70% 的需求是計算頁面裡的資訊,如:
- 這個頁面來了多少人,或者有多少人點選進入這個頁面;
- 活動一共來了多少人;
- 頁面裡的某一個挂件,獲得了多少點選、産生了多少曝光。
1.2 方案
抽象一下這個場景就是下面這種 SQL:
簡單來說,就是從一張表做篩選條件,然後按照次元層面做聚合,接着産生一些 Count 或者 Sum 操作。
基于這種場景,我們最開始的解決方案如上圖右邊所示。
我們用到了 Flink SQL 的 Early Fire 機制,從 Source 資料源取資料,之後做了 DID 的分桶。比如最開始紫色的部分按這個做分桶,先做分桶的原因是防止某一個 DID 存在熱點的問題。分桶之後會有一個叫做 Local Window Agg 的東西,相當于資料分完桶之後把相同類型的資料相加。Local Window Agg 之後再按照次元進行 Global Window Agg 的合桶,合桶的概念相當于按照次元計算出最終的結果。Early Fire 機制相當于在 Local Window Agg 開一個天級的視窗,然後每分鐘去對外輸出一次。
這個過程中我們遇到了一些問題,如上圖左下角所示。
在代碼正常運作的情況下是沒有問題的,但如果整體資料存在延遲或者追溯曆史資料的情況,比如一分鐘 Early Fire 一次,因為追溯曆史的時候資料量會比較大,是以可能導緻 14:00 追溯曆史,直接讀到了 14:02 的資料,而 14:01 的那個點就被丢掉了,丢掉了以後會發生什麼?
在這種場景下,圖中上方的曲線為 Early Fire 回溯曆史資料的結果。橫坐标是分鐘,縱坐标是截止到目前時刻的頁面 UV,我們發現有些點是橫着的,意味着沒有資料結果,然後一個陡增,然後又橫着的,接着又一個陡增,而這個曲線的預期結果其實是圖中下方那種平滑的曲線。
為了解決這個問題,我們用到了 Cumulate Window 的解決方案,這個解決方案在 Flink 1.13 版本裡也有涉及,其原理是一樣的。
資料開一個大的天級視窗,大視窗下又開了一個小的分鐘級視窗,資料按資料本身的 Row Time 落到分鐘級視窗。
- Watermark 推進過了視窗的 event_time,它會進行一次下發的觸發,通過這種方式可以解決回溯的問題,資料本身落在真實的視窗, Watermark 推進,在視窗結束後觸發。
- 此外,這種方式在一定程度上能夠解決亂序的問題。比如它的亂序資料本身是一個不丢棄的狀态,會記錄到最新的累計資料。
- 最後是語義一緻性,它會基于事件時間,在亂序不嚴重的情況下,和離線計算出來的結果一緻性是相當高的。
以上是 PV/UV 一個标準化的解決方案。
2. DAU 計算
2.1 背景介紹
下面介紹一下 DAU 計算:
我們對于整個大盤的活躍裝置、新增裝置和回流裝置有比較多的監控。
- 活躍裝置指的是當天來過的裝置;
- 新增裝置指的是當天來過且曆史沒有來過的裝置;
- 回流裝置指的是當天來過且 N 天内沒有來過的裝置。
但是我們計算過程之中可能需要 5~8 個這樣不同的 Topic 去計算這幾個名額。
我們看一下離線過程中,邏輯應該怎麼算。
首先我們先算活躍裝置,把這些合并到一起,然後做一個次元下的天級别去重,接着再去關聯次元表,這個次元表包括裝置的首末次時間,就是截止到昨天裝置首次通路和末次通路的時間。
得到這個資訊之後,我們就可以進行邏輯計算,然後我們會發現新增和回流的裝置其實是活躍裝置裡打的一個子标簽。新增裝置就是做了一個邏輯處理,回流裝置是做了 30 天的邏輯處理,基于這樣的解決方案,我們能否簡單地寫一個 SQL 去解決這個問題?
其實我們最開始是這麼做的,但遇到了一些問題:
- 第一個問題是:資料源是 6~8 個,而且我們大盤的口徑經常會做微調,如果是單作業的話,每次微調的過程之中都要改,單作業的穩定性會非常差;
- 第二個問題是:資料量是萬億級,這會導緻兩個情況,首先是這個量級的單作業穩定性非常差,其次是實時關聯維表的時候用的 KV 存儲,任何一個這樣的 RPC 服務接口,都不可能在萬億級資料量的場景下保證服務穩定性;
- 第三個問題是:我們對于時延要求比較高,要求時延小于一分鐘。整個鍊路要避免批處理,如果出現了一些任務性能的單點問題,我們還要保證高性能和可擴容。
2.2 技術方案
針對以上問題,介紹一下我們是怎麼做的:
如上圖的例子,第一步是對 A B C 這三個資料源,先按照次元和 DID 做分鐘級别去重,分别去重之後得到三個分鐘級别去重的資料源,接着把它們 Union 到一起,然後再進行同樣的邏輯操作。
這相當于我們資料源的入口從萬億變到了百億的級别,分鐘級别去重之後再進行一個天級别的去重,産生的資料源就可以從百億變成了幾十億的級别。
在幾十億級别資料量的情況下,我們再去關聯資料服務化,這就是一種比較可行的狀态,相當于去關聯使用者畫像的 RPC 接口,得到 RPC 接口之後,最終寫入到了目标 Topic。這個目标 Topic 會導入到 OLAP 引擎,供給多個不同的服務,包括移動版服務,大屏服務,名額看闆服務等。
這個方案有三個方面的優勢,分别是穩定性、時效性和準确性。
- 首先是穩定性。松耦合可以簡單了解為當資料源 A 的邏輯和資料源 B 的邏輯需要修改時,可以單獨修改。第二是任務可擴容,因為我們把所有邏輯拆分得非常細粒度,當一些地方出現了如流量問題,不會影響後面的部分,是以它擴容比較簡單,除此之外還有服務化後置和狀态可控。
- 其次是時效性,我們做到毫秒延遲,并且次元豐富,整體上有 20+ 的次元做多元聚合。
- 最後是準确性,我們支援資料驗證、實時監控、模型出口統一等。
此時我們遇到了另外一個問題 - 亂序。對于上方三個不同的作業,每一個作業重新開機至少會有兩分鐘左右的延遲,延遲會導緻下遊的資料源 Union 到一起就會有亂序。
2.3 延遲計算方案
遇到上面這種有亂序的情況下,我們要怎麼處理?
我們總共有三種處理方案:
-
第一種解決方案是用 “did + 次元 + 分鐘” 進行去重,Value 設為 “是否來過”。比如同一個 did,04:01 來了一條,它會進行結果輸出。同樣的,04:02 和 04:04 也會進行結果輸出。但如果 04:01 再來,它就會丢棄,但如果 04:00 來,依舊會進行結果輸出。
這個解決方案存在一些問題,因為我們按分鐘存,存 20 分鐘的狀态大小是存 10 分鐘的兩倍,到後面這個狀态大小有點不太可控,是以我們又換了解決方案 2。
-
第二種解決方案,我們的做法會涉及到一個假設前提,就是假設不存在資料源亂序的情況。在這種情況下,key 存的是 “did + 次元”,Value 為 “時間戳”,它的更新方式如上圖所示。
04:01 來了一條資料,進行結果輸出。04:02 來了一條資料,如果是同一個 did,那麼它會更新時間戳,然後仍然做結果輸出。04:04 也是同樣的邏輯,然後将時間戳更新到 04:04,如果後面來了一條 04:01 的資料,它發現時間戳已經更新到 04:04,它會丢棄這條資料。
這樣的做法大幅度減少了本身所需要的一些狀态,但是對亂序是零容忍,不允許發生任何亂序的情況,由于我們不好解決這個問題,是以我們又想出了解決方案 3。
-
方案 3 是在方案 2 時間戳的基礎之上,加了一個類似于環形緩沖區,在緩沖區之内允許亂序。
比如 04:01 來了一條資料,進行結果輸出;04:02 來了一條資料,它會把時間戳更新到 04:02,并且會記錄同一個裝置在 04:01 也來過。如果 04:04 再來了一條資料,就按照相應的時間差做一個位移,最後通過這樣的邏輯去保障它能夠容忍一定的亂序。
綜合來看這三個方案:
- 方案 1 在容忍 16 分鐘亂序的情況下,單作業的狀态大小在 480G 左右。這種情況雖然保證了準确性,但是作業的恢複和穩定性是完全不可控的狀态,是以我們還是放棄了這個方案;
- 方案 2 是 30G 左右的狀态大小,對于亂序 0 容忍,但是資料不準确,由于我們對準确性的要求非常高,是以也放棄了這個方案;
- 方案 3 的狀态跟方案 1 相比,它的狀态雖然變化了但是增加的不多,而且整體能達到跟方案 1 同樣的效果。方案 3 容忍亂序的時間是 16 分鐘,我們正常更新一個作業的話,10 分鐘完全足夠重新開機,是以最終選擇了方案 3。
3. 營運場景
3.1 背景介紹
營運場景可分為四個部分:
- 第一個是資料大屏支援,包括單直播間的分析資料和大盤的分析資料,需要做到分鐘級延遲,更新要求比較高;
- 第二個是直播看闆支援,直播看闆的資料會有特定次元的分析,特定人群支援,對次元豐富性要求比較高;
- 第三個是資料政策榜單,這個榜單主要是預測熱門作品、爆款,要求的是小時級别的資料,更新要求比較低;
- 第四個是 C 端實時名額展示,查詢量比較大,但是查詢模式比較固定。
下面進行分析這 4 種不同的狀态産生的一些不同的場景。
前 3 種基本沒有什麼差别,隻是在查詢模式上,有的是特定業務場景,有的是通用業務場景。
針對第 3 種和第 4 種,它對于更新的要求比較低,對于吞吐的要求比較高,過程之中的曲線也不要求有一緻性。第 4 種查詢模式更多的是單實體的一些查詢,比如去查詢内容,會有哪些名額,而且對 QPS 要求比較高。
3.2 技術方案
針對上方 4 種不同的場景,我們是如何去做的?
-
首先看一下基礎明細層 (圖中左方),資料源有兩條鍊路,其中一條鍊路是消費的流,比如直播的消費資訊,還有觀看 / 點贊 / 評論。經過一輪基礎清洗,然後做次元管理。上遊的這些次元資訊來源于 Kafka,Kafka 寫入了一些内容的次元,放到了 KV 存儲裡邊,包括一些使用者的次元。
這些次元關聯了之後,最終寫入 Kafka 的 DWD 事實層,這裡為了做性能的提升,我們做了二級緩存的操作。
- 如圖中上方,我們讀取 DWD 層的資料然後做基礎彙總,核心是視窗次元聚合生成 4 種不同粒度的資料,分别是大盤多元彙總 topic、直播間多元彙總 topic、作者多元彙總 topic、使用者多元彙總 topic,這些都是通用次元的資料。
-
如圖中下方,基于這些通用次元資料,我們再去加工個性化次元的資料,也就是 ADS 層。拿到了這些資料之後會有次元擴充,包括内容擴充和營運次元的拓展,然後再去做聚合,比如會有電商實時 topic,機構服務實時 topic 和大 V 直播實時 topic。
分成這樣的兩個鍊路會有一個好處:一個地方處理的是通用次元,另一個地方處理的是個性化的次元。通用次元保障的要求會比較高一些,個性化次元則會做很多個性化的邏輯。如果這兩個耦合在一起的話,會發現任務經常出問題,并且分不清楚哪個任務的職責是什麼,建構不出這樣的一個穩定層。
- 如圖中右方,最終我們用到了三種不同的引擎。簡單來說就是 Redis 查詢用到了 C 端的場景,OLAP 查詢用到了大屏、業務看闆的場景。
四、未來規劃
上文一共講了三個場景,第一個場景是标準化 PU/UV 的計算,第二個場景是 DAU 整體的解決方案,第三個場景是營運側如何解決。基于這些内容,我們有一些未來規劃,分為 4 個部分。
- 第一部分是實時保障體系完善:
- 一方面做一些大型的活動,包括春晚活動以及後續常态化的活動。針對這些活動如何去保障,我們有一套規範去做平台化的建設;
- 第二個是分級保障标準制定,哪些作業是什麼樣的保障級别 / 标準,會有一個标準化的說明;
- 第三個是引擎平台能力推動解決,包括 Flink 任務的一些引擎,在這上面我們會有一個平台,基于這個平台去做規範、标準化的推動。
- 第二部分是實時數倉内容建構:
- 一方面是場景化方案的輸出,比如針對活動會有一些通用化的方案,而不是每次活動都開發一套新的解決方案;
- 另一方面是内容資料層次沉澱,比如現在的資料内容建設,在厚度方面有一些場景的缺失,包括内容如何更好地服務于上遊的場景。
- 第三部分是 Flink SQL 場景化建構,包括 SQL 持續推廣、SQL 任務穩定性和 SQL 任務資源使用率。我們在預估資源的過程中,會考慮比如在什麼樣 QPS 的場景下, SQL 用什麼樣的解決方案,能支撐到什麼情況。Flink SQL 可以大幅減少人效,但是在這個過程中,我們想讓業務操作更加簡單。
- 第四部分是批流一體探索。實時數倉的場景其實就是做離線 ETL 計算加速,我們會有很多小時級别的任務,針對這些任務,每次批處理的時候有一些邏輯可以放到流處理去解決,這對于離線數倉 SLA 體系的提升十分巨大。
更多 Flink 相關技術問題,可掃碼加入社群釘釘交流群
第一時間擷取最新技術文章和社群動态,請關注公衆号~
活動推薦
阿裡雲基于 Apache Flink 建構的企業級産品-實時計算Flink版現開啟活動:
99 元試用
實時計算Flink版(包年包月、10CU)即有機會獲得 Flink 獨家定制T恤;另包 3 個月及以上還有 85 折優惠!
了解活動詳情:
https://www.aliyun.com/product/bigdata/sc