天天看點

Spark Streaming 流式計算實戰

這次分享會比較實戰些。具體業務場景描述:

我們每分鐘會有幾百萬條的日志進入系統,我們希望根據日志提取出時間以及使用者名稱,然後根據這兩個資訊形成

username/year/month/day/hh/normal 

username/year/month/day/hh/delay

路徑,存儲到hdfs中。如果我們發現日志産生的時間和到達的時間相差超過的一定的門檻值,那麼會放到 delay 目錄,否則放在正常的 normal 目錄。

為什麼這裡不使用 storm呢? 我們初期确實想過使用 storm 去實作,然而使用 storm 寫資料到hdfs比較麻煩:

* storm 需要持有大量的 hdfs 檔案句柄。需要落到同一個檔案裡的記錄是不确定什麼時候會來的,你不能寫一條就關掉,是以需要一直持有。 * 需要使用hdfs 的寫檔案的 append 模式,不斷追加記錄。

大量持有檔案句柄以及在什麼時候釋放這些檔案句柄都是一件很困難的事情。另外使用 hdfs 的追加内容模式也會有些問題。

後續我們就調研 spark streaming 。 spark streaming 有個好處,我可以攢個一分鐘處理一次即可。這就意味着,我們可以隔一分鐘(你當然也可以設定成五分鐘,十分鐘)批量寫一次叢集,hdfs 對這種形态的檔案存儲還是非常友好的。這樣就很輕易的解決了 storm 遇到的兩個問題。

實時性也能得到保證,譬如我的 batch interval 設定為 一分鐘 那麼我們就能保證一分鐘左右的延遲 ,事實上我們的業務場景是可以容忍半小時左右的。

當然,spark 處理完資料後,如何落到叢集是比較麻煩的一件事情,不同的記錄是要寫到不同的檔案裡面去的,沒辦法簡單的 saveastextfile 就搞定。這個我們通過自定義 partitioner 來解決,第三個環節會告訴大家具體怎麼做。 

上面大家其實可以看到 spark streaming 和 storm 都作為流式處理的一個解決方案,但是在不同的場景下,其實有各自适合的時候。 

我們的資料來源是kafka ,我們之前也有應用來源于 hdfs檔案系統監控的,不過建議都盡量對接 kafka 。

spark streaming 對接kafka 做資料接受的方案有兩種:

*receiver-based approach  *direct approach (no receivers)

兩個方案具體優劣我專門寫了文章分析,大家晚點可以看看這個連結和 spark streaming 相關的文章。

<a href="https://yq.aliyun.com/users/1369673132990567/">我的技術博文</a>

我這裡簡單描述下:

*receiver-based approach 記憶體問題比較嚴重,因為她接受資料和處理資料是分開的。如果處理慢了,它還是不斷的接受資料。容易把負責接受的節點給搞挂了。 * direct approach 是直接把 kafka 的 partition 映射成 rdd 裡的 partition 。 是以資料還是在 kafka 。隻有在算的時候才會從 kafka 裡拿,不存在記憶體問題,速度也快。

是以建議都是用 direct approach 。具體調用方式是這樣:

Spark Streaming 流式計算實戰

還是很簡單的,之後就可以像正常的 rdd 一樣做處理了。

Spark Streaming 流式計算實戰

經過處理完成後 ,我們拿到了logs 對象 。

到這一步位置,日志的每條記錄其實是一個 tuple(path,line)  也就是每一條記錄都會被标記上一個路徑。那麼現在要根據路徑,把每條記錄都寫到對應的目錄去該怎麼做呢?

一開始想到的做法是這樣:

Spark Streaming 流式計算實戰

首先收集到所有的路徑。接着 for 循環 paths ,然後過濾再進行存儲,類似這樣:

Spark Streaming 流式計算實戰

這裡我們一般會把 rdd 給 cache 住,這樣每次都直接到記憶體中過濾就行了。但如果 path 成百上千個呢? 而且資料量一分鐘至少幾百萬,save 到磁盤也是需要時間的。是以這種方案肯定是不可行的。

