天天看點

快手基于 Apache Flink 的優化實踐

本次由快手劉建剛老師分享,内容主要分為三部分。首先介紹流式計算的基本概念, 然後介紹 Flink 的關鍵技術,最後講講 Flink 在快手生産實踐中的一些應用,包括實時名額計算和快速 failover。

一、流式計算的介紹

流式計算主要針對 unbounded data(無界資料流)進行實時的計算,将計算結果快速的輸出或者修正。

這部分将分為三個小節來介紹。第一,介紹大資料系統發展史,包括初始的批處理到現在比較成熟的流計算;第二,為大家簡單對比下批處理和流處理的差別;第三,介紹流式計算裡面的關鍵問題,這是每個優秀的流式計算引擎所必須面臨的問題。

1、大資料系統發展史

快手基于 Apache Flink 的優化實踐

上圖是 2003 年到 2018 年大資料系統的發展史,看看是怎麼一步步走到流式計算的。

2003 年,Google 的 MapReduce 橫空出世,通過經典的 Map&Reduce 定義和系統容錯等保障來友善處理各種大資料。很快就到了 Hadoop,被認為是開源版的  MapReduce, 帶動了整個apache開源社群的繁榮。再往後是谷歌的 Flume,通過算子連接配接等 pipeline 的方式解決了多個 MapReduce 作業連接配接處理低效的問題。

流式系統的開始以 Storm 來介紹。Storm 在2011年出現, 具備延時短、性能高等特性, 在當時頗受喜愛。但是 Storm 沒有提供系統級别的 failover 機制,無法保障資料一緻性。那時的流式計算引擎是不精确的,lamda 架構組裝了流處理的實時性和批處理的準确性,曾經風靡一時,後來因為難以維護也逐漸沒落。

接下來出現的是 Spark Streaming,可以說是第一個生産級别的流式計算引擎。Spark Streaming 早期的實作基于成熟的批處理,通過 mini batch 來實作流計算,在 failover 時能夠保障資料的一緻性。

Google 在流式計算方面有很多探索,包括 MillWheel、Cloud Dataflow、Beam,提出了很多流式計算的理念,對其他的流式計算引擎影響很大。

再來看 Kafka。Kafka 并非流式計算引擎,但是對流式計算影響特别大。Kafka 基于log 機制、通過 partition 來儲存實時資料,同時也能存儲很長時間的曆史資料。流式計算引擎可以無縫地與kafka進行對接,一旦出現 Failover,可以利用 Kafka 進行資料回溯,保證資料不丢失。另外,Kafka 對 table 和 stream 的探索特别多,對流式計算影響巨大。

Flink 的出現也比較久,一直到 2016 年左右才火起來的。Flink 借鑒了很多 Google 的流式計算概念,使得它在市場上特别具有競争力。後面我會詳細介紹 Flink 的一些特點。

2、批處理與流計算的差別

批處理和流計算有什麼樣的差別,這是很多同學有疑問的地方。我們知道 MapReduce 是一個批處理引擎,Flink 是一個流處理引擎。我們從四個方面來進行一下對比:

1)使用場景

MapReduce 是大批量檔案處理,這些檔案都是 bounded data,也就是說你知道這個檔案什麼時候會結束。相比而言,Flink 處理的是實時的 unbounded data,資料源源不斷,可能永遠都不會結束,這就給資料完備性和 failover 帶來了很大的挑戰。

快手基于 Apache Flink 的優化實踐

2)容錯

MapReduce 的容錯手段包括資料落盤、重複讀取、最終結果可見等。檔案落盤可以有效儲存中間結果,一旦 task 挂掉重新開機就可以直接讀取磁盤資料,隻有作業成功運作完了,最終結果才對使用者可見。這種設計的哲理就是你可以通過重複讀取同一份資料來産生同樣的結果,可以很好的處理 failover。

Flink 的容錯主要通過定期快照和資料回溯。每隔一段時間,Flink就會插入一些 barrier,barrier 從 source 流動到 sink,通過 barrier 流動來控制快照的生成。快照制作完就可以儲存在共享引擎裡。一旦作業出現問題,就可以從上次快照進行恢複,通過資料回溯來重新消費。

3)性能

MapReduce 主要特點是高吞吐、高延時。高吞吐說明處理的資料量非常大;高延時就是前面說到的容錯問題,它必須把整個作業處理完才對使用者可見。

