天天看點

Apache Flink運作時在B站的穩定性優化與實踐

作者:閃念基因

本期作者

馬陽陽

基礎架構部實時平台Flink引擎資深開發工程師

丁國濤

基礎架構部實時平台Flink引擎資深開發工程師

01 背景

以Flink為基礎的實時計算在B站有着廣泛而深入的應用。目前B站的Flink作業主要運作在三種叢集環境下:純實體機部署的YARN叢集、為了提高Kafka叢集資源使用率而和Kafka混部的YARN叢集以及為了提高線上伺服器而和K8S混部的YARN叢集(這部分有計劃在不遠的将來使用Flink On K8S部署方式代替)。其中純實體機YARN叢集使用純SSD盤的統一機型的伺服器,包含1000+台伺服器;和Kafka混部的叢集目前為Flink提供了2000+ cores;和線上的K8S混部的叢集已經使用了6000+ cores,并且還在持續增加。在業務方向上,B站的Flink已經應用在了包括AI、廣告、數倉、資料傳輸和其它的很多業務上。目前B站Flink作業的最大并行度為2000。下圖展示了B站實時應用的整體架構及Flink Runtime的工作範圍。

Apache Flink運作時在B站的穩定性優化與實踐

正是由于在B站Flink應用廣泛,作業數量衆多,很多作業的流量和并行度也很大,我們在使用Flink的過程中遇到了一些社群版本的Flink無法滿足的功能。我們遇到的主要痛點如下:

Apache Flink運作時在B站的穩定性優化與實踐

為了解決上述痛點,我們對Flink Runtime進行了很多的定制開發和改進。下面将從以下幾個方面介紹一下B站在Flink Runtime上所做的改進:

  • Checkpoint相關的改進
  • 可用性的提升
  • 其它優化

02 Checkpoint相關的改進

Checkpoint作為Flink容錯機制的基礎,對Flink作業有着重要的意義。在解決實際生産問題和與使用者交流的過程中,Checkpoint相關的問題也占着極大的比重。為了更好地滿足平台和使用者的需求,我們在以下幾個方面對Checkpoint做了大量的改進:可恢複性、Checkpoint優化以及相關工具的開發。

2.1 可恢複性

2.1.1 改進的Operator ID生成算法

在Flink中作業中經常出現的場景是随着流量或者計算複雜度變化,使用者或者平台需要改變作業的并行度以增加處理能力。這種變化可能導緻Kafka Source算子和下遊算子的連接配接關系發生變化(例如Kafka Topic的partition數量為50,并行度從50變為100,這種情況下Kafka Source和下遊的連接配接關系從forward變為rebalance(社群原生)或者rescale(B站改進))。社群原生的Operator ID生成算法中,計算一個算子的Operator ID時,會将其下遊可以chain在一起的算子也考慮進去。在剛剛我們提到的場景中,由于Kafka Source可能從可以和下遊算子chain在一起變為不能chain,進而導緻計算出的算子ID發生變化,進而導緻作業無法從原來的Checkpoint恢複。

為了解決上述問題,我們擴充了社群的算子Operator ID生成算法StreamGraphHasherV2,引入了StreamGraphHasherV3。在StreamGraphHasherV3中,在計算Operator ID時,不考慮算子和下遊算子的連接配接關系,可以生成穩定的算子Operator ID,很大地提升了因為作業并行度變化情況下的Checkpoint可恢複率。

2.1.2 調整最大并行度的計算方法

在Flink的狀态中,最大并行度是一個重要的概念。為了從狀态恢複時,不必拉取所有的狀态檔案,Flink使用了類似一緻性哈希的做法,将狀态的鍵值做哈希後劃分到一個固定的份數裡,每個算子的一個并行度負責其中的一個範圍。而這個需要将鍵值的哈希劃分的份數就是由最大并行度決定的。Flink中采用如下算法決定一個算子的最大并行度:

//如果使用者手動設定了最大并行度,則使用使用者設定的最大并行度
//否則按照如下算子計算最大并行度
Math.min( 
   Math.max( 
       MathUtils.roundUpToPowerOfTwo(operatorParallelism + (operatorParallelism / 2)), 
       128), 
   32768);           

在我們的生産場景中,經常遇到因為最大并行度變化導緻無法從Checkpoint恢複的問題。為了解決該問題,我們的工作從兩個方面進行。第一個方面調整最大并行度的計算方法,在本部分介紹;第二個方面對存量的狀态,從狀态中讀取出原始的鍵值,根據新的最大并行度重新計算鍵值的key group,并寫入新的狀态中,作業再從新的狀态恢複,這部分工作将在下面介紹。

在新的最大并行度計算算法中,我們将最大并行度的最小值調整為1024,并依據并行度的10倍按照原來的算法計算最大并行度。

Math.min( 
   Math.max( 
       MathUtils.roundUpToPowerOfTwo((operatorParallelism + (operatorParallelism / 2)) * 10), 
       1024), 
   32768);           

2.1.3 基于State Processor API

的Key Group重算方法

上面提到的方法可以解決新增作業在大多數情況下從Checkpoint恢複的需求,但是無法對存量作業起作用。對于存量作業,我們的思路是從State檔案中反序列化出原始的鍵值,并根據反序列化出的鍵值重新計算所屬的key group,然後将結果寫入新的狀态檔案,生成新的Checkpoint,然後作業可以從新的Checkpoint恢複。

基于上述思路,我們調研了社群的相關的工具,發現State Processor API可以部分地滿足我們的需求。關于State Processor API的介紹及使用方式,可以參考社群相關文檔,下面主要介紹一下我們為了實作自己的需求對State Processor API所做的擴充工作。

分析之後,我們發現原生的State Processor API在如下幾個方面不能滿足的我們的需求:

  1. State Processor API提供的使用者接口是基于單個算子的,而我們需要的是對Checkpoint的所有的狀态進行讀取并計算。
  2. State Processor API中需要使用者手動構造StateDescriptor并傳入相關方法中。
  3. State Processor API會同時反序列化Key和點查對應的Value,性能上無法滿足需求。

針對上述問題,我們的解決方案如下:

  1. 從Checkpoint的_metadata檔案中讀取所有的Operator ID,對每個Operator ID依次調用State Processor API提供的接口進行單個算子的計算。
  2. 在Checkpoint的中繼資料中加入必要的資訊,在使用State Processor API前從Checkpoint的中繼資料中讀取必要的資訊生成調用State Processor API使用者接口需要的StateDescriptor。
  3. 修改State Processor API的實作方式,添加一種隻會反序列化key,而無需反序列化value(将value作為位元組數組看待)的方式。

基于上述修改,我們實作了資料的key group的重算,系統的整體架構如下圖所示。經過重算的狀态,算子的最大并行度可以滿足恢複的需求。這樣我們就解決了由于最大并行度變化導緻的Checkpoint無法恢複的問題。

Apache Flink運作時在B站的穩定性優化與實踐

2.1.4 使用Operator Name輔助

從Checkpoint恢複

由于上面提到的Operator ID生成算法會導緻Checkpoint不相容,無法全量應用在所有的作業中(新增作業可以應用,存量作業無法應用)。我們需要找到新的辦法來解決算子間連接配接關系變化導緻的Checkpoint不相容問題。基于B站Flink作業的特點,SQL作業占了多數,且SQL作業中大多數情況下都不存在同名的Operator Name,我們設計了一種新的方法來輔助從Checkpoint恢複,即在Operator ID沖突導緻無法從Checkpoint恢複時,試着将Operator Name當做橋梁,将Checkpoint中的Operator ID和作業DAG中的Operator ID關聯起來,進而實作将Checkpoint中的狀态Assign給DAG中的算子。

2.2 Checkpoint優化

2.2.1 Full Checkpoint

Flink社群提供了兩種觸發檢查點的方式:Checkpoint和Savepoint。關于Checkpoint和Savepoint的差別,可以參考社群相關的文檔。簡單來說(對我們使用RockDB State Backend,且開啟了Incremental Checkpoint),Checkpoint和Savepoint的差別主要包括如下兩個方面:

  • Checkpoint為增量式的,且可以在單個Task内部利用多線程上傳檔案,而Savepoint把RocksDB的狀态作為一個Stream,單線程上傳至HDFS
  • 由于我們開啟了Retain Checkpoint On Cancellation,Checkpoint會依賴之前的運作執行個體生成的Checkpoint,且可能存在較長的依賴鍊,導緻之前的運作執行個體産生的Checkpoint必須保留在HDFS上,進而占用大量空間

基于上述Checkpoint和Savepoint的優缺點,我們提出了Full Checkpoint的概念。Full Checkpoint綜合了Incremental Checkpoint和Savepoint的優點:

  1. Full Checkpoint隻上傳增量的檔案,且跟Checkpoint一樣可以利用多線程減少Checkpoint完成需要的時間。
  2. 對依賴的之前的Checkpoint的檔案,Full Checkpoint會上傳,進而使得Full Checkpoint生成的Checkpoint對之前運作執行個體的Checkpoint不存在依賴關系,便于Checkpoint的清理、遷移等。

根據上述思想,Full Checkpoint涉及的主要元件和各元件的作用可以使用下圖描述。

Apache Flink運作時在B站的穩定性優化與實踐

Full Checkpoint可以由使用者或者平台通過Flink Rest API觸發,我們也在接下來要介紹的重新開機接口中使用了Full Checkpoint,并且計劃使用Full Checkpoint替換Savepoint作為停止作業前的預設檢查點實作方式。

2.2.2 Regional Checkpoint

在Flink作業中,有一類作業有着比較明顯的特點:作業的DAG中沒有或者幾乎沒有(關于這一點在下面的Regional Checkpoint的擴充中會詳細解釋)ALL-TO-ALL的連接配接、對資料的準确性要求較高(這類作業一般都有很多下遊作業)。在這類作業中,Checkpoint扮演着很重要的角色,在資料重放時,不僅影響作業的Exactly-Once特性,還決定着要從Kafka拉取多少資料。對于并行度比較大的作業,受環境的影響(網絡抖動、存儲抖動等),會導緻Checkpoint有比較大的機率失敗。

為了解決這類問題,我們根據這類作業的特點(幾乎沒有ALL-TO-ALL連接配接),借鑒了社群Region-Failover的思路,并參考了業界的實作,實作了稱為Regional-Checkpoint的Checkpoint優化。所謂Regional-Checkpoint,即将DAG劃分為一個個的Region,将Region作為相對獨立的單元看待,Region之間不産生互相影響。下圖展示了幾種典型場景下的Region劃分:

Apache Flink運作時在B站的穩定性優化與實踐

将DAG劃分為Region之後,按照下表所示的邏輯對Checkpoint進行處理:

Apache Flink運作時在B站的穩定性優化與實踐

在實際的開發中,為了友善引入Regional Checkpoint相關的處理邏輯及減小對原生Checkpoint處理邏輯(上表中的Global Checkpoint)的影響,我們抽象出了CheckpointHandler接口,将相關的公共邏輯放入了其抽象實作類AbstractCheckpointHandler中,并将Global Checkpoint和Regional Checkpoint的處理邏輯分别放在GlobalCheckpointHandler和RegionalCheckpointHandler中。使用者可以通過參數來控制是否使用Regional Checkpoint。

2.3 相關工具的開發

下面介紹一些我們為了友善Checkpoint相關運維和問題排查而開發的實用工具。

2.3.1 定期狀态檔案清理

上面介紹Full Checkpoint時提到過,我們開啟了Incremental Checkpoint和Checkpoint的Retain On Cancellation功能。這會造成Checkpoint之間存在依賴關系,對Checkpoint的清理造成一定的影響,進而造成Checkpoint占用的空間增加,HDFS存儲成本增加。為此,我們引入定期執行的Checkpoint清理程式。其基本邏輯為周遊Checkpoint的存儲目錄,過濾掉設定的時間之前的Checkpoint,找到剩下的每個Checkpoint的_metadata檔案,并從_metadata檔案中解析出對狀态檔案的引用,記錄下來,之後将所有不再被引用的狀态檔案做清理。下圖描述了清理流程。

Apache Flink運作時在B站的穩定性優化與實踐

2.3.2 Checkpoint中繼資料增強

在B站,我們增加了在作業啟動時将Operator ID和Operator Name的映射資訊列印到日志的功能來幫助排查Flink作業無法從Checkpoint恢複的問題,線上上問題排查中起到了很好的作用。但是由于B站的ES日志索引儲存時間為14天,我們也遇到了運作時間超過14天的作業在重新啟動後無法從Checkpoint恢複時,無法找到Operator ID和Operator Name的映射關系,進而導緻無法排查出原因的情況。為了解決這個問題,我們将Operator ID和Operator Name的映射關系存入了Checkpoint的中繼資料中,通過增加了極少量的中繼資料存儲,使得我們在任何時候都能獲得Operator ID和Operator Name的映射關系。下圖展示了加入額外的中繼資料後,_metadata檔案的存儲結構。

Apache Flink運作時在B站的穩定性優化與實踐

2.3.3 State中繼資料洞察

在排查Checkpoint相關的問題時,檢視Checkpoint的中繼資料(_metadata檔案的内容)是一個必要且十分有用的手段。為了友善地檢視Checkpoint的中繼資料資訊,我們開發了相關的工具來檢視給定Checkpoint(指向_metadata檔案)的中繼資料。該工具除了支援社群原生的中繼資料外,還支援上面提到的B站自定義的中繼資料。使用者可以提供參數來隻是是否需要列印出所有的Operator ID和Operator Name的映射資訊,也可以提供需要列印的Operator的ID或者Name。

03 可用性提升

在B站,有一類實時計算作業作為基礎元件為其它所有的實時或離線計算作業提供基礎,是以這類作業對可用性有很高的要求。最初這類作業使用Flume實作。随着公司實時計算的技術棧全面往Flink遷移,這類作業也有使用Flink的需求。為了滿足這類計算作業的可用性需求,我們做了很多的工作,下面介紹其中主要的幾點工作。

3.1 Hybrid HA

對于生産環境,社群版Flink提供了基于Zookeeper(用于YARN和Standalone)和config map(用于Kubernetes)的HA方案。B站的Flink作業部署和運作在Yarn環境中,基于Zookeeper的高可用方案是唯一可選的方案。但是根據B站過往使用基于Zookeeper的HA的經驗,在作業數量變大之後,Zookeeper本身的不穩定性反而會造成作業的失敗。為了提高作業的可用性,我們需要對社群基于Zookeeper的HA方案進行一定的改造。

考慮到在YARN部署環境下,實際上并沒有實際進行Leader選舉,而Leader監聽機制可以通過輪詢來實作。基于這個現實,我們考慮實作基于HDFS的HA機制,同時考慮到如果全量作業都基于輪詢機制請求HDFS,會對HDFS的namenode(或者NNProxy server)造成較大的壓力,我們提出了同時基于Zookeeper和HDFS的Hybrid HA機制。該機制的主要思想為在Zookeeper運作正常時,HA機制基于Zookeeper運作,在Zookeeper發生異常時,為了保障作業仍能穩定運作,HA機制切換到基于HDFS運作。

由于資料可能會存在兩個系統裡,如何保障資料的一緻性成為必須要考慮的問題。為此,我們對HA涉及到的資料進行了梳理,HA 要寫入的資料包括:

  • ResourceManager、Dispatcher、JobMaster 的 address:JobManager 和 TaskManager 都會讀取
  • Checkpoint、CheckpointIDCounter:僅 JobManager 讀取
  • Job 運作狀态:僅 JobManager 讀取
  • JobGraph:僅 JobManager 讀取

對于僅被JobManager讀取的資料,由于一個作業僅有一個,讀取頻率不高,僅寫入HDFS即可。

對于會被TaskManager讀取的資料,讀取頻率很高,會對HDFS 造成比較大的壓力,需要同時寫入HDFS 和ZK,ZK正常時從ZK讀取,隻在ZK異常時從HDFS讀取。

綜上,寫入資料時要保證:

  • 所有資料必須寫入HDFS
  • TaskManager用到的資料要同時寫入ZK
  • 若寫HDFS異常,任務直接挂掉

将上述過程用圖形表示如下:

Apache Flink運作時在B站的穩定性優化與實踐

當JM位址成功寫入 HDFS,但是寫入ZK失敗時,就會出現一緻性問題。此時,從HDFS讀取到的是最新的資料,而從Zookeeper讀取到的是過期的資料。為了保證 leader 發現時的可用性和一緻性,讀資料要保證:

  • 優先讀 ZK
  • ZK 異常降級讀 HDFS
  • ZK 不一緻降級讀 HDFS
  • 讀 HDFS 有異常任務挂掉

解決不一緻問題的常用手段就是加版本号資訊,我們給寫入 HDFS 和 ZK 的資料都加上版本号,隻要兩者的版本号一緻就可以認為資料一緻。Leader 選舉和 Leader 發現的資訊存儲在LeaderInformation類中,每一次新的Leader選舉都會生成一個新的 Leader Session ID,它是一個 UUID 随機字元串,我們選擇把它作為資料的版本号。

3.2 Job Manager恢複過程保持

Task正常運作 (Reconcilation)

社群版本的Flink在Job Manager失敗恢複(HA)的過程如下圖所示。

Apache Flink運作時在B站的穩定性優化與實踐

可以看到,Task Executor通過高可用服務感覺到Leader發生切換或Task Executor與Job Master之間的心跳逾時的時候,會主動斷開與Job Master的連接配接,在這個過程中會進行如下動作:

  • 将運作在該Task Executor上所有屬于該JobMaster的Tasks取消(cancel)
  • 将 Job 持有的 slots 狀态從 ACTIVE 改為 ALLOCATED(也就是已配置設定但尚未給到 Job 使用)
  • 與JobMaster斷開連接配接

根據我們對線上任務的統計,作業啟動過程中,根據作業并行度的大小和作業運作圖的複雜度,部署所有的Task可能花費從一百毫秒級到幾十秒鐘的時間,是以即使開啟了Job Manager HA,不考慮Task失敗的情況下,作業仍然可能幾十秒鐘的不可用時間。而實際上,在Task正常運作過程中,除了TaskExecutor與JobMaster的心跳,Task需要與JobMaster互動的資訊非常之少(彙報Checkpoint的資訊和Accumulator的資訊,而多數任務是沒有使用Accumulator的)。基于以上統計和分析,我們考慮在Job Manager HA的過程中保持Task正常運作。

為了達到上述目标,需要同時對JobMaster端和TaskExecutor端進行改造。經過分析相關代碼,需要進行如下改造。

  • Task Executor的改造
  1. 取消TaskManager由于JobManager的Leadership變動或者心跳逾時就Cancel Task的行為
  2. TaskManager與新的JobManager Leader建立連接配接後通過心跳上報Job對應的Slot資訊和Task的運作狀态,以便JobManager恢複ExecutionGraph和SlotPool
  • Job Master端的改造
  • 對 ExecutionGraph 核心資料進行快照,實作一個FileSystem Store遠端存儲快照,基于該快照初始化一個用于恢複的ExecutionGraph對象
  • ExecutionGraph内部實作恢複Job的邏輯(稱為 RECONCILING 狀态),主要是Job和Task相關狀态的恢複
  • 實作Job Master Failover之後基于快照和Task Executor上報的資訊來恢複Job的過程(RECONCILING 過程)

上述過程可以使用圖形表示如下:

Apache Flink運作時在B站的穩定性優化與實踐

除上述主流程的改造之外,我們還需要確定在TaskExecutor與老的JobMaster斷開連接配接到與新的JobMaster重建立立連接配接之間Task Executor到JobMaster的RPC能夠得到正确的處理。

3.3 Regional Checkpoint

适配HDFS Sink

上面介紹了我們引入Regional Checkpoint的背景,這個背景跟寫入HDFS的作業也很符合,但是HDFS Sink作業卻無法應用Regional Checkpoint,原因在于HDFS中存在一個單并行度的算子:StreamingFileCommitter。正式由于該算子的存在,導緻作業中會存在ALL-TO-ALL連接配接,進而導緻整個作業會被劃分為一個Region,進而導緻Regional Checkpoint沒有效果。考慮到寫HDFS的作業的數量及其重要性,我們将Regional Checkpoint對HDFS Sink進行了适配。

HDFS Sink場景中的一個典型的DAG如下圖所示(圖中還展示了Region劃分的方式):

為了在該場景下應用Regional Checkpoint,我們修改了該場景下的Region劃分方式,将Source到FileWriter的鍊路劃分為一個Region,并将FileCommitter劃入每一個Region中。

考慮到實作的通用性,我們為FileCommitter這種可以通過被不同Region重複包含而實作在含有單并行度算子中将DAG劃分為多個Region的算子引入了一個新的接口RegionSharable,在開啟了Regional Checkpoint的配置且系統識别到DAG中包含實作了RegionSharable算子的情況下,系統可以自動實作将該算子劃分到之前的每一個Region,并在适當的時機調用該算子通過RegionSharable提供的方法來通知該算子必要的資訊。RegionSharable接口及相關資料的結構的定義如下:

public interface RegionShareOperator {
    void notifyRegionalCkComplete(RegionCheckpointFailedDetail detail);
}
  
public class RegionCheckpointFailedDetail {
    long checkpointId;
    FailedScope scope;
    Set failedTasks;
  
  
    enum FailedScope {
        UP_STREAM,
        DOWN_STREAM,
        BOTH
    }
  
    enum FailedType {
        ERROR,
        EXPIRED
    }
  
    static class FailedTaskInfo {
        int subTaskId;
        String taskName;
        FailedType reason;
        String message;
    }
}           

運作時,CheckpointCoordinator在Source到FileWriter鍊路上發生Checkpoint失敗時,會找到FileCommitter所在Execution,進而找到FileCommitter所在的Task Manager,并通過Task Manager Gateway通知Checkpoint Region的失敗情況,FileCommitter會依據自身業務邏輯進行相應的處理,來保障資料處理的Exactly-Once語義。

3.4 為SQL作業應用

Rescale Partitioner

社群版本的Flink中提供了Region Failover的功能,在Region中的Task失敗後,可以隻重新開機對應的Region,而不用重新開機整個作業或者作業中的所有Tasks。在Kafka的Partition數量和作業預設并行度一緻的情況下,作業使用Forward模式,DAG劃分為和Kakfa Partition數量一緻的Region,可以充分利用Region Failover減少需要重新開機的Task數量,進而提高作業的可用性。在B站内部,為了防止使用者手動設定Source算子的并行度導緻的問題(算子空跑、性能問題等),我們禁用了使用者設定Source算子并行度的功能。最初的設計中,我們提供兩種連接配接方式。

在Source算子消費的Kafka topic的partition數量和作業預設并行度一緻時,使用Forward連接配接方式;在Source算子消費的Kafka topic的Partition數量和作業預設并行度不一緻時,使用Rebalance連接配接方式。實際生産環境中經常會出現Kafka topic的Partition數量和作業預設并行度不一緻的場景,會在Source(Source算子+Calc+可選的Watermark Assigner)和後續算子間使用Rebalance模式,進而整個作業的DAG被劃分到一個Region中,導緻任意一個Task失敗都會導緻所有的Tasks需要重新開機。為了解決該問題,我們在SQL作業中也引入了Rescale模式。Rescale模式的典型連接配接方式如下圖所示:

Apache Flink運作時在B站的穩定性優化與實踐

其中也标示了對應場景中的Region劃分方式,可以看到在一個Task失敗時,并不會重新開機所有的Sub-task,對可用性提升會有較大的提升作用。

注意到使用Rescale模式的前提條件是Kafka的Partition數量和全局并行度成倍數關系,這導緻雖然上述改動可以覆寫更多的場景,但是仍無法滿足所有的需要。為了覆寫更多的傳輸作業場景,我們引入了force-rescale參數,該參數會在Source并行度和全局并行度不是倍數關系時,強制使用Rescale模式。通過使用該參數,使用者可以根據業務需求在性能和可用性之間做取舍。

3.5 單點恢複(Approximate Local

Recovery)

在實時計算的應用場景中,有一類作業有如下特點:

  • 需要處理的資料量大,實時計算作業并行度大
  • 計算中主要的計算為雙流或多流join邏輯,算子間會産生ALL-TO-ALL連接配接
  • 由于作業并行度大,作業在失敗恢複時,需要花費的時間較長
  • 對資料一緻性敏感度較低,可以容忍部分資料丢失
  • 資料的實時性對資料應用的效果非常關鍵

在B站,滿足上述特點的作業包括商業化部門和AI部門的模型訓練作業和樣本拼接作業。

由于作業的算子間存在ALL-TO-ALL連接配接,即使開啟了region failover,在任意task失敗時也會發生全量failover,造成資料較長時間的不可用。基于業務對資料一緻性要求不高的特點,參考業界的一些分享,我們開發了單點恢複的功能。

一個典型的Flink task(不包括source task(沒有上遊task)和sink task(沒有下遊task))會包含上遊task和下遊task,一個task會和它的上下遊task産生資料交換。這種資料交換既可能發生在相同的task manager内部,也可能發生在不同的task manager之間。下圖顯示了Flink中的網絡模型,其中上側的圖為跨Task Manager進行資料交換的圖示,下側的圖則簡化展示了在同一個Task Manager内部的資料交換的圖示。

Apache Flink運作時在B站的穩定性優化與實踐
Apache Flink運作時在B站的穩定性優化與實踐

為了實作單點恢複,需要進行以下處理:

  • 在task失敗後,隻重新開機失敗的task,并在多個task發生重新開機時,保證task按照期望的順序啟動
  • 不完整記錄的處理
  • 上下遊task感覺到task的失敗,并在task重新開機前進行正确的處理
  • Task重新開機後,上下遊task能正确與重新開機後的task實作資料互動

其中還有一個細節值得注意,為了吞吐量考慮,Flink會将多個序列化後的Record放入同一個Buffer中進行發送,也導緻了一個序列化後的Record可能會存在于兩個或者多個Buffer中。在Task失敗和重新開機後,需要對這部分Buffer進行細緻的處理,否則可能導緻Record反序列化失敗,進而導緻Task重新開機後失敗。

下面我們分别介紹對應的工作。

3.5.1 Task的重新開機

Flink對task失敗後應該要進行哪些task的重新開機進行了良好的抽象,我們隻需要實作Flink提供的接口FailoverStrategy,在其方法getTasksNeedingRestart(ExecutionVertexID executionVertexId, Throwable cause)的實作中隻傳回傳入的task,并實作對應的Factory即可。

由于可能同時有多個task失敗(例如由于task manager被kill),而Flink并不保證調用重新開機task的順序,這樣會可能會導緻下遊task先于上遊task被啟動,進而造成下遊task視圖找到消費的partition時發生失敗,導緻task啟動失敗。Flink作業在啟動時也可能發生類似的情況,其處理方法為在Task擷取消費的partition時,進行重試,并從Job Master拉取partition最新資訊。為了降低實作的複雜度,我們選擇在Job Master端啟動task時進行一些控制,進而保證下遊的task在啟動時,上遊的task已經啟動。具體的處理方法為,下遊task在啟動時,如果發現其上遊的task還未啟動,則先從啟動過程退出,并注冊對上遊task啟動狀态的監聽,

在其監聽的所有上遊task都啟動成功之後,再恢複啟動過程。

3.5.2 Task失敗的感覺及對應的處理

Task可能會因為以下的原因失敗:

  • 單個Task由于本身的運作邏輯或遇到髒資料未能正确進行容錯等原因失敗
  • TaskManager被kill,導緻運作在該task manager上的所有task失敗
  • 機器斷電當機,導緻該機器上的所有task manager上的所有task失敗

針對上述類型的任務失敗,其上下遊task感覺到其失敗的方式分别對應如下:

  • Task自身的清理邏輯會在task失敗後清理對應的網絡資源,會向上遊task發送channel close資訊,并向下遊task發送exception
  • Task manager被kill後,其持有的TCP連接配接會進行關閉,上遊的Netty Server和下遊Netty Client會感覺到,并通過回調函數通知對應的task
  • 機器當機的情況下,上下遊task無法通過Netty感覺到對應網絡連接配接的失敗,我們通過Job manager來實作對上下遊taskd的通知h4. 不完整記錄的處理

在上述背景中介紹了不完整記錄産生的背景和可能造成的問題。為了解決上述問題,我們需要在上遊task感覺到下遊task失敗和下遊task重新連接配接後進行必要的處理。同時,需要下遊task感覺到上遊task失敗後,也需要進行必要的處理。

對以不完整記錄開頭的Buffer的處理參考了社群的方案,将BufferConsumer和不完整記錄的長度封裝到BufferConsumerWithPartialLength中。在上遊task感覺到下遊task失敗重連之後,會将是否需要清楚partitial record的标志位設定為true,在将BufferConsumer的資料拷貝進入網絡棧前,依據該标志位進行不同的動作,如果該标志位為true,會将不完整記錄的長度跳過,如果跳過的長度為BufferConsumer的長度,則說明該記錄可能跨越多個BufferConsumer,仍然保持該标志位為true,以便在處理下一個BufferConsumer時仍然進行相同的判斷;如果跳過的長度小于BufferConsumer的長度,則會将标志位置為false。

下遊task通過網絡棧接收到資料之後,在StreamTaskNetworkInput中,會将資料緩存在RecordDeserializer中,拿到完整記錄的資料(可能跨越多個buffer),将記錄反序列化之後,交給task線程去進行後續的處理。是以在上遊task失敗,下遊task感覺到之後,可以通過将RecordDeserializer中還未反序列化的buffer中的資料清除來實作對buffer結尾處的不完整記錄的處理。

這裡有一個問題需要注意,StreamTaskNetworkInput位于task層,和感覺上遊task失敗的網絡層處于不同的處理線程中。為了讓StreamTaskNetworkInput感覺到上遊task的失敗,我們選擇了按照目前Flink的線程模型來處理,即網絡層感覺到上遊task失敗之後,通過向task傳遞一個ChannelUnavailableEvent事件。這樣可以不用在task層和網絡層之間添加複雜的同步操作,否則可能會影響Flink資料處理的性能。h3. 下遊task恢複之前,對應上遊task丢棄相應的資料

Flink依賴下遊的Netty client及時讀取上遊task産生的資料來維持上遊task的buffer占用維持在較低的水準(不能及時拉取時會出現反壓)。下遊task失敗時,将停止從上遊task拉取資料,如果不做任何處理,将會導緻上遊task的記憶體占用過高,最終導緻全部上遊task反壓。

為了解決上述問題,有兩種思路:

  • 上遊task感覺到下遊task失敗之後,直接将發送到相應task的資料丢棄
  • 上遊task将待發送的資料順序寫入磁盤檔案,待下遊task恢複之後,上遊task先從磁盤讀取資料發送給下遊task,待磁盤資料讀取完畢之後,再恢複記憶體發送資料
  • 第二種方案實作較複雜,我們目前采用了第一種實作方式。具體做法為在PipelinedApproximateSubpartition中設定一個标志位(PipelinedApproximateSubpartition是PipelinedSubpartition的子類,其中添加了實作單點恢複邏輯需要的功能),在上遊task感覺到下遊task失敗之後,會将該标志位設定為true。在将資料通過PipelinedSubpartition#add(BufferConsumer bufferConsume, ...)添加到subpartition時,如果發現該标志位為true,則直接調用BufferConsumer#close()将資料丢棄并傳回。h2. Task的恢複

從上面的介紹可以知道,job manager會重新開機失敗的task,我們需要保證task重新開機後,與其有連接配接的上下遊task都能正确地恢複資料的發送或接收。

3.5.3 上遊Task恢複的動作

在開啟單點恢複後,資料發送端會切換為PipelinedApproximateSubpartition實作,其中維護了available字段,我們需要在下遊task恢複之後,将其從false設定為true,以恢複資料的發送。

3.5.4 下遊Task恢複的動作

  • 失敗的上遊task重新開機後,job manager會送RPC到task manager,通知對應的下遊task。在下遊task接收到通知後,需要重建立立與上遊task的資料聯系。主要的邏輯位于SingleInputGate中,收到通知後,SingleInputGate會視圖重新初始化InputChannel并替換之前維護的InputChannel,并在初始化完畢後,通過InputChannel重新請求上遊的subpartition。這裡有兩個問題需要注意:
  • 對和上遊task同時失敗的task,其有兩個途徑進入InputChannel的初始化路徑。一是我們剛剛介紹的job manager發送的通知,二是其啟動時得到的資訊。我們需要注意對這兩種情況進行必要的處理。
  • 在上遊task失敗前,下遊task可能處于嚴重的堆積狀态或者阻塞狀态,導緻其接收的資料遲遲無法處理,進而導緻其接收到job manager的通知時,發現相應的InputChannel處于available狀态。對這種情況,我們的處理是,先将資訊緩存起來,等待消費到ChannelUnavailableEvent後,再進行InputChannel的重建工作。同時,我們設定一個定時器,若定時器逾時時,仍然沒有消費到ChannelUnavailableEvent,我們将對應的task直接置為失敗。

04 其它優化

下面介紹一些B站在Runtime上其它方面取得的一些進展。

4.1 基于Backlog負載均衡

Flink的Rescale Partitioner和Rebalance Partitioner預設會使用Round-robin的方式,把資料發送到下遊Task的Channel中,其中涉及到的元件如下圖所示。在生産環境中經常遇到的問題是,由于環境波動,下遊Task的Sub-task處理能力會出現不均衡,最終會導緻導緻整個上遊Task發生反壓。在Rescale和Rebalance模式下,每一條資料并沒有特定的指向性,可以發送給下遊的任意Sub-task,是以如果可以根據下遊Sub-task的負載動态分發資料的話,将可以改善上遊Task的反壓狀況。

Apache Flink運作時在B站的穩定性優化與實踐

基于上述思想,考慮到在Flink Credit-based Flow Control機制中,Backlog Size用來反應下遊sub-task的處理負載(Backlog Size越大,說明下遊消費能力越差),我們引入了基于Backlog Size的動态負載均衡機制來代替社群原生的Round-robin的方式。

該方案的整體架構如下:

Apache Flink運作時在B站的穩定性優化與實踐

LoadBasedChannelSelector為我們新引入的類,其實作了ChannelSelector,用來替換社群實作的RoundRobinChannelSelector。其主要功能在一個可替換的抽象類BacklogLoadBasedStrategy中實作。BacklogLoadBasedStrategy通過監聽器監聽ResultPartition中的Backlog Size變化,并根據Backlog Size的變化動态地改變維護的狀态,用來決定如何為一條資料選擇下遊的Channel。

4.2 大規模叢集運維優化

B站的實時YARN叢集中有一千多台機器,經常會有因為記憶體/磁盤故障或者更新作業系統等需求要下線機器的需求。由于Flink作業7X24運作的特性,如果直接下線機器的話,會對使用者體驗造成很壞的影響。為了使得這種運維操作更加平滑,我們設計了如下的流程:

  1. 将待運維的機器清單從YARN摘掉label(防止運維過程中有新的作業或者已有作業重新部署到這些機器上)
  2. 将相同數量的機器加入叢集中以備替換帶下線的機器
  3. 開始逐機器進行運維,流程如下
  • 查詢該機器上運作的作業
  • 對其中的每一個作業,調用Flink提供的帶黑名單的接口将該作業重新開機,并驗證重新開機成功
  • 将該機器從叢集下線

在上述流程中,有一個重要的步驟是調用Flink提供的接口來重新開機作業,且該接口可以提供一個機器清單作為黑名單,在該清單中的機器上的資源将被Flink資料總管忽略。

跟我們添加的其它接口一樣,重新開機接口需要通過Flink提供的Rest API調用。如果在調用重新開機接口時提供了機器的主機名清單作為參數,分布在該機器上的可用slots不會被配置設定給重新開機的作業,并且如果沒有足夠的資源來重新開機作業,Flink在向YARN申請資源時也會将這些機器的主機名作為黑名單,防止YARN配置設定位于這些機器上的資源。下圖為重新開機接口實作的整體架構及相關元件的互動過程(注意:圖中并未畫出與YARN的互動過程)。

Apache Flink運作時在B站的穩定性優化與實踐

05 其它優化

下面介紹一些B站在Runtime上其它方面取得的一些進展。

5.1 增加更多Checkpoint

恢複的相容場景

我們已經做了很多工作來實作Checkpoint的相容性,但是在實際場景中還有很多需要相容的場景,例如聚合計算時,增加或者減少聚合名額的場景、異步維表join場景下結果表的字段增減的場景等,後續我們也會對這些場景提供支援。

5.2 HDFS Sink适配

Region Failover

上面介紹過HDFS Sink适配Regional Checkpoint的工作。基于這個思想,我們計劃進行HDFS Sink對Region Failover的适配,提升HDFS Sink作業的可用性。

5.3 實作無重新開機的擴縮容

在實時計算中,因為業務增加或者突然的流量增加或者減少,對作業進行擴縮容是很常見且非常必要的操作。目前為了對一個作業進行擴縮容,我們需要先将作業下線,重新配置參數(并行度)後,再将作業進行送出。根據作業并行度和狀态的大小,這會花費分鐘級的時間。對一些作業來說,這是不可接受的,對于可以接受這個延遲的作業,也希望能将擴縮容的時間盡量降低。基于這個背景,我們會提供無需重新開機作業的擴縮容操作。

參考文獻:

[1]https://www.infoq.cn/article/88iajgkazdxw5hut-joh

[2]https://www.infoq.cn/article/idw_igykly724yhkgqbk

[3] https://developer.aliyun.com/article/774837

[4]https://nightlies.apache.org/flink/flink-docs-master/zh/docs/libs/state_processor_api/

作者:馬陽陽&丁國濤

來源:微信公衆号:哔哩哔哩技術

出處:https://mp.weixin.qq.com/s/PQYylmHBjnnH9pX7-nxvQA