我當時還把 paths 循環給并行化了,然而目前情況是 cpu 處理慢了,是以有改善,但是仍然達不到要求。

這個時候你可能會想,要是我能把每個路徑的資料都事先收集起來,得到幾個大的集合,然後把這些集合并行的寫入到 hdfs 上就好了。事實上,後面我實施的方案也确實是這樣的。所謂集合的概念,其實就是 partition 的概念。而且這在spark 中也是易于實作的,而實作的方式就是利用自定義 partioner 。具體的方式如下:

Spark Streaming 流式計算實戰

通過上面的代碼,我們就得到了路徑和 partiton id 的對應關系。接着周遊 partition 就行了。對應的 a 是分區号,b 則是分區的資料疊代器。接着做對應的寫入操作就行。這些分區寫入都是在各個 executor 上執行的,并不是在 driver 端,是以足夠快。

我簡單解釋下代碼 ,首先我把收集到的路徑 zipwithindex 這樣就把路徑和數字一一對應了 ;接着我建立了一個匿名類 實作了 partitioner 。numpartitions 顯然就是我們的路徑集合的大小,遇到一個 key (其實就是個路徑)時,則調用路徑和數字的映射關系 ,然後就把所有的資料根據路徑 hash 到不同的 partition 了 。接着周遊 partition 就行了,對應的 a 是分區号,b 則是分區的資料疊代器。接着做對應的寫入操作就行。這些分區寫入都是在各個 executor 上執行的,并不是在 driver 端,是以足夠快。我們在測試叢集上五分鐘大概 1000-2000w 資料,90顆核,180g 記憶體,平均處理時間大概是2分鐘左右。記憶體可以再降降  我估計 100g 足夠了  。

雖然 spark streaming 是作為一個24 * 7 不間斷運作的程式來設計的,但是程式都會 crash ,那如果 crash 了,會不會導緻資料丢失?會不會啟動後重複消費?

我這裡直接給出結論:

* 使用 direct approach 模式 * 啟用 checkpoint 機制

做到上面兩步,就可以保證資料至少被消費一次。

那如何保證不重複消費呢?

這個需要業務自己來保證。簡單來說,業務有兩種:

* 幂等的 * 自己保證事務

所謂幂等操作就是重複執行不會産生問題,如果是這種場景下,你不需要額外做任何工作。但如果你的應用場景是不允許資料被重複執行的,那隻能通過業務自身的邏輯代碼來解決了。

以目前場景為例,就是典型的幂等 ,因為可以做寫覆寫 ,

Spark Streaming 流式計算實戰

具體代碼如上 ,那如何保證寫覆寫呢? 

檔案名我采用了 job batch time 和 partition 的 id 作為名稱。這樣,如果假設系統從上一次失敗的 job 重新跑的時候,相同的内容會被覆寫寫,是以就不會存在重複的問題。

我們每分鐘會有幾百萬條的日志進入系統,我們希望根據日志提取出時間以及使用者名稱,然後根據這兩個資訊形成  

username/year/month/day/hh/delay 

我們作了四個方面的分析: 

spark streaming 與 storm 适用場景分析 ;

spark streaming 與 kafka 內建方案選型,我們推薦direct approach 方案 ;

自定義 partitioner 實作日志檔案快速存儲到hdfs ;

spark streaming 如何保證資料的完整性,不丢,不重 。

<b>q1. spark streaming 可以直接在後面連上 elasticsearch 麼?</b>

a1. 可以的。透露下,我馬上也要做類似的實踐。

<b>q2. 公司選用 storm 是由于它可以針對每條日志隻做一次處理,spark streaming 可以做到麼?</b>

a2.  spark streaming 是按時間周期的, 需要攢一段時間,再一次性對獲得的所有資料做處理

<b>q3. 什麼是檔案句柄?</b>

a3. hdfs 寫入 你需要持有對應的檔案的 client 。不可能來一條資料,就重新常見一個連結,然後用完就關掉。

