<b>3.3 建立metadatacleaner</b>
sparkcontext為了保持對所有持久化的rdd的跟蹤,使用類型是timestamped-weakvaluehashmap的persistentrdds緩存。metadatacleaner的功能是清除過期的持久化rdd。建立metadatacleaner的代碼如下。
private[spark] val persistentrdds = new timestampedweakvaluehashmap[int,
rdd[_]]
private[spark] val metadatacleaner =
new metadatacleaner(metadatacleanertype.spark_context, this.cleanup,
conf)
我們仔細看看metadatacleaner的實作,見代碼清單3-14。
代碼清單3-14 metadatacleaner的實作
private[spark] class metadatacleaner(
cleanertype: metadatacleanertype.metadatacleanertype,
cleanupfunc: (long) => unit,
conf: sparkconf)
extends logging
{
val name = cleanertype.tostring
private val delayseconds = metadatacleaner.getdelayseconds(conf,
cleanertype)
private val periodseconds = math.max(10, delayseconds / 10)
private val timer = new timer(name + " cleanup timer", true)
private val task = new timertask {
override def run() {
try {
cleanupfunc(system.currenttimemillis() - (delayseconds * 1000))
loginfo("ran metadata cleaner for " + name)
} catch {
case e: exception => logerror("error running cleanup task for
" + name, e)
}
}
if (delayseconds > 0) {
timer.schedule(task, delayseconds * 1000, periodseconds * 1000)
def cancel() {
timer.cancel()
從metadatacleaner的實作可以看出其實質是一個用timertask實作的定時器,不斷調用cleanupfunc: (long) => unit這樣的函數參數。構造metadatacleaner時的函數參數是cleanup,用于清理persistentrdds中的過期内容,代碼如下。
private[spark] def cleanup(cleanuptime:
long) {
persistentrdds.clearoldvalues(cleanuptime)