天天看點

Spark 2.0 Structured Streaming 分析

spark 2.0 将流式計算也統一到dataframe裡去了,提出了structured streaming的概念,将資料源映射為一張無線長度的表,同時将流式計算的結果映射為另外一張表,完全以結構化的方式去操作流式資料,複用了其對象的catalyst引擎。

作為spark平台的流式實作,spark streaming 是有單獨一套抽象和api的,大體如下

Spark 2.0 Structured Streaming 分析

圖檔來源于spakr官網

代碼的形态如下:

上面都是套路,基本都得照着這麼寫。

spark 2.0 時代

概念上,所謂流式,無非就是無限大的表,官方給出的圖一目了然:

Spark 2.0 Structured Streaming 分析

圖檔來源于官網

在之前的宣傳ppt裡,有類似的代碼,給人煥然一新的感覺。當然,下面的代碼你肯定要有上下文的,就這一句肯定跑不起來的。

Spark 2.0 Structured Streaming 分析

圖檔來源于http://litaotao.github.io/images/spark-2.0-7.png

第一個是标準的dataframe的使用代碼。下面第二個則是流式計算的代碼,看完這個demo你肯定會納悶:

沒有定時器麼,我怎麼設定duration?

在哪裡設定awaittermination呢?

如果我要寫入到其他引擎,而其他引擎沒有适配咋辦?

這些疑問其實歸結起來就是:

<b>structured streaming  的完整套路是啥?</b>

我們來看看代碼(例子來源于spark源碼,我稍微做了些修改):

這個就是structured streaming 的完整套路了。

structured streaming  目前source源隻支援file 和 socket 兩種。輸出則是四種,前面已經提到。foreach則是可以無限擴充的。我舉個例子:

我把資料最後寫到各個節點的臨時目錄裡。當然,這隻是個例子,不過其他類似于寫入redis的,則是類似的。

如果structured streaming 僅僅是換個api,或者能夠支援dataframe操作,那麼我隻能感到遺憾了,因為2.0之前通過某些封裝也能夠很好的支援dataframe的操作。那麼 structured streaming 的意義到底何在?

重新抽象了流式計算

易于實作資料的exactly-once

我們知道,2.0之前的spark streaming 隻能做到at-least once,架構層次很難幫你做到exactly-once,參考我以前寫的文章spark streaming crash 如何保證exactly once semantics。 現在通過重新設計了流式計算架構,使得實作exactly-once 變得容易了。

可能你會注意到,在structured streaming 裡,多出了outputmode,現在有complete,append,update 三種,現在的版本隻實作了前面兩種。

complete,每次計算完成後,你都能拿到全量的計算結果。

append,每次計算完成後,你能拿到增量的計算結果。

但是,這裡有個但是,使用了聚合類函數才能用complete模式,隻是簡單的使用了map,filter等才能使用append模式。 不知道大家明白了這裡的含義麼?

complete 就是我們前面提到的mapwithstate實作。 append 模式則是标準的對資料做解析處理,不做複雜聚合統計功能。

官方給出了complete 模式的圖:

Spark 2.0 Structured Streaming 分析

append 模式則是傳回transform後最新的資料。

前面我們說到,現在的設計很簡單,其實就是 無限大的 source table 映射到一張無限大的 result table上,每個周期完成後,都會更新result table。我們看到,structured streaming 已經接管了端到端了,可以通過内部機制保證資料的完整性,可靠性。

offset 概念,流式計算一定有offset的概念。

對于無法回溯的資料源則采用了wal日志

state概念,對result table 的每個分區都進行狀态包裝,分區的的每個add,put,update,delete操作,都會寫入到hdfs上,友善系統恢複。

其中第三點是隻有在2.0才有的概念。不過比較遺憾的是,result table 和foreachwriter 并沒有什麼結合,系統隻是保證result table的完整性,通過hdfsbackedstatestoreprovider将result table 儲存到hdfs。

以前的api就是給你個partition的iterator,你愛怎麼玩怎麼玩,但是到了現在,以foreachwriter為例,

資料你隻能一條一條處理了。理論上如果假設正好在process的過程中,系統挂掉了,那麼資料就會丢了,但因為 structured streaming  如果是complete模式,因為是全量資料,是以其實做好覆寫就行,也就說是幂等的。

如果是append 模式,則可能隻能保證at-least once ,而對于其内部,也就是result table 是可以保證exactly-once 的。對于比如資料庫,本身是可以支援事物的,可以在foreachwrite close的時候commit下,有任何失敗的時候則在close的時候,rollback 就行。但是對于其他的,比如hbase,redis 則較為困難。

另外在foreachwriter提供的初始化函數,

傳回值是boolean,通過檢測版本号,是否跳過這個分區的資料處理。傳回true是為不跳過,否則為跳過。當你打開的時候,可以通過某種手段儲存version,再系統恢複的時候,則可以讀取該版本号,低于該版本的則傳回false,目前的則繼續處理。