Flink 主要特點是高吞吐、低延時。在流式系統裡,Flink 的吞吐是很高的。同時,它也可以做到實時處理和輸出,讓使用者快速看到結果。

4)計算過程

MapReduce 主要通過 Map 和 reduce 來計算。Map 負責讀取資料并作基本的處理, reduce 負責資料的聚合。使用者可以根據這兩種基本算子,組合出各種各樣的計算邏輯。

Flink 為使用者提供了 pipeline 的 API 和批流統一的 SQL。通過 pipeline  的 API, 使用者可以友善地組合各種算子建構複雜的應用;Flink SQL 是一個更高層的 API 抽象,極大地降低了使用者的使用門檻。 

3、流式計算的關鍵問題

這部分主要通過四個問題給大家解答流式計算的關鍵問題,也是很多計算引擎需要考慮的問題。

1)What

What 是指通過什麼樣的算子來進行計算。主要包含三個方面的類型,element-wise 表示一對一的計算,aggregating 表示聚合操作,composite 表示多對多的計算。

2)Where

aggregating 會進行一些聚合的計算, 主要是在各種 window 裡進行計算。視窗包含滑動視窗、滾動視窗、會話視窗。視窗會把無界的資料切分成有界的一個個資料塊進行處理,後面我們會詳細介紹這點。

3)When

When 就是什麼時候觸發計算。視窗裡面有資料,由于輸入資料是無窮無盡的,很難知道一個視窗的資料是否全部到達了。流式計算主要通過 watermark 來保障資料的完備性,通過 trigger 來決定何時觸發。當接收到數值為 X 的 Watermark 時,可以認為所有時間戳小于等于X的事件全部到達了。一旦 watermark 跨過視窗結束時間,就可以通過 trigger 來觸發計算并輸出結果。

4)How

How 主要指我們如何重新定義同一視窗的多次觸發結果。前面也說了 trigger 是用來觸發視窗的, 一個視窗可能會被觸發多次,比如1分鐘的視窗每 10 秒觸發計算一次。處理方式主要包含三種:

  • Discarding,丢棄之前的狀态重新計算。這種方式每次的觸發結果都是互不關聯的,多次觸發結果的組合反映了全部的視窗内容,下遊一般會再次聚合;
  • Accumulating,這個就是一個聚合的狀态,比如說第二次觸發的時候是在第一次的結果上進行計算的,下遊隻需要儲存最新的結果即可;
  • Accumulating 和 retracting,這個主要在 Accumulating 的基礎上加了一個 retracting,retracting 的意思就是撤銷。視窗再次觸發時,會告訴下遊撤銷上一次的計算結果,并告知最新的結果。Flink SQL 的聚合就使用了這種 retract的模式。

二、Flink 關鍵技術

1、Flink 簡介

Flink 是一款分布式計算引擎, 既可以進行流式計算,也可以進行批處理。下圖是官網對 Flink 的介紹:

快手基于 Apache Flink 的優化實踐

Flink 可以運作在 k8s、yarn、mesos 等資源排程平台上,依賴 hdfs 等檔案系統,輸入包含事件和各種其他資料,經過 Flink 引擎計算後再輸出到其他中間件或者資料庫等。

Flink 有兩個核心概念:

  • State:Flink 可以處理有狀态的資料,通過自身的 state 機制來保障作業failover時資料不丢失;
  • Event Time:允許使用者按照事件時間來處理資料,通過 watermark 來推動時間前進,這個後面還會詳細介紹。主要是系統的時間和事件的時間。

Flink 主要通過上面兩個核心技術來保證 exactly-once, 比如說作業 Failover 的時候狀态不丢失,就好像沒發生故障一樣。

2、快照機制

Flink 的快照機制主要是為了保障作業 failover 時不丢失狀态。Flink 提供了一種輕量級的快照機制,不需要停止作業就可以幫助使用者持久化記憶體中的狀态資料。 

快手基于 Apache Flink 的優化實踐

上圖中的 markers(與 barrier 語義相同)通過流動來觸發快照的制作,每一個編号都代表了一次快照,比如編号為 n 的 markers 從最上遊流動到最下遊就代表了一次快照的制作過程。簡述如下:

  • 系統發送編号為 n 的 markers 到最上遊的算子,markers 随着資料往下遊流動;
  • 當下遊算子收到 marker 後,就開始将自身的狀态儲存到共享存儲中;
  • 當所有最下遊的算子接收到 marker 并完成算子快照後,本次作業的快照制作完成。

