天天看點

Flink流處理(三)- 資料流操作

3. 資料流操作

流處理引擎一般會提供一組内置的操作,用于對流做消費、轉換,以及輸出。接下來我們介紹一下最常見的流操作。

操作分為無狀态的(stateless)與有狀态的(stateful)。無狀态的操作不包含任何内部狀态。也就是說,處理此event時,并不需要任何其他曆史event的資訊,也不需要儲存它自己的資訊。無狀态的操作易于并行,因為events可以以它們到達的順序,互相獨立的被處理。在出現錯誤時,無狀态operator可以被簡單的重新執行,從它丢失資料的點開始繼續執行即可。

有狀态的operator可能會維護之前它處理過的events的資訊。它的狀态資訊(state)會根據到達的events進行更新,并且被用于處理之後events的邏輯。有狀态的流處理應用對于并行來說會更為複雜,并且需要以容錯的方式運作。因為它的狀态需要高效的分區(partitioned),并能夠在發生錯誤後得到穩定的恢複。

資料消費與輸出

資料消費與輸出的操作使得流處理器可以與外部系統進行通信。資料消費是指:從外部資料源擷取raw data,然後轉換成适用于處理的格式。實作了資料消費邏輯的operator稱為data sources,它可以消費如 TCP socket 的資料、檔案、Kafka topic 的資料等。資料輸出(egress)是生成output的操作,它将資料以适合外部系統處理的格式輸出。實作資料輸出邏輯的operator稱為data sink。

轉換操作

轉換操作是一個單次操作,每次單獨的處理一個event。用于将event 做某些變換後再輸出一個新的流資料。轉換邏輯可以整合在operator中,或是由使用者定義的方法提供。如下圖所示:

Flink流處理(三)- 資料流操作

Operators 可以接收多個inputs并産生多個輸出流。它們也可以用于修改dataflow graph 的結構,例如将流split為多個流,亦或是将多個流整合為一個流。

滾動聚合(rolling aggregation)

Rolling aggregation 是一個聚合操作,例如sum,minimum以及maximum。它會根據輸入的event,對結果做持續的更新。聚合操作是有狀态的,它将輸入的資料與目前的狀态資訊(state)進行整合,再産生一個更新後的聚合值。為了高效地與目前狀态進行整合,并輸出一個single value,聚合操作必須滿足結合律(associative)與交換律(commutative)。否則 operator 需要存儲整個流的曆史記錄。下圖是一個滾動聚合求最小值的示例,它持有目前最小值,并根據輸入的events更新目前最小值:

Flink流處理(三)- 資料流操作

視窗操作

轉換與滾動聚合每次處理一個event并産生一個output event,繼而(有可能)更新狀态。然而,某些操作需要收集并緩存一些記錄後再計算它們的結果。例如求中值函數,這個操作需要對多條資料聚合做處理,但是流是無限的,對此,我們需要限制此操作維護的資料量大小。此功能由視窗(window)操作提供。

考慮這麼一個場景:應用為司機提供實時的路況資訊。這裡,我們需要知道在某個地段,前幾分鐘内是否有有擁堵或是事故)。如果僅是對流曆史記錄做一個單聚合(single aggregate),則會損失資料随時間變化的資訊。例如,你可能想知道每5分鐘内有多少個自行車穿過某交叉路口。

Window操作會持續地從一個無限流中,建立events的有限子集(稱為buckets),使得我們可以在這些有限集合上做計算。Events 通常是根據資料的屬性或是時間,被配置設定到buckets中。為了更好地定義window operator,我們先了解一下events是如何配置設定給buckets、以及windows是如何産生一個result的。Window 的行為由一組政策定義。Window 政策決定了什麼時候建立bucket、哪個event被配置設定到哪個bucket中、以及bucket裡的内容什麼時候被評估(evaluate)。對于何時評估,這個基于觸發條件。當滿足某個出發條件時,bucket裡的内容會被發往一個評估函數(evaluation function),此方法會對bucket裡的元素進行計算。評估函數可以是聚合操作(例如求sum、最小值)、或是使用者自定義的操作。決策可以是基于時間的(例如在最近5秒内收到的events),也可以是基于數量的(例如,最近收到的100個events),亦或是基于資料的屬性。下面我們介紹一下常見的視窗類型。

