<b>3.10 创建和启动executorallocationmanager</b>
executorallocationmanager用于对已分配的executor进行管理,创建和启动executor-allocationmanager的代码如下。
private[spark] val
executorallocationmanager: option[executorallocationmanager] =
if (conf.getboolean("spark.dynamicallocation.enabled", false))
{
some(new executorallocationmanager(this, listenerbus, conf))
}
else {
none
executorallocationmanager.foreach(_.start())
默认情况下不会创建executorallocationmanager,可以修改属性spark.dynamicallocation.enabled为true来创建。executorallocationmanager可以设置动态分配最小executor数量、动态分配最大executor数量、每个executor可以运行的task数量等配置信息,并对配置信息进行校验。start方法将executorallocationlistener加入listenerbus中,executorallocationlistener通过监听listenerbus里的事件,动态添加、删除executor。并且通过thread不断添加executor,遍历executor,将超时的executor杀掉并移除。executorallocationlistener的实现与其他sparklistener类似,不再赘述。executorallocationmanager的关键代码见代码清单3-47。
代码清单3-47 executorallocationmanager的关键代码
private val intervalmillis: long = 100
private var clock: clock = new realclock
private val listener = new
executorallocationlistener
def start(): unit = {
listenerbus.addlistener(listener)
startpolling()
}
private def startpolling(): unit = {
val t = new thread {
override def run(): unit = {
while (true) {
try {
schedule()
} catch {
case e: exception =>
logerror("exception in dynamic executor allocation thread!", e)
}
thread.sleep(intervalmillis)
t.setname("spark-dynamic-executor-allocation")
t.setdaemon(true)
t.start()
根据3.4.1节的内容,我们知道listenerbus内置了线程listenerthread,此线程不断从eventqueue中拉出事件对象,调用监听器的监听方法。要启动此线程,需要调用listenerbus的start方法,代码如下。
listenerbus.start()