一旦作業失敗,重新開機時就可以從快照恢複。

下面為一個簡單的 demo 說明(barrier 等同于 marker)。

快手基于 Apache Flink 的優化實踐
  • barrier 到達 Source,将狀态 offset=7 存儲到共享存儲;
  • barrier 到達 Task,将狀态 sum=21 存儲到共享存儲;
  • barrier 到達 Sink,commit 本次快照,标志着快照的成功制作。
快手基于 Apache Flink 的優化實踐

這時候突然間作業也挂掉, 重新開機時 Flink 會通過快照恢複各個狀态。Source 會将自身的 offset 置為 7,Task 會将自身的 sum 置為 21。現在我們可以認為 1、2、3、4、5、6 這 6 個數字的加和結果并沒有丢失。這個時候,offset 從 7 開始消費,跟作業失敗前完全對接了起來,確定了 exactly-once。 

3、事件時間

時間類型分為兩種:

  • Event time(事件時間),指事件發生的時間,比如采集資料時的時間;
  • Processing time(系統時間),指系統的時間,比如處理資料時的時間。

如果你對資料的準确性要求比較高的話,采用 Event time 能保障 exactly-once。Processing Time 一般用于實時消費、精準性要求略低的場景,主要是因為時間生成不是 deterministic。

我們可以看下面的關系圖, X 軸是 Event time,Y 軸是 Processing time。理想情況下 Event time 和 Processing time 是相同的,就是說隻要有一個事件發生,就可以立刻處理。但是實際場景中,事件發生後往往會經過一定延時才會被處理,這樣就會導緻我們系統的時間往往會滞後于事件時間。這裡它們兩個的差 Processing-time lag 表示我們處理事件的延時。

快手基于 Apache Flink 的優化實踐

事件時間常用在視窗中,使用 watermark 來確定資料完備性,比如說 watermarker 值大于 window 末尾時間時,我們就可以認為 window 視窗所有資料都已經到達了,就可以觸發計算了。

快手基于 Apache Flink 的優化實踐

比如上面 [0-10] 的視窗,現在 watermark 走到了 10,已經到達了視窗的結束,觸發計算 SUM=21。如果要是想對遲到的資料再進行觸發,可以再定義一下後面 late data 的觸發,比如說後面來了個 9,我們的 SUM 就等于 30。

4、視窗機制

視窗機制就是把無界的資料分成資料塊來進行計算,主要有三種視窗。

  • 滾動視窗:固定大小的視窗,相鄰視窗沒有交集;
  • 滑動視窗:每個視窗的大小是一樣的,但是兩個視窗之間會有重合;
  • 會話視窗:根據活躍時間聚合而成的視窗, 比如活躍時間超過3分鐘新起一個視窗。視窗之間留有一定的間隔。
快手基于 Apache Flink 的優化實踐

視窗會自動管理狀态和觸發計算,Flink 提供了豐富的視窗函數來進行計算。主要包括以下兩種:

  • ProcessWindowFunction,全量計算會把所有資料緩存到狀态裡,一直到視窗結束時統一計算。相對來說,狀态會比較大,計算效率也會低一些;
  • AggregateFunction,增量計算就是來一條資料就算一條,可能我們的狀态就會特别的小,計算效率也會比 ProcessWindowFunction 高很多,但是如果狀态存儲在磁盤頻繁通路狀态可能會影響性能。
快手基于 Apache Flink 的優化實踐

三、快手 Flink 實踐

1、應用概括

快手應用概括主要是分為資料接入、Flink 實時計算、資料應用、資料展示四個部分。各層各司其職、銜接流暢,為使用者提供一體化的資料服務流程。

快手基于 Apache Flink 的優化實踐

2、實時名額計算

常見的實時名額計算包括 uv、pv 和 sum。這其中 uv 的計算最為複雜也最為經典。下面我将重點介紹 uv。

uv 指的是不同使用者的個數,我們這邊計算的就是不同 deviceld 的個數,主要的挑戰來自三方面:

  • 使用者數多,資料量大。活動期間的 QPS 經常在千萬級别,實際計算起來特别複雜;
  • 實時性要求高,通常為幾秒到分鐘結果的輸出;
  • 穩定性要求高,比如說我們在做春晚活動時候要求故障時間需要低于2%或更少。

