天天看點

深入了解Spark:核心思想與源碼分析. 3.10 建立和啟動ExecutorAllocationManager

<b>3.10 建立和啟動executorallocationmanager</b>

executorallocationmanager用于對已配置設定的executor進行管理,建立和啟動executor-allocationmanager的代碼如下。

private[spark] val

executorallocationmanager: option[executorallocationmanager] =

if (conf.getboolean("spark.dynamicallocation.enabled", false))

{

some(new executorallocationmanager(this, listenerbus, conf))

    }

else {

none

executorallocationmanager.foreach(_.start())

預設情況下不會建立executorallocationmanager,可以修改屬性spark.dynamicallocation.enabled為true來建立。executorallocationmanager可以設定動态配置設定最小executor數量、動态配置設定最大executor數量、每個executor可以運作的task數量等配置資訊,并對配置資訊進行校驗。start方法将executorallocationlistener加入listenerbus中,executorallocationlistener通過監聽listenerbus裡的事件,動态添加、删除executor。并且通過thread不斷添加executor,周遊executor,将逾時的executor殺掉并移除。executorallocationlistener的實作與其他sparklistener類似,不再贅述。executorallocationmanager的關鍵代碼見代碼清單3-47。

代碼清單3-47 executorallocationmanager的關鍵代碼

private val intervalmillis: long = 100

private var clock: clock = new realclock

private val listener = new

executorallocationlistener

def start(): unit = {

listenerbus.addlistener(listener)

startpolling()

}

private def startpolling(): unit = {

val t = new thread {

override def run(): unit = {

while (true) {

                try {

                    schedule()

                } catch {

                    case e: exception =&gt;

logerror("exception in dynamic executor allocation thread!", e)

                }

                thread.sleep(intervalmillis)

t.setname("spark-dynamic-executor-allocation")

t.setdaemon(true)

t.start()

根據3.4.1節的内容,我們知道listenerbus内置了線程listenerthread,此線程不斷從eventqueue中拉出事件對象,調用監聽器的監聽方法。要啟動此線程,需要調用listenerbus的start方法,代碼如下。

listenerbus.start()

繼續閱讀