天天看點

Spark Streaming + Spark SQL 實作配置化ETL流程

<a href="https://github.com/allwefantasy/streamingpro">項目位址</a>

傳統的spark streaming程式需要:

建構streamingcontext

設定checkpoint

連結資料源

各種transform

foreachrdd 輸出

通常而言,你可能會因為要走完上面的流程而建構了一個很大的程式,比如一個main方法裡上百行代碼,雖然在開發小功能上足夠便利,但是複用度更方面是不夠的,而且不利于協作,是以需要一個更高層的開發包提供支援。

如何開發一個spark streaming程式

我隻要在配置檔案添加如下一個job配置,就可以作為标準的的spark streaming 程式送出運作:

上面的配置相當于完成了如下的一個流程:

從kafka消費資料

将kafka資料轉化為表

通過sql進行處理

列印輸出

是不是很簡單,而且還可以支援熱加載,動态添加job等

該實作的特性有:

配置化

支援多job配置

支援各種資料源子產品

支援通過sql完成資料處理

支援多種輸出子產品

未來可擴充的支援包含:

動态添加或者删除job更新,而不用重新開機spark streaming

支援storm等其他流式引擎

更好的多job互操作

該實作完全基于serviceframeworkdispatcher 完成,核心功能大概隻花了三個小時。

這裡我們先理出幾個概念:

spark streaming 定義為一個app

每個action定義為一個job.一個app可以包含多個job

配置檔案結構設計如下:

一個完整的app 對應一個配置檔案。每個頂層配置選項,如job1,job2分别對應一個工作流。他們最終都會運作在一個app上(spark streaming執行個體上)。

strategy 用來定義如何組織 compositor,algorithm, ref 的調用關系

algorithm作為資料來源

compositor 資料處理鍊路子產品。大部分情況我們都是針對該接口進行開發

ref 是對其他job的引用。通過配合合适的strategy,我們将多個job組織成一個新的job

每個元件( compositor,algorithm, strategy) 都支援參數配置

那對應的子產品是如何實作的?本質是将上面的配置檔案,通過已經實作的子產品,轉化為spark streaming程式。

以sqlcompositor 的具體實作為例:

上面的代碼就完成了一個sql子產品。那如果我們要完成一個自定義的.map函數呢?可類似下面的實作:

同理你可以實作filter,repartition等其他函數。

該方式提供了一套更為高層的api抽象,使用者隻要關注具體實作而無需關注spark的使用。同時也提供了一套配置化系統,友善建構資料處理流程,并且複用原有的子產品,支援使用sql進行資料處理。

這個隻是我們大系統的一小部分,願意和我們一起進一步完善該系統麼?歡迎加入我們(請私信我)。

繼續閱讀