天天看點

詳解MapReduce中的五大程式設計模型

前言

我們上一節講了關于

MapReduce

中的應用場景和架構分析,最後還使用了一個

CountWord

Demo

來進行示範,關于

MapReduce

的具體操作。如果還不了解的朋友可以看看上篇文章:初識MapReduce的應用場景(附JAVA和Python代碼)

接下來,我們會講解關于

MapReduce

的程式設計模型,這篇文章的主要目的就是講清楚

Mapreduce

的程式設計模型有多少種,它們之間是怎麼協調合作的,會盡量從源碼的角度來解析,最後就是講解不同的語言是如何調用

Hadoop

中的

Mapreduce

API

的。

目錄

  • MapReduce 程式設計模型的架構
  • 五種程式設計模型的詳解
    • InputFormat
    • OutPutFormat
    • Mapper
    • Reducer
    • Partitioner
  • Hadoop Streaming 的詳解
  • 總結

MapReduce 程式設計模型的架構

我們先來看一張圖,關于

MapReduce

的程式設計模型

詳解MapReduce中的五大程式設計模型
  • 使用者程式層

使用者程式層是指使用者用編寫好的代碼來調用

MapReduce

的接口層。

  • 工具層
    • Job control 是為了監控

      Hadoop

      中的

      MapReduce

      向叢集送出複雜的作業任務,送出了任務到叢集中後,形成的任務是一個有向圖。每一個任務都有兩個方法

      submit()

      waitForCompletion(boolean)

      submit()

      方法是向叢集中送出作業,然後立即傳回,

      waitForCompletion(boolean)

      就是等待叢集中的作業是否已經完成了,如果完成了,得到的結果可以當作下個任務的輸入。
    • chain Mapper

      chain Reducer

      的這個子產品,是為了使用者編寫鍊式作業,形式類似于

      Map + Reduce Map *

      ,表達的意思就是隻有一個

      Reduce

      ,在

      Reduce

      的前後可以有多個

      Map

    • Hadoop Streaming

      支援的是腳本語言,例Python、PHP等來調用

      Hadoop

      的底層接口,

      Hadoop Pipes

      支援的是

      C++

      來調用。
  • 程式設計接口層,這一層是全部由

    Java

    語言來實作的,如果是

    Java

    來開發的話,那麼可以直接使用這一層。

詳解五種程式設計模型

InputFormat

作用

對輸入進入

MapReduce

的檔案進行規範處理,主要包括

InputSplit

RecordReader

兩個部分。

TextOutputFormat

是預設的檔案輸入格式。

詳解MapReduce中的五大程式設計模型

InputSplit

這個是指對輸入的檔案進行邏輯切割,切割成一對對

Key-Value

值。有兩個參數可以定義

InputSplit

的塊大小,分别是

mapred.max.split.size

(記為

minSize

)和

mapred.min.split.size

(記為

maxSize

)。

RecordReader

是指作業在

InputSplit

中切割完成後,輸出

Key-Value

對,再由

RecordReader

進行讀取到一個個

Mapper

檔案中。如果沒有特殊定義,一個

Mapper

檔案的大小就是由

Hadoop

block_size

決定的,

Hadoop 1.x

中的

block_size

64M

,在

Hadoop 2.x

中的

block_size

的大小就是

128M

切割塊的大小

Hadoop2.x

以上的版本中,一個

splitSize

的計算公式為

splitSize = max\{minSize,min\{maxSize, blockSize\}\}
           

OutputFormat

作用

對輸出的檔案進行規範處理,主要的工作有兩個部分,一個是檢查輸出的目錄是否已經存在,如果存在的話就會報錯,另一個是輸出最終結果的檔案到檔案系統中,

TextOutputFormat

是預設的輸出格式。

詳解MapReduce中的五大程式設計模型

OutputCommiter

OutputCommiter

的作用有六點:

  • 作業(

    job

    )的初始化
//進行作業的初始化,建立臨時目錄。
//如果初始化成功,那麼作業就會進入到 Running 的狀态
public abstract void setupJob(JobContext var1) throws IOException;
           
  • 作業運作結束後就删除作業
//如果這個job完成之後,就會删除掉這個job。
//例如删除掉臨時的目錄,然後會宣布這個job處于以下的三種狀态之一,SUCCEDED/FAILED/KILLED
@Deprecated
    public void cleanupJob(JobContext jobContext) throws IOException {
    }
           
  • 初始化

    Task

//初始化Task的操作有建立Task的臨時目錄
public abstract void setupTask(TaskAttemptContext var1) throws IOException;
           
  • 檢查是否送出

    Task

    的結果
//檢查是否需要送出Task,為的是Task不需要送出的時候送出出去
public abstract boolean needsTaskCommit(TaskAttemptContext var1) throws IOException;
           
  • 送出

    Task

//任務結束的時候,需要送出任務
public abstract void commitTask(TaskAttemptContext var1) throws IOException;
           
  • 回退

    Task

