天天看點

YARN ResourceManager failover機制

RM(ResourceManager)每次在啟動之前都會進行初始化并執行一次recovery操作,然後才啟動RM,對外提供服務。

RM啟動流程如下圖:

YARN ResourceManager failover機制

RM啟動流程圖

RM中的各種服務包括:

(1)、ContainerAllocationExpirer:監控Containter是否到期。

(2)、AmLivelinessMonitor:監控App的存活狀态。

(3)、NodesListManager:node清單管理,可以動态往叢集中添加節點或者減少節點。

(4)、NMLivelinessMonitor:監控各個NodeManager是否存活,預設情況下,如果某個NodeManage在10min内未彙報心跳,則認為該節點出現故障。

(5)、ResourceTrackerService:實作了ResourceTracker協定,主要負責管理各個

NodeManager,如新NodeManager注冊,死NodeManager的剔除。

(6)、ApplicationMasterService:實作了AMRMProtocol通信協定,負責與

ApplicationMaster互動,接收來自ApplicationMaster的請求并作出相應。

(7)、ClientRMService:實作ClientRMProtocal協定,負責與client互動,接收來自client

端的請求并作出響應。

(8)、AdminService:實作RMAdminProtocol協定,主要負責整個系統權限管理,如哪些client可以修改系統中隊列名稱,給某些隊列增加資源等。

(9)、ApplicationMasterLauncher:管理ApplicationMaster的啟動和退出。

一、RECOVERY的目标

l App恢複:未執行完成的App在RM重新開機之後重新執行

App恢複的三種政策:

<1>、整個作業重新計算

<2>、儲存已經完成的map task和reduce task,隻重新計算未完成的task

<3>、儲存task的進度,從task斷點處開始計算,如:某個task完成了20%,則AM重新開機後,讓該task從20%處開始計算。

(第三種方案基本不可能實作,因為作業執行時,有時會儲存幾個全局變量,如全局counter,自定義的變量,這些東西由使用者的程式控制,架構很難擷取到他們的值并持久化到磁盤上以便恢複。)

l 資源恢複:重新注冊叢集中的節點(NodeManager)資訊到RM

二、YARN關于RM恢複已做的工作

1、RM的狀态資訊

YARN中RM的狀态資訊包括兩種:

l 一種是RMNode,叢集中節點(NodeManager)的狀态資訊

l 一種是ApplicationInfo,送出的應用狀态資訊

2、RM狀态存儲方式

RM狀态資訊目前YARN隻提供了兩種存儲方式:

l 一種是基于記憶體的方式存儲(MemStore)

l 一種是基于ZK的方式存儲(ZKStore)

具體選擇那種存儲方式可以用以下參數配置:

yarn.resourcemanager.store.class:配置RM狀态資訊存儲方式,有MemStore和ZKStore。

yarn.resourcemanager.zookeeper-store.address:當使用ZK存儲時,指定在ZK上的存儲位址。

3、RM的恢複[不可用]

YARN目前鑒于是内測版,RM恢複部分實作了部分代碼,總體是不可用的,隻供參考,部分關鍵代碼如下。

<1>、RMNode的恢複:

重新将叢集的節點資訊注冊到RM。

ResourceTrackerService中的代碼如下:

public void recover(RMState state) {

List<RMNode> nodeManagers = state.getStoredNodeManagers();

for (RMNode nm : nodeManagers) {

createNewNode(nm.getNodeID(), nm.getNodeHostName(), nm

.getCommandPort(), nm.getHttpPort(), nm.getNode(), nm

.getTotalCapability());

}

for (Map.Entry<ApplicationId, ApplicationInfo> entry : state

.getStoredApplications().entrySet()) {

List<Container> containers = entry.getValue().getContainers();

List<Container> containersToAdd = new ArrayList<Container>();

for (Container c : containers) {

RMNode containerNode = this.rmContext.getNodesCollection()

.getNodeInfo(c.getNodeId());

containersToAdd.add(c);

containerNode.allocateContainer(entry.getKey(), containersToAdd);

containersToAdd.clear();

}

}

}

<2>、ApplicationInfo的恢複:

調用資源排程算法(CapacityScheduler、FairScheduler、FifoScheduler)重新配置設定資源(Container)并重新執行未執行完成的App。