針對各種各樣的 uv 計算,我們提供了一套成熟的計算流程。主要包含了三方面:

  • 字典方案:将 string 類型的 deviceld 轉成 long 類型,友善後續的 uv 計算;
  • 傾斜處理:比如某些大 V 會導緻資料嚴重傾斜,這時候就需要打散處理;
  • 增量計算:比如計算 1 天的 uv,每分鐘輸出一次結果。

字典方案需要確定任何兩個不同的 deviceId 不能映射到相同的 long 類型數字上。快手内部主要使用過以下三種方案:

快手基于 Apache Flink 的優化實踐
  • HBase, 基于 partition 分區建立 deviceld 到 id 的映射, 通過緩存和批量通路來加速;
  • Redis, 這種方案嚴格來說不屬于字典,主要通過 key-value 來判斷資料是否首次出現,基于首次資料來計算 uv,這樣就會把 pv 和 uv 的計算進行統一;
  • 最後就是一個 Flink 内部自建的全局字典實作 deviceld 到 id 的轉換,之後計算UV。

這三種方案裡面,前兩種屬于外部存儲的字典方案,優點是可以做到多個作業共享 1 份資料, 缺點是外部通路慢而且不太穩定。最後一種 Flink 字典方案基于 state,不依賴外部存儲, 性能高但是無法多作業共享。

接下來我們重點介紹基于Flink自身的字典方案,下圖主要是建立一個 deviceld 到 id 的映射:

快手基于 Apache Flink 的優化實踐

主要分成三步走:

1)建立 Partition 分區, 指定一個比較大的 Partition 分區個數,該個數比較大并且不會變,根據 deviceld 的哈希值将其映射到指定 partition。

2)建立 id 映射。每個 Partition 都有自己負責的 id 區間,確定 Partition 之間的long 類型的 id 不重複, partition 内部通過自增 id 來確定每個 deviceId 對應一個 id。

3)使用 keyed state 儲存 id 映射。這樣我們的作業出現并發的大改變時,可以友善的 rescale,不需要做其他的操作。

除了 id 轉換,後面就是一個實時名額計算的常見問題,就是資料傾斜。業界常見的解決資料傾斜處理方案主要是兩種:

  • 打散再聚合:先将傾斜的資料打散計算,然後再聚合計算結果;
  • Local-aggregate:先在本地計算預聚合,這樣會大大減少下遊的資料壓力。

二者的本質是一樣的,都是先預聚合再彙總,進而避免單點性能問題。

快手基于 Apache Flink 的優化實踐

上圖為計算最小值的熱點問題,紅色資料為熱點資料。如果直接将它們打到同一個分區,會出現性能問題。為了解決傾斜問題,我們通過hash政策将資料分成小的 partition 來計算,如上圖的預計算,最後再将中間結果彙總計算。

當一切就緒後,我們來做增量的 UV 計算,比如計算 1 天 uv,每分鐘輸出 1 次結果。計算方式既可以采用 API,也可以采用 SQL。

針對 API,我們選擇了 global state+bitmap 的組合,既嚴格遵循了 Event Time 又減少了 state 大小:

快手基于 Apache Flink 的優化實踐

下面為計算流程(需要注意時區問題):

  • 定義跟觸發間隔一樣大小的 window(比如 1 分鐘);
  • Global state 用來儲存跨視窗的狀态,我們采用 bitmap 來存儲狀态;
  • 每隔一個 window 觸發一次,輸出起始至今的 UV;
  • 目前作用域(比如 1 天)結束,清空狀态重新開始。

針對 SQL,增量計算支援的還不是那麼完善,但是可以利用 early-fire 的參數來提前觸發視窗。

配置如下:

table.exec.emit.early-fire.enabled:
truetable.exec.emit.early-fire.delay:60 s      

early-fire.delay 就是每分鐘輸出一次結果的意思。

SQL 如下:

SELECT TUMBLE_ROWTIME(eventTime, interval ‘1’ day) AS rowtime, dimension, count(distinct id) as uv
FROM person
GROUP BY TUMBLE(eventTime, interval '1' day), dimension      

如果遇到傾斜,可以參考上一步來處理。