//如果Task處于KILLED或者FAILED的狀态,這Task就會進行删除掉臨時的目錄
//如果這個目錄删除不了(例如出現了異常後,處于被鎖定的狀态),另一個同樣的Task會被執行
//然後使用同樣的attempt-id去把這個臨時目錄給删除掉,也就說,一定會把臨時目錄給删除幹淨
 public abstract void abortTask(TaskAttemptContext var1) throws IOException;

           

處理Task Side-Effect File

Hadoop

中有一種特殊的檔案和特殊的操作,那就是

Side-Eddect File

,這個檔案的存在是為了解決某一個

Task

因為網絡或者是機器性能的原因導緻的運作時間過長,進而導緻拖慢了整體作業的進度,是以會為每一個任務在另一個節點上再運作一個子任務,然後選擇兩者中處理得到的結果最快的那個任務為最終結果,這個時候為了避免檔案都輸入在同一個檔案中,是以就把備胎任務輸出的檔案取作為

Side-Effect File

RecordWriter

這個是指輸出

KEY-VALUE

對到檔案中。

Mapper和Reducer

詳解Mapper

InputFormat

為每一個

InputSplit

生成一個

map

任務,

mapper

的實作是通過

job

中的

setMapperClass(Class)

方法來配置寫好的

map

類,如這樣

//設定要執行的mapper類
job.setMapperClass(WordMapper.class);
           

其内部是調用了

map(WritableComparable, Writable, Context)

這個方法來為每一個鍵值對寫入到

InputSplit

,程式會調用

cleanup(Context)

方法來執行清理任務,清理掉不需要使用到的中間值。

關于輸入的鍵值對類型不需要和輸出的鍵值對類型一樣,而且輸入的鍵值對可以映射到0個或者多個鍵值對。通過調用

context.write(WritableComparable, Writable)

來收集輸出的鍵值對。程式使用

Counter

來統計鍵值對的數量,

Mapper

中的輸出被排序後,就會被劃分到每個

Reducer

中,分塊的總數目和一個作業的

reduce

任務的數目是一樣的。

需要多少個Mapper任務

關于一個機器節點适合多少個

map

任務,官方的文檔的建議是,一個節點有

10

個到

100

個任務是最好的,如果是

cpu

低消耗的話,

300

個也是可以的,最合理的一個map任務是需要運作超過

1

分鐘。

詳解Reducer

Reducer

任務的話就是将

Mapper

中輸出的結果進行統計合并後,輸出到檔案系統中。

使用者可以自定義

Reducer

的數量,使用

Job.setNumReduceTasks(int)

這個方法。

在調用

Reducer

的話,使用的是

Job.setReducerClass(Class)

方法,内部調用的是

reduce(WritableComparable, Iterable<Writable>, Context)

這個方法,最後,程式會調用

cleanup(Context)

來進行清理工作。如這樣:

//設定要執行的reduce類
job.setReducerClass(WordReduce.class);
           

Reducer

實際上是分三個階段,分别是

Shuffle

Sort

Secondary Sort

shuffle

這個階段是指

Reducer

的輸入階段,系統會為每一個

Reduce

任務去擷取所有的分塊,通過的是

HTTP

的方式

sort

這個階段是指在輸入

Reducer

階段的值進行分組,

sort

shuffle

是同時進行的,可以這麼了解,一邊在輸入的時候,同時在一邊排序。

Secondary Sort

這個階段不是必需的,隻有在中間過程中對

key

的排序和在

reduce

的輸入之前對

key

的排序規則不同的時候,才會啟動這個過程,可以通過的是

Job.setSortComparatorClass(Class)

來指定一個

Comparator

進行排序,然後再結合

Job.setGroupingComparatorClass(Class)

來進行分組,最後可以實作二次排序。

在整個

reduce

中的輸出是沒有排序

需要多少個 Reducer 任務

建議是

0.95

或者是

1.75

*

mapred.tasktracker.reduce.tasks.maximum

。如果是

0.95

的話,那麼就可以在

mapper

任務結束時,立馬就可以啟動

Reducer

任務。如果是

1.75

的話,那麼運作的快的節點就可以在

map

任務完成的時候先計算一輪,然後等到其他的節點完成的時候就可以計算第二輪了。當然,

Reduce

任務的個數不是越多就越好的,個數多會增加系統的開銷,但是可以在提升負載均衡,進而降低由于失敗而帶來的負面影響。

Partitioner

這個子產品用來劃分鍵值空間,控制的是

map

任務中的

key

值分割的分區,預設使用的算法是哈希函數,

HashPartitioner

是預設的

Partitioner

總結

這篇文章主要就是講了

MapReduce

的架構模型,分别是分為使用者程式層、工具層、程式設計接口層這三層,在程式設計接口層主要有五種程式設計模型,分别是

InputFomat

MapperReduce

Partitioner

OnputFomat

Reducer

。主要是偏理論,代碼的參考例子可以參考官方的例子:WordCount_v2.0

這是

MapReduce

系列的第二篇,接下來的一篇會詳細寫關于

MapReduce

的作業配置和環境,結合一些面試題的彙總,是以接下來的這篇還是幹貨滿滿的,期待着就好了。

更多幹貨,歡迎關注我的公衆号:spacedong

詳解MapReduce中的五大程式設計模型

繼續閱讀