<a href="https://github.com/allwefantasy/streamingpro">項目位址</a>
傳統的spark streaming程式需要:
建構streamingcontext
設定checkpoint
連結資料源
各種transform
foreachrdd 輸出
通常而言,你可能會因為要走完上面的流程而建構了一個很大的程式,比如一個main方法裡上百行代碼,雖然在開發小功能上足夠便利,但是複用度更方面是不夠的,而且不利于協作,是以需要一個更高層的開發包提供支援。
如何開發一個spark streaming程式
我隻要在配置檔案添加如下一個job配置,就可以作為标準的的spark streaming 程式送出運作:
上面的配置相當于完成了如下的一個流程:
從kafka消費資料
将kafka資料轉化為表
通過sql進行處理
列印輸出
是不是很簡單,而且還可以支援熱加載,動态添加job等
該實作的特性有:
配置化
支援多job配置
支援各種資料源子產品
支援通過sql完成資料處理
支援多種輸出子產品
未來可擴充的支援包含:
動态添加或者删除job更新,而不用重新開機spark streaming
支援storm等其他流式引擎
更好的多job互操作
該實作完全基于serviceframeworkdispatcher 完成,核心功能大概隻花了三個小時。
這裡我們先理出幾個概念:
spark streaming 定義為一個app
每個action定義為一個job.一個app可以包含多個job
配置檔案結構設計如下:
一個完整的app 對應一個配置檔案。每個頂層配置選項,如job1,job2分别對應一個工作流。他們最終都會運作在一個app上(spark streaming執行個體上)。
strategy 用來定義如何組織 compositor,algorithm, ref 的調用關系
algorithm作為資料來源
compositor 資料處理鍊路子產品。大部分情況我們都是針對該接口進行開發
ref 是對其他job的引用。通過配合合适的strategy,我們将多個job組織成一個新的job
每個元件( compositor,algorithm, strategy) 都支援參數配置
那對應的子產品是如何實作的?本質是将上面的配置檔案,通過已經實作的子產品,轉化為spark streaming程式。
以sqlcompositor 的具體實作為例:
上面的代碼就完成了一個sql子產品。那如果我們要完成一個自定義的.map函數呢?可類似下面的實作:
同理你可以實作filter,repartition等其他函數。
該方式提供了一套更為高層的api抽象,使用者隻要關注具體實作而無需關注spark的使用。同時也提供了一套配置化系統,友善建構資料處理流程,并且複用原有的子產品,支援使用sql進行資料處理。
這個隻是我們大系統的一小部分,願意和我們一起進一步完善該系統麼?歡迎加入我們(請私信我)。