天天看點

MapReduce V1:JobTracker處理Heartbeat流程分析

我們基于Hadoop 1.2.1源碼分析MapReduce V1的處理流程。這篇文章的内容,更多地主要是描述處理/互動流程性的東西,大部分流程圖都是經過我梳理後畫出來的(開始我打算使用序列圖來描述流程,但是發現很多流程在單個對象内部都已經非常複雜,想要通過序列圖表達有點擔心描述不清,是以選擇最基本的程式流程圖),可能看起來比較枯燥,重點還是關注主要的處理流程要點,特别的地方我會刻意标示出來,便于了解。

JobTracker與TaskTracker之間通過org.apache.hadoop.mapred.InterTrackerProtocol協定來進行通信,TaskTracker通過該接口進行遠端調用實作Heartbeat消息的發送,協定方法定義如下所示:

<code>1</code>

<code>HeartbeatResponse heartbeat(TaskTrackerStatus status,</code>

<code>2</code>

<code></code><code>boolean</code> <code>restarted,</code>

<code>3</code>

<code></code><code>boolean</code> <code>initialContact,</code>

<code>4</code>

<code></code><code>boolean</code> <code>acceptNewTasks,</code>

<code>5</code>

<code></code><code>short</code> <code>responseId)</code><code>throws</code> <code>IOException;</code>

通過該方法可以看出,最核心的Heartbeat報告資料都封裝在TaskTrackerStatus對象中,JobTracker端會接收TaskTracker周期性地發送的心跳報告,根據這些心跳資訊來更新整個Hadoop叢集中計算資源的狀态/數量,以及Task的運作狀态。

另外,在JobTracker端維護的對象的資料結構,主要包括如下3個:

TaskTracker:這個類是在JobTracker端定義的,描述了TaskTracker的基本資訊和狀态(需要注意的是:它與TaskTracker程序的實作類同名,但是含義完全不同)

JobInProgress:簡寫JIP,在JobTracker端用來描述,JobClient送出的Job運作狀态的資料結構,一個JIP對象還包含了組成一個Job的Task對應的一組TIP的資訊

TaskInProgress:簡寫TIP,在JobTracker端用來描述,在TaskTracker上運作的Task狀态的資料結構(需要注意的是:在TaskTracker端也對應一個TaskInProgress實作類,它與JobTracker端的同名,但是所包含的内容也并不完全相同)

TaskAttemptID:簡寫TAID,它是唯一辨別了組成一個Job的Task的一個運作執行個體,一個Task(MapTask/ReduceTask)可能運作多次,比如第一次運作失敗,對應一個失敗的TAID,第二次排程又運作,又對應一個新的TAID;再比如,推測執行,可能會對應着同一個Task的、具有2個不同TAID的Task運作執行個體

TaskTrackerStatus結構

TaskTrackerStatus對象要在網絡間進行序列化傳輸,是以實作了接口org.apache.hadoop.io.Writable,該對象的資料結構,如下圖所示:

TaskTrackerStatus中各個資料項的含義,說明如下表所示:

<b>字段名稱</b>

<b>字段類型</b>

<b>說明</b>

trackerName

String

TaskTracker的名稱,例如:tracker_ + localHostname + : + taskReportAddress

host

TaskTracker所在主機名稱

httpPort

int

HTTP端口号,預設50030

taskFailures

在該TaskTracker上運作失敗的Task的個數

dirFailures

在TaskTracker節點上,配置的mapred.local.dir目錄失敗的個數

maxMapTasks

在該TaskTracker上同時運作Map的最大個數,通過mapred.tasktracker.map.tasks.maximum配置的,預設值是2

maxReduceTasks

在該TaskTracker上同時運作Map的最大個數,通過 mapred.tasktracker.reduce.tasks.maximum配置的,預設值是2

resStatus

ResourceStatus

在該TaskTracker上的資源情況,主要包括如下内容:虛拟記憶體大小、實體記憶體大小、Map slot數量、Reduce slot數量、可用磁盤空間、可用虛拟記憶體大小、可用實體記憶體大小、處理器數量、CPU頻率、CPU使用百分比、累積CPU時間

taskStatus

TaskStatus

在該TaskTracker上,目前task的狀态,它有分為MapTaskStatus和ReduceTaskStatus,主要包含如下内容:taskid(TaskAttemptID)、運作進度百分比、運作狀态、診斷資訊、所在TaskTracker名稱、slot數、開始時間、結束時間、執行階段(Phase)、一組計數器資訊

taskTrackerHealthStatus

TaskTrackerHealthStatus

TaskTracker的健康狀态資訊