資源排程算法采用以下參數進行配置:yarn.resourcemanager.scheduler.class,預設采用:org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler容量排程算法。

CapacityScheduler中代碼如下:

@Lock(Lock.NoLock.class)

public void recover(RMState state) throws Exception {

applications.clear();

for (Map.Entry<ApplicationId, ApplicationInfo> entry : state.getStoredApplications().entrySet()) {

ApplicationId appId = entry.getKey();

ApplicationInfo appInfo = entry.getValue();

SchedulerApp app = applications.get(appId);

app.allocate(appInfo.getContainers());

for (Container c: entry.getValue().getContainers()) {

Queue queue = queues.get(appInfo.getApplicationSubmissionContext().getQueue());

queue.recoverContainer(clusterResource, applications.get(appId), c);

}

}

}

三、YARN關于RM恢複沒做的工作

<1>、RM狀态資訊何時存儲

<2>、RM失敗後根據狀态進行恢複

<3>、RM HA

四、hadoop YARN 社群上的讨論

目前在hadoop YARN jira上針對RM recovery的讨論,主要集中在以下幾方面:

YARN ResourceManager failover機制

第一點:RM恢複需要那些狀态資訊

一種觀點是隻需要存儲還沒有執行完成的App的資訊。

一種觀點是既要存儲App的資訊,還要存儲NM的資訊,如目前YARN部分實作的那樣。

同時,YARN目前實作的狀态資訊也有人任務存在問題:一是資訊不足,不足以進行恢複;而是存在備援資訊,這些資訊無需持久化。

第二點:RM狀态資訊何時進行存儲

這部分目前代碼中完全沒有相關實作。

第三點:RM狀态資訊的存儲方式

有三種存儲方式可以選擇:

MemStore:記憶體存儲。狀态是需要持久化的,記憶體存儲不太可能會采用。

DiskStore:磁盤存儲。這部分在hadoop 2.0.2中還沒有相關代碼,但估計在目前開發的版本中有。

ZKStore:ZooKeeper存儲

第四點:RM失敗後如何進行恢複

這和第一點相關。

五、目前社群提出的recovery具體思路和問題

https://issues.apache.org/jira/browse/YARN-128

Tsuyoshi OZAWA added a comment - 27/Jul/12 10:42

Yeah, it’s not trivial what to save into ZK or the local disk of RM.

I’m going to look at the code too, and post them here.

Tsuyoshi OZAWA added a comment - 29/Jul/12 10:14

I’ve looked around the code of RM, and I’ve found that the current Recoverable interface provides storing the states as follows:

1. information about application(application ids and info defined in ApplicationId.java and ApplicationSubmissionContext.java).

2. Information about node managers(info about Node Manager defined in RMNode.java).

目前的Recoverable接口提供了兩種狀态的存儲:應用資訊和NM資訊。

My questions are:

1. Are the states enough to store? In my looking around the code, RMContext has the other states, however, the states are recoverable without the store.

2. When the states should be saved onto the store?

3. When the interface getLastLoggedNodeId() is used?

  1. 這些狀态足夠?RMContext有其他狀态,這些狀态無需store也可恢複。
  2. 這些狀态何時被儲存到store中?
  3. 接口getLastLoggedNodeId()何時使用?

IMHO, we should go step by step as follows:

1. Define the states of RM, which are preserved onto MemStore/DiskStore/ZKStore.

2. Implement the resurrectable version when the RM crashed(ex. DiskStore/ZKStore).

Prototyping 2 and testing it will prove the correctness of 1.

  1. 定義RM狀态,通過MemStore/DiskStore/ZKStore進行儲存。
  2. 實作在當機後可恢複的RM版本。

If you have any ideas, please let me know.

Bikas Saha added a comment - 06/Aug/12 07:48

I think the current implementation (actual code/commented code/todo’s etc) looks like a prototype which may not be in sync with the current state of the functional code. So I am not sure about using it as is.

目前的實作像一個快照,可能并沒有和目前的功能代碼同步。

Also, the implementation seems to be doing blocking calls to ZK etc and will likely end up being a bottleneck on RM threads/perf if a lot of state information needs to be synced to stable store.

實作通過調用ZK等,如果需要同步到持久化存儲上的狀态資訊很多,很可能造成RM線程/perf的瓶頸。

