<b>3.10 建立和啟動executorallocationmanager</b>
executorallocationmanager用于對已配置設定的executor進行管理,建立和啟動executor-allocationmanager的代碼如下。
private[spark] val
executorallocationmanager: option[executorallocationmanager] =
if (conf.getboolean("spark.dynamicallocation.enabled", false))
{
some(new executorallocationmanager(this, listenerbus, conf))
}
else {
none
executorallocationmanager.foreach(_.start())
預設情況下不會建立executorallocationmanager,可以修改屬性spark.dynamicallocation.enabled為true來建立。executorallocationmanager可以設定動态配置設定最小executor數量、動态配置設定最大executor數量、每個executor可以運作的task數量等配置資訊,并對配置資訊進行校驗。start方法将executorallocationlistener加入listenerbus中,executorallocationlistener通過監聽listenerbus裡的事件,動态添加、删除executor。并且通過thread不斷添加executor,周遊executor,将逾時的executor殺掉并移除。executorallocationlistener的實作與其他sparklistener類似,不再贅述。executorallocationmanager的關鍵代碼見代碼清單3-47。
代碼清單3-47 executorallocationmanager的關鍵代碼
private val intervalmillis: long = 100
private var clock: clock = new realclock
private val listener = new
executorallocationlistener
def start(): unit = {
listenerbus.addlistener(listener)
startpolling()
}
private def startpolling(): unit = {
val t = new thread {
override def run(): unit = {
while (true) {
try {
schedule()
} catch {
case e: exception =>
logerror("exception in dynamic executor allocation thread!", e)
}
thread.sleep(intervalmillis)
t.setname("spark-dynamic-executor-allocation")
t.setdaemon(true)
t.start()
根據3.4.1節的内容,我們知道listenerbus内置了線程listenerthread,此線程不斷從eventqueue中拉出事件對象,調用監聽器的監聽方法。要啟動此線程,需要調用listenerbus的start方法,代碼如下。
listenerbus.start()