天天看點

利用Spark Streaming實作分布式采集系統

前兩天我剛在自己的一篇文章中鼓吹資料天生就是流式的,并且指出:

<b>批量計算已經在慢慢退化,未來必然是屬于流式計算的,資料的流動必定是由資料自己驅動流轉的。</b>

而spark streaming 在上層概念上,完美融合了批量計算和流式計算,讓他們你中有我,我中有你,這種設計使得spark streaming 作為流式計算的一個載體,同時也能作為其他一些需要分布式架構的問題提供解決方案。

天然就是分布式的,不用再為實作分布式協調而蛋疼

基于task的任務執行機制,可随意控制task數量

無需關注機器,是面向資源的,使得部署變得異常簡單,申明資源,送出,over

內建完善的輸入輸出,包括hdfs/kafka/elasticsearch/hbase/mysql/redis 等等,這些都無需自己去搞

成熟簡單的算子讓你對資料的處理變得異常簡單

現在以标題中的采集系統為例,整個事情你隻要實作采集邏輯,至于具體中繼資料讀取,結果存儲到哪都可能隻要個簡單配置或者利用現成的元件,最後部署也隻要簡單申明下資源就可以在一個可以彈性擴充的叢集上。

關于這塊的理念,可參考 

<a href="https://yq.aliyun.com/articles/60235?spm=5176.8091938.0.0.9tgxbg">看不到伺服器的年代,一個新的時代</a>

<a href="https://yq.aliyun.com/articles/60239?spm=5176.8091938.0.0.9tgxbg">transformer架構解析</a>

<a href="https://yq.aliyun.com/articles/60243?spm=5176.8091938.0.0.9tgxbg">spark streaming 妙用之實作工作流排程器</a>

目前這個采集系統主要是為了監控使用。但凡一個公司,或者部門内部會有大量的開源系統,每個開源元件都會提供大緻三類輸出:

标準的metrics 輸出,友善你內建到gangila等監控系統上

web ui,比如spark,storm,hbase 都提供了自己的web界面等

rest 接口,主要是 json,xml,字元串

但是對于監控來說,前面兩個直覺易用,但是也都有比較大的問題:

metrics 直接輸出到監控系統,就意味着沒辦法定制,如果我希望把多個名額放在一塊,這個可能就很難做到。

web ui 則需要人去看了

相反,rest 接口最為靈活,但是需要自己做寫邏輯,比如擷取資料,處理,然後做自己的呈現 。問題來了,如果我現在有幾千個rest接口的資料要擷取,并且需要一個很友善的手段抽取裡面要的值(或者名額)。這便涉及到了兩個問題:

收集的接口可能非常多,如何讓收集程式是可很橫向擴充的?

接口傳回的資料形态各異,如何提供一個友善一緻的模型,讓使用者簡單通過一個配置就可以抽取出裡面的内容?

利用Spark Streaming實作分布式采集系統

[email protected]

采集中繼資料源,目前存儲在es裡

采集系統會定時到es裡擷取中繼資料,并且執行特定的收集邏輯

通過采集系統的一定的算子,将資料格式化,接入kafka

通過标準(已經存在的)etl系統完成資料的處理,供後續流程進一步處理

回到上面的一個問題,

<b>接口傳回的資料形态各異,如何提供一個友善一緻的模型,讓使用者簡單通過一個配置就可以抽取出裡面的内容</b>

rest 接口傳回的資料,無非四種:

html

json

xml

text

對于1,我們先不探讨。對于json,xml 我們可以采用 xpath,對于text我們可以采用标準的正則或者etl來進行抽取。

我們在定義一個需要采集的url時,需要同時配置需要采集的名額以及對應的名額的xpath路徑或者正則。當然也可以交給後端的etl完成該邏輯。不過我們既然已經基于spark streaming做采集系統,自然也可以利用其強大的資料處理功能完成必要的格式化動作。是以我們建議在采集系統直接完成。

資料源的一個可能的資料結構:

采集系統通過我們封裝的一個 dinputstream,然後根據batch(排程周期),擷取這些資料,之後交給特定的執行邏輯去執行。采用streamingpro,會是這樣:

通過上面的配置檔案,可以很好看到處理流程。

輸入采集源

采集結果

根據xpath 抽取名額

輸出結果

中繼資料管理系統是必要的,他可以友善你添加新的url監控項。通過streamingpro,你可以在spark streaming 的driver中添加中繼資料管理頁面,實作對中繼資料的操作邏輯。我們未來會為 如何通過streamingpro 給spark streaming 添加自定義rest 接口/web頁面提供更好的教程。

上面其實已經是試下了一個采集系統的雛形,得益于spark streaming天然的分布式,以及靈活的算子,我們的系統是足夠靈活,并且可橫向擴充。

然而你會發現,

如果我需要每個接口有不同的采集周期該如何?

如果我要實作更好的容錯性如何?

如何實作更好的動态擴容?

第一個問題很好解決,我們在中繼資料裡定義采集周期,而spark streaming的排程周期則設定為最小粒度。

第二個問題容錯性屬于業務層面的東西,但是如果有task失敗,spark streaming也會把你嘗試重新排程和重試。我們建議由自己來完成。

第三個,隻要開啟了 dynamic resource allocation,則能夠根據情況,實作資源的伸縮利用。

文/祝威廉(簡書作者)

原文連結:http://www.jianshu.com/p/694fda15b304

著作權歸作者所有,轉載請聯系作者獲得授權,并标注“簡書作者”。

繼續閱讀