On that note, my gut feeling is that the RM state in practice is, in a sense, the sum total of the current state of the cluster as reflected in the NM’s. So there may not be the need to store any state as long as the RM can recover the current state of the cluster from the NM’s in a reasonable amount of time. The NM’s anyways have to re-sync with the RM after it comes back up. So that is not extra overhead.

叢集的所有目前狀态可以被NM節點的狀态反映。在合理的時間内,RM可以從NM上恢複叢集目前狀态。NM可用後無論如何都會同RM重新同步。

Saving a lot of state would result in having to solve the same set of issues that the Namenode has to solve in order to maintain consistent, reliable and available saved state. IMO, for the RM we are better off avoiding those issues.

The only state that needs to be save, as far as I can see, is the information about all jobs that are not yet completed. This information is present only in the RM and so needs to be preserved across RM restart. Fortunately, this information is small and infrequently updated. So saving it synchronously in ZK may not be too much of an issue.

隻有未完成的job資訊需要作為RM狀态被存儲。這些資訊隻在RM上,需要在RM重新開機時恢複。這些資訊量很少而且更新的頻率不高。通過ZK同步開銷不大。

Tsuyoshi OZAWA added a comment - 08/Aug/12 09:03

> So there may not be the need to store any state as long as the RM can recover the current state of the cluster from the NM’s in a reasonable amount of time.

It’s good idea to avoid saving recoverable states without storing. It’s uncertain that it can be recoverable in a reasonable amount of time, so prototyping is needed.

不确定是否能在合理的時間内恢複,是以還需要快照。

> The only state that needs to be save, as far as I can see, is the information about all jobs that are not yet completed.

I agree with you. I’ll check whether the states of WIP jobs is defined correctly or not.

> Also, the implementation seems to be doing blocking calls to ZK etc and will likely end up being a bottleneck on RM threads/perf if a lot of state information needs to be synced to stable store.

I think, to avoid being the bottleneck, RM should have a dedicated thread to save the states of RM. The main thread can send the requests of saving the states to the dedicated thread without blocking by using queue or something. Using async APIs to save the states is also effective, however, the code can get complicated.

為了避免瓶頸,RM需要一個專門的線程儲存RM的狀态。main線程可以發送儲存狀态的請求給這個專門的線程,而因為使用隊列或者其他資料結構而阻塞。使用異步API儲存狀态也可以保證效率,但代碼會比較複雜。

Vinod Kumar Vavilapalli added a comment - 24/Sep/12 20:39

Pasting notes from Bikas inline for easier discussion.

Basic Idea:基本思路

Key idea is that the state of the cluster is its current state. So don’t save all container info.

RM on startup sets a recovery flag on. Informs scheduler via API.

叢集狀态就是叢集目前的狀态,無需儲存所有容器的資訊。RM啟動時設定一個恢複标志。

Re-create running AM info from persisted state. Running AM’s will heartbeat to the RM and be asked to re-sync.

從持久化的狀态中重新建立運作的AM資訊。運作的AM通過心跳聯系RM,同時被要求重新同步。

Re-start AM’s that have been lost. What about AM’s that completed during restart. Re-running them should be a no-op.

重新啟動失去聯系的AM。在重新開機期間完成的AM,使用空操作重跑他們。

Ask running and re-started AM’s to re-send all pending container requests to re-create pending request state.

要求所有正在運作和重新啟動的AM重新發送所有的等待容器請求以重建等待請求狀态。

RM accepts new AM registrations and their requests.

RM接收新AM注冊和請求。

Scheduling pass is not performed when recovery flag is on.

在恢複标志被設定期間不執行排程操作。

RM waits for nodes to heartbeat and give it container info.

RM等待節點的心跳資訊,收到後發送容器資訊。

RM passes container info to scheduler so that the scheduler can re-create current allocation state.

RM将容器資訊發給排程器,排程器以之重建目前的資源配置設定狀态。

After recovery time threshold, reset recovery flag and start the scheduling pass. Normal from thereon.

恢複時間限制到達後,重置恢複标志,啟動排程。

Schedulers could save their state and recover previous allocation information from that saved state.

排程器儲存其狀态,從儲存的狀态中恢複之前的配置設定資訊。

What info comes in node heartbeats:節點心跳帶來的資訊

