前言
我們上一節講了關于
MapReduce
中的應用場景和架構分析,最後還使用了一個
CountWord
的
Demo
來進行示範,關于
MapReduce
的具體操作。如果還不了解的朋友可以看看上篇文章:初識MapReduce的應用場景(附JAVA和Python代碼)
接下來,我們會講解關于
MapReduce
的程式設計模型,這篇文章的主要目的就是講清楚
Mapreduce
的程式設計模型有多少種,它們之間是怎麼協調合作的,會盡量從源碼的角度來解析,最後就是講解不同的語言是如何調用
Hadoop
中的
Mapreduce
的
API
的。
目錄
- MapReduce 程式設計模型的架構
- 五種程式設計模型的詳解
- InputFormat
- OutPutFormat
- Mapper
- Reducer
- Partitioner
- Hadoop Streaming 的詳解
- 總結
MapReduce 程式設計模型的架構
我們先來看一張圖,關于
MapReduce
的程式設計模型
- 使用者程式層
使用者程式層是指使用者用編寫好的代碼來調用
MapReduce
的接口層。
- 工具層
- Job control 是為了監控
中的Hadoop
向叢集送出複雜的作業任務,送出了任務到叢集中後,形成的任務是一個有向圖。每一個任務都有兩個方法MapReduce
和submit()
,waitForCompletion(boolean)
方法是向叢集中送出作業,然後立即傳回,submit()
就是等待叢集中的作業是否已經完成了,如果完成了,得到的結果可以當作下個任務的輸入。waitForCompletion(boolean)
-
和chain Mapper
的這個子產品,是為了使用者編寫鍊式作業,形式類似于chain Reducer
,表達的意思就是隻有一個Map + Reduce Map *
,在Reduce
的前後可以有多個Reduce
Map
-
支援的是腳本語言,例Python、PHP等來調用Hadoop Streaming
的底層接口,Hadoop
支援的是Hadoop Pipes
來調用。C++
- Job control 是為了監控
- 程式設計接口層,這一層是全部由
語言來實作的,如果是Java
來開發的話,那麼可以直接使用這一層。Java
詳解五種程式設計模型
InputFormat
作用
對輸入進入
MapReduce
的檔案進行規範處理,主要包括
InputSplit
和
RecordReader
兩個部分。
TextOutputFormat
是預設的檔案輸入格式。
InputSplit
這個是指對輸入的檔案進行邏輯切割,切割成一對對
Key-Value
值。有兩個參數可以定義
InputSplit
的塊大小,分别是
mapred.max.split.size
(記為
minSize
)和
mapred.min.split.size
(記為
maxSize
)。
RecordReader
是指作業在
InputSplit
中切割完成後,輸出
Key-Value
對,再由
RecordReader
進行讀取到一個個
Mapper
檔案中。如果沒有特殊定義,一個
Mapper
檔案的大小就是由
Hadoop
的
block_size
決定的,
Hadoop 1.x
中的
block_size
是
64M
,在
Hadoop 2.x
中的
block_size
的大小就是
128M
。
切割塊的大小
在
Hadoop2.x
以上的版本中,一個
splitSize
的計算公式為
splitSize = max\{minSize,min\{maxSize, blockSize\}\}
OutputFormat
作用
對輸出的檔案進行規範處理,主要的工作有兩個部分,一個是檢查輸出的目錄是否已經存在,如果存在的話就會報錯,另一個是輸出最終結果的檔案到檔案系統中,
TextOutputFormat
是預設的輸出格式。
OutputCommiter
OutputCommiter
的作用有六點:
- 作業(
)的初始化job
//進行作業的初始化,建立臨時目錄。
//如果初始化成功,那麼作業就會進入到 Running 的狀态
public abstract void setupJob(JobContext var1) throws IOException;
- 作業運作結束後就删除作業
//如果這個job完成之後,就會删除掉這個job。
//例如删除掉臨時的目錄,然後會宣布這個job處于以下的三種狀态之一,SUCCEDED/FAILED/KILLED
@Deprecated
public void cleanupJob(JobContext jobContext) throws IOException {
}
- 初始化
Task
//初始化Task的操作有建立Task的臨時目錄
public abstract void setupTask(TaskAttemptContext var1) throws IOException;
- 檢查是否送出
的結果Task
//檢查是否需要送出Task,為的是Task不需要送出的時候送出出去
public abstract boolean needsTaskCommit(TaskAttemptContext var1) throws IOException;
- 送出
Task
//任務結束的時候,需要送出任務
public abstract void commitTask(TaskAttemptContext var1) throws IOException;
- 回退
Task
//如果Task處于KILLED或者FAILED的狀态,這Task就會進行删除掉臨時的目錄
//如果這個目錄删除不了(例如出現了異常後,處于被鎖定的狀态),另一個同樣的Task會被執行
//然後使用同樣的attempt-id去把這個臨時目錄給删除掉,也就說,一定會把臨時目錄給删除幹淨
public abstract void abortTask(TaskAttemptContext var1) throws IOException;
處理Task Side-Effect File
在
Hadoop
中有一種特殊的檔案和特殊的操作,那就是
Side-Eddect File
,這個檔案的存在是為了解決某一個
Task
因為網絡或者是機器性能的原因導緻的運作時間過長,進而導緻拖慢了整體作業的進度,是以會為每一個任務在另一個節點上再運作一個子任務,然後選擇兩者中處理得到的結果最快的那個任務為最終結果,這個時候為了避免檔案都輸入在同一個檔案中,是以就把備胎任務輸出的檔案取作為
Side-Effect File
RecordWriter
這個是指輸出
KEY-VALUE
對到檔案中。
Mapper和Reducer
詳解Mapper
InputFormat
為每一個
InputSplit
生成一個
map
任務,
mapper
的實作是通過
job
中的
setMapperClass(Class)
方法來配置寫好的
map
類,如這樣
//設定要執行的mapper類
job.setMapperClass(WordMapper.class);
其内部是調用了
map(WritableComparable, Writable, Context)
這個方法來為每一個鍵值對寫入到
InputSplit
,程式會調用
cleanup(Context)
方法來執行清理任務,清理掉不需要使用到的中間值。
關于輸入的鍵值對類型不需要和輸出的鍵值對類型一樣,而且輸入的鍵值對可以映射到0個或者多個鍵值對。通過調用
context.write(WritableComparable, Writable)
來收集輸出的鍵值對。程式使用
Counter
來統計鍵值對的數量,
在
Mapper
中的輸出被排序後,就會被劃分到每個
Reducer
中,分塊的總數目和一個作業的
reduce
任務的數目是一樣的。
需要多少個Mapper任務
關于一個機器節點适合多少個
map
任務,官方的文檔的建議是,一個節點有
10
個到
100
個任務是最好的,如果是
cpu
低消耗的話,
300
個也是可以的,最合理的一個map任務是需要運作超過
1
分鐘。
詳解Reducer
Reducer
任務的話就是将
Mapper
中輸出的結果進行統計合并後,輸出到檔案系統中。
使用者可以自定義
Reducer
的數量,使用
Job.setNumReduceTasks(int)
這個方法。
在調用
Reducer
的話,使用的是
Job.setReducerClass(Class)
方法,内部調用的是
reduce(WritableComparable, Iterable<Writable>, Context)
這個方法,最後,程式會調用
cleanup(Context)
來進行清理工作。如這樣:
//設定要執行的reduce類
job.setReducerClass(WordReduce.class);
Reducer
實際上是分三個階段,分别是
Shuffle
、
Sort
和
Secondary Sort
。
shuffle
這個階段是指
Reducer
的輸入階段,系統會為每一個
Reduce
任務去擷取所有的分塊,通過的是
HTTP
的方式
sort
這個階段是指在輸入
Reducer
階段的值進行分組,
sort
和
shuffle
是同時進行的,可以這麼了解,一邊在輸入的時候,同時在一邊排序。
Secondary Sort
這個階段不是必需的,隻有在中間過程中對
key
的排序和在
reduce
的輸入之前對
key
的排序規則不同的時候,才會啟動這個過程,可以通過的是
Job.setSortComparatorClass(Class)
來指定一個
Comparator
進行排序,然後再結合
Job.setGroupingComparatorClass(Class)
來進行分組,最後可以實作二次排序。
在整個
reduce
中的輸出是沒有排序
需要多少個 Reducer 任務
建議是
0.95
或者是
1.75
*
mapred.tasktracker.reduce.tasks.maximum
。如果是
0.95
的話,那麼就可以在
mapper
任務結束時,立馬就可以啟動
Reducer
任務。如果是
1.75
的話,那麼運作的快的節點就可以在
map
任務完成的時候先計算一輪,然後等到其他的節點完成的時候就可以計算第二輪了。當然,
Reduce
任務的個數不是越多就越好的,個數多會增加系統的開銷,但是可以在提升負載均衡,進而降低由于失敗而帶來的負面影響。
Partitioner
這個子產品用來劃分鍵值空間,控制的是
map
任務中的
key
值分割的分區,預設使用的算法是哈希函數,
HashPartitioner
是預設的
Partitioner
。
總結
這篇文章主要就是講了
MapReduce
的架構模型,分别是分為使用者程式層、工具層、程式設計接口層這三層,在程式設計接口層主要有五種程式設計模型,分别是
InputFomat
、
MapperReduce
、
Partitioner
、
OnputFomat
和
Reducer
。主要是偏理論,代碼的參考例子可以參考官方的例子:WordCount_v2.0
這是
MapReduce
系列的第二篇,接下來的一篇會詳細寫關于
MapReduce
的作業配置和環境,結合一些面試題的彙總,是以接下來的這篇還是幹貨滿滿的,期待着就好了。
更多幹貨,歡迎關注我的公衆号:spacedong