天天看點

[ATC 17] StreamBox: 面向多核機器上的針對Records的無序到達的實時流處理系統

        今天要講的文章ATC 2017年的一篇文章,StreamBox: Modern Stream Processing on a Multicore Machine。本文主要想解決的問題就是:高速大量的實時流資料主要是由IOT、Data Centers、Humans産生,它需要快速實時流處理系統。然而由于資料的到達是無序的,因為這些記錄通過不同的網絡路徑傳播,并且records上的計算以不同的速率執行。是以如何對這種 無序到達的資料進行高效的處理?是當今實時流處理系統一個重要挑戰,是以作者提出了StreamBox,利用視窗機制和如今多核體系架構的硬體,對這種無序到達的資料進行高效的處理。

1.BackGround

[ATC 17] StreamBox: 面向多核機器上的針對Records的無序到達的實時流處理系統

        高速大量的實時流資料主要是由IOT、Data Centers、Humans産生,它需要快速實時資料流處理。Pipeline 将處理無界的資料流進入到Epoch 這樣一個時序的處理單元中。Pipeline 将會執行多個Transform操作,每個Transform都是一個單獨的計算。在資料被處理完成之後,Pipeline将會發出輸出結果給使用者。

[ATC 17] StreamBox: 面向多核機器上的針對Records的無序到達的實時流處理系統

        為什麼這樣的實時流處理時很難的呢?因為資料是無序的到達的,因為這些記錄通過不同的網絡路徑傳播,并且records上的計算以不同的速率執行。并且如今的硬體資源都含有大量的CPU 核數,這樣可以提供高性能的計算。他們不得不利用資料并行、流水線并行、記憶體本通路本地化

2. Prior Work

        先前的工作 都是在epoch内部進行無限的處理,在這個圖檔中。Transform0含有三個epochs,但是在同一時刻隻有一個epoch 被處理。接下來,随着時間的進行,當這個epoch準備好之後,Transform0到Transform1之間有兩個epoch在同一時刻運作。然後接下來Transform0到Transform2之間,會有三個epoch在同一時刻運作。在這種設計下,同一時刻每個Transform隻有一個epoch被處理。 StreamBox就是這樣一種設計。它通過epoch來處理無序tuple,并且它能夠并行地處理所有Transform的epoch。

[ATC 17] StreamBox: 面向多核機器上的針對Records的無序到達的實時流處理系統

        StreamBox就是這樣一種設計。它通過epoch來處理無序tuple,并且它能夠并行地處理所有Transform的epoch。

        相比之前的工作和StreamBox進行對比,StreamBox是一個High Pipeline 并且資料并行的實時流處理系統,因為它能夠并行地處理所有Transform的epoch。

        作者比較了StreamBox和現有存在的實時流處理系統在多核機器上,StreamBox 可以達到很高的系統吞吐量比現有的實時流處理系統。

[ATC 17] StreamBox: 面向多核機器上的針對Records的無序到達的實時流處理系統

3. StreamBox Introduction

        在此介紹之前,為了去處理實時流資料,作者定義了Transform。Transform是一個消耗流資料并且能夠産生流資料的一種計算結構。許多Transform構成了一個DAG圖就叫做一個Pipeline。

[ATC 17] StreamBox: 面向多核機器上的針對Records的無序到達的實時流處理系統

        無界的數量流包括持續到來的輸入,每個Stream Records包括資料和産生資料的event time。event time 就是資料的生産時間。由于這些記錄通過不同的網絡路徑傳播,并且record上的計算以不同的速率執行,這些記錄的到達時間是無序的。

[ATC 17] StreamBox: 面向多核機器上的針對Records的無序到達的實時流處理系統

        實時流資料持續不斷的到來,視窗是一個臨時的處理資料的範圍。視窗将無限的資料沿着時間邊界切割成有限的片段。我們的資料将會前往相應的視窗範圍根據它們的eventTime。Transform可以基于Windonw進行計算操作。

        資料将會前往相應的視窗範圍根據它們的eventTime。正如之前提到的那樣,Records 意味着會以無序的形式持續不斷的到來,比如舉個列子。1:09比1:11事件提早發生,但是1:09比1:11推遲到來。這個時候1:11需要進入到一個window中,但是這裡有兩個windows。分别是1:05-1:10和1:00-1:05。是以問題就是什麼時候這個視窗執行完成,系統如何知道屬于這個window的所有的記錄已經到來了。特别的是在這個例子中,系統如何知道1:09将要接下來會到達。

[ATC 17] StreamBox: 面向多核機器上的針對Records的無序到達的實時流處理系統

        是以系統必須通知輸入已經完成,這就是watermark。Watermark就是由Stream Souce生成的,用來訓示輸入已經完成。WaterMark X指所有比X時間小的輸入資料(含有evetnTime)都已經到達了。比如說WaterMark 1:05 意味着比1:05時間小的輸入資料都已經到達了,是以這個視窗就可以關閉了。

[ATC 17] StreamBox: 面向多核機器上的針對Records的無序到達的實時流處理系統

        接下來我們來看如何去使用watermark來處理無序的資料。這個情況下,當1:08到達的時候,它将會去相應的window。當1:11到達,它将會去另外一個window。然後當1:09到達的時候,它将會去先前的那個window。此時,系統發現了一個watermark 1:10。系統知道比1:10小的輸入資料都已經到達了。是以它就可以關閉這個window,Transform就可以進行基于視窗的計算。

[ATC 17] StreamBox: 面向多核機器上的針對Records的無序到達的實時流處理系統

        事件流由watermark分開進入到Epoch中,每個Epoch是一組在兩個Watermark之間的記錄集合。并且一個視窗可能會穿越多個Epoch。

