天天看点

深入理解Spark:核心思想与源码分析. 3.11 ContextCleaner的创建与启动

<b>3.11 contextcleaner的创建与启动</b>

contextcleaner用于清理那些超出应用范围的rdd、shuffledependency和broadcast对象。由于配置属性spark.cleaner.referencetracking默认是true,所以会构造并启动contextcleaner,代码如下。

private[spark] val cleaner:

option[contextcleaner] = {

if (conf.getboolean("spark.cleaner.referencetracking", true))

{

some(new contextcleaner(this))

    }

else {

none

}

cleaner.foreach(_.start())

contextcleaner的组成如下:

referencequeue:缓存顶级的anyref引用;

referencebuffer:缓存anyref的虚引用;

listeners:缓存清理工作的监听器数组;

cleaningthread:用于具体清理工作的线程。

contextcleaner的工作原理和listenerbus一样,也采用监听器模式,由线程来处理,此线程实际只是调用keepcleaning方法。keepcleaning的实现见代码清单3-48。

代码清单3-48 keep cleaning的实现

private def keepcleaning(): unit =

utils.loguncaughtexceptions {

while (!stopped) {

try {

val reference =

option(referencequeue.remove(contextcleaner.ref_queue_poll_timeout))

.map(_.asinstanceof[cleanuptaskweakreference])

// synchronize here to avoid being interrupted on stop()

synchronized {

                reference.map(_.task).foreach {

task =&gt;

                logdebug("got cleaning

task " + task)

                referencebuffer -=

reference.get

                task match {

                    case cleanrdd(rddid) =&gt;

                        docleanuprdd(rddid,

blocking = blockoncleanuptasks)

                    case

cleanshuffle(shuffleid) =&gt;

docleanupshuffle(shuffleid, blocking = blockonshufflecleanuptasks)

cleanbroadcast(broadcastid) =&gt;

docleanupbroadcast(broadcastid, blocking = blockoncleanuptasks)

                    }

                }

} catch {

case ie: interruptedexception if stopped =&gt; // ignore

case e: exception =&gt; logerror("error in cleaning thread",

e)

继续阅读