Handle sequence number mismatch during recovery. On heartbeat from node send ReRegister command instead of Reboot. NodeManager should continue running containers during this time.

在恢複過程中,handle序列号将處于不對應的狀态。節點通過心跳發送重新注冊的指令而不是重新開機。在此期間NM繼續執行容器相關操作。

RM sends commands back to clean up containers/applications. Can orphans be left behind on nodes after RM restart? Will NM be able to auto-clean containers?

RM向NM發送指令以清除容器/應用。在RM重新開機後節點上是否可能存在剩餘的孤立容器?NM是否可以自動清除容器?

ApplicationAttemptId can be gotten from Container objects to map resources back to SchedulingApp.

ApplicationAttemptId可以從容器對象到被映射的資源處被擷取,然後傳回給SchedulingApp。

How to pause scheduling pass:如何暫停排程過程

Scheduling pass is triggered on NODE_UPDATE events that happen on node heartbeat. Easy to pause under recovery flag.

排程過程在節點心跳時被節點更新事件觸發。很容易被恢複标志所暫停。

YarnScheduler.allocate() is the API that needs to be changed.

API中的YarnScheduler.allocate()應該被修改了。

How to handle container releases messages that were lost when RM was down? Will AM’s get delivery failure and continue to resend indefinitely?

如何處理在RM當機時丢失的容器釋放消息?AM将能擷取發送的失敗資訊然後繼續無限地重新發送直到成功?

How to re-create scheduler allocation state:如何重建排程器配置設定狀态

On node re-register, RM passes container info to scheduler so that the scheduler can re-create current allocation state.

在節點重新注冊期間,RM将容器資訊傳給排程器,排程器可以以之重建當先配置設定狀态。

Use CsQueue.recoverContainer() to recover previous allocations from currently running containers.

使用CsQueue.recoverContainer()從目前運作的容器中來恢複之前的配置設定資訊。

How to re-synchronize pending requests with AM’s:如何與AM重新同步等待請求

Need new AM-RM API to resend asks from AM to RM.

需要新的AM-RM API來重新發送AM到RM的請求。

Keep accumulating asks from AM’s like it currently happens when allocate() is called.

保持從AM的累計請求,就像請求在allocate()被調用的同時發生的一樣。

How to persist AM state:如何持久化AM狀态

Store AM info in a persistent ZK node that uses version numbers to prevent out of order updates from other RM’s. One ZK node per AM under a master RM ZK node. AM submission creates ZK node. Start and restart update ZK node. Completion clears ZK node.

儲存AM資訊到一個持久化的ZK節點,使用版本号防止更新因其他RM的更新而失序。一個ZK節點對應一個AM,這些節點都在一個主RM ZK節點下。AM的送出導緻ZK節點的建立。AM的啟動和重新開機更新ZK節點。完成清除ZK節點。

Metrics:名額

What needs to be done to maintain consistency across restarts. New app attempt would be a new attempt but what about recovered running apps.

為了保持重新開機期間的一緻性需要做什麼?新應用嘗試将會是一個新嘗試,但是恢複運作的應用如何處理?

Security:安全

What information about keys and tokens to persist across restart so that existing secure containers continue to run with new RM and new containers. ZK nodes themelves should be secure.

哪些key和token資訊需要在重新開機期間持久化使得已存在的安全容器可以和新RM和新容易一起繼續運作?ZK節點的安全性需要自己保證。

Vinod Kumar Vavilapalli added a comment - 24/Sep/12 21:17

+1 for most of your points. Some specific comments:

What about AM’s that completed during restart. Re-running them should be a no-op.

AMs should not finish themselves while the RM is down or recovering. They should just spin.

AM不應在RM當機或恢複期間完成,而是應等待。

How to handle container releases messages that were lost when RM was down? Will AM’s get delivery failure and continue to resend indefinitely?

You mean release requests from AM? Like above, if AMs just spin, we don’t have an issue.

Need new AM-RM API to resend asks from AM to RM.

See AMResponse.getRebott(). That can be used to inform AMs to resend all details.

What information about keys and tokens to persist across restart so that existing secure containers continue to run with new RM and new containers.

We already noted this as java comments in code. Need to put in proper documentation.

ZK nodes themelves should be secure.

Good point. Worst case that ZK doesn’t support security, we can rely on a RM specific ZK instance and firewall rules.

