天天看點

Spark Streaming 資料清理機制

為啥要了解機制呢?這就好比jvm的垃圾回收,雖然jvm的垃圾回收已經巨牛了,但是依然會遇到很多和它相關的case導緻系統運作不正常。

這個内容我記得自己剛接觸spark streaming的時候,老闆也問過我,運作期間會保留多少個rdd? 當時沒回答出來。後面在群裡也有人問到了,是以就整理了下。文中如有謬誤之處,還望指出。

我們知道spark streaming 計算還是基于spark core的,spark core 的核心又是rdd. 是以spark streaming 肯定也要和rdd扯上關系。然而spark streaming 并沒有直接讓使用者使用rdd而是自己抽象了一套dstream的概念。 dstream 和 rdd 是包含的關系,你可以了解為java裡的裝飾模式,也就是dstream  是對rdd的增強,但是行為表現和rdd是基本上差不多的。都具備幾個條件:

具有類似的tranformation動作,比如map,reducebykey等,也有一些自己獨有的,比如window,mapwithstated等

都具有action動作,比如foreachrdd,count等

從程式設計模型上看是一緻的。

是以很可能你寫的那堆spark streaming代碼看起來好像和spark 一緻的,然而并不能直接複用,因為一個是dstream的變換,一個是rdd的變化。

dstream 下面包含幾個類:

資料源類,比如inputdstream,具體如directkafkainputstream等

轉換類,典型比如mappeddstream,shuffleddstream

輸出類,典型比如foreachdstream

從上面來看,資料從開始(輸入)到結束(輸出)都是dstream體系來完成的,也就意味着使用者正常情況是無法直接去産生和操作rdd的,這也就是說,dstream有機會和義務去負責rdd的生命周期。

這就回答了前言中的問題了。spark streaming具備自動清理功能。

在spark streaming中rdd的生命流程大體如下:

在inputdstream會将接受到的資料轉化成rdd,比如directkafkainputstream 産生的就是 kafkardd

接着通過mappeddstream等進行資料轉換,這個時候是直接調用rdd對應的map方法進行轉換的

在進行輸出類操作時,才暴露出rdd,可以讓使用者執行相應的存儲,其他計算等操作。

我們這裡就以下面的代碼來進行更詳細的解釋:

foreachrdd 産生foreachdstream,因為foreachrdd是個action,是以會觸發任務的執行,會被調用generatejob方法。

對應的parent是mappeddstream,也就是說調用mappeddstream.getorcompute.該方法在dstream中,首先會在mappeddstream對象中的generatedrdds 變量中查找是否已經有rdd,如果沒有則觸發計算,并且将産生的rdd放到generatedrdds

計算rdd是調用的compute方法,mappeddstream 的compute方法很簡單,直接調用的父類也就是directkafkainputstream的getorcompute方法:

在上面的例子中,mappeddstream 的parent是directkafkainputstream中,這是個資料源,是以他的compute方法會直接new出一個rdd.

從上面可以得出幾個結論:

資料源以及轉換類dstream都會維護一個generatedrdds,可以按batchtime 進行擷取

内部本質還是進行的rdd的轉換

這裡又會有兩種情況,一種是調用dstream.cache,第二種是rdd.cache。事實上他們是完全一樣的。

dstream的cache 動作隻是将dstream的變量storagelevel 設定為memory_only_ser,然後在産生(或者擷取)rdd的時候,調用rdd的persit方法進行設定。是以dstream.cache 産生的效果等價于rdd.cache(也就是你自己調用foreachrdd 将rdd 都設定一遍)

其實無所謂cache不cache住,rdd最終都是要釋放的,否則運作久了,光rdd對象也能承包了你的記憶體。我們知道,在spark streaming中,周期性産生事件驅動spark streaming 的類其實是:

org.apache.spark.streaming.scheduler.jobgenerator

他内部有個永動機(定時器),定時釋出一個産生任務的事件:

然後通過processevent進行事件處理:

目前我們隻關注clearmetadata 事件。對應的方法為:

首先是清理輸出dstream(比如foreachdstream),接着是清理輸入類(基于receiver模式)的資料。

foreachdstream 其實調用的也是dstream的方法。該方法大體如下:

大體執行動作如下描述:

根據記憶周期得到應該剔除的rdd

根據是否要清理cache資料,進行unpersit 操作,并且顯示的移除block

根據依賴調用其他的dstream進行動作清理

這裡我們還可以看到,通過參數spark.streaming.unpersist 你是可以決定是否手工控制是否需要對cache住的資料進行清理。

這裡你會有兩個疑問:

dependencies 是什麼?

rememberduration 是怎麼來的?

dependencies 你可以簡單了解為父dstream,通過dependencies 我們可以獲得已完整dstream鍊。

rememberduration 的設定略微複雜些,大體是 slideduration,如果設定了checkpointduration 則是2*checkpointduration 或者通過dstreamgraph.rememberduration(如果設定了的話,譬如通過streamingcontext.remember方法,不過通過該方法設定的值要大于計算得到的值會生效)

另外值得一提的就是後面的dstream 會調整前面的dstream的rememberduration,譬如如果你用了window* 相關的操作,則在此之前的dstream 的rememberduration 都需要加上windowduration。

然後根據spark streaming的定時性,每個周期隻要完成了,都會觸發清理動作,這個就是清理動作發生的時機。代碼如下:

spark streaming 會在每個batch任務結束時進行一次清理動作。每個dstream 都會被掃描,不同的dstream根據情況不同,保留的rdd數量也是不一緻的,但都是根據rememberduration變量決定,而該變量會被下遊的dstream所影響,是以不同的dstream的rememberduration取值是不一樣的。