我們基于Hadoop 1.2.1源碼分析MapReduce V1的處理流程。這篇文章的内容,更多地主要是描述處理/互動流程性的東西,大部分流程圖都是經過我梳理後畫出來的(開始我打算使用序列圖來描述流程,但是發現很多流程在單個對象内部都已經非常複雜,想要通過序列圖表達有點擔心描述不清,是以選擇最基本的程式流程圖),可能看起來比較枯燥,重點還是關注主要的處理流程要點,特别的地方我會刻意标示出來,便于了解。
JobTracker與TaskTracker之間通過org.apache.hadoop.mapred.InterTrackerProtocol協定來進行通信,TaskTracker通過該接口進行遠端調用實作Heartbeat消息的發送,協定方法定義如下所示:
<code>1</code>
<code>HeartbeatResponse heartbeat(TaskTrackerStatus status,</code>
<code>2</code>
<code></code><code>boolean</code> <code>restarted,</code>
<code>3</code>
<code></code><code>boolean</code> <code>initialContact,</code>
<code>4</code>
<code></code><code>boolean</code> <code>acceptNewTasks,</code>
<code>5</code>
<code></code><code>short</code> <code>responseId)</code><code>throws</code> <code>IOException;</code>
通過該方法可以看出,最核心的Heartbeat報告資料都封裝在TaskTrackerStatus對象中,JobTracker端會接收TaskTracker周期性地發送的心跳報告,根據這些心跳資訊來更新整個Hadoop叢集中計算資源的狀态/數量,以及Task的運作狀态。
另外,在JobTracker端維護的對象的資料結構,主要包括如下3個:
TaskTracker:這個類是在JobTracker端定義的,描述了TaskTracker的基本資訊和狀态(需要注意的是:它與TaskTracker程序的實作類同名,但是含義完全不同)
JobInProgress:簡寫JIP,在JobTracker端用來描述,JobClient送出的Job運作狀态的資料結構,一個JIP對象還包含了組成一個Job的Task對應的一組TIP的資訊
TaskInProgress:簡寫TIP,在JobTracker端用來描述,在TaskTracker上運作的Task狀态的資料結構(需要注意的是:在TaskTracker端也對應一個TaskInProgress實作類,它與JobTracker端的同名,但是所包含的内容也并不完全相同)
TaskAttemptID:簡寫TAID,它是唯一辨別了組成一個Job的Task的一個運作執行個體,一個Task(MapTask/ReduceTask)可能運作多次,比如第一次運作失敗,對應一個失敗的TAID,第二次排程又運作,又對應一個新的TAID;再比如,推測執行,可能會對應着同一個Task的、具有2個不同TAID的Task運作執行個體
TaskTrackerStatus結構
TaskTrackerStatus對象要在網絡間進行序列化傳輸,是以實作了接口org.apache.hadoop.io.Writable,該對象的資料結構,如下圖所示:
TaskTrackerStatus中各個資料項的含義,說明如下表所示:
<b>字段名稱</b>
<b>字段類型</b>
<b>說明</b>
trackerName
String
TaskTracker的名稱,例如:tracker_ + localHostname + : + taskReportAddress
host
TaskTracker所在主機名稱
httpPort
int
HTTP端口号,預設50030
taskFailures
在該TaskTracker上運作失敗的Task的個數
dirFailures
在TaskTracker節點上,配置的mapred.local.dir目錄失敗的個數
maxMapTasks
在該TaskTracker上同時運作Map的最大個數,通過mapred.tasktracker.map.tasks.maximum配置的,預設值是2
maxReduceTasks
在該TaskTracker上同時運作Map的最大個數,通過 mapred.tasktracker.reduce.tasks.maximum配置的,預設值是2
resStatus
ResourceStatus
在該TaskTracker上的資源情況,主要包括如下内容:虛拟記憶體大小、實體記憶體大小、Map slot數量、Reduce slot數量、可用磁盤空間、可用虛拟記憶體大小、可用實體記憶體大小、處理器數量、CPU頻率、CPU使用百分比、累積CPU時間
taskStatus
TaskStatus
在該TaskTracker上,目前task的狀态,它有分為MapTaskStatus和ReduceTaskStatus,主要包含如下内容:taskid(TaskAttemptID)、運作進度百分比、運作狀态、診斷資訊、所在TaskTracker名稱、slot數、開始時間、結束時間、執行階段(Phase)、一組計數器資訊
taskTrackerHealthStatus
TaskTrackerHealthStatus
TaskTracker的健康狀态資訊
下面,主要對ResourceStatus、TaskStatus、TaskTrackerHealthStatus進行說明:
ResourceStatus封裝了一個TaskTracker節點的資源資訊,結構如下圖所示:
TaskStatus封裝了一個TaskTracker節點上運作的Task的狀态資訊,結構如下圖所示:
上圖将TaskStatus的包含的資料結構全部展開,可以根據字段含義來了解它所描述的一些資訊。
TaskTrackerHealthStatus封裝了TaskTracker的健康狀态資訊,如下圖所示:
JobTracker處理Heartbeat流程
JobTracker處理Heartbeat的流程,如果把每個處理細節都詳細地展開,非常地複雜,可能從頭到尾描述下來會感覺枯燥無味,是以這裡我先概要地描述JobTracker處理Heartbeat的整體流程,然後再按照功能劃分出一個個看似還算獨立的子處理流程,單獨地進行詳細說明,這樣能夠更容易了解。整體處理流程,如下圖所示:
下面,我們根據上面的Heartbeat處理流程圖,概要地說明Heartbeat是如何處理的,流程描述如下所示:
TaskTracker建立一個TaskTrackerStatus對象,TaskTrackerStatus内部封裝的資訊包括:TaskTracker所在節點的基本資訊、運作在TaskTracker上的Task的狀态資訊、TaskTracker服務的健康狀态資訊、TaskTracker的資源資訊,另外發送心跳的RPC方法還包括restarted(TaskTracker是否重新開機)、initialContact(TaskTracker是否初次連接配接JobTracker)、acceptNewTasks(TaskTracker是否能夠運作新的Task)、responseId(心跳響應ID),通過InterTrackerProtocol協定的heartbeat方法發送給JobTracker。
JobTracker接收到TaskTracker發送的心跳資料。
JobTracker檢查TaskTracker的host是否在黑名單中,如果TaskTracker在黑名單中,則直接抛出異常終止RPC調用,否則繼續下一步流程。
檢查TaskTracker RPC調用參數restarted的值,如果TaskTracker重新開機了,則标記TaskTracker狀态為健康狀态;如果TaskTracker沒有重新開機,則檢查是否可以指派任務在該TaskTracker上運作。
如果TaskTracker不是初次連接配接JobTracker,檢查JobTracker是否存在上一次向該TaskTracker發送的Heartbeat響應資料,存在的話則說明TaskTracker因為失去了與JobTracker之間的RPC連接配接而沒有接收到,JobTracker直接再給TaskTracker重新發送該響應資料;不存在的話,若JobTracker重新開機了,使TaskTracker重新加入叢集,需要通知Recovery Manager從恢複清單中移除該TaskTracker,若JobTracker未重新開機,這種情況幾乎是不可能存在的(既然TaskTracker不是初次連接配接,JobTracker也沒有重新開機,JobTracker端不可能沒有儲存Heartbeat響應資料)。
處理JobTracker接收到的TaskTracker的Heartbeat資訊,主要是TaskTrackerStatus封裝的資料。
根據處理Heartbeat資料結果,如果TaskTracker需要重新初始化,則發送一個帶有ReinitTrackerAction指令的Heartbeat響應資料,否則TaskTracker不需要重新初始化則繼續下一步流程。
檢查是否可以向該TaskTracker指派任務,如果可以可以向該TaskTracker指派任務,則直接使用TaskScheduler指定的排程政策,選擇目前可以指派給TaskTracker的一組需要啟動的Task(對應指令LaunchTaskAction)。
根據TaskScheduler排程政策選擇的需要啟動的Task,并根據TaskTracker發送的Task狀态報告,繼續選擇一些已經完成/需要被清理的Task配置設定給TaskTracker:先檢查在該TaskTracker上是否有完成的Job,計算屬于這些Job的需要被Kill掉(對應指令KillTaskAction)的Task;再檢查是否有完成的Job,并且對應在該TaskTracker上的Task需要被清理(對應指令KillTaskAction);最後檢查是否有已經完成需要被送出的Task(以此來通知TaskTracker送出Task完成并更新狀态,對應指令CommitTaskAction)。
構造一個包含可排程Task(LaunchTaskAction/KillTaskAction/CommitTaskAction)的HeartbeatResponse對象,更新JobTracker内部維護的trackerToHeartbeatResponseMap映射。根據TaskTracker的Heartbeat報告的Task狀态資訊,對标記為完成的Task,更新JobTracker内部維護的多個隊列和Map:trackerToMarkedTasksMap、taskidToTrackerMap、trackerToTaskMap、taskidToTIPMap。最後,傳回TaskTracker調用的結果:HeartbeatResponse對象。
上面流程圖中,黑色虛線所表示的處理流程,我們說明一下:這種情況是不可能出現的,因為TaskTracker不是第一次連接配接JobTracker,而JobTracker端還沒有上一次TaskTracker發送的Heartbeat對應的HeartbeatResponse,同時JobTracker又沒有重新開機動過,是以這種條件是不存在的,那麼該流程分支也不可能執行,故而用虛線描述,指向發送一個帶有ReinitTrackerAction的HeartbeatResponse。
下面,我們細化整個流程,将一些比較重要的流程詳細分析說明:
TaskTracker與JobTracker失去連接配接,更新狀态
JobTracker如果在給定逾時時間範圍之内沒有收到TaskTracker的Heartbeat報告,會認為該TaskTracker已經無法執行/指派任務,那麼在JobTracker端與該TaskTracker相關的資料結構都需要更新,受到影響的Job和Task的資料結構也需要更新,具體處理流程如下圖所示:
上述流程圖描述的流程,如下所示:
從隊列Map<String, Set<JobID>> trackerToJobsToCleanup中移除在該TaskTracker上已經完成且需要清理的所有Job。
從隊列Map<String, Set<TaskAttemptID>> trackerToTasksToCleanup中移除在TaskTracker上已經運作完成且需要清理的所有Task。
通知Recovery Manager從其維護的Set<String>類型的恢複清單JobTracker.RecoveryManager.recoveredTrackers中移除該TaskTracker。
從TreeMap<String, Set<TaskAttemptID>> trackerToTaskMap中删除在該TaskTracker上運作的所有Task。
對在該TaskTracker上的運作的每一個Task(在隊列trackerToTaskMap中),進行如下2步處理:
(1)從隊列Map<TaskAttemptID, TaskInProgress> taskidToTIPMap中取出TaskAttemptID對應的TaskInProgress tip結構,再根據tip擷取到JobInProgress:JobInProgress job = tip.getJob();;
(2)如果ReduceTask已經完成,以及具有0個ReduceTask的所有MapTask已經完成,則将這些Task放入到隊列TreeMap<String, Set<TaskAttemptID>> trackerToMarkedTasksMap中;如果tip标記Task沒有完成,或者滿足條件tip.isMapTask() && !tip.isJobSetupTask() && job.desiredReduces() != 0,檢查Job運作狀态,當job.getStatus().getRunState() == JobStatus.RUNNING || job.getStatus().getRunState() == JobStatus.PREP成立時,則該Task運作失敗,并更新Task狀态,同時收集這類Job,放入集合Set<JobInProgress> jobsWithFailures中,後續對這些Job進行處理;
由于該TaskTracker被JobTracker标記為lost狀态,則對上面收集到的jobsWithFailures集合中的Job,隻要存在屬于該Job的Task被配置設定到該TaskTracker上運作,會通過累加計算在該TaskTracker上失敗的Task計數,給該TaskTracker以懲罰,并釋放所有在該TaskTracker上預留的Slot。
從隊列TreeMap<String, Set<TaskAttemptID>> trackerToMarkedTasksMap中移除所有被标記完成的Task,同時更新JobTracker内部維護的如下3個隊列:TreeMap<TaskAttemptID, String> taskidToTrackerMap、TreeMap<String, Set<TaskAttemptID>> trackerToTaskMap、Map<TaskAttemptID, TaskInProgress> taskidToTIPMap。
如果在該TaskTracker上的運作的Task還有沒處理的,則轉第6步進行處理;否則,流程結束。
檢查是否可以向TaskTracker指派運作Task
當TaskTracker發送Heartbeat标志其沒有重新開機,那麼會執行該子流程,如下圖所示:
在JobTracker端,既然TaskTracker彙報狀态表明其沒有重新開機,那麼就需要檢查該TaskTracker對應的黑名單和灰名單情況,如果TaskTracker狀态一切正常,則恢複其正常被指派任務并運作Task的能力。
标記TaskTracker為Health狀态
當TaskTracker重新開機了,然後再次連接配接JobTracker時,發送Heartbeat的過程中,會執行該流程。重新開機的TaskTracker,JobTracker會将一個TaskTracker标記為Health狀态,說明該TaskTracker對應的資源資訊(記憶體/CPU)應該在JobTracker端做記錄,表示這些資源是可用的,更新JobTracker端的幾個可用資源的變量計數。但是,很有可能TaskTracker重新開機之前,其上運作Task失敗了很多次,在JobTracker端記錄該失敗計數,當滿足一定條件後,會将TaskTracker加入灰名單,如果TaskTracker重新開機了,應該将其從灰名單中移除,以便不影響任務分派,具體處理流程如下圖所示:
上述流程圖比較簡單不再累述。
更新TaskTracker狀态
如果TaskTracker不是第一次連接配接JobTracker,那麼在JobTracker端的隊列HashMap<String, TaskTracker> taskTrackers中會儲存上一次TaskTracker向JobTracker彙報的狀态TaskTrackerStatus,如果該TaskTrackerStatus不存在,則直接處理目前彙報的TaskTracker的狀态報告,使得JobTracker端維護的該TaskTracker的狀态是最新的,具體的處理流程,如下圖所示:
上圖中處理流程,描述如下所示:
檢查是否JobTracker端存在該TaskTracker上一次彙報的狀态報告,如果不存在,則直接處理目前發送的狀态報告;否則,會更新JobTracker端維護的如下4個全局計數器:totalMaps(MapTask總數)、totalReduces(ReduceTask總數)、occupiedMapSlots(占用的Map Slot總數)、occupiedReduceSlots(占用的Reduce Slot總數),在目前計數值的基礎上,減去上次彙報的報告中的數量(實際上是假定上次彙報的全部名額都已完成,如果沒完成,再通過本次彙報的狀态報告再加回去);如果TaskTracker沒有被加入到黑名單中,還需要更新下面2個JobTracker端全局計數器:totalMapTaskCapacity(該TaskTracker上最大Map Slot總數)、totalReduceTaskCapacity(該TaskTracker上最大Reduce Slot總數)。
處理TaskTracker目前彙報的狀态報告,更新JobTracker内部維護的6個全局計數器:totalMaps、totalReduces、occupiedMapSlots、occupiedReduceSlots、totalMapTaskCapacity、totalReduceTaskCapacity,各個計數器具體含義見上一步說明。
如果TaskTracker是第一次彙報狀态報告,則需要在JobTracker内部注冊,構造一個org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker對象(該TaskTracker對象是在JobTracker的視角看到的結構),加入到隊列HashMap<String, TaskTracker> taskTrackers中,同時還要計算該TaskTracker所在的host節點上TaskTracker程序的個數,更新隊列Map<String, Integer> uniqueHostsMap。
更新TaskTracker上所有Task狀态
在JobTracker處理TaskTracker發送的Heartbeat的過程中,首先會更新JobTracker維護的TaskTracker的狀态資訊,因為一個TaskTracker上可能運作着很多Task,那麼需要更新這些Task的狀态,可以通過上面介紹的TaskTrackerStatus的結構看出,對應着一個TaskStatus的狀态報告集合,是以這裡有一個批量更新TaskStatus狀态的操作,實際上會對每一個Task的狀态分别進行更新,整體處理流程如下圖所示:
具體處理流程,描述如下所示:
從TaskTracker發送的TaskTrackerStatus對象可以提取Task狀态報告集合,然後對每一個狀态報告進行處理,直到所有的Task的狀态都已經被更新到JobTracker内部維護的狀态對象上,下面描述每一個TaskStatus的處理過程:
(1)如果一個Task的運作狀态不為TaskStatus.State.UNASSIGNED,說明該Task還沒有在TaskTracker上獲得運作機會,則并不讓該Task失敗(當一個Task指派給一個TaskTracker運作時,會首先在JobTracker端加入到一個逾時清單中,由一個獨立的線程JobTracker.ExpireLaunchingTasks去檢測,該Task是否在給定的時間内(預設是10分鐘 )是否在TaskTracker上啟動而且一直沒有報告狀态,如果沒有報告,則會将該Task标記為失敗),等待下一次被排程配置設定給TaskTracker去運作。
(2)根據Task的ID,擷取到它對應的JobInProgress資訊,如果沒有擷取到則将該Task對應的JobInProgress對象加入到cleanup清單Map<String, Set<JobID>> trackerToJobsToCleanup中,直接傳回繼續處理下一個TaskStatus報告;如果能夠擷取到對應的JobInProgress資訊,則檢查該JobInProgress中包含的Job是否設定初始化完成狀态,如果沒有設定,則直接将該Task加入到隊列Map<String, Set<TaskAttemptID>> trackerToTasksToCleanup中,等待JobTracker排程Kill掉該Task,直接傳回繼續處理下一個TaskStatus報告。
(3)檢查該TaskStatus報告中對應的TaskAttemptID,是否在JobTracker端存在對應的TaskInProgress對象,很有可能JobTracker重新開機,記憶體中維護的Map<TaskAttemptID, TaskInProgress> taskidToTIPMap隊列中沒有TaskInProgress對象,這時JobInProgress對象一定存在,可以通過JobInProgress對象擷取到該Task對應的TaskInProgress對象(因為在JobTracker端建立Job的時候,會分别建立4類TIP:map、reduce、cleanup、setup),再将其加入到Map<TaskAttemptID, TaskInProgress> taskidToTIPMap隊列中,同時觸發已知的一組JobInProgressListener的jobUpdated方法,去更新Job狀态。
(4)根據TaskStatus能夠擷取到所有Fetch失敗的Task,查詢該Task對應的TaskInProgress對象,進而進一步通知JobInProgress對象,根據設定的允許Task Fetch失敗的最大次數限制,确定是否要讓該Task失敗,并更新TaskInProgress狀态。
更新Task狀态
當Task的狀态發生變化的情況下,可能需要更新Task的狀态,我們根據JobTracker定義的updateTaskStatus方法,方法聲明如下所示:
<code>public</code> <code>synchronized</code> <code>void</code> <code>updateTaskStatus(TaskInProgress tip, TaskStatus status)</code>
其中,tip是目前在JobTracker端維護的Task的狀态,status是TaskTracker彙報的Task狀态,更新JobTracker端Task狀态主要是根據心跳彙報的status來更新tip資料結構。更新Task狀态的具體流程,如下圖所示:
更新Task狀态,主要是更新每個Task對應的在JobTracker端維護的TaskInProgress結構,處理流程描述如下:
如果心跳彙報的status中,Task運作狀态為SUCCEEDED,當tip辨別已經完成或辨別被Kill掉,則統一修改status的運作狀态為KILLED;如果心跳彙報的status對應的TaskAttemptID不是cleanup task,當該TaskAttemptID 對應的JobInProgress表示Job已經完成,或失敗,或被Kill掉,那麼status運作狀态為FAILED_UNCLEAN則修改為FAILED,運作狀态為KILLED_UNCLEAN則修改為KILLED。
調用TaskInProgress的updateStatus方法,傳入目前TaskTracker彙報的status狀态對象,更新tip的狀态。TaskInProgress會維護每個Task對應的TaskStatus對象oldStatus,并根據彙報的status對更新替換oldStatus。有3種情況不需要更新:第一種是當status的運作狀态不等于RUNNING/COMMIT_PENDING/FAILED_UNCLEAN/KILLED_UNCLEAN/UNASSIGNED中的任何一種狀态;第二種是status的運作狀态為RUNNING、UNASSIGNED中的任意一種狀态,并且oldStatus的運作狀态為FAILED/KILLED/FAILED_UNCLEAN/KILLED_UNCLEAN/SUCCEEDED/COMMIT_PENDING中任意一種狀态;第三種是oldStatus的運作狀态為FAILED/KILLED中的任意一種狀态,這種情況會把該TaskAttemptID加入到隊列TreeMap<TaskAttemptID, Boolean> tasksToKill中辨別需要Kill掉該Task。
如果status的運作狀态為FAILED狀态,并且JobTracker在Safe模式下,則設定status的運作狀态為KILLED。
此時,如果oldStatus與status不相等,即TaskAttemptID的狀态已經發生變化,則會根據status的運作狀态建立不同的TaskCompletionEvent事件(SUCCEEDED/FAILED/KILLED),這些 TaskCompletionEvent事件會被加入到JobInProgress的taskCompletionEvents清單中,供JobClient查詢或供JobTracker檢索;或者執行相應的操作:如果運作狀态為FAILED_UNCLEAN/KILLED_UNCLEAN,則tip中該TaskAttemptID标記為失敗并更新相關結構,然後加入到mapCleanupTasks/reduceCleanupTasks清單中等待被清理,同時将該TaskAttemptID對應的資料從JobTracker的taskidToTIPMap、taskidToTrackerMap、trackerToTaskMap這3個隊列中删除。
根據構造的TaskCompletionEvent對象,并且如果status的運作狀态為SUCCEEDED,則更新其對應的JobInProgress的狀态為成功。