天天看點

shuffle(partitioner+combiner+sort)shuffle(partitioner+combiner+sort)

shuffle(partitioner+combiner+sort)

  1. 每一個map有一個環形記憶體緩沖區,用于存儲任務的輸出。預設大小100MB(io.sort.mb屬性),一旦達到閥值0.8(io.sort.spill.percent),一個背景線程把内容寫到(spill)磁盤的指定目錄(mapred.local.dir)下的建立的一個溢出寫檔案
  2. 寫磁盤前,要partition,sort。如果有combiner,combiner排序後資料。
  3. 等最後距離寫完,合并全部溢出寫檔案為一個分區且排序的檔案。
  4. reducer通過http方式得到輸出檔案的分區。
  5. TaskTracker為分區檔案運作Reduce任務。複制階段把Map輸出複制到Reducer的記憶體或磁盤。一個Map任務完畢,Reduce就開始複制輸出。
  6. 排序階段合并map輸出。然後走Reduce階段。

MR過程各個角色的作用

  1. jobClient:送出作業
  2. JobTracker:初始化作業,配置設定作業,TaskTracker與其進行通信,協調監控整個作業
  3. TaskTracker:定期與JobTracker通信,執行Map和Reduce任務
  4. HDFS:保留作業的資料、配置、jar包、結果

作業送出

  1. 送出作業之前,需要對作業進行配置   編寫自己的MR程式   配置作業,包括輸入輸出路徑等。。。
  2. 送出作業   配置完畢後,通過JobClient送出
  3. 具體功能:
  4. 與JobTracker通信得到一個jar的存儲路徑和Jobld
  5. 輸入輸出路徑檢查
  6. 将jobjar拷貝到HDFS
  7. 計算輸入分片,将分片資訊寫到job.split中
  8. 寫job.xml
  9. 真正送出作業

作業初始化

  1. 用戶端送出作業後,JobTracker會将作業加入到隊列,然後進行排程,預設是FIFO方式
  2. 具體功能:
  3. 作業初始化隻要是值JobInProgress中完成的
  4. 讀取分片資訊
  5. 任務建立task包扣Map和reduce任務
  6. 建立TaskInProgress執行task

任務配置設定

  1. tasktracker與jobtracker之間的通信和任務配置設定是通過心跳機制實作的
  2. tasktracker會主動定期向jobtracker發送心跳資訊,詢問是否有任務要做,如果有,就會申請到任務
  3. 心跳  定期   任務完成--領

任務執行

  1. 如果tasktracker拿到任務,會将所有的資訊拷貝到本地,包扣代碼、配置、分片資訊等
  2. tasktracker中的localizeJob()方法會被條用進行本地化,拷貝job .jar,jobconf,job.xml到本地
  3. tasktracker調用launchTaskForJob()方法加載啟動任務
  4. MapTaskRunner和ReduceTaskRunner分别啟動javachild程序來執行相應的任務

狀态更新

  1. Task會定期向TaskTraker回報執行情況
  2. TaskTracker會定期收集所在叢集上的所有Task的資訊,并向JobTracker回報
  3. JobTracker會根據所有TaskTracker回報上來的資訊進行彙總

作業完成

  1. JobTracker是在接收到最後一個任務完成後,才将任務标記為成功
  2. 将纾解寫入到HDFS中

錯誤處理

  1. JobTracker失敗  存在單點故障,hadoop2.0解決了這個問題
  2. TaskTracker失敗  tasktracker崩潰了會停止向Jobtracker發送心跳資訊,jobtracker會将tasktracker從等待的任務池中移除,并将該任務轉移到其他的地方執行,jobtracker将tasktracker加入到黑名單中
  3. Task失敗  任務失敗,會向TaskTracker抛出異常   任務挂起

JobTracker

  1. 負責接收使用者送出的作業,負責啟動、跟蹤任務執行
  2. JobSubmissionProtocol是JobClient與JobTracker通信的接口
  3. InterTrackerProtocol是TaskTracker與JobTracker通信的接口

TaskTracker

  1. 負責執行任務

JobClient

  1. 是使用者作業與JobTracker互動的主要接口
  2. 負責送出作業的,負責啟動、跟蹤任務執行、通路任務狀态和日志等。

Partitioner程式設計

  1. partitioner是partitioner的基類,如果需要定制partitioner也需要繼承該類
  2. HashPartitioner是mapreduce的預設partitioner。計算方法是which reduce=(key.hashCode()&Integer.MAX_VALUE)%numReduceTasks,得到目前的目的reducer
  3. mapper ----擷取資料
  4. partitioner  ----分區  屬于shuffle
  5. reduce   ---計算
public class TCPartitioner extends Partitioner<Text, TelBean>{
    
    @Override
    public int getPartition(Text key, TelBean bean, int arg2) {
        // TODO Auto-generated method stub
    
    }
}
           

把partitioner加入到job裡面

  1. 把partitioner添加到job裡面
  2. job.setPartitionerClass(TCPartitioner.class);
  3. 設定reduceTasks的數量  有幾個分區設定幾個任務
  4. job.setNumReduceTasks(2);

sort程式設計

繼承WritableComparable<> -序列化并且排序

public class Bean implements WritableComparable<Bean>{
   
 //反序列化
    @Override
    public void readFields(DataInput in) throws IOException {
        // TODO Auto-generated method stub
    }
   
 //序列化
    @Override
    public void write(DataOutput out) throws IOException {
        // TODO Auto-generated method stub
    }
   
 @Override
    public int compareTo(Bean bean) {
        // TODO Auto-generated method stub
    }
}
           

combiner程式設計

  1. 每一個map可能會産生大量的輸出,combiner的作用就是在map端對輸出先做一次合并,以減少傳輸到reducer的資料量
  2. combiner最基本是實作本地key的歸并,combiner具有類似本地的reduce功能
  3. 如果不用combiner,那麼,所有的結果都是reduce完成,效率會相對低下。使用combiner,先完成的map會在本地聚合,提升速度
  4. 注意:combiner的輸出是reduce的輸入,如果combiner是可插拔的,添加combiner絕對不能改變最終的計算結果。是以combiner隻應該用于那種reduce的輸入kry/value類型完全一緻,且不影響最終結果的場景。比如累加,最大值等。。。
  5. combiner就是map端的educer
  6. job.setCombinerClass();
shuffle(partitioner+combiner+sort)shuffle(partitioner+combiner+sort)
shuffle(partitioner+combiner+sort)shuffle(partitioner+combiner+sort)