More requirements:

  • An upper bound (time) on recovery?

恢複時間的上限。

  • Writing to ZK shouldn’t add more than x% (< 1-2%) to app latency?

向ZK寫不應使應用增加超過1-2%的延遲。

More state to save:

  • New app submissions should be persisted/accepted but not acted upon during recovery.

在恢複期間新應用送出應被持久化/接受,但不能被執行。

Miscellaneous points:

  • I think we should add a new ServiceState call Recovering and use the same in RM.
  • Overall, clients, AMs and NMs should spin while the RM is down or doing recovery. Also we need to handle fail-over of RM, should do as part of a separate ticket.

用戶端、AM和NM在RM當機或恢複期間應挂起。

  • When is recovery officially finished? When all running AMs sync up? I suppose so, that would be an upper bound equaling AM-expiry interval.

恢複在何時正式結束?在所有運作的AM同步完成的時候?如果這樣需要一個等于AM到期時間的上限。

  • Need to think of how the RM-NM shared secret roll-over is affected, if RM is down for a significant amount of item

需要考慮如果RM因為NM數量過多而當機,RM和NM直接如何有效、保密地roll-over。

Robert Joseph Evans added a comment - 24/Sep/12 22:10

AMs should not finish themselves while the RM is down or recovering. They should just spin.

+1 for that. If we let the MR AM finish, and then the RM comes up and tries to restart it will get confused because it will not find the job history log where it expects to see it which will cause it to restart, and it is likely to find the output directory already populated with data, which could cause the job to fail. What is worse it may not fail, because I think the output committer will ignore those errors. The first AM could inform oozie that the job finished through a callback, and a second job may be launched and is reading the data at the time that the restarted first job is trying to write that data, which could cause inconsistent results or cause the second job to fail somewhat randomly.

如果使MR AM完成,RM重新開機時會有問題,因為其無法在期望位置找到相應job曆史日志,這些日志記錄了RM需要重新開機的原因,RM可能會發現輸出目錄已經有資料了,這将導緻job’失敗。更糟糕的是job可能不會失敗,因為輸出送出者會忽略這些錯誤。第一個AM會通過回調通知oozie job完成,第二個job可能會啟動并讀取資料,而此時重新開機的第一個job正在寫這些資料,這将導緻不一緻的結果或第二個job因為随機原因而失敗。

An upper bound (time) on recovery?

This is a bit difficult to determine because the RM is responsible for renewing tokens. Right now it will renew them when they only have about 10% of their time left before they expire. So it depends on how long the shortest token you have in flight is valid for before it needs to be renewed. In general all of the tokens I have seen are for 24 hours, so you would have about 2.4 hours to bring the RM back up and read in/start renewing all of the tokens or risk tokens expiring.

這個時間難以決定,因為RM需要處理token更新。目前RM将在token過期時間剩餘10%的時候更新token。是以恢複的上限時間取決于最短的token有效時間。通常token的過期時間為24小時,是以需要2.4小時使RM後備并讀取和開始更新所有過期的token,否則就有token過期的風險。

Thomas Graves added a comment - 25/Sep/12 02:03

RM sends commands back to clean up containers/applications. Can orphans be left behind on nodes after RM restart? Will NM be able to auto-clean containers?

Containers can currently be lost. See YARN-72 and YARN-73. Once its changed so RM doesn’t always reboot the NM’s that will get a bit better but its still possible so we will have to handle somehow. Since the NM could crash it almost needs a way to check on startup whats running and at that point decide if it should clean them up. It does have a .pid file for the containers but you would have to be sure that process is the same one as when the NM went down.

容器一般來說會丢失。容量改變後RM不會一直重新開機NM,這樣比較好,但是可能依然需要我們來處理。因為NM可能當機,是以其需要一種方法在啟動時檢查當機時正在運作的應用,并決定是否需要清理它們。NM有一個關于容器的pid檔案,但是仍然需要保證這個程序和NM當機時的程序一緻。

Thomas Graves added a comment - 25/Sep/12 03:40

What about AM’s that completed during restart. Re-running them should be a no-op.

AMs should not finish themselves while the RM is down or recovering. They should just spin.

