轉載一篇文章,感覺很不錯,原文出處:http://www.linuxidc.com/Linux/2012-02/54485p2.htm
Hadoop的核心元件在一起工作時如下圖所示:
圖4.4高層MapReduce工作流水線
MapReduce的輸入一般來自HDFS中的檔案,這些檔案分布存儲在叢集内的節點上。運作一個MapReduce程式會在叢集的許多節點甚至所有節點上運作mapping任務,每一個mapping任務都是平等的:mappers沒有特定“辨別物”與其關聯。是以,任意的mapper都可以處理任意的輸入檔案。每一個mapper會加載一些存儲在運作節點本地的檔案集來進行處理(譯注:這是移動計算,把計算移動到資料所在節點,可以避免額外的資料傳輸開銷)。
當mapping階段完成後,這階段所生成的中間鍵值對資料必須在節點間進行交換,把具有相同鍵的數值發送到同一個reducer那裡。Reduce任務在叢集内的分布節點同mappers的一樣。這是MapReduce中唯一的任務節點間的通信過程。map任務間不會進行任何的資訊交換,也不會去關心别的map任務的存在。相似的,不同的reduce任務之間也不會有通信。使用者不能顯式的從一台機器封送資訊到另外一台機器;所有資料傳送都是由Hadoop MapReduce平台自身去做的,這些是通過關聯到數值上的不同鍵來隐式引導的。這是Hadoop MapReduce的可靠性的基礎元素。如果叢集中的節點失效了,任務必須可以被重新啟動。如果任務已經執行了有副作用(side-effect)的操作,比如說,跟外面進行通信,那共享狀态必須存在可以重新開機的任務上。消除了通信和副作用問題,那重新開機就可以做得更優雅些。
近距離觀察
在上一圖中,描述了Hadoop MapReduce的高層視圖。從那個圖你可以看到mapper和reducer元件是如何用到詞頻統計程式中的,它們是如何完成它們的目标的。接下來,我們要近距離的來來看看這個系統以擷取更多的細節。
圖4.5細節化的Hadoop MapReduce資料流
圖4.5展示了流線水中的更多機制。雖然隻有2個節點,但相同的流水線可以複制到跨越大量節點的系統上。下去的幾個段落會詳細講述MapReduce程式的各個階段。
1.輸入檔案:
檔案是MapReduce任務的資料的初始存儲地。正常情況下,輸入檔案一般是存在HDFS裡。這些檔案的格式可以是任意的;我們可以使用基于行的日志檔案,也可以使用二進制格式,多行輸入記錄或其它一些格式。這些檔案會很大—數十G或更大。
2. 輸入格式:
InputFormat類定義了如何分割和讀取輸入檔案,它提供有下面的幾個功能:
•選擇作為輸入的檔案或對象;
•定義把檔案劃分到任務的InputSplits;
•為RecordReader讀取檔案提供了一個工廠方法;
Hadoop自帶了好幾個輸入格式。其中有一個抽象類叫FileInputFormat,所有操作檔案的InputFormat類都是從它那裡繼承功能和屬性。當開啟Hadoop作業時,FileInputFormat會得到一個路徑參數,這個路徑内包含了所需要處理的檔案,FileInputFormat會讀取這個檔案夾内的所有檔案(譯注:預設不包括子檔案夾内的),然後它會把這些檔案拆分成一個或多個的InputSplit。你可以通過JobConf對象的setInputFormat()方法來設定應用到你的作業輸入檔案上的輸入格式。下表給出了一些标準的輸入格式:
輸入格式
描述
鍵
值
TextInputFormat
預設格式,讀取檔案的行
行的位元組偏移量
行的内容
KeyValueInputFormat
把行解析為鍵值對
第一個tab字元前的所有字元
行剩下的内容
SequenceFileInputFormat
Hadoop定義的高性能二進制格式
使用者自定義
使用者自定義
表4.1MapReduce提供的輸入格式
預設的輸入格式是TextInputFormat,它把輸入檔案每一行作為單獨的一個記錄,但不做解析處理。這對那些沒有被格式化的資料或是基于行的記錄來說是很有用的,比如日志檔案。更有趣的一個輸入格式是KeyValueInputFormat,這個格式也是把輸入檔案每一行作為單獨的一個記錄。然而不同的是TextInputFormat把整個檔案行當做值資料,KeyValueInputFormat則是通過搜尋tab字元來把行拆分為鍵值對。這在把一個MapReduce的作業輸出作為下一個作業的輸入時顯得特别有用,因為預設輸出格式(下面有更詳細的描述)正是按KeyValueInputFormat格式輸出資料。最後來講講SequenceFileInputFormat,它會讀取特殊的特定于Hadoop的二進制檔案,這些檔案包含了很多能讓Hadoop的mapper快速讀取資料的特性。Sequence檔案是塊壓縮的并提供了對幾種資料類型(不僅僅是文本類型)直接的序列化與反序列化操作。Squence檔案可以作為MapReduce任務的輸出資料,并且用它做一個MapReduce作業到另一個作業的中間資料是很高效的。
3. 資料片段(InputSplit):
一個輸入塊描述了構成MapReduce程式中單個map任務的一個單元。把一個MapReduce程式應用到一個資料集上,即是指一個作業,會由幾個(也可能幾百個)任務組成。Map任務可能會讀取整個檔案,但一般是讀取檔案的一部分。預設情況下,FileInputFormat及其子類會以64MB(與HDFS的Block預設大小相同,譯注:Hadoop建議Split大小與此相同)為基數來拆分檔案。你可以在hadoop-site.xml(譯注:0.20.*以後是在mapred-default.xml裡)檔案内設定mapred.min.split.size參數來控制具體劃分大小,或者在具體MapReduce作業的JobConf對象中重寫這個參數。通過以塊形式處理檔案,我們可以讓多個map任務并行的操作一個檔案。如果檔案非常大的話,這個特性可以通過并行處理大幅的提升性能。更重要的是,因為多個塊(Block)組成的檔案可能會分散在叢集内的好幾個節點上(譯注:事實上就是這樣),這樣就可以把任務排程在不同的節點上;是以所有的單個塊都是本地處理的,而不是把資料從一個節點傳輸到另外一個節點。當然,日志檔案可以以明智的塊處理方式進行處理,但是有些檔案格式不支援塊處理方式。針對這種情況,你可以寫一個自定義的InputFormat,這樣你就可以控制你檔案是如何被拆分(或不拆分)成檔案塊的。自定義的檔案格式在第五部分有描述。
輸入格式定義了組成mapping階段的map任務清單,每一個任務對應一個輸入塊。接着根據輸入檔案塊所在的實體位址,這些任務會被分派到對應的系統節點上,可能會有多個map任務被分派到同一個節點上。任務分派好後,節點開始運作任務,嘗試去最大并行化執行。節點上的最大任務并行數由mapred.tasktracker.map.tasks.maximum參數控制。
4. 記錄讀取器(RecordReader)
InputSplit定義了如何切分工作,但是沒有描述如何去通路它。 RecordReader類則是實際的用來加載資料并把資料轉換為适合mapper讀取的鍵值對。RecordReader執行個體是由輸入格式定義的,預設的輸入格式,TextInputFormat,提供了一個LineRecordReader,這個類的會把輸入檔案的每一行作為一個新的值,關聯到每一行的鍵則是該行在檔案中的位元組偏移量。RecordReader會在輸入塊上被重複的調用直到整個輸入塊被處理完畢,每一次調用RecordReader都會調用Mapper的map()方法。
5. Mapper:
Mapper執行了MapReduce程式第一階段中有趣的使用者定義的工作。給定一個鍵值對,map()方法會生成一個或多個鍵值對,這些鍵值對會被送到Reducer那裡。對于整個作業輸入部分的每一個map任務(輸入塊),每一個新的Mapper執行個體都會在單獨的Java程序中被初始化,mapper之間不能進行通信。這就使得每一個map任務的可靠性不受其它map任務的影響,隻由本地機器的可靠性來決定。map()方法除了鍵值對外還會接收額外的兩個參數(譯注:在0.20.×後的版本,接口已變化,由Context對象代替這兩個參數):
•OutputCollector對象有一個叫collect()的方法,它可以利用該方法把鍵值對送到作業的reduce階段。
•Reporter對象提供目前任務的資訊,它的getInputSplit()方法會傳回一個描述目前輸入塊的對象,并且還允許map任務提供關于系統執行進度的額外資訊。setStatus()方法允許你生成一個回報給使用者的狀态消息,incrCounter()方法允許你遞增共享的高性能計數器,除了預設的計數器外,你還可以定義更多的你想要的計數器。每一個mapper都可以遞增計數器,JobTracker會收集由不同處理得到的遞增資料并把它們聚集在一起以供作業結束後的讀取。
6. Partition & Shuffle:
當第一個map任務完成後,節點可能還要繼續執行更多的map任務,但這時候也開始把map任務的中間輸出交換到需要它們的reducer那裡去,這個移動map輸出到reducer的過程叫做shuffle。每一個reduce節點會分派到中間輸出的鍵集合中的一個不同的子集合,這些子集合(被稱為“partitions”)是reduce任務的輸入資料。每一個map任務生成的鍵值對可能會隸屬于任意的partition,有着相同鍵的數值總是在一起被reduce,不管它是來自那個mapper的。是以,所有的map節點必須就把不同的中間資料發往何處達成一緻。Partitioner類就是用來決定給定鍵值對的去向,預設的分類器(partitioner)會計算鍵的哈希值并基于這個結果來把鍵賦到相應的partition上,自定義的分類器在第五部分有較長的描述。
7. 排序:
每一個reduce任務負責歸約(reduceing)關聯到相同鍵上的所有數值,每一個節點收到的中間鍵集合在被送到具體的reducer那裡前就已經自動被Hadoop排序過了。
8. 歸約(Reduce):
每個reduce任務都會建立一個Reducer執行個體,這是一個使用者自定義代碼的執行個體,負責執行特定作業的第二個重要的階段。對于每一個已賦予到reducer的partition内的鍵來說,reducer的reduce()方法隻會調用一次,它會接收一個鍵和關聯到鍵的所有值的一個疊代器,疊代器會以一個未定義的順序傳回關聯到同一個鍵的值。reducer也要接收一個OutputCollector和Report對象,它們像在map()方法中那樣被使用。
9.輸出格式:
提供給OutputCollector的鍵值對會被寫到輸出檔案中,寫入的方式由輸出格式控制。OutputFormat的功能跟前面描述的InputFormat類很像,Hadoop提供的OutputFormat的執行個體會把檔案寫在本地磁盤或HDFS上,它們都是繼承自公共的FileInputFormat類。每一個reducer會把結果輸出寫在公共檔案夾中一個單獨的檔案内,這些檔案的命名一般是part-nnnnn,nnnnn是關聯到某個reduce任務的partition的id,輸出檔案夾通過FileOutputFormat.setOutputPath() 來設定。你可以通過具體MapReduce作業的JobConf對象的setOutputFormat()方法來設定具體用到的輸出格式。下表給出了已提供的輸出格式:
輸出格式
描述
TextOutputFormat
預設的輸出格式, 以 "key \t value" 的方式輸出行
SequenceFileOutputFormat
輸出二進制檔案,适合于讀取為子MapReduce作業的輸入
NullOutputFormat
忽略收到的資料,即不做輸出
表4.2: Hadoop提供的輸出格式
Hadoop提供了一些OutputFormat執行個體用于寫入檔案,基本的(預設的)執行個體是TextOutputFormat,它會以一行一個鍵值對的方式把資料寫入一個文本檔案裡。這樣後面的MapReduce任務就可以通過KeyValueInputFormat類簡單的重新讀取所需的輸入資料了,而且也适合于人的閱讀。還有一個更适合于在MapReduce作業間使用的中間格式,那就是SequenceFileOutputFormat,它可以快速的序列化任意的資料類型到檔案中,而對應SequenceFileInputFormat則會把檔案反序列化為相同的類型并送出為下一個Mapper的輸入資料,方式和前一個Reducer的生成方式一樣。NullOutputFormat不會生成輸出檔案并丢棄任何通過OutputCollector傳遞給它的鍵值對,如果你在要reduce()方法中顯式的寫你自己的輸出檔案并且不想Hadoop架構輸出額外的空輸出檔案,那這個類是很有用的。
RecordWriter:這個跟InputFormat中通過RecordReader讀取單個記錄的實作很相似,OutputFormat類是RecordWriter對象的工廠方法,用來把單個的記錄寫到檔案中,就像是OuputFormat直接寫入的一樣。
Reducer輸出的檔案會留在HDFS上供你的其它應用使用,比如另外一個MapReduce作業,或一個給人工檢查的單獨程式。