天天看點

360深度實踐:Flink 與 Storm 協定級對比

作者:張馨予

本文從資料傳輸和資料可靠性的角度出發,對比測試了 Storm 與 Flink 在流處理上的性能,并對測試結果進行分析,給出在使用 Flink 時提高性能的建議。

Apache Storm、Apache Spark 和 Apache Flink 都是開源社群中非常活躍的分布式計算平台,在很多公司可能同時使用着其中兩種甚至三種。對于實時計算來說,Storm 與 Flink 的底層計算引擎是基于流的,本質上是一條一條的資料進行處理,且處理的模式是流水線模式,即所有的處理程序同時存在,資料在這些程序之間流動處理。而 Spark 是基于批量資料的處理,即一小批一小批的資料進行處理,且處理的邏輯在一批資料準備好之後才會進行計算。在本文中,我們把同樣基于流處理的 Storm 和 Flink 拿來做對比測試分析。

在我們做測試之前,調研了一些已有的大資料平台性能測試報告,比如,雅虎的 Streaming-benchmarks,或者 Intel 的 HiBench 等等。除此之外,還有很多的論文也從不同的角度對分布式計算平台進行了測試。雖然這些測試 case 各有不同的側重點,但他們都用到了同樣的兩個名額,即吞吐和延遲。吞吐表示機關時間内所能處理的資料量,是可以通過增大并發來提高的。延遲代表處理一條資料所需要的時間,與吞吐量成反比關系。

在我們設計計算邏輯時,首先考慮一下流處理的計算模型。上圖是一個簡單的流計算模型,在 Source 中将資料取出,發往下遊 Task ,并在 Task 中進行處理,最後輸出。對于這樣的一個計算模型,延遲時間由三部分組成:資料傳輸時間、 Task 計算時間和資料排隊時間。我們假設資源足夠,資料不用排隊。則延遲時間就隻由資料傳輸時間和 Task 計算時間組成。而在 Task 中處理所需要的時間與使用者的邏輯息息相關,是以對于一個計算平台來說,資料傳輸的時間才更能反映這個計算平台的能力。是以,我們在設計測試 Case 時,為了更好的展現出資料傳輸的能力,Task 中沒有設計任何計算邏輯。

在确定資料源時,我們主要考慮是在程序中直接生成資料,這種方法在很多之前的測試标準中也同樣有使用。這樣做是因為資料的産生不會受到外界資料源系統的性能限制。但由于在我們公司内部大部分的實時計算資料都來源于 kafka ,是以我們增加了從 kafka 中讀取資料的測試。

對于資料傳輸方式,可以分為兩種:程序間的資料傳輸和程序内的資料傳輸。

程序間的資料傳輸是指這條資料會經過序列化、網絡傳輸和反序列化三個步驟。在 Flink 中,2個處理邏輯分布在不同的 TaskManager 上,這兩個處理邏輯之間的資料傳輸就可以叫做程序間的資料傳輸。Flink 網絡傳輸是采用的 Netty 技術。在 Storm 中,程序間的資料傳輸是 worker 之間的資料傳輸。早版本的 storm 網絡傳輸使用的 ZeroMQ,現在也改成了 Netty。

程序内的資料傳輸是指兩個處理邏輯在同一個程序中。在 Flink 中,這兩個處理邏輯被 Chain 在了一起,在一個線程中通過方法調用傳參的形式程序資料傳輸。在 Storm 中,兩個處理邏輯變成了兩個線程,通過一個共享的隊列進行資料傳輸。

Storm 和 Flink

都有各自的可靠性機制。在 Storm 中,使用 ACK 機制來保證資料的可靠性。而在 Flink 中是通過 checkpoint 機制來保證的,這是來源于 chandy-lamport 算法。

事實上 Exactly-once 可靠性的保證跟處理的邏輯和結果輸出的設計有關。比如結果要輸出到kafka中,而輸出到kafka的資料無法復原,這就無法保證 Exactly-once。我們在測試的時候選用的 at-least-once 語義的可靠性和不保證可靠性兩種政策進行測試。

上圖是我們測試的環境和各個平台的版本。

上圖展示的是

Flink

在自産資料的情況下,不同的傳輸方式和可靠性的吞吐量:在程序内+不可靠、程序内+可靠、程序間+不可靠、程序間+可靠。可以看到程序内的資料傳輸是程序間的資料傳輸的3.8倍。是否開啟 checkpoint 機制對 Flink 的吞吐影響并不大。是以我們在使用 Flink 時,進來使用程序内的傳輸,也就是盡可能的讓算子可以 Chain 起來。

那麼我們來看一下為什麼 Chain 起來的性能好這麼多,要如何在寫 Flink 代碼的過程中讓 Flink 的算子 Chain 起來使用程序間的資料傳輸。

大家知道我們在 Flink 代碼時一定會建立一個 env,調用 env 的 disableOperatorChainning() 方法會使得所有的算子都無法 chain 起來。我們一般是在 debug 的時候回調用這個方法,友善調試問題。

如果允許 Chain 的情況下,上圖中 Source 和 mapFunction 就會 Chain 起來,放在一個 Task 中計算。反之,如果不允許 Chain,則會放到兩個 Task 中。