Doesn’t the RM still need to handle this. The client could stop the AM at any point by talking directly to it. Or since anyone can write an AM it could simply finish on its own. Or perhaps timing issue on app finish. How does the RM tell the difference? We can have the MR client/AM handle this nicely but even then there could be a bug or expiry after so long. So perhaps if the AM is down it doesn’t get restarted? Thats probably not ideal if app happens to go down at the same time as the RM though – like a rack gets rebooted or something, but otherwise you have to handle all the restart issues, like Bobby mentioned above.

RM無需處理這些。用戶端可以通過同RM直接交換而在任意點停止AM。或者任何将寫一個AM的對象都可以自己來完成。或者可能是應用完成的時間選擇的問題。RM如何分辨其中的差別?我們可以使MR用戶端/AM來很好地處理這些問題,但是即使如此依然可能存在bug或者到期時間過長。或許AM當機後不重新開機?如果應用剛好在RM當機或重新開機時當機,這可能不是一個理想的解決方案,但是不這樣做就必須處理所有重新開機的問題。

Robert Joseph Evans added a comment - 25/Sep/12 16:06

The problem is that we cannot be truly backwards compatible when adding in this feature. We have to better define the lifecycle of an AM for it to be “well behaved” and properly handle RM recovery. I would say that if the client asks the AM to stop it should still pause on unregister until it can successfully unregister, or until it can mark itself as “killed” in a persistent way like with the job history log, so that when that AM is relaunched all it has to do is to check a file on HDFS and then unregister. Perhaps the only way to be totally backwards compatible is for the AM to indicate when it registers if it supports RM recovery or not. Or to avoid any race conditions when the client launches the AM it would indicate this. If it does not (legacy AMs), then the RM will not try to relaunch it if the AM goes down while the RM is recovering. If it does, then the AM will always be relaunched when the RM goes down.

問題是我們不能保證添加這個特性後能夠保證向後 相容。我們需要好好定義AM的生存周期,其應該具有良好的行為并且可以在RM恢複期間被适當地處理。如果用戶端要求AM停止,其将在登出時暫停直到登出完成,或者可以在一個持久化方式中(比如job曆史日志)辨別自身為“killed”,這樣AM重加載時所需做的就是堅持HDFS上的檔案然後登出。或許唯一保證向後相容的方法是AM在注冊時需要指明其是否支援RM恢複。避免用戶端在加載AM存在任何的競争條件。如果不這麼做,那麼AM當機時RM正在恢複,RM将不會嘗試重新加載AM。反之,RM當機後AM将會被重新加載。

Devaraj K added a comment - 18/Oct/12 07:54

Attaching the first version of patch. I have tested in a small cluster with FIFO & Capacity schedulers by making RM down and up while running the application and continued the application without any failures.

Arun C Murthy added a comment - 06/Nov/12 01:34

Agree with Bobby’s concerns.

For now I think the first step should be to merely restart all apps on RM restart, something similar to MR1 today.

RM重新開機的第一步應該是僅僅重新開機所有應用,如同MR1。

Bikas – can I pls suggest this as a first step? Thanks!

Bikas Saha added a comment - 06/Nov/12 17:28

yeah. I have been thinking on similar lines too. Working on a refreshed proposal and code patch.

Bikas Saha added a comment - 06/Nov/12 17:33

Devaraj, I think the current approach+code based on zkstore (that YARN-128.patch builds on top of) has some significant issues wrt perf/scalability of ZK/future HA. The design outline attached to this jira calls out some of the issues. The next proposal document will help clarify a bit more I hope.

Bikas Saha added a comment - 09/Nov/12 18:46

Attaching a proposal doc and code for the first iteration. The proposal is in the same lines as the earlier initial design sketch but limits the first iteration of the work to restarting the applications after the RM comes back up. The reasoning and ideas are detailed in the doc.

Attaching some code that implements the proposal. It includes a functional test that verifies the end-to-end scenario using an in-memory store. If everything looks good overral then I will tie up the loose ends and add more tests.

For review, the code is broken into 1) removal of old code 2) new code + test. There are TODO comments in the code where folks could make suggestions. The code is attached in full for a build and test pass on Jenkins because my machine is having long host resolution timeouts. Any ideas on this?

During the testing I found a bug in the CapacityScheduler because of which it fails to activate applications when resources are added to the cluster. Folks can comment on the fix. There is a separate test case that shows the bug and verifies the fix.

Bikas Saha added a comment - 11/Nov/12 14:47

