shuffle(partitioner+combiner+sort)
- 每一個map有一個環形記憶體緩沖區,用于存儲任務的輸出。預設大小100MB(io.sort.mb屬性),一旦達到閥值0.8(io.sort.spill.percent),一個背景線程把内容寫到(spill)磁盤的指定目錄(mapred.local.dir)下的建立的一個溢出寫檔案
- 寫磁盤前,要partition,sort。如果有combiner,combiner排序後資料。
- 等最後距離寫完,合并全部溢出寫檔案為一個分區且排序的檔案。
- reducer通過http方式得到輸出檔案的分區。
- TaskTracker為分區檔案運作Reduce任務。複制階段把Map輸出複制到Reducer的記憶體或磁盤。一個Map任務完畢,Reduce就開始複制輸出。
- 排序階段合并map輸出。然後走Reduce階段。
MR過程各個角色的作用
- jobClient:送出作業
- JobTracker:初始化作業,配置設定作業,TaskTracker與其進行通信,協調監控整個作業
- TaskTracker:定期與JobTracker通信,執行Map和Reduce任務
- HDFS:保留作業的資料、配置、jar包、結果
作業送出
- 送出作業之前,需要對作業進行配置 編寫自己的MR程式 配置作業,包括輸入輸出路徑等。。。
- 送出作業 配置完畢後,通過JobClient送出
- 具體功能:
- 與JobTracker通信得到一個jar的存儲路徑和Jobld
- 輸入輸出路徑檢查
- 将jobjar拷貝到HDFS
- 計算輸入分片,将分片資訊寫到job.split中
- 寫job.xml
- 真正送出作業
作業初始化
- 用戶端送出作業後,JobTracker會将作業加入到隊列,然後進行排程,預設是FIFO方式
- 具體功能:
- 作業初始化隻要是值JobInProgress中完成的
- 讀取分片資訊
- 任務建立task包扣Map和reduce任務
- 建立TaskInProgress執行task
任務配置設定
- tasktracker與jobtracker之間的通信和任務配置設定是通過心跳機制實作的
- tasktracker會主動定期向jobtracker發送心跳資訊,詢問是否有任務要做,如果有,就會申請到任務
- 心跳 定期 任務完成--領
任務執行
- 如果tasktracker拿到任務,會将所有的資訊拷貝到本地,包扣代碼、配置、分片資訊等
- tasktracker中的localizeJob()方法會被條用進行本地化,拷貝job .jar,jobconf,job.xml到本地
- tasktracker調用launchTaskForJob()方法加載啟動任務
- MapTaskRunner和ReduceTaskRunner分别啟動javachild程序來執行相應的任務
狀态更新
- Task會定期向TaskTraker回報執行情況
- TaskTracker會定期收集所在叢集上的所有Task的資訊,并向JobTracker回報
- JobTracker會根據所有TaskTracker回報上來的資訊進行彙總
作業完成
- JobTracker是在接收到最後一個任務完成後,才将任務标記為成功
- 将纾解寫入到HDFS中
錯誤處理
- JobTracker失敗 存在單點故障,hadoop2.0解決了這個問題
- TaskTracker失敗 tasktracker崩潰了會停止向Jobtracker發送心跳資訊,jobtracker會将tasktracker從等待的任務池中移除,并将該任務轉移到其他的地方執行,jobtracker将tasktracker加入到黑名單中
- Task失敗 任務失敗,會向TaskTracker抛出異常 任務挂起
JobTracker
- 負責接收使用者送出的作業,負責啟動、跟蹤任務執行
- JobSubmissionProtocol是JobClient與JobTracker通信的接口
- InterTrackerProtocol是TaskTracker與JobTracker通信的接口
TaskTracker
- 負責執行任務
JobClient
- 是使用者作業與JobTracker互動的主要接口
- 負責送出作業的,負責啟動、跟蹤任務執行、通路任務狀态和日志等。
Partitioner程式設計
- partitioner是partitioner的基類,如果需要定制partitioner也需要繼承該類
- HashPartitioner是mapreduce的預設partitioner。計算方法是which reduce=(key.hashCode()&Integer.MAX_VALUE)%numReduceTasks,得到目前的目的reducer
- mapper ----擷取資料
- partitioner ----分區 屬于shuffle
- reduce ---計算
public class TCPartitioner extends Partitioner<Text, TelBean>{
@Override
public int getPartition(Text key, TelBean bean, int arg2) {
// TODO Auto-generated method stub
}
}
把partitioner加入到job裡面
- 把partitioner添加到job裡面
- job.setPartitionerClass(TCPartitioner.class);
- 設定reduceTasks的數量 有幾個分區設定幾個任務
- job.setNumReduceTasks(2);
sort程式設計
繼承WritableComparable<> -序列化并且排序
public class Bean implements WritableComparable<Bean>{
//反序列化
@Override
public void readFields(DataInput in) throws IOException {
// TODO Auto-generated method stub
}
//序列化
@Override
public void write(DataOutput out) throws IOException {
// TODO Auto-generated method stub
}
@Override
public int compareTo(Bean bean) {
// TODO Auto-generated method stub
}
}
combiner程式設計
- 每一個map可能會産生大量的輸出,combiner的作用就是在map端對輸出先做一次合并,以減少傳輸到reducer的資料量
- combiner最基本是實作本地key的歸并,combiner具有類似本地的reduce功能
- 如果不用combiner,那麼,所有的結果都是reduce完成,效率會相對低下。使用combiner,先完成的map會在本地聚合,提升速度
- 注意:combiner的輸出是reduce的輸入,如果combiner是可插拔的,添加combiner絕對不能改變最終的計算結果。是以combiner隻應該用于那種reduce的輸入kry/value類型完全一緻,且不影響最終結果的場景。比如累加,最大值等。。。
- combiner就是map端的educer
- job.setCombinerClass();