以Yarn、HDFS和MapReduce為主要組成的Hadoop,涉及到大量複雜的、互動的事件處理、狀态轉換,同時,這些事件排程和狀态轉換又對實時性和效率提出了極高的要求。可以想見,沒有一個規整的、通用型良好的排程器,Hadoop代碼無論是對讀者,還是對開發者,都将變成一場災難,同時,Hadoop的運作效率也會變得無法忍受。統一的、設計良好的、通用的和共用的排程器,對于Hadoop不同元件的開發者來說是一種解脫,大大降低了Hadoop在事件排程、狀态轉換的底層出錯的可能性,提高了代碼穩定性和可讀性。這篇文章主要介紹了Hadoop的核心排程器AsyncDispatcher的設計和實作。同Hadoop狀态機一樣,這個通用排程器設計得十分通用,完美可擴充可重用,我們在自己的項目中完全可以使用Hadoop的排程器實作我們自己的事件排程邏輯。
先抛開代碼本身,我們來看一個基于事件的排程器的工作方式:
public AsyncDispatcher() {
this(new LinkedBlockingQueue<Event>());
}
public AsyncDispatcher(BlockingQueue<Event> eventQueue) {
super("Dispatcher");
this.eventQueue = eventQueue;
this.eventDispatchers = new HashMap<Class<? extends Enum>, EventHandler>();
}
在它的構造方法中,有兩個核心變量,eventQueue是一個隊列,用來存放事件,同時,還構造了一個eventDispatchers,這是一個map對象,用來管理事件和事件處理器之間的關系。AsyncDispatcher同樣遵循我一直提到的Yarn的服務化設計,即AsyncDispatcher被抽象為一個service,在完成對象構造以後,會進行服務的初始化和啟動:
@Override
protected void serviceInit(Configuration conf) throws Exception {
this.exitOnDispatchException =
conf.getBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY,
Dispatcher.DEFAULT_DISPATCHER_EXIT_ON_ERROR);
super.serviceInit(conf);
}
初始化操作并沒有做太多的事情,隻是調用父類的serviceInit()方法,用來将配置檔案設定進來。
@Override
protected void serviceStart() throws Exception {
//start all the components
super.serviceStart();
eventHandlingThread = new Thread(createThread());
eventHandlingThread.setName("AsyncDispatcher event handler");
eventHandlingThread.start();
}
serviceStart()方法的主要任務是建立了一個名字叫做
AsyncDispatcher event handler
的獨立線程,我們一起來看這個線程的代碼:
Runnable createThread() {
return new Runnable() {
@Override
public void run() {
while (!stopped && !Thread.currentThread().isInterrupted()) {
drained = eventQueue.isEmpty();
// blockNewEvents is only set when dispatcher is draining to stop,
// adding this check is to avoid the overhead of acquiring the lock
// and calling notify every time in the normal run of the loop.
if (blockNewEvents) {
synchronized (waitForDrained) {
if (drained) {
waitForDrained.notify();
}
}
}
Event event;
try {
event = eventQueue.take();
} catch(InterruptedException ie) {
if (!stopped) {
LOG.warn("AsyncDispatcher thread interrupted", ie);
}
return;
}
if (event != null) {
dispatch(event);
}
}
}
};
}
線程的大緻功能,是不斷循環檢測事件隊列eventQueue中是否有新的事件到來,如果有,則取出這個事件的處理器,一個EventHandler接口的實作類,調用dispatch()方法,對該事件進行處理。
那麼問題來了,這個異步排程器會管理多個事件和事件排程器,那麼它什麼時候開始知道某個事件發生的時候改用什麼排程器對象對這個事件進行處理呢?這就是register方法:
/**
* 注冊事件分派器
*/
public void register(Class<? extends Enum> eventType,
EventHandler handler) {
/* check to see if we have a listener registered */
EventHandler<Event> registeredHandler = (EventHandler<Event>)
eventDispatchers.get(eventType);
LOG.info("Registering " + eventType + " for " + handler.getClass());
if (registeredHandler == null) {
eventDispatchers.put(eventType, handler);
} else if (!(registeredHandler instanceof MultiListenerHandler)){
/* for multiple listeners of an event add the multiple listener handler */
MultiListenerHandler multiHandler = new MultiListenerHandler();
multiHandler.addHandler(registeredHandler);
multiHandler.addHandler(handler);
eventDispatchers.put(eventType, multiHandler);
} else {
/* already a multilistener, just add to it */
MultiListenerHandler multiHandler
= (MultiListenerHandler) registeredHandler;
multiHandler.addHandler(handler);
}
}
還有一個問題,不同的事件,是怎麼放入eventQueue中的呢?這是通過AsyncDispatcher.GenericEventHandler.handle()方法進行的:
class GenericEventHandler implements EventHandler<Event> {
public void handle(Event event) {
if (blockNewEvents) { //如果該标記為置位,說明可能服務正在進行stop操作,無法處理新的請求
return;
}
drained = false;
/* all this method does is enqueue all the events onto the queue */
int qSize = eventQueue.size();
if (qSize != && qSize % ==
&& lastEventQueueSizeLogged != qSize) {
lastEventQueueSizeLogged = qSize;
LOG.info("Size of event-queue is " + qSize);
}
int remCapacity = eventQueue.remainingCapacity();
if (remCapacity < ) {
LOG.warn("Very low remaining capacity in the event-queue: "
+ remCapacity);
}
try {
eventQueue.put(event);
} catch (InterruptedException e) {
if (!stopped) {
LOG.warn("AsyncDispatcher thread interrupted", e);
}
// Need to reset drained flag to true if event queue is empty,
// otherwise dispatcher will hang on stop.
drained = eventQueue.isEmpty();
throw new YarnRuntimeException(e);
}
};
}
AsyncDispatcher
内部維護了一個
GenericEventHandler handlerInstance
變量,通過z這個
handlerInstance.handle()
來将新的事件放入到
eventQueue
中。這個方法名字雖然叫做
handle()
,但是由于我們的AysncDispatcher是異步排程器,是以并沒有立刻進行處理,而是放入隊列,由上面提到的
eventHandlingThread
線程進行處理。
先看AsyncDispatcher.serviceStop()方法:
protected void serviceStop() throws Exception {
if (drainEventsOnStop) {
blockNewEvents = true; //首先阻止新任務的分派,試圖優雅停掉目前線程的工作
LOG.info("AsyncDispatcher is draining to stop, igonring any new events.");
long endTime = System.currentTimeMillis() + getConfig()
.getLong(YarnConfiguration.DISPATCHER_DRAIN_EVENTS_TIMEOUT,
YarnConfiguration.DEFAULT_DISPATCHER_DRAIN_EVENTS_TIMEOUT);
synchronized (waitForDrained) {
while (!drained && eventHandlingThread != null
&& eventHandlingThread.isAlive()
&& System.currentTimeMillis() < endTime) {
waitForDrained.wait();
LOG.info("Waiting for AsyncDispatcher to drain. Thread state is :" +
eventHandlingThread.getState());
}
}
}
stopped = true;
if (eventHandlingThread != null) {
eventHandlingThread.interrupt();//防止線程正在處理一個耗時任務導緻線程依然沒有退出
try {
eventHandlingThread.join();//等待eventHandlingThread執行完畢
} catch (InterruptedException ie) {
LOG.warn("Interrupted Exception while stopping", ie);
}
}
serviceStop()調用一開始,就将blockNewEvents=true置位,這意味着将不再接受新的event送出,然後的主要工作,就是需要等待所有的serviceStop()調用前送出的事件都被執行完畢再真正停止服務,否則會造成任務丢失。
waitForDrained.wait(1000);
代表着服務關閉的時候會每1s檢查我們的eventQueue中的所有事件是否處理完畢,如果沒有處理完畢,則繼續等待。雖然1s已經非常短暫,但是對于Yarn這樣的高并發系統,也必須進行優化。
為了能夠在eventQueue清空的時候及時通知serviceStop()所線上程,我們看這段代碼:
if (blockNewEvents) {
synchronized (waitForDrained) {
if (drained) {
waitForDrained.notify();
}
}
}
如果
blockNewEvents=true
,代表該排程器目前正在進行關閉操作,是以,事件處理線程有必要在發現事件隊列已經清空的情況下(drained=true),通過waitForDrained鎖及時通知到服務關閉線程(waitForDrained.notify())。服務關閉線程收到該通知就能立刻從waitForDrained.wait()中直接喚醒,進而及時完成關閉操作,而不再進行不必要的wait操作。
注意,wait操作與sleep()不同,wait()操作雖然必須在synchronized代碼塊中,但是等待過程中會放棄鎖。是以,serviceStop()正在執行wait(),隊列處理線程
AsyncDispatcher event handler
依然可以進入synchronized(waitForDrained)檢查eventQueue是否已經完全清空。
由此可見這段的其實很關鍵,目的是為了在某些情況下我們需要關閉AsyncDispatcher的時候,能夠非常優雅的、消息無丢失地同時迅速的關閉服務。
AsyncDispatcher event handler
線程會不斷從eventQueue中取出事件,然後傳遞給分派方法dispatch()對這個事件進行派發:
Event event;
try {
event = eventQueue.take();
} catch(InterruptedException ie) {
if (!stopped) {
LOG.warn("AsyncDispatcher thread interrupted", ie);
}
return;
}
if (event != null) {
dispatch(event);
}
我們來看
dispatch()
方法:
protected void dispatch(Event event) {
//all events go thru this loop
if (LOG.isDebugEnabled()) {
LOG.debug("Dispatching the event " + event.getClass().getName() + "."
+ event.toString());
}
Class<? extends Enum> type = event.getType().getDeclaringClass();
try{
EventHandler handler = eventDispatchers.get(type);
if(handler != null) {
handler.handle(event);
} else {
throw new Exception("No handler for registered for " + type);
}
} catch (Throwable t) {
//TODO Maybe log the state of the queue
LOG.fatal("Error in dispatcher thread", t);
// If serviceStop is called, we should exit this thread gracefully.
if (exitOnDispatchException
&& (ShutdownHookManager.get().isShutdownInProgress()) == false
&& stopped == false) {
Thread shutDownThread = new Thread(createShutDownThread());
shutDownThread.setName("AsyncDispatcher ShutDown handler");
shutDownThread.start();
}
}
}
代碼比較簡單,就是根據事件類型
event.getType().getDeclaringClass();
,從
eventDispatchers
中取出對應的EventHandler,然後調用
handle()
方法對事件進行處理。從
eventDispatchers
的聲明
protected final Map<Class<? extends Enum>, EventHandler> eventDispatchers
中可以看到,
eventDispatchers
的key是
Class<? extends Enum>
,即是一個
Enum
對象的子類的Class類型。
我們可以從
Hadoop-Yarn
的一個使用場景來看看Hadoop是如何使用
Asynchronized
進行事件排程的。我們以
Yarn ResourceManager
為例,來看看它是如何使用中央異步排程器進行事件排程的。
同樣,
ResourceManager
也被Yarn抽象為服務,在
ResourceManager.serviceInit()
方法中,建立了排程器對象:
private Dispatcher setupDispatcher() {
Dispatcher dispatcher = createDispatcher();
dispatcher.register(RMFatalEventType.class,
new ResourceManager.RMFatalEventDispatcher());
return dispatcher;
}
protected Dispatcher createDispatcher() {
return new AsyncDispatcher();
}
在
ResourceManager
服務的子服務
RMActiveServices
的初始化方法
serviceInit()
中,在這個排程器對象上注冊了各種事件和handler:
rmDispatcher.register(RMAppEventType.class,
new ApplicationEventDispatcher(rmContext));
// Register event handler for RmAppAttemptEvents
//将RmAppAttemptEvent交給RMAppAttempt去維護,其實作類是RMAppAttemptImpl
rmDispatcher.register(RMAppAttemptEventType.class,
new ApplicationAttemptEventDispatcher(rmContext));
// Register event handler for RmNodes
//将RmNodesEvent交給RMNode去維護,其實作類是RMNodeImpl
rmDispatcher.register(
RMNodeEventType.class, new NodeEventDispatcher(rmContext));
我們以
RMAppAttemptEventType
類型的事件進行分析。注意,一個類型是指相同僚件的集合,
RMAppAttemptEventType
這個類型的事件代表了與
ResouceManager App Attempt
(即ApplicationMaster本身的attempt程序)相關的事件集合,如
RMAppAttemptEventType.START
,
RMAppAttemptEventType.KILL
,
RMAppAttemptEventType.LAUNCHED
等等,都是
enum
類型。
在ApplicationMasterService.allocate()方法中(關于ApplicaitonMaster與ApplicationMasterService之間的通信,大家可以看我的部落格[YARN ApplicationMaster與ResourceManager之間基于applicationmaster_protocol.proto協定的allocate()接口源碼解析]),需要處理
RMAppAttemptEventType
類型的事件:
//this.rmContext.getDispatcher().getEventHandler()傳回内部維護的GenericEventHandler handlerInstance
this.rmContext.getDispatcher().getEventHandler().handle(
new RMAppAttemptStatusupdateEvent(appAttemptId, request
.getProgress()));
顯然,目前發生的事件是
RMAppAttemptStatusupdateEvent
,
GenericEventHandler handlerInstance
在
handle()
方法中将該事件放入
eventQueue
,
eventHandlingThread
從
RMAppAttemptStatusupdateEvent
中取出
eventType
,我們跟蹤一下
RMAppAttemptStatusupdateEvent
的代碼可以看出,
eventType
正是
RMAppAttemptEventType
。好吧,剛才說過,在
ResourceManager
服務的子服務
RMActiveServices
的初始化方法
serviceInit()
中已經為這個
RMAppAttemptEventType
注冊了handler,是以取出這個handler來處理這個事件即可。
以上就是異步排程器
AsyncDispatcher
的整個工作流程,良好的通用型和異步設計,使得Hadoop幾乎所有的事件排程都使用它來進行。這種基于事件的設計思想對于一個高并發的系統來說非常實用。