Updating patches for new code and combined patch.

Changes

1) Code added to remove application data upon completion

2) All TODO’s examined and removed/fixed.

3) Improved TestRMRestart and its readability

4) Added more tests for RMAppAttemptTransitions

5) Refactored RMStateStore into an abstract class so that it can implement common functionality to notify app attempt about async store operation completion

Fix for capacity scheduler bug is still in the patch because it blocks test completion. The issue is also tracked in YARN-209

Bikas Saha added a comment - 12/Nov/12 05:29

Attaching rebased patches

Bikas Saha added a comment - 13/Nov/12 12:03

Attaching rebased patches + change RMStateStore to throw exception to notify about store errors.

Arinto Murdopo added a comment - 15/Nov/12 08:35

Based on the YARN-128.full-code-4.patch, I have these following observations:

1) In TestRMRestart.java Line 78, app1 and appState refer to the same instance because we are using memory to store the states (MemoryRMStateStore). Therefore, the assert result will always be True.

2) ApplicationState is stored when we invoke MockRM’s submitApp method. More precisely, it is in ClientRMService class, line 266. The state that we store contains the resource request from client. In this case, the value of resource request is 200. However, if we wait for some time, the value will be updated to 1024 (which is the normalized value given by the Scheduler).

3)Currently our school project is trying to persist the state in persistent storage, and the assert statement in our modified test class returns error since our storage stores the resource value before updated by the scheduler.

Based on above observations, should we update the persisted memory value with the new value assigned by scheduler?

Since we are going to restart both ApplicationMaster and NodeManager when there is failure in ResourceManager, I think the answer is no, we can use the original value requested by user. But I’m not really sure with my own reasoning.. soo.. please comment on it. . If the answer is yes, then we should wait until Scheduler updates the resource value before persisting it into the storage.

Bikas Saha added a comment - 16/Nov/12 11:03

1) Unless I am mistaken, the test condition is correct. app1 is the app actually submitted while appState is the state retrieved from the store. By checking that both are the same, we are checking that the data that was supposed to be passed has actually been passed to the store and there is no bug in the transfer of that data. The assert will be false if the transfer does not happen or some other value gets passed by mistake. Does that help clarify?

3) Which resource value is this? The one that is store in ApplicationSubmissionContext->ContainerLaunchContext? In the patch, the ApplicationSubmissionContext is being store at the very beginning to ensure that the client does not have to submit the job again. Hence, the Resource set by the client is saved. I am not sure what your project is saving after the scheduling is done.

You are right. We dont want to store the updated value since this updated value is a side-effect of the policy of the scheduler.

I am not sure if this applies to your project. I will be shortly posting an Zookeeper and HDFS state store that you could use unless you are using your own storage mechanism.

Arinto Murdopo added a comment - 16/Nov/12 13:21

1) Yes, I agree with your clarification. It works as what you state when we are using persistent storage (not MemStore, but ZK, MySQL, file or other persistent storage)

However, when we are using MemStore, the stored object (appState) and app1 are referring to the same instance since our “store” is memory. To test my argument, we can put breakpoint in the assert statement that compares the ApplicationSubmissionContext, then use IDE feature to change any value of appState’s properties i.e resource in ApplicationSubmissionContext. The corresponding app1 value (in this case is the resource in app1′s ApplicationSubmissionContext) will also be updated to the same value.

3). Yes, it’s in Resource in ApplicationSubmissionContext->ContainerLaunchContext. e

If we are saving the original resource value requested by client, then the assert statement that compare ApplicationSubmissionContext will not pass.

Let’s say Client request resource of memory with value of 200. We store this in our persistent storage. After we store, scheduler updates the resource with value of 1024. In this case, the resource in app1 instance will be 1024, but the resource that stored in our storage is 200. Hence, it will not pass when we compare them using current assert statement. Maybe we need to keep storing our original resource request in ApplicationSubmissionContext.

Looking forward to your ZK and HDFS state store. The state store in our project is using MySQL cluster.

Tom White added a comment - 16/Nov/12 14:02

