天天看點

mapreduce複制連接配接的代碼_MapReduce研究(一)

mapreduce複制連接配接的代碼_MapReduce研究(一)

前言

分布式系統領域裡總共有三大塊:存儲、計算和通信。之前的Redis的系列相當于分布式存儲的一個案例研究,研究了兩個月之後自己從中受益匪淺。是以也想從這篇文章開始再開一個分布式計算架構的研究系列。剛好最近在同時上我校的分布式系統課程和MIT6.824, 兩個課程的Lab1都是MapReduce實作,是以借此機會也嘗試研究一下MapReduce。

這篇文章假定讀者對MapReduce有基本的認識,不會去介紹MR的基礎概念,同時本篇不研究如何使用Hadoop來進行程式設計, 更專注于如何開發一個類似于Hadoop的分布式計算系統。

當然了,這麼大的問題不可能被我一周研究明白,也不可能在一篇文章中說清楚。本次系列地文章大概會寫三篇。第一篇介紹MR中地執行地整體流程。第二篇介紹MRAppMaster、MapTask和ReduceTask的執行細節。第三章介紹FalutTolerence以及其他的一些問題。本篇文章主要是源碼閱讀了,Hadoop的源碼本身比較龐大。希望這篇部落格對大家讀MR相關部分的代碼能有些幫助。

同時還是比較推薦MIT6.824課程的, 我看B站上有人搬運了課程視訊。這裡放給大家連接配接,大家可以去學習。

https://www.bilibili.com/video/av87684880?p=4​www.bilibili.com

MapReduce總覽介紹

一、《MapReduce: Simplified Data Processing on Large Clusters》概要介紹

MapReduce把

并行、容錯、負載均衡以及資料分布

的細節隐藏了起來,提升了使用者的使用友好性,進而得到了廣泛的使用。下面這張圖整體描述了MapReduce的流程,想必大家都不陌生。我們首先引用MR的原文來概述每一步都發生了什麼。強烈建議大家仔細讀一下下面的内容,對整體了解MR的工作流程非常有好處。

mapreduce複制連接配接的代碼_MapReduce研究(一)

MapReduce執行流程

1. The MapReduce library in the user program first splits the input files into M pieces of typically 16 megabytes to 64 megabytes (MB) per piece (controllable by the user via an optional parameter). It then starts up many copies of the program on a cluster of machines.

2. One of the copies of the program is special – the master. The rest are workers that are assigned work by the master. There are M map tasks and R reduce tasks to assign. The master picks idle workers and assigns each one a map task or a reduce task.

3. A worker who is assigned a map task reads the contents of the corresponding input split. It parses key/value pairs out of the input data and passes each pair to the user-defined Map function. The intermediate key/value pairs produced by the Map function are buffered in memory.

4. Periodically, the buffered pairs are written to local disk, partitioned into R regions by the partitioning function. The locations of these buffered pairs on the local disk are passed back to the master, who is responsible for forwarding these locations to the reduce workers.

5. When a reduce worker is notified by the master about these locations, it uses remote procedure calls to read the buffered data from the local disks of the map workers. When a reduce worker has read all intermediate data, it sorts it by the intermediate keys so that all occurrences of the same key are grouped together. The sorting is needed because typically many different keys map to the same reduce task. If the amount of intermediate data is too large to fit in memory, an external sort is used.

6. The reduce worker iterates over the sorted intermediate data and for each unique intermediate key encountered, it passes the key and the corresponding set of intermediate values to the user’s Reduce function. The output of the Reduce function is appended to a final output file for this reduce partition.

7. When all map tasks and reduce tasks have been completed, the master wakes up the user program. At this point, the MapReduce call in the user program returns back to the user code.

二、MapReduce整體架構介紹

在Yarn引入之後,MR的架構變得複雜了許多,整體結構即工作流程如下圖。原本1.0中JobTracker的功能被分成了資源管理和任務管理兩部分。資源管理由ResourceManager管理,任務管理由MRAppMaster負責。MapTask/ReduceTask 和MRAppMaster都運作在容器(Container)中,而Containter的啟動和管理都由NodeManager來負責。

mapreduce複制連接配接的代碼_MapReduce研究(一)

圖中描述了啟動一個Job流程的11步。我們重點介紹作業初始化部分,即從第五步開始的過程。在2.0版本之後,作業的初始化細化為了兩個部分,分别為過程5a和5b。由上面所說的任務送出申請資源調用,将其請求給排程器處理,可以看到,排程器配置設定Container,同時在管理節點上啟動一個ApplicationMaster程序。Application Master的主要執行者 MRAppMaster會初始化一定數量的記錄對象來跟蹤Job的運作進度,并收取task的進度和完成情況(過程6)。此時Master會決定如何組織運作MapReduce Job。如果Job很小,能在同一個JVM和同一個Node運作的話,則用uber模式運作。

對于運作任務的配置設定,如果不再uber模式下運作,則AppMaster會為所有的Map和Reduce Task向RM請求Container。 所有的請求都通過heartbeat傳遞。心跳中也傳遞其他資訊,例如關于Map資料本地化的資訊、分片所在的主機和機架位址資訊等。這些資訊幫助排程器做出排程的決策,排程器盡可能地遵循資料本地化或者機架本地化地原則配置設定Container。