下面,主要對ResourceStatus、TaskStatus、TaskTrackerHealthStatus進行說明:

ResourceStatus封裝了一個TaskTracker節點的資源資訊,結構如下圖所示:

TaskStatus封裝了一個TaskTracker節點上運作的Task的狀态資訊,結構如下圖所示:

上圖将TaskStatus的包含的資料結構全部展開,可以根據字段含義來了解它所描述的一些資訊。

TaskTrackerHealthStatus封裝了TaskTracker的健康狀态資訊,如下圖所示:

JobTracker處理Heartbeat流程

JobTracker處理Heartbeat的流程,如果把每個處理細節都詳細地展開,非常地複雜,可能從頭到尾描述下來會感覺枯燥無味,是以這裡我先概要地描述JobTracker處理Heartbeat的整體流程,然後再按照功能劃分出一個個看似還算獨立的子處理流程,單獨地進行詳細說明,這樣能夠更容易了解。整體處理流程,如下圖所示:

下面,我們根據上面的Heartbeat處理流程圖,概要地說明Heartbeat是如何處理的,流程描述如下所示:

TaskTracker建立一個TaskTrackerStatus對象,TaskTrackerStatus内部封裝的資訊包括:TaskTracker所在節點的基本資訊、運作在TaskTracker上的Task的狀态資訊、TaskTracker服務的健康狀态資訊、TaskTracker的資源資訊,另外發送心跳的RPC方法還包括restarted(TaskTracker是否重新開機)、initialContact(TaskTracker是否初次連接配接JobTracker)、acceptNewTasks(TaskTracker是否能夠運作新的Task)、responseId(心跳響應ID),通過InterTrackerProtocol協定的heartbeat方法發送給JobTracker。

JobTracker接收到TaskTracker發送的心跳資料。

JobTracker檢查TaskTracker的host是否在黑名單中,如果TaskTracker在黑名單中,則直接抛出異常終止RPC調用,否則繼續下一步流程。

檢查TaskTracker RPC調用參數restarted的值,如果TaskTracker重新開機了,則标記TaskTracker狀态為健康狀态;如果TaskTracker沒有重新開機,則檢查是否可以指派任務在該TaskTracker上運作。

如果TaskTracker不是初次連接配接JobTracker,檢查JobTracker是否存在上一次向該TaskTracker發送的Heartbeat響應資料,存在的話則說明TaskTracker因為失去了與JobTracker之間的RPC連接配接而沒有接收到,JobTracker直接再給TaskTracker重新發送該響應資料;不存在的話,若JobTracker重新開機了,使TaskTracker重新加入叢集,需要通知Recovery Manager從恢複清單中移除該TaskTracker,若JobTracker未重新開機,這種情況幾乎是不可能存在的(既然TaskTracker不是初次連接配接,JobTracker也沒有重新開機,JobTracker端不可能沒有儲存Heartbeat響應資料)。

處理JobTracker接收到的TaskTracker的Heartbeat資訊,主要是TaskTrackerStatus封裝的資料。

根據處理Heartbeat資料結果,如果TaskTracker需要重新初始化,則發送一個帶有ReinitTrackerAction指令的Heartbeat響應資料,否則TaskTracker不需要重新初始化則繼續下一步流程。

檢查是否可以向該TaskTracker指派任務,如果可以可以向該TaskTracker指派任務,則直接使用TaskScheduler指定的排程政策,選擇目前可以指派給TaskTracker的一組需要啟動的Task(對應指令LaunchTaskAction)。

根據TaskScheduler排程政策選擇的需要啟動的Task,并根據TaskTracker發送的Task狀态報告,繼續選擇一些已經完成/需要被清理的Task配置設定給TaskTracker:先檢查在該TaskTracker上是否有完成的Job,計算屬于這些Job的需要被Kill掉(對應指令KillTaskAction)的Task;再檢查是否有完成的Job,并且對應在該TaskTracker上的Task需要被清理(對應指令KillTaskAction);最後檢查是否有已經完成需要被送出的Task(以此來通知TaskTracker送出Task完成并更新狀态,對應指令CommitTaskAction)。

構造一個包含可排程Task(LaunchTaskAction/KillTaskAction/CommitTaskAction)的HeartbeatResponse對象,更新JobTracker内部維護的trackerToHeartbeatResponseMap映射。根據TaskTracker的Heartbeat報告的Task狀态資訊,對标記為完成的Task,更新JobTracker内部維護的多個隊列和Map:trackerToMarkedTasksMap、taskidToTrackerMap、trackerToTaskMap、taskidToTIPMap。最後,傳回TaskTracker調用的結果:HeartbeatResponse對象。