Bikas, this looks good so far. Thanks for working on it. A few comments:

  • Is there a race condition in ResourceManager#recover where RMAppImpl#recover is called after the StartAppAttemptTransition from resubmitting the app? The problem would be that the earlier app attempts (from before the resart) would not be the first ones since the new attempt would get in first.
  • I think we need the concept of a ‘killed’ app attempt (when the system is at fault, not the app) as well as a ‘failed’ attempt, like we have in MR task attempts. Without the distinction a restart will count against the user’s app attempts (default 1 retry) which is undesirable.
  • Rather than change the ResourceManager constructor, you could read the recoveryEnabled flag from the configuration.

Bikas Saha added a comment - 16/Nov/12 15:56

@Arinto

Thanks for using the code!

1) Yes. Both are the same object. But that is what the test is testing. That the context that got saved in the store is the same as the one the app was submitted with. We are doing this with an in memory store that lets us examine the stored data and compare it with the real data. A real store would save this the data. So comparison is not possible.

3) Yes. It seems incorrect to store scheduler side-effects. e.g. upon restart if the scheduler config make minimum container size = 512 then again it will not match.

I am attaching a patch for a ZK store that you can try. It applies on top of the current full patch.

@Tom

Thanks for reviewing!

1) There is no race condition because the Dispatcher has not been started yet and hence the attempt start event has not been processed. There is a comment to that effect in the code.

2) I agree. I had thought about it too. But it looks like the current behavior (before this patch) does this because it does not differentiate killed/failed attempts when deciding that the attempt retry limit has been reached. So I thought about leaving it for a separate jira which would be unrelated to this. Once that is done this code could use it and not count the restarted attempt. This patch is already huge. Does that sound good?

3) Yes. That could be done. The constructor makes it easier to write tests without mangling configs.

Tom White added a comment - 16/Nov/12 16:14

You are right about there being no race – I missed the comment! I opened YARN-218 for the killed/failed distinction as I agree it can be tackled separately.

Bikas Saha added a comment - 17/Nov/12 17:39

Updated ZK and FileSystem store patches. FileSystem patch applies after ZK patch.

Tom White added a comment - 19/Nov/12 17:48

I had a quick look at the new patches and FileSystemRMStateStore and ZKRMStateStore seem to be missing default constructors, which StoreFactory needs. You might change the tests to use StoreFactory to construct the store instances to test this code path.

Bikas Saha added a comment - 19/Nov/12 20:31

Thanks for looking at the patches while work is still in progress. That helps a lot!

Yes. I am working on that currently. The 2 also have a lot of duplicated code which I am moving into the base class. I will soon create a few sub tasks and post the final patches in them so that its easier to review and commit them.

Bikas Saha added a comment - 20/Nov/12 13:14

Attaching final patch with full changes for a test run. Can someone with access please trigger a test run on JIRA?

Changes

1) Completed handling on unmanaged AM’s

2) Refactored ZK and FileSystem store classes to move common logic into the base class and also integrate with the RM

3) Test improvements

I have tested manually on a single node with both ZK and FileSystem store (using HDFS) and run wordcount job across a restart.

I will create sub-tasks of this jira to break the changes into logical pieces.

Bikas Saha added a comment - 20/Nov/12 15:16

Done creating sub-tasks and attaching final patches for review and commit.

Arinto Murdopo added a comment - 28/Nov/12 16:19

Tested the YARN-128.full-code.5.patch, using ZooKeeper store and the result is positive. ResourceManager resurrected properly after we killed it.

Experiment overview:

  • ZK settings: 1 ZK-Server consisted of 3 different nodes
  • HDFS was in single-node setting. YARN and HDFS was executed in the same node.
  • Executed bbp and pi examples from the generated hadoop distribution (we built and packaged the trunk and patch code)
  • Killed ResourceManager process when bbp or pi was executing(using Linux kill command) and started new RM 3 seconds after we killed it.

Strahinja Lazetic added a comment - 04/Dec/12 11:08

Bikas, I have one question; Since we reboot NMs and terminate all the running containers and AMs upon the RM restart, why do we need to keep track of the previous Applications’ attempts? Couldn’t we just start “from scratch” instead of generating the next attempt id based on the last running one?

參考連結如下:

https://issues.apache.org/jira/browse/YARN-128

https://issues.apache.org/jira/browse/MAPREDUCE-4343

https://issues.apache.org/jira/secure/attachment/12549649/YARN-128.patch

https://issues.apache.org/jira/secure/attachment/12532336/MR-4343.1.patch

繼續閱讀