對于沒有 Chain 起來的兩個算子,他們被放到了不同的兩個 Task 中,那麼他們之間的資料傳輸是這樣的:SourceFunction 取到資料序列化後放入記憶體,然後通過網絡傳輸給 MapFunction 所在的程序,該程序将資料方序列化後使用。

對于 Chain 起來的兩個算子,他們被放到同一個Task中,那麼這兩個算子之間的資料傳輸則是:SourceFunction 取到資料後,進行一次深拷貝,然後 MapFunction 把深拷貝出來的這個對象作為輸入資料。

雖然 Flink 在序列化上做了很多優化,跟不用序列化和不用網絡傳輸的程序内資料傳輸對比,性能還是差很多。是以我們盡可能的把算子 Chain 起來。

不是任何兩個算子都可以 Chain 起來的,要把算子 Chain 起來有很多條件:第一,下遊算子隻能接受一種上遊資料流,比如Map接受的流不能是一條 union 後的流;其次上下遊的并發數一定要一樣;第二,算子要使用同一個資源 Group,預設是一緻的,都是 default;第三,就是之前說的 env 中不能調用 disableOperatorChainning() 方法,最後,上遊發送資料的方法是 Forward 的,比如,開發時沒有調用 rebalance() 方法,沒有 keyby(),沒有 boardcast 等。

對比一下自産資料時,使用程序内通信,且不保證資料可靠性的情況下,

Flink 與 Storm 的吞吐

。在這種情況下,Flink 的性能是 Storm 的15倍。Flink 吞吐能達到2060萬條/s。不僅如此,如果在開發時調用了env.getConfig().enableObjectReuse() 方法,Flink 的但并發吞吐能達到4090萬條/s。

當調用了 enableObjectReuse 方法後,Flink 會把中間深拷貝的步驟都省略掉,SourceFunction 産生的資料直接作為 MapFunction 的輸入。但需要特别注意的是,這個方法不能随便調用,必須要確定下遊 Function 隻有一種,或者下遊的 Function 均不會改變對象内部的值。否則可能會有線程安全的問題。

當對比在不同可靠性政策的情況下,Flink 與 Storm 的表現時,我們發現,保證可靠性對 Flink 的影響非常小,但對 Storm 的影響非常大。總的來說,在保證可靠的情況下,Flink 單并發的吞吐是 Storm 的15倍,而不保證可靠的情況下,Flink 的性能是 Storm 的66倍。會産生這樣的結果,主要是因為 Flink 與 Storm 保證資料可靠性的機制不同。

而 Storm 的 ACK 機制為了保證資料的可靠性,開銷更大。

左邊的圖展示的是 Storm 的 ACK 機制。Spout 每發送一條資料到 Bolt,就會産生一條 ACK 的資訊給 ACKer ,當 Bolt 處理完這條資料後也會發送 ACK 資訊給 ACKer。當 ACKer 收到這條資料的所有 ACK 資訊時,會回複 Spout 一條 ACK 資訊。也就是說,對于一個隻有兩級(spout+bolt)的拓撲來說,每發送一條資料,就會傳輸3條 ACK 資訊。這3條 ACK 資訊則是為了保證可靠性所需要的開銷。

右邊的圖展示的是 Flink 的 Checkpoint 機制。Flink 中 Checkpoint 資訊的發起者是 JobManager。它不像 Storm 中那樣,每條資訊都會有 ACK 資訊的開銷,而且按時間來計算花銷。使用者可以設定做 checkpoint 的頻率,比如10秒鐘做一次 checkpoint。每做一次 checkpoint,花銷隻有從 Source 發往 map 的1條 checkpoint 資訊(JobManager 發出來的 checkpoint 資訊走的是控制流,與資料流無關)。與 Storm 相比,Flink 的可靠性機制開銷要低得多。這也就是為什麼保證可靠性對 Flink 的性能影響較小,而 Storm 的影響确很大的原因。

最後一組自産資料的測試結果對比是 Flink 與 Storm 在程序間的資料傳輸的對比,可以看到程序間資料傳輸的情況下,Flink 但并發吞吐是 Storm 的4.7倍。保證可靠性的情況下,是 Storm 的14倍。

上圖展示的是消費 kafka 中資料時,Storm 與 Flink 的但并發吞吐情況。因為消費的是 kafka 中的資料,是以吞吐量肯定會收到 kafka 的影響。我們發現性能的瓶頸是在 SourceFunction 上,于是增加了 topic 的 partition 數和 SourceFunction 取資料線程的并發數,但是 MapFunction 的并發數仍然是1.在這種情況下,我們發現 Flink 的瓶頸轉移到上遊往下遊發資料的地方。而 Storm 的瓶頸确是在下遊收資料反序列化的地方。

之前的性能分析使我們基于資料傳輸和資料可靠性的角度出發,單純的對

Flink 與 Storm

計算平台本身進行了性能分析。但實際使用時,task 是肯定有計算邏輯的,這就勢必更多的涉及到 CPU,記憶體等資源問題。我們将來打算做一個智能分析平台,對使用者的作業進行性能分析。通過收集到的名額資訊,分析出作業的瓶頸在哪,并給出優化建議。

繼續閱讀