天天看點

深入了解Spark:核心思想與源碼分析. 3.3 建立metadataCleaner

<b>3.3 建立metadatacleaner</b>

sparkcontext為了保持對所有持久化的rdd的跟蹤,使用類型是timestamped-weakvaluehashmap的persistentrdds緩存。metadatacleaner的功能是清除過期的持久化rdd。建立metadatacleaner的代碼如下。

private[spark] val persistentrdds = new timestampedweakvaluehashmap[int,

rdd[_]]

private[spark] val metadatacleaner =

new metadatacleaner(metadatacleanertype.spark_context, this.cleanup,

conf)

我們仔細看看metadatacleaner的實作,見代碼清單3-14。

代碼清單3-14 metadatacleaner的實作

private[spark] class metadatacleaner(

cleanertype: metadatacleanertype.metadatacleanertype,

cleanupfunc: (long) =&gt; unit,

conf: sparkconf)

extends logging

{

val name = cleanertype.tostring

private val delayseconds = metadatacleaner.getdelayseconds(conf,

cleanertype)

private val periodseconds = math.max(10, delayseconds / 10)

private val timer = new timer(name + " cleanup timer", true)

private val task = new timertask {

        override def run() {

try {

cleanupfunc(system.currenttimemillis() - (delayseconds * 1000))

loginfo("ran metadata cleaner for " + name)

} catch {

case e: exception =&gt; logerror("error running cleanup task for

" + name, e)

}

    }

if (delayseconds &gt; 0) {

timer.schedule(task, delayseconds * 1000, periodseconds * 1000)

def cancel() {

timer.cancel()

從metadatacleaner的實作可以看出其實質是一個用timertask實作的定時器,不斷調用cleanupfunc: (long) =&gt; unit這樣的函數參數。構造metadatacleaner時的函數參數是cleanup,用于清理persistentrdds中的過期内容,代碼如下。

private[spark] def cleanup(cleanuptime:

long) {

persistentrdds.clearoldvalues(cleanuptime)

繼續閱讀