4. StreamBox Desgin

        目前現有的實時流處理系統都是在分布式系統中進行優化。它們忽視了高效的多核機器,并且他們假設一台機器無法有效的處理實時流處理資料。

        然而,現代硬體的進步使單個多核機器成為一個有吸引力的流媒體平台。 這些進步包括:(i)顯着提高入口速率的高吞吐量I / O,例如遠端直接存儲器存取(RDMA)和10Gb以太網; (ii)TB級别的記憶體存儲,可以儲存大量的流進行中的狀态以及(iii)大量的計算CPU核數。本文旨在最大限度地提高資料流吞吐量并最大限度地減少現代多核硬體的延遲,進而減少流處理系統所需的機器數量。作者的目标就是設計一個高效的實時流處理系統為了一個多核的機器。流處理系統在保證正确性的前提下,它還要盡量減少線程之間同步,同時尊重兩個Transform之間的依賴性。然後為了達到動态高度并行,它能夠在不同的epoch中并行的處理任何記錄。并且系統的目标是達到高吞吐量和低延遲。StreamBox彈性地将軟體并行性映射到硬體。 它将一組工作線程綁定到核心。 (i)每個線程獨立地從一個容器中檢索一組記錄(一個包),并執行變換,産生新的記錄,并将其存儲到下遊容器中。(ii)為了優化等待時間,它優先處理具有下遊輸出所需要的目前時間戳的容器。

[ATC 17] StreamBox: 面向多核機器上的針對Records的無序到達的實時流處理系統

        但是這裡由三個挑戰:為了達到正确性,它通過兩個不變量來保證watermark的語義。為了達到高吞吐量,它不停頓pipline。例如,當水印處理是一個長時間延遲事件時,StreamBox不會停滞,因為一旦後續記錄到達,它就會打開新的容器并開始處理它們。為了達到低延遲的特性,他不會造成空閑的watermark事件處理。并且能夠提供一個高度并行化的Pipline進行高度并行化的計算。

[ATC 17] StreamBox: 面向多核機器上的針對Records的無序到達的實時流處理系統

        第一個規則就是Transform必須順序地消費watermark。Transform必須在處理完1:10之後,才能處理1:20的watermark。Transform必須消費一個epoch中的所有記錄,然後才能消費epoch的watermark,并且重新整理Transform的内部狀态,生成新的watermark。

[ATC 17] StreamBox: 面向多核機器上的針對Records的無序到達的實時流處理系統

        第二個規則Transform計算必須尊重每個epoch的邊界。也就是說一旦一個記錄被配置設定到一個epoch時,這個record永遠不會改變相應的epoch,因為這種變化可能會違背watermark的保證。

[ATC 17] StreamBox: 面向多核機器上的針對Records的無序到達的實時流處理系統

        作者的解決方案就是使用一個資料結構:級聯容器。每個Cascading Container對應一個相應的epoch。它們跟蹤SS一個epoch的狀态,以及記錄和watermark之間的關系。并且它們排程工作線程以消耗watermark和records。

[ATC 17] StreamBox: 面向多核機器上的針對Records的無序到達的實時流處理系統

        由于每個Transform操作含有多個Container,是以每個container對應相應的下遊container。那麼為什麼要這樣去設計?這是因為記錄/水印需要通過這個連接配接流經Pipeline。

        并且Transform計算必須尊重epoch的邊界。也就是說每一個Container的bundles,必須發往相應的下遊Contaniner容器進行計算,而不能發往其他的Contaniner,進而造成不必要的計算。

[ATC 17] StreamBox: 面向多核機器上的針對Records的無序到達的實時流處理系統

        然後就是在container内的所有記錄都被處理後,水印将被處理,這個将對應第一個規則,Transform必須順序的消費watermark。接下來就是watermark必須被順序的處理,比如watermark 20:00被處理,必須在watermark15:00之前被處理。

[ATC 17] StreamBox: 面向多核機器上的針對Records的無序到達的實時流處理系統

            最後就是所有的container中的記錄都是并行的處理,是以說StreamBox能夠并行的處理所有的Transfomr中的epoch。

        接下來是StreamBox的總體設計,一個Pipeline有多個Transform。并且每個Transform含有多個Container,recodrs/watermarks流動必須通過這些連結。是以為了做到這些,我們需要有一個高度并行的Pipline。

[ATC 17] StreamBox: 面向多核機器上的針對Records的無序到達的實時流處理系統

5. Evaluation

        作者通過設計一組應用對StreamBox進行測試,作者發現StreamBox有着很好的吞吐量和可擴充性。

[ATC 17] StreamBox: 面向多核機器上的針對Records的無序到達的實時流處理系統

        作者将StreamBox和存在的現有的實時流處理引擎進行比較,發現StreamBox由于多核并行體系架構相比如現有的未批處理引擎有着很好的性能。

[ATC 17] StreamBox: 面向多核機器上的針對Records的無序到達的實時流處理系統

        作者發現StreamBox對于處理無序的記錄有着很好的吞吐量的性能。

[ATC 17] StreamBox: 面向多核機器上的針對Records的無序到達的實時流處理系統

6. Conclusion

        StreamBox通過使用所有CPU核心并行處理任何時期的任何記錄實作高吞吐量和低延遲,并且數百萬記錄每秒吞吐量,與具有數百個CPU核心的群集上的分布式引擎相當。StreamBox幾十毫秒的延遲,比其他大型引擎短20倍。

繼續閱讀