而對于任務地執行來說,配置設定給Task任務Container後,NodeManager上地Application Master就啟動Container, 最後Task被一個稱為“YarnChild”地main類執行。不過在此之前,各個資源檔案已經從分布式緩存拷貝下來,這樣才能運作MapTask或者ReduceTask。

對于程序和狀态地更新,當Yarn運作時,Task和Container會将他們地進度和狀态報告給AppMaster。如下圖所示:

mapreduce複制連接配接的代碼_MapReduce研究(一)

任務運作進度報告

接下來我們按照執行的流程去看每一步都發生了什麼。

Split資料

我們知道MapReduce是依賴分布式檔案系統的(HDFS或GFS)的。當我們送出任務的時候,MR首先把整個資料

邏輯切分

成一個個的split。一定注意這裡是邏輯切分,并不是真正的實體切分。切分後的split是什麼樣呢?我們以FileSplit為例進行展示。

// 一個FileSplit主要包含檔案路徑、該Split的資料開始部分在該檔案中的偏移以及該Split的長度
           

假如我們把一個64G的檔案word_count.txt分割為1024個split,即每個split大小是64M。那麼第1個Split和第2個Split的參數如下:

// 第一個Split
           

當一個worker拿到這樣一個Split,就可以根據Split中的資訊從HDFS中去直接讀取資料了。Split懂了之後,Split的分割就很容易懂了,話不多說,直接放代碼。需要提一句的是splitSize大小的計算。從計算公式中我們也可以看出splitSize最小為minSize(一般16M),最大為blockSize(一般為64M)。是以這也是為什麼原文說每個splitSize在16-64M之間了。

// 分布式檔案系統中的blocksize, 如GFS中為64M
           

YarnChild

當NodeManager啟動一個Container運作YarnChild時,首先不斷嘗試從AppMaster中擷取一個Task直到擷取成功。擷取後再進行一些Task的環境資訊的設定後,調用Task.run()函數開始執行該Task。

class 
           

Task

Task主要有兩種, ReduceTask和MapTask,這兩種Task都繼承了Task基類。Task中儲存了任務的狀态資訊,同時還實作了一個YarnChild中調用的run()方法來啟動執行task。我們接下來以MapTask為例進行介紹。

初始化一個MapTask,需要JobFile路徑、 這個Task的ID、reduceTask的數目以及配置設定給這個mapTask的Split資料相關資訊。

public 
           

splitIndex裡面包含的是該MapTask執行所需要的資料資訊。我們看一下源碼,splitIndex中包含了split再整個檔案系統中的位置,以及該split的開始讀取的起始偏移位置。(

但是為啥這裡沒有這個split的資料長度資訊呢?)
public 
           

好,言歸正傳,我們繼續回到task.run()的話題。MapTask整個過程實際上分為兩個階段(phase): MapPhase和SortPhase,前者就是調用我們在用Hadoop的時候寫的map函數,後者是對MapTask産生的中間檔案進行排序,把歸屬于同一個Reducer的KV排到一起。這樣等MapTask執行結束之後,Task需要向AppMaster傳回R個類似于(intermeidateFile, startOffset, length)這樣的三元組,辨別每個partition的資料的存儲位置。task.run()的代碼如下,代碼不長,就直接全部放過來:

@Override
  
           

在上面的代碼中,根據是否使用新的API選擇調用NewMapper還是OldMapper。我們以調用NewMapper為例來看裡面發生了什麼。

private 
           

我們發現在NewMapper裡面擷取了split的資訊, 擷取了讀取split的RecorderReader , 同時擷取了mapper輸出的結果收集器OutputCollector,同時擷取了一個向AppMaster報告任務資訊的TaskReporter,建立了一個mapper。擷取這些之後建立一個mapper的執行環境mapperContext,最終調用了mapper.run(mapperContext)

接着我們來看看mapper.run()中又發生了什麼。勝利就在前方了同志們,堅持住。

通過下面的代碼我們發現run()方法裡很簡單,就是不停的擷取Key,Value,然後把這些Key,Value傳給使用者自定義的map()函數進行處理。

public 
           

最後的最後,我們來看看map函數又發生了什麼。map就是我們使用hadoop的時候編寫的map函數,在函數的最後我們把處理的結果通過context類寫入到我們剛才在MapTask中定義的輸出資料收集器中。

/**
           

至此,我們走完了從YarnChild程序啟動,到最後我們定義的map/reduce函數的執行流程。我們看到實作中一步步的封裝。

總結

本篇文章主要是代碼閱讀了。由于我也才是開始讀Hadoop的代碼, 上周寫完作業讀了一下,然後就是今天為了寫部落格認真讀了一遍,我自己對這個也比較陌生,是以寫起來可能沒法舉重若輕。我今天上午讀MR的代碼的時候都快崩了,感覺找不到頭,很亂。直到下午才開始找到主線,最後能一路順藤摸瓜似地找到整個鍊條上地調用關系。也希望大家能讀完這篇文章後,對讀Hadoop地代碼能有所幫助。這篇文章沒有講MapTask執行的過程中的中間檔案生成、ReduceTask執行過程中的資料擷取(Shuffle)、資料排序以及資料計算過程。這些内容放在下周的部落格裡面研究吧。

後記

終于寫完了,太不容易了。