<b>3.11 contextcleaner的创建与启动</b>
contextcleaner用于清理那些超出应用范围的rdd、shuffledependency和broadcast对象。由于配置属性spark.cleaner.referencetracking默认是true,所以会构造并启动contextcleaner,代码如下。
private[spark] val cleaner:
option[contextcleaner] = {
if (conf.getboolean("spark.cleaner.referencetracking", true))
{
some(new contextcleaner(this))
}
else {
none
}
cleaner.foreach(_.start())
contextcleaner的组成如下:
referencequeue:缓存顶级的anyref引用;
referencebuffer:缓存anyref的虚引用;
listeners:缓存清理工作的监听器数组;
cleaningthread:用于具体清理工作的线程。
contextcleaner的工作原理和listenerbus一样,也采用监听器模式,由线程来处理,此线程实际只是调用keepcleaning方法。keepcleaning的实现见代码清单3-48。
代码清单3-48 keep cleaning的实现
private def keepcleaning(): unit =
utils.loguncaughtexceptions {
while (!stopped) {
try {
val reference =
option(referencequeue.remove(contextcleaner.ref_queue_poll_timeout))
.map(_.asinstanceof[cleanuptaskweakreference])
// synchronize here to avoid being interrupted on stop()
synchronized {
reference.map(_.task).foreach {
task =>
logdebug("got cleaning
task " + task)
referencebuffer -=
reference.get
task match {
case cleanrdd(rddid) =>
docleanuprdd(rddid,
blocking = blockoncleanuptasks)
case
cleanshuffle(shuffleid) =>
docleanupshuffle(shuffleid, blocking = blockonshufflecleanuptasks)
cleanbroadcast(broadcastid) =>
docleanupbroadcast(broadcastid, blocking = blockoncleanuptasks)
}
}
} catch {
case ie: interruptedexception if stopped => // ignore
case e: exception => logerror("error in cleaning thread",
e)