<b>3.11 contextcleaner的建立與啟動</b>
contextcleaner用于清理那些超出應用範圍的rdd、shuffledependency和broadcast對象。由于配置屬性spark.cleaner.referencetracking預設是true,是以會構造并啟動contextcleaner,代碼如下。
private[spark] val cleaner:
option[contextcleaner] = {
if (conf.getboolean("spark.cleaner.referencetracking", true))
{
some(new contextcleaner(this))
}
else {
none
}
cleaner.foreach(_.start())
contextcleaner的組成如下:
referencequeue:緩存頂級的anyref引用;
referencebuffer:緩存anyref的虛引用;
listeners:緩存清理工作的監聽器數組;
cleaningthread:用于具體清理工作的線程。
contextcleaner的工作原理和listenerbus一樣,也采用監聽器模式,由線程來處理,此線程實際隻是調用keepcleaning方法。keepcleaning的實作見代碼清單3-48。
代碼清單3-48 keep cleaning的實作
private def keepcleaning(): unit =
utils.loguncaughtexceptions {
while (!stopped) {
try {
val reference =
option(referencequeue.remove(contextcleaner.ref_queue_poll_timeout))
.map(_.asinstanceof[cleanuptaskweakreference])
// synchronize here to avoid being interrupted on stop()
synchronized {
reference.map(_.task).foreach {
task =>
logdebug("got cleaning
task " + task)
referencebuffer -=
reference.get
task match {
case cleanrdd(rddid) =>
docleanuprdd(rddid,
blocking = blockoncleanuptasks)
case
cleanshuffle(shuffleid) =>
docleanupshuffle(shuffleid, blocking = blockonshufflecleanuptasks)
cleanbroadcast(broadcastid) =>
docleanupbroadcast(broadcastid, blocking = blockoncleanuptasks)
}
}
} catch {
case ie: interruptedexception if stopped => // ignore
case e: exception => logerror("error in cleaning thread",
e)