天天看點

深入了解Spark:核心思想與源碼分析. 3.11 ContextCleaner的建立與啟動

<b>3.11 contextcleaner的建立與啟動</b>

contextcleaner用于清理那些超出應用範圍的rdd、shuffledependency和broadcast對象。由于配置屬性spark.cleaner.referencetracking預設是true,是以會構造并啟動contextcleaner,代碼如下。

private[spark] val cleaner:

option[contextcleaner] = {

if (conf.getboolean("spark.cleaner.referencetracking", true))

{

some(new contextcleaner(this))

    }

else {

none

}

cleaner.foreach(_.start())

contextcleaner的組成如下:

referencequeue:緩存頂級的anyref引用;

referencebuffer:緩存anyref的虛引用;

listeners:緩存清理工作的監聽器數組;

cleaningthread:用于具體清理工作的線程。

contextcleaner的工作原理和listenerbus一樣,也采用監聽器模式,由線程來處理,此線程實際隻是調用keepcleaning方法。keepcleaning的實作見代碼清單3-48。

代碼清單3-48 keep cleaning的實作

private def keepcleaning(): unit =

utils.loguncaughtexceptions {

while (!stopped) {

try {

val reference =

option(referencequeue.remove(contextcleaner.ref_queue_poll_timeout))

.map(_.asinstanceof[cleanuptaskweakreference])

// synchronize here to avoid being interrupted on stop()

synchronized {

                reference.map(_.task).foreach {

task =&gt;

                logdebug("got cleaning

task " + task)

                referencebuffer -=

reference.get

                task match {

                    case cleanrdd(rddid) =&gt;

                        docleanuprdd(rddid,

blocking = blockoncleanuptasks)

                    case

cleanshuffle(shuffleid) =&gt;

docleanupshuffle(shuffleid, blocking = blockonshufflecleanuptasks)

cleanbroadcast(broadcastid) =&gt;

docleanupbroadcast(broadcastid, blocking = blockoncleanuptasks)

                    }

                }

} catch {

case ie: interruptedexception if stopped =&gt; // ignore

case e: exception =&gt; logerror("error in cleaning thread",

e)

繼續閱讀