滾動(tumbling)視窗:配置設定events到不重疊的固定大小的buckets中。在超出window的邊界後,所有在window内的的事件會被發往到評估函數做處理。基于數量的(count-based )滾動視窗定義了:在收集多少個events後,開始觸發評估。圖2-6顯示了一個count-based滾動視窗,将輸入流分散到四個events一組的buckets中。

   

Flink流處理(三)- 資料流操作

    基于時間的滾動視窗定義了:以時間周期分隔,在一個時間周期内的events會被緩存到bucket 中。

       圖 2-7 展示了一個基于時間的滾動視窗,将events收集到buckets中,每10分鐘觸發一次計算。

Flink流處理(三)- 資料流操作

2. 滑動視窗:配置設定events到可重疊的固定大小的buckets中。也就是說,一個event可能屬于多個buckets。我們在定義一個滑動視窗時,需要提供兩個變量:長度(length)和步長(slide)。Slide的值決定了新bucket建立的間隔。下圖是一個基于數量(count-based)的滑動視窗,長度為4個events,slide為3個events:

Flink流處理(三)- 資料流操作

3. 會話(session)視窗:會話視窗在某些實際場景中會比滾動視窗與滑動視窗更适用。考慮這樣一個場景:一個應用需要分析線上使用者的行為。這裡我們需要聚合在一個session内,某個使用者的所有事件。Session 由一系列連續的事件組成,并且在連續事件之後,會有一段無事件時間。例如,使用者浏覽新聞時,點選不同的頁面,可以被看作一個session。因為一個session 的長度并無法預先定義,而是取決于實際的資料。是以滾動已經滑動視窗并不适用于此場景。我們需要的是一個windows操作可以将所有屬于同一個session 的事件,分發到同一個bucket中。會話視窗可以根據一個“會話間隔”(session gap)定義一個session的過期時間。在到達過期時間後,一個會話視窗即被關閉。下圖展示了一個會話視窗:

Flink流處理(三)- 資料流操作

到目前為止,我們看到的所有視窗類型都是應用于整個流。但是在實際場景中,可能需要将一個流分為多個邏輯上的流,并在之上使用并行window。例如,假設我們收到的資料源來自于各個不同的傳感器,我們可能需要通過傳感器的ID先對stream做整合,然後再在之上應用視窗計算。在并行視窗(parallel windows)中,每個分區(partition)均完全獨立地應用它特定的視窗政策。下圖展示了一個基于計數的滾動視窗,長度為2,通過event 顔色分區:

Flink流處理(三)- 資料流操作

在流進行中有兩個十分重要的概念:時間語義(time semantics)以及狀态管理(state management)。視窗操作與這兩個概念關系密切。時間可能是流進行中最重要的方面。盡管低延時是流進行中非常棒的一個特性,但是它的實際延遲值已經遠超出了快速分析(just fast analytics)的延遲。流處理在實際系統、網絡、以及通信信道(communication channel)中還不夠完善,并且流資料經常會延遲到達,或是亂序到達。這裡很重要的一點是:在這些情況下,如何傳遞出精準、明确(deterministic)的結果。除此之外,流處理應用除了處理目前産生的event外,還應具備處理曆史events的能力,這可以實作流的離線分析,甚至是時間穿梭分析(time travel analyses)。當然,如果你的系統無法保證對狀态資訊的容錯,則這些功能均毫無意義。到目前為止,我們提到的所有視窗類型都需要先将資料緩存,然後再應用計算并産生結果。實際上,如果你想在一個流應用上計算任何感興趣的資訊,即使是一個簡單的計數(count),也需要維護狀态資訊(state)。假設一個流處理應用可能會跑幾天,幾個月,甚至幾年,我們需要確定在任何故障發生後,state能可靠地被恢複,并且系統需要確定仍能提供準确的結果。下面我們會介紹在流進行中time 的概念,以及在發生錯誤情況下的state guarantees。

References:

Vasiliki Kalavri, Fabian Hueske. Stream Processing With Apache Flink. 2019