上面流程圖中,黑色虛線所表示的處理流程,我們說明一下:這種情況是不可能出現的,因為TaskTracker不是第一次連接配接JobTracker,而JobTracker端還沒有上一次TaskTracker發送的Heartbeat對應的HeartbeatResponse,同時JobTracker又沒有重新開機動過,是以這種條件是不存在的,那麼該流程分支也不可能執行,故而用虛線描述,指向發送一個帶有ReinitTrackerAction的HeartbeatResponse。

下面,我們細化整個流程,将一些比較重要的流程詳細分析說明:

TaskTracker與JobTracker失去連接配接,更新狀态

JobTracker如果在給定逾時時間範圍之内沒有收到TaskTracker的Heartbeat報告,會認為該TaskTracker已經無法執行/指派任務,那麼在JobTracker端與該TaskTracker相關的資料結構都需要更新,受到影響的Job和Task的資料結構也需要更新,具體處理流程如下圖所示:

上述流程圖描述的流程,如下所示:

從隊列Map&lt;String, Set&lt;JobID&gt;&gt; trackerToJobsToCleanup中移除在該TaskTracker上已經完成且需要清理的所有Job。

從隊列Map&lt;String, Set&lt;TaskAttemptID&gt;&gt; trackerToTasksToCleanup中移除在TaskTracker上已經運作完成且需要清理的所有Task。

通知Recovery Manager從其維護的Set&lt;String&gt;類型的恢複清單JobTracker.RecoveryManager.recoveredTrackers中移除該TaskTracker。

從TreeMap&lt;String, Set&lt;TaskAttemptID&gt;&gt; trackerToTaskMap中删除在該TaskTracker上運作的所有Task。

對在該TaskTracker上的運作的每一個Task(在隊列trackerToTaskMap中),進行如下2步處理:

(1)從隊列Map&lt;TaskAttemptID, TaskInProgress&gt; taskidToTIPMap中取出TaskAttemptID對應的TaskInProgress tip結構,再根據tip擷取到JobInProgress:JobInProgress job = tip.getJob();;

(2)如果ReduceTask已經完成,以及具有0個ReduceTask的所有MapTask已經完成,則将這些Task放入到隊列TreeMap&lt;String, Set&lt;TaskAttemptID&gt;&gt; trackerToMarkedTasksMap中;如果tip标記Task沒有完成,或者滿足條件tip.isMapTask() &amp;&amp; !tip.isJobSetupTask() &amp;&amp; job.desiredReduces() != 0,檢查Job運作狀态,當job.getStatus().getRunState() == JobStatus.RUNNING || job.getStatus().getRunState() == JobStatus.PREP成立時,則該Task運作失敗,并更新Task狀态,同時收集這類Job,放入集合Set&lt;JobInProgress&gt; jobsWithFailures中,後續對這些Job進行處理;

由于該TaskTracker被JobTracker标記為lost狀态,則對上面收集到的jobsWithFailures集合中的Job,隻要存在屬于該Job的Task被配置設定到該TaskTracker上運作,會通過累加計算在該TaskTracker上失敗的Task計數,給該TaskTracker以懲罰,并釋放所有在該TaskTracker上預留的Slot。

從隊列TreeMap&lt;String, Set&lt;TaskAttemptID&gt;&gt; trackerToMarkedTasksMap中移除所有被标記完成的Task,同時更新JobTracker内部維護的如下3個隊列:TreeMap&lt;TaskAttemptID, String&gt; taskidToTrackerMap、TreeMap&lt;String, Set&lt;TaskAttemptID&gt;&gt; trackerToTaskMap、Map&lt;TaskAttemptID, TaskInProgress&gt; taskidToTIPMap。

如果在該TaskTracker上的運作的Task還有沒處理的,則轉第6步進行處理;否則,流程結束。

檢查是否可以向TaskTracker指派運作Task

當TaskTracker發送Heartbeat标志其沒有重新開機,那麼會執行該子流程,如下圖所示:

在JobTracker端,既然TaskTracker彙報狀态表明其沒有重新開機,那麼就需要檢查該TaskTracker對應的黑名單和灰名單情況,如果TaskTracker狀态一切正常,則恢複其正常被指派任務并運作Task的能力。

标記TaskTracker為Health狀态