3、快速 failover

最後看下我們部門最近發力的一個方向,如何快速 failover。

Flink 作業都是 long-running 的線上作業,很多對可用性的要求特别高,尤其是跟公司核心業務相關的作業,SLA 要求 4 個 9 甚至更高。當作業遇到故障時,如何快速恢複對我們來說是一個巨大的挑戰。

下面分三個方面來展開:

  • Flink 目前已有的快速恢複方案;
  • 基于 container 宕掉的快速恢複;
  • 基于機器宕掉的快速恢複。

1)Flink 目前已有的快速恢複方案

Flink 目前已有的快速恢複方案主要包括以下兩種:

  • region failover。如果流式作業的 DAG 包含多個子圖或者 pipeline,那麼 task 失敗時隻會影響其所屬的子圖或者 pipeline ,而不用整個 DAG 都重新啟動;
  • local recovery。在 Flink 将快照同步到共享存儲的同時,在本地磁盤也儲存一份快照。作業失敗恢複時,可以排程到上次部署的位置,并從 local disk 進行快照恢複。

2)基于 container 宕掉的快速恢複

實際環境中, container 宕掉再申請有時會長達幾十秒,比如因為 hdfs 慢、yarn 慢等原因,嚴重影響恢複速度。為此,我們做了如下優化:

  • 備援資源。維持固定個數的備援 container,一旦 container 宕掉,備援 container 立刻候補上來,省去了繁雜的資源申請流程;
  • 提前申請。一旦發現作業因為 container 宕掉而失敗,立刻申請新的 container 。
快手基于 Apache Flink 的優化實踐

以上優化覆寫了很大一部分場景,恢複時間從 30s-60s 降到 20s 以内。

3)基于機器宕掉的快速恢複

機器宕掉時,flink on yarn 的恢複時間超過 3 分鐘,這對重要作業顯然是無法容忍的!為了做到快速恢複,我們需要做到快速感覺和恢複:

  • 備援資源并打散配置設定,確定兩個備援資源不在一個 container,redundantContainerNum=max(containerNumOfHost) + 1;
  • 作業當機,Hawk 監測系統 5 秒内發現;
  • 備援資源快速候補,免去申請資源的流程。
快手基于 Apache Flink 的優化實踐

通過這種方案,我們可以容忍任意一台機器的當機,并将當機恢複時間由原先的 3 分鐘降低到 30 秒以内。

四、總結

本文從大資料系統的發展入手,進而延伸出流式系統的關鍵概念,之後介紹了 Flink的關鍵特性,最後講解了快手内部的實時名額計算和快速 failover,希望對大家有所幫助。

五、Q&A

Q1:打算做實時計算,可以跳過 Storm、Spark 直接上手 Flink 嗎?

A:可以直接使用 Flink。Storm 在 failover 時會丢失資料,無法做到 exactly-once;spark streaming 是 Flink 的競争者,是在批處理的基礎上實作流計算,相比而言,Flink 的底層是流處理,更加适合流計算。

Q2:一般怎麼處理 taskmanager heartbeat timeout?

A:預設 10 秒彙報一次心跳,心跳逾時為 50 秒,這個時候作業會失敗,如果配置了高可用那麼會重新開機。

Q3:如何保證 2 天大時間跨度延遲消息的視窗計算?

A:這裡主要的挑戰在于時間長、狀态大,建議 stateBakend 使用 Rocksdb(可以利用磁盤存儲大狀态),視窗計算建議使用增量計算來減少狀态的大小。

Q4:Flink on Yarn,Yarn 重新開機會自動拉起 Flink 任務嗎,說不能拉起怎麼處理,手動啟動嗎?

A:如果配置了高可用(依賴 zookeeper),作業失敗了就可以自動拉起。

Q5:Kafka 目前多用作資料中轉平台,Flink 相當于替代了 Kafka Stream 嗎?

  A:Kafka的核心功能是消息中間件,kafka stream 可以跟 kafka 很好的內建,但并不是一個專業的計算引擎。相比而言,flink 是一個分布式的流式計算引擎,功能上更加強大。

Q6:你們怎麼看待 Apache Beam?

A:Apache Beam 在上層進行了抽象,可以類比 SQL,隻定義規範,底層可以接入各種計算引擎。    

快手基于 Apache Flink 的優化實踐