<b>q4. spark 分析流資料,分析好的資料怎麼存到 mysql 比較好?</b>

a4. 我沒有這個實踐過存儲到 mysql。一般資料量比較大,是以對接的會是 reids/hbase/hdfs。

<b>q5. 有沒有嘗試過将資料寫入 hive?</b>

a5. 沒有。但沒有問題的。而且 spark streaming 裡也可以使用 spark sql 。我不知道這會不會有幫助。

<b>q6. 幂等是什麼?</b>

a6. 就是反複操作不會有副作用。

<b>q7. 可不可以分享一下 spark 完整的應用場景?</b>

a7. 這個有點太大。 目前 spark 覆寫了離線計算,資料分析,機器學習,圖計算,流式計算等多個領域,目标也是一個通用的資料平台,是以一般你想到的都能用 spark 解決。

<b>q8. 如何了解日志産生時間和到達時間相差超過一定的門檻值?</b>

a8. 每條日志都會帶上自己産生的時間。同時,如果這條日志到我們的系統太晚了,我們就認為這屬于延時日志。

<b>q9. 目前這套體系穩定性如何?會不會有經常d節點的情況?</b>

a9. 穩定性确實有待商榷 建議小範圍嘗試。

<b>q10. spark streaming 内部是如何設計并解決 storm 存在的兩個問題的?老師能分析一下細節嗎?</b>

a10. 這和 spark streaming 的設計是相關的。微批處理模式使得我們可以一個周期打開所有檔案句柄,然後直接寫入幾千萬條資料,然後關閉。第二個是使用 partition 并行加快寫入速度。

<b>q11. 如何應對網絡抖動導緻阻塞?</b>

a11. spark 本身有重試機制,還有各種逾時機制。

<b>q12. 怎樣保證消息的及時性?</b>

a12. 依賴于資料源,kafka,spark streaming 是否處理能力充足,沒有 delay . 所有環節都會影響消息的及時性。

<b>q13. 實際運用中,分析完的資料,本身有很大的結構關系,有時又需要對資料二次補充,處理完的資料量不大,該選哪種存儲方式?</b>

a13. 能用分布式存儲的就用分布式存儲。可以不做更新的,盡量不做更新。我一般推薦對接到 hbase 。

<b>q14. streaming 字面是流的意思,倒是課程中提到對日志有延遲的考慮,是 spark  streaming 是自定一個周期,處理周期到達的資料集合,通俗講感覺像批處理,不是每條記錄不一定要有時間戳?</b>

a14. 你了解對了。每條記錄沒有時間戳。如果有,也是日志自己帶的。spark streaming 并不會給每條記錄帶上時間。

<b>q16. storm 避免重複是依賴 zookeeper,spark streaming 靠什麼記錄處理到哪行呢?</b>

a16. 通過 checkpoint 機制,自己維護了 zookeeper 的偏移量。

<b>q17. 請問一下 spark streaming 處理日志資料的壓測結果如何呢?</b>

q17. 剛剛說了,在我們的測試叢集裡, 1000-2000w 條記錄,平均處理時間大約2分鐘,90顆核,180g 記憶體。沒有任何調優參數。理論記憶體可以繼續降低,,因為不 cache 資料 。

<b>q18. amq 與他們之間差別和聯系?</b>

a18. amq 也是消息隊列? spark streaming 支援相當多的消息隊列。

<b>q19. 國内 spark 叢集部署在哪些雲上?</b>

a19. 沒有用過雲。

<b>q21. zookeeper 目前 hbase 都不想依賴它了,因為會導緻系統的不穩定,請問老師怎麼看?</b>

a21. 還好吧,産生問題主要是 client 太多。比如 hbase 依賴 zookeeper,所有使用 hbase 的,都需要先和 zookeeper 建立連接配接,這對 zookeeper 産生較大的壓力。其他的系統也類似。如果共享 zookeeper 叢集,那麼它的連接配接數會成為一個瓶頸。

繼續閱讀