當TaskTracker重新開機了,然後再次連接配接JobTracker時,發送Heartbeat的過程中,會執行該流程。重新開機的TaskTracker,JobTracker會将一個TaskTracker标記為Health狀态,說明該TaskTracker對應的資源資訊(記憶體/CPU)應該在JobTracker端做記錄,表示這些資源是可用的,更新JobTracker端的幾個可用資源的變量計數。但是,很有可能TaskTracker重新開機之前,其上運作Task失敗了很多次,在JobTracker端記錄該失敗計數,當滿足一定條件後,會将TaskTracker加入灰名單,如果TaskTracker重新開機了,應該将其從灰名單中移除,以便不影響任務分派,具體處理流程如下圖所示:

上述流程圖比較簡單不再累述。

更新TaskTracker狀态

如果TaskTracker不是第一次連接配接JobTracker,那麼在JobTracker端的隊列HashMap&lt;String, TaskTracker&gt; taskTrackers中會儲存上一次TaskTracker向JobTracker彙報的狀态TaskTrackerStatus,如果該TaskTrackerStatus不存在,則直接處理目前彙報的TaskTracker的狀态報告,使得JobTracker端維護的該TaskTracker的狀态是最新的,具體的處理流程,如下圖所示:

上圖中處理流程,描述如下所示:

檢查是否JobTracker端存在該TaskTracker上一次彙報的狀态報告,如果不存在,則直接處理目前發送的狀态報告;否則,會更新JobTracker端維護的如下4個全局計數器:totalMaps(MapTask總數)、totalReduces(ReduceTask總數)、occupiedMapSlots(占用的Map Slot總數)、occupiedReduceSlots(占用的Reduce Slot總數),在目前計數值的基礎上,減去上次彙報的報告中的數量(實際上是假定上次彙報的全部名額都已完成,如果沒完成,再通過本次彙報的狀态報告再加回去);如果TaskTracker沒有被加入到黑名單中,還需要更新下面2個JobTracker端全局計數器:totalMapTaskCapacity(該TaskTracker上最大Map Slot總數)、totalReduceTaskCapacity(該TaskTracker上最大Reduce Slot總數)。

處理TaskTracker目前彙報的狀态報告,更新JobTracker内部維護的6個全局計數器:totalMaps、totalReduces、occupiedMapSlots、occupiedReduceSlots、totalMapTaskCapacity、totalReduceTaskCapacity,各個計數器具體含義見上一步說明。

如果TaskTracker是第一次彙報狀态報告,則需要在JobTracker内部注冊,構造一個org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker對象(該TaskTracker對象是在JobTracker的視角看到的結構),加入到隊列HashMap&lt;String, TaskTracker&gt; taskTrackers中,同時還要計算該TaskTracker所在的host節點上TaskTracker程序的個數,更新隊列Map&lt;String, Integer&gt; uniqueHostsMap。

更新TaskTracker上所有Task狀态

在JobTracker處理TaskTracker發送的Heartbeat的過程中,首先會更新JobTracker維護的TaskTracker的狀态資訊,因為一個TaskTracker上可能運作着很多Task,那麼需要更新這些Task的狀态,可以通過上面介紹的TaskTrackerStatus的結構看出,對應着一個TaskStatus的狀态報告集合,是以這裡有一個批量更新TaskStatus狀态的操作,實際上會對每一個Task的狀态分别進行更新,整體處理流程如下圖所示:

具體處理流程,描述如下所示:

從TaskTracker發送的TaskTrackerStatus對象可以提取Task狀态報告集合,然後對每一個狀态報告進行處理,直到所有的Task的狀态都已經被更新到JobTracker内部維護的狀态對象上,下面描述每一個TaskStatus的處理過程:

(1)如果一個Task的運作狀态不為TaskStatus.State.UNASSIGNED,說明該Task還沒有在TaskTracker上獲得運作機會,則并不讓該Task失敗(當一個Task指派給一個TaskTracker運作時,會首先在JobTracker端加入到一個逾時清單中,由一個獨立的線程JobTracker.ExpireLaunchingTasks去檢測,該Task是否在給定的時間内(預設是10分鐘 )是否在TaskTracker上啟動而且一直沒有報告狀态,如果沒有報告,則會将該Task标記為失敗),等待下一次被排程配置設定給TaskTracker去運作。

(2)根據Task的ID,擷取到它對應的JobInProgress資訊,如果沒有擷取到則将該Task對應的JobInProgress對象加入到cleanup清單Map&lt;String, Set&lt;JobID&gt;&gt; trackerToJobsToCleanup中,直接傳回繼續處理下一個TaskStatus報告;如果能夠擷取到對應的JobInProgress資訊,則檢查該JobInProgress中包含的Job是否設定初始化完成狀态,如果沒有設定,則直接将該Task加入到隊列Map&lt;String, Set&lt;TaskAttemptID&gt;&gt; trackerToTasksToCleanup中,等待JobTracker排程Kill掉該Task,直接傳回繼續處理下一個TaskStatus報告。

