Flink是标準的實時處理引擎,而且Spark的兩個子產品Spark Streaming和Structured Streaming都是基于微批處理的,不過現在Spark Streaming已經非常穩定基本都沒有更新了,然後重點移到spark sql和structured Streaming了。
Flink作為一個很好用的實時處理架構,也支援批處理,不僅提供了API的形式,也可以寫sql文本。這篇文章主要是幫着大家對于Structured Streaming和Flink的主要不同點。
1. 運作模型
Structured Streaming 的task運作也是依賴driver 和 executor,當然driver和excutor也還依賴于叢集管理器Standalone或者yarn等。可以用下面一張圖概括:

Structured Streaming 周期性或者連續不斷的生成微小dataset,然後交由Spark SQL的增量引擎執行,跟Spark Sql的原有引擎相比,增加了增量處理的功能,增量就是為了狀态和流表功能實作。由于是也是微批處理,底層執行也是依賴Spark SQL的。
Flink 中的執行圖可以分成四層:StreamGraph-> JobGraph -> ExecutionGraph -> 實體執行圖。細分:
StreamGraph: 是根據使用者通過 Stream API 編寫的代碼生成的最初的圖。用來表示程式的拓撲結構。
JobGraph: StreamGraph經過優化後生成了JobGraph,送出給 JobManager 的資料結構。主要的優化為,将多個符合條件的節點 chain 在一起作為一個節點,這樣可以減少資料在節點之間流動所需要的序列化/反序列化/傳輸消耗。這個可以用來建構自己的自己的叢集任務管理架構。
ExecutionGraph: JobManager 根據 JobGraph 生成的分布式執行圖,是排程層最核心的資料結構。
實體執行圖: JobManager 根據ExecutionGraph 對 Job 進行排程後,在各個TaskManager 上部署 Task 後形成的“圖”,并不是一個具體的資料結構。
2. 程式設計風格
兩者的程式設計模型基本一緻吧,都是鍊式調用。
3. 時間概念
三種處理時間:事件時間,注入時間,處理時間。
Flink支援三種時間,同時flink支援基于事件驅動的處理模型,同時在聚合等算子存在的時候,支援狀态逾時自動删除操作,以避免7*24小時流程式計算狀态越來越大導緻oom,使得程式挂掉。
Structured Streaming僅支援事件時間,處理時間。
對于基于事件時間的處理flink和Structured Streaming都是支援watemark機制,視窗操作基于watermark和事件時間可以對滞後事件做相應的處理,雖然聽起來這是個好事,但是整體來說watermark就是雞肋,它會導緻結果資料輸出滞後,比如watermark是一個小時,視窗一個小時,那麼資料輸出實際上會延遲兩個小時,這個時候需要進行一些處理。
4. 維表實作及異步io
Structured Streaming不直接支援與維表的join操作,但是可以使用map、flatmap及udf等來實作該功能,所有的這些都是同步算子,不支援異步IO操作。但是Structured Streaming直接與靜态資料集的join,可以也可以幫助實作維表的join功能,當然維表要不可變。
Flink也不支援與維表進行join操作,除了map,flatmap這些算子之外,flink還有異步IO算子,可以用來實作維表,提升性能。
5. 狀态管理
狀态維護應該是流處理非常核心的概念了,比如join,分組,聚合等操作都需要維護曆史狀态,那麼flink在這方面很好,structured Streaming也是可以,但是spark Streaming就比較弱了,隻有個别狀态維護算子upstatebykye等,大部分狀态需要使用者自己維護,雖然這個對使用者來說有更大的可操作性和可以更精細控制但是帶來了程式設計的麻煩。flink和Structured Streaming都支援自己完成了join及聚合的狀态維護。
Structured Streaming有進階的算子,使用者可以完成自定義的mapGroupsWithState和flatMapGroupsWithState,可以了解類似Spark Streaming 的upstatebykey等狀态算子。
就拿mapGroupsWithState為例:
由于Flink與Structured Streaming的架構的不同,task是常駐運作的,flink不需要狀态算子,隻需要狀态類型的資料結構。
首先看一下Keyed State下,我們可以用哪些原子狀态:
ValueState:即類型為T的單值狀态。這個狀态與對應的key綁定,是最簡單的狀态了。它可以通過update方法更新狀态值,通過value()方法擷取狀态值。
ListState:即key上的狀态值為一個清單。可以通過add方法往清單中附加值;也可以通過get()方法傳回一個Iterable<T>來周遊狀态值。
ReducingState:這種狀态通過使用者傳入的reduceFunction,每次調用add方法添加值的時候,會調用reduceFunction,最後合并到一個單一的狀态值。
FoldingState:跟ReducingState有點類似,不過它的狀态值類型可以與add方法中傳入的元素類型不同(這種狀态将會在Flink未來版本中被删除)。
MapState:即狀态值為一個map。使用者通過put或putAll方法添加元素。
6. join操作
flink的join操作沒有大的限制,支援種類豐富,比:
Inner Equi-join
- join可以傳遞。比如df1.join(df2).join(df3)。
- 從spark2.3開始,隻有在輸出模式為append的流查詢才能使用join,其他輸出模式暫不支援。
- 從spark2.3開始,在join之前不允許使用no-map-like操作。以下是不能使用的例子。
- 在join之前不能使用流聚合操作。
- 在join之前,無法在update模式下使用mapGroupsWithState和flatMapGroupsWithState。
7. 觸發處理模型
這個之是以講一下差別,實際緣由也很簡單,Structured Streaming以前是依據spark的批處理起家的實時處理,而flink是真正的實時處理。那麼既然Structured Streaming是批處理,那麼問題就簡單了,批次執行時間和執行頻率自然是有限制的,就産生了多種觸發模型,簡單稱其為triggers。Strucctured Streaming的triggers有以下幾種形式:
1). 支援單次觸發處理,類似于flink的批處理。
Trigger.Once()顧名思義這個僅處理一次,類似于flink的批處理。
2). 周期性觸發處理
Trigger.ProcessingTime("2 seconds")
查詢将以微批量模式執行,其中微批次将以使用者指定的間隔啟動:
a).如果先前的微批次在該間隔内完成,則引擎将等待該間隔結束,然後開始下一個微批次。
b).如果前一個微批次需要的時間超過完成的時間間隔(即如果錯過了區間邊界),那麼下一個微批次将在前一個完成後立即開始(即,它不會等待下一個間隔邊界))。
c).如果沒有可用的新資料,則不會啟動微批次。
3). 連續處理
指定一個時間間隔
Trigger.Continuous("1 second")
這個1秒鐘表示每秒鐘記錄一次連續處理查詢進度。
4). 預設觸發模型
一個批次執行結束立即執行下個批次。
Flink的觸發模式很簡單了,一旦啟動job一直執行處理,不存在各種觸發模式,當然假如視窗不算的話。
8. 表管理
flink和structured streaming都可以講流注冊成一張表,然後使用sql進行分析,不過兩者之間差別還是有些的。
Structured Streaming将流注冊成臨時表,然後用sql進行查詢,操作也是很簡單跟靜态的dataset/dataframe一樣。
其實,此處回想Spark Streaming 如何注冊臨時表呢?在foreachRDD裡,講rdd轉換為dataset/dataframe,然後将其注冊成臨時表,該臨時表特點是代表目前批次的資料,而不是全量資料。Structured Streaming注冊的臨時表就是流表,針對整個實時流的。Sparksession.sql執行結束後,傳回的是一個流dataset/dataframe,當然這個很像spark sql的sql文本執行,是以為了差別一個dataframe/dataset是否是流式資料,可以df.isStreaming來判斷。
當然,flink也支援直接注冊流表,然後寫sql分析,sql文本在flink中使用有兩種形式:
對于第一種形式,sqlQuery執行結束之後會傳回一張表也即是Table對象,然後可以進行後續操作或者直接輸出,如:result.writeAsCsv("");。
而sqlUpdate是直接将結果輸出到了tablesink,是以要首先注冊tablesink,方式如下:
flink系統資料庫的形式比較多,直接用資料源系統資料庫,如:
也可以從datastream轉換成表,如:
9. 監控管理
對于Structured Streaming一個SparkSession執行個體可以管理多個流查詢,可以通過SparkSession來管理流查詢,也可以直接通過start調用後傳回的StreamingQueryWrapper對象來管理流查詢。
SparkSession.streams擷取的是一個StreamingQueryManager,然後通過start傳回的StreamingQueryWrapper對象的id就可以擷取相應的流查詢狀态和管理相應的流查詢。當然,也可以直接使用StreamingQueryWrapper來做這件事情,由于太簡單了,我們就不貼了可以直接在源碼裡搜尋該類。
對與Structured Streaming的監控,當然也可以使用StreamingQueryWrapper對象來進行健康監控和告警
其中,有些對象内部有更詳細的監控名額,比如lastProgress,這裡就不詳細展開了。
還有一種監控Structured Streaming的方式就是自定義StreamingQueryListener,然後監控名額基本一樣。注冊的話直接使用
spark.streams.addListener(new StreamingQueryListener())即可。
Flink的管理工具新手的話主要建議是web ui ,可以進行任務送出,job取消等管理操作,監控的話可以看執行圖的結構,job的執行狀态,背壓情況等。
當然,也可以通過比如flink的YarnClusterClient用戶端對jobid進行狀态查詢,告警,啟動,停止等操作。
總結
除了以上描述的這些内容,可能還關心kafka結合的時候新增topic或者分區時能否感覺,實際上兩者都能感覺,初次之外。flink還有很多特色,比如資料回流,分布式事務支援,分布式快找,異步增量快照,豐富的windows操作,側輸出,複雜事件處理等等。
對于視窗和join,兩者差別還是很大,限于篇幅問題後面浪尖會分别給出講解。
flink是一個不錯的流處理架構,雖然目前還有些bug和待完善的部分。
原文位址:https://mp.weixin.qq.com/s/F7jHlcc-91bUbCNx50hXww