天天看点

深入理解Spark:核心思想与源码分析. 3.10 创建和启动ExecutorAllocationManager

<b>3.10 创建和启动executorallocationmanager</b>

executorallocationmanager用于对已分配的executor进行管理,创建和启动executor-allocationmanager的代码如下。

private[spark] val

executorallocationmanager: option[executorallocationmanager] =

if (conf.getboolean("spark.dynamicallocation.enabled", false))

{

some(new executorallocationmanager(this, listenerbus, conf))

    }

else {

none

executorallocationmanager.foreach(_.start())

默认情况下不会创建executorallocationmanager,可以修改属性spark.dynamicallocation.enabled为true来创建。executorallocationmanager可以设置动态分配最小executor数量、动态分配最大executor数量、每个executor可以运行的task数量等配置信息,并对配置信息进行校验。start方法将executorallocationlistener加入listenerbus中,executorallocationlistener通过监听listenerbus里的事件,动态添加、删除executor。并且通过thread不断添加executor,遍历executor,将超时的executor杀掉并移除。executorallocationlistener的实现与其他sparklistener类似,不再赘述。executorallocationmanager的关键代码见代码清单3-47。

代码清单3-47 executorallocationmanager的关键代码

private val intervalmillis: long = 100

private var clock: clock = new realclock

private val listener = new

executorallocationlistener

def start(): unit = {

listenerbus.addlistener(listener)

startpolling()

}

private def startpolling(): unit = {

val t = new thread {

override def run(): unit = {

while (true) {

                try {

                    schedule()

                } catch {

                    case e: exception =&gt;

logerror("exception in dynamic executor allocation thread!", e)

                }

                thread.sleep(intervalmillis)

t.setname("spark-dynamic-executor-allocation")

t.setdaemon(true)

t.start()

根据3.4.1节的内容,我们知道listenerbus内置了线程listenerthread,此线程不断从eventqueue中拉出事件对象,调用监听器的监听方法。要启动此线程,需要调用listenerbus的start方法,代码如下。

listenerbus.start()

继续阅读