(3)檢查該TaskStatus報告中對應的TaskAttemptID,是否在JobTracker端存在對應的TaskInProgress對象,很有可能JobTracker重新開機,記憶體中維護的Map&lt;TaskAttemptID, TaskInProgress&gt; taskidToTIPMap隊列中沒有TaskInProgress對象,這時JobInProgress對象一定存在,可以通過JobInProgress對象擷取到該Task對應的TaskInProgress對象(因為在JobTracker端建立Job的時候,會分别建立4類TIP:map、reduce、cleanup、setup),再将其加入到Map&lt;TaskAttemptID, TaskInProgress&gt; taskidToTIPMap隊列中,同時觸發已知的一組JobInProgressListener的jobUpdated方法,去更新Job狀态。

(4)根據TaskStatus能夠擷取到所有Fetch失敗的Task,查詢該Task對應的TaskInProgress對象,進而進一步通知JobInProgress對象,根據設定的允許Task Fetch失敗的最大次數限制,确定是否要讓該Task失敗,并更新TaskInProgress狀态。

更新Task狀态

當Task的狀态發生變化的情況下,可能需要更新Task的狀态,我們根據JobTracker定義的updateTaskStatus方法,方法聲明如下所示:

<code>public</code> <code>synchronized</code> <code>void</code> <code>updateTaskStatus(TaskInProgress tip, TaskStatus status)</code>

其中,tip是目前在JobTracker端維護的Task的狀态,status是TaskTracker彙報的Task狀态,更新JobTracker端Task狀态主要是根據心跳彙報的status來更新tip資料結構。更新Task狀态的具體流程,如下圖所示:

更新Task狀态,主要是更新每個Task對應的在JobTracker端維護的TaskInProgress結構,處理流程描述如下:

如果心跳彙報的status中,Task運作狀态為SUCCEEDED,當tip辨別已經完成或辨別被Kill掉,則統一修改status的運作狀态為KILLED;如果心跳彙報的status對應的TaskAttemptID不是cleanup task,當該TaskAttemptID 對應的JobInProgress表示Job已經完成,或失敗,或被Kill掉,那麼status運作狀态為FAILED_UNCLEAN則修改為FAILED,運作狀态為KILLED_UNCLEAN則修改為KILLED。

調用TaskInProgress的updateStatus方法,傳入目前TaskTracker彙報的status狀态對象,更新tip的狀态。TaskInProgress會維護每個Task對應的TaskStatus對象oldStatus,并根據彙報的status對更新替換oldStatus。有3種情況不需要更新:第一種是當status的運作狀态不等于RUNNING/COMMIT_PENDING/FAILED_UNCLEAN/KILLED_UNCLEAN/UNASSIGNED中的任何一種狀态;第二種是status的運作狀态為RUNNING、UNASSIGNED中的任意一種狀态,并且oldStatus的運作狀态為FAILED/KILLED/FAILED_UNCLEAN/KILLED_UNCLEAN/SUCCEEDED/COMMIT_PENDING中任意一種狀态;第三種是oldStatus的運作狀态為FAILED/KILLED中的任意一種狀态,這種情況會把該TaskAttemptID加入到隊列TreeMap&lt;TaskAttemptID, Boolean&gt; tasksToKill中辨別需要Kill掉該Task。

如果status的運作狀态為FAILED狀态,并且JobTracker在Safe模式下,則設定status的運作狀态為KILLED。

此時,如果oldStatus與status不相等,即TaskAttemptID的狀态已經發生變化,則會根據status的運作狀态建立不同的TaskCompletionEvent事件(SUCCEEDED/FAILED/KILLED),這些 TaskCompletionEvent事件會被加入到JobInProgress的taskCompletionEvents清單中,供JobClient查詢或供JobTracker檢索;或者執行相應的操作:如果運作狀态為FAILED_UNCLEAN/KILLED_UNCLEAN,則tip中該TaskAttemptID标記為失敗并更新相關結構,然後加入到mapCleanupTasks/reduceCleanupTasks清單中等待被清理,同時将該TaskAttemptID對應的資料從JobTracker的taskidToTIPMap、taskidToTrackerMap、trackerToTaskMap這3個隊列中删除。

根據構造的TaskCompletionEvent對象,并且如果status的運作狀态為SUCCEEDED,則更新其對應的JobInProgress的狀态為成功。

繼續閱讀