本文主要從兩個層面深度剖析:阿裡巴巴對Flink究竟做了哪些優化?
取之開源,用之開源
一、SQL層
為了能夠真正做到使用者根據自己的業務邏輯開發一套代碼,能夠同時運作在多種不同的場景,Flink首先需要給使用者提供一個統一的API。在經過一番調研之後,阿裡巴巴實時計算認為SQL是一個非常适合的選擇。在批處理領域,SQL已經經曆了幾十年的考驗,是公認的經典。在流計算領域,近年來也不斷有流表二象性、流是表的ChangeLog等理論出現。在這些理論基礎之上,阿裡巴巴提出了動态表的概念,使得流計算也可以像批處理一樣使用SQL來描述,并且邏輯等價。這樣一來,使用者就可以使用SQL來描述自己的業務邏輯,相同的查詢語句在執行時可以是一個批處理任務,也可以是一個高吞吐低延遲的流計算任務,甚至是先使用批處理技術進行曆史資料的計算,然後自動的轉成流計算任務處理最新的實時資料。在這種聲明式的API之下,引擎有了更多的選擇和優化空間。接下來,我們将介紹其中幾個比較重要的優化。
首先是對SQL層的技術架構進行更新和替換。調研過Flink或者使用過Flink的開發者應該知道,Flink有兩套基礎的API,一套是DataStream,另一套是DataSet。DataStream API是針對流式處理的使用者提供,DataSet API是針對批處理使用者提供,但是這兩套API的執行路徑是完全不一樣的,甚至需要生成不同的Task去執行。Flink原生的SQL層在經過一系列優化之後,會根據使用者希望是批處理還是流處理的不同選擇,去調用DataSet或者是DataStream API。這就會造成使用者在日常開發和優化中,經常要面臨兩套幾乎完全獨立的技術棧,很多事情可能需要重複的去做兩遍。這樣也會導緻在一邊的技術棧上做的優化,另外一邊就享受不到。是以阿裡巴巴在SQL層提出了全新的Quyer Processor,它主要包括一個流和批可以盡量做到複用的優化層(Query Optimizer)以及基于相同接口的算子層(Query Executor)。這樣一來, 80%以上的工作可以做到兩邊複用,比如一些公共的優化規則,基礎資料結構等等。同時,流和批也會各自保留自己一些獨特的優化和算子,以滿足不同的作業行為。
在SQL層的技術架構統一之後,阿裡巴巴開始尋求一種更高效的基礎資料結構,以便讓Blink在SQL層的執行更加高效。在原生Flink SQL中,都統一使用了一種叫Row的資料結構,它完全由JAVA的一些對象構成關系資料庫中的一行。假如現在的一行資料由一個整型,一個浮點型以及一個字元串組成,那麼Row當中就會包含一個JAVA的Integer、Double和String。衆所周知,這些JAVA的對象在堆内有不少的額外開銷,同時在通路這些資料的過程中也會引入不必要的裝箱拆箱操作。基于這些問題,阿裡巴巴提出了一種全新的資料結構BinaryRow,它和原來的Row一樣也是表示一個關系資料中的一行,但與之不同的是,它完全使用二進制資料來存儲這些資料。在上述例子中,三個不同類型的字段統一由JAVA的byte[]來表示。這會帶來諸多好處:
- 首先在存儲空間上,去掉了很多無謂的額外消耗,使得對象的存儲更為緊湊;
- 其次在和網絡或者狀态存儲打交道的時候,也可以省略掉很多不必要的序列化反序列化開銷;
- 最後在去掉各種不必要的裝箱拆箱操作之後,整個執行代碼對GC也更加友好。
通過引入這樣一個高效的基礎資料結構,整個SQL層的執行效率得到了一倍以上的提升。
在算子的實作層面,阿裡巴巴引入了更廣範圍的代碼生成技術。得益于技術架構和基礎資料結構的統一,很多代碼生成技術得以達到更廣範圍的複用。同時由于SQL的強類型保證,使用者可以預先知道算子需要處理的資料的類型,進而可以生成更有針對性更高效的執行代碼。在原生Flink SQL中,隻有類似a > 2或者c + d這樣的簡單表達式才會應用代碼生成技術,在阿裡巴巴優化之後,有一些算子會進行整體的代碼生成,比如排序、聚合等。這使得使用者可以更加靈活的去控制算子的邏輯,也可以直接将最終運作代碼嵌入到類當中,去掉了昂貴的函數調用開銷。一些應用代碼生成技術的基礎資料結構和算法,比如排序算法,基于二進制資料的HashMap等,也可以在流和批的算子之間進行共享和複用,讓使用者真正享受到了技術和架構的統一帶來的好處。在針對批處理的某些場景進行資料結構或者算法的優化之後,流計算的性能也能夠得到提升。接下來,我們聊聊阿裡巴巴在Runtime層對Flink又大刀闊斧地進行了哪些改進。
二、Runtime層
為了讓Flink在Alibaba的大規模生産環境中生根發芽,實時計算團隊如期遇到了各種挑戰,首當其沖的就是如何讓Flink與其他叢集管理系統進行整合。Flink原生叢集管理模式尚未完善,也無法原生地使用其他其他相對成熟的叢集管理系統。基于此,一系列棘手的問題接連浮現:多租戶之間資源如何協調?如何動态的申請和釋放資源?如何指定不同資源類型?
為了解決這個問題,實時計算團隊經曆大量的調研與分析,最終選擇的方案是改造Flink資源排程系統,讓Flink可以原生地跑在Yarn叢集之上;并且重構Master架構,讓一個Job對應一個Master,從此Master不再是叢集瓶頸。以此為契機,阿裡巴巴和社群聯手推出了全新的Flip-6架構,讓Flink資源管理變成可插拔的架構,為Flink的可持續發展打下了堅實的基礎。如今Flink可以無縫運作在YARN、Mesos和K8s之上,正是這個架構重要性的有力說明。
解決了Flink叢集大規模部署問題後,接下來的就是可靠和穩定性,為了保證Flink在生産環境中的高可用,阿裡巴巴着重改善了Flink的FailOver機制。首先是Master的FailOver,Flink原生的Master FailOver會重新開機所有的Job,改善後Master任何FailOver都不會影響Job的正常運作;其次引入了Region-based的Task FailOver,盡量減少任何Task的FailOver對使用者造成的影響。有了這些改進的保駕護航,阿裡巴巴的大量業務方開始把實時計算遷移到Flink上運作。
Stateful Streaming是Flink的最大亮點,基于Chandy-Lamport算法的Checkpoint機制讓Flink具備Exactly Once一緻性的計算能力,但在早期Flink版本中Checkpoint的性能在大規模資料量下存在一定瓶頸,阿裡巴巴也在Checkpoint上進行了大量改- 進,比如:
- 增量Checkpoint機制:阿裡巴巴生産環境中遇到大JOB有幾十TB State是常事,做一次全量CP地動山搖,成本很高,是以阿裡巴巴研發了增量Checkpoint機制,從此之後CP從暴風驟雨變成了細水長流;
-
Checkpoint小檔案合并:都是規模惹的禍,随着整個叢集Flink JOB越來越多,CP檔案數也水漲船高,最後壓的HDFS NameNode不堪重負,阿裡巴巴
通過把若幹CP小檔案合并成一個大檔案的組織方式,最終把NameNode的壓力減少了幾十倍。
雖然說所有的資料可以放在State中,但由于一些曆史的原因,使用者依然有一些資料需要存放在像HBase等一些外部KV存儲中,使用者在Flink Job需要通路這些外部的資料,但是由于Flink一直都是單線程處理模型,導緻通路外部資料的延遲成為整個系統的瓶頸,顯然異步通路是解決這個問題的直接手段,但是讓使用者在UDF中寫多線程同時還要保證ExactlyOnce語義,卻并非易事。阿裡巴巴在Flink中提出了AsyncOperator,讓使用者在Flink JOB中寫異步調用和寫“Hello Word”一樣簡單 ,這個讓Flink Job的吞吐有了很大的飛躍。
Flink在設計上是一套批流統一的計算引擎,在使用過快如閃電的流計算之後,批使用者也開始有興趣入住Flink小區。但批計算也帶來了新的挑戰,首先在任務排程方面,阿裡巴巴引入了更加靈活的排程機制,能夠根據任務之間的依賴關系進行更加高效的排程;其次就是資料Shuffle,Flink原生的Shuffle Service和TM綁定,任務執行完之後要依舊保持TM無法釋放資源;還有就是原有的Batch shuffle沒有對檔案進行合并,是以基本無法在生産中使用。阿裡巴巴開發了Yarn Shuffle Service功能的同時解決了以上兩個問題。在開發Yarn Shuffle Service的時候,阿裡巴巴發現開發一套新的Shuffle Service非常不便,需要侵入Flink代碼的很多地方,為了讓其他開發者友善的擴充不同Shuffle,阿裡巴巴同時改造了Flink Shuffle架構,讓Flink的Shuffle變成可插拔的架構。目前阿裡巴巴的搜尋業務已經在使用Flink Batch Job,并且已經開始服務于生産。
經過3年多打磨,Blink已經在阿裡巴巴開始茁壯生長,但是對Runtime的優化和改進是永無止境的,一大波改進和優化正在路上。
更多資訊請通路
Apache Flink 中文社群網站