天天看點

MapReduce深度分析(一)MapReduce深度分析(一)

MapReduce深度分析(一)MapReduce深度分析(一)

  圖為MapReduce資料流向示意圖

  步驟1、輸入檔案從HDFS流向到Mapper節點。在一般情況下,存儲資料的節點就是Mapper運作的節點,不需要在節點之間進行資料傳輸,也就是盡量讓存儲靠近計算。

  步驟2、mapper輸出到記憶體緩沖區。Mapper的輸入是解析後的鍵值對,輸出是經過處理後新的<key,value>鍵值對。mapper的輸出并不是直接寫到本地檔案系統,而是先寫入一個記憶體緩沖區,當緩沖區達到一定的門檻值後就将緩沖區中的資料以一個臨時檔案的形式寫入本地磁盤。partitioner就是發生在這個階段,也就是在寫入記憶體緩沖區的同時執行了partitioner對檔案進行分區,以便後續對Reduce進行處理。

  步驟3、從記憶體緩沖區到磁盤。當緩沖區達到100M,溢寫比例預設是0.8。從緩沖區寫到本地磁盤的過程就是spill。溢寫線程啟動同時會對這80M的記憶體資料依據key進行排序,如果使用者作業設定了Combiner,那麼在寫到磁盤之前,會對Map輸出的鍵值對調用Combiner類做規約操作。目的是減少溢寫到本地磁盤檔案的資料量。這個過程還涉及對多次臨時檔案的合并,排序後最終形成Region檔案。

  步驟4、從Mapper端的本地檔案系統流向Reduce端。也就是Shuffle階段。複制資料階段。

  步驟5、從Reduce端記憶體緩沖區流向Reduce端的本地磁盤。這個過程就是Reduce中的Merge和Sort階段。Merge分為三種情況,記憶體檔案合并,磁盤檔案合并,同時還會以key為鍵進行排序,最終會生成已經對相同key的value進行聚集并排序号的輸出檔案。

  步驟6、Merge和Sort之後直接流向Reduce函數進行規約操作。

  步驟7、Reduce處理完畢之後根據使用者指定的輸出類型寫入HDFS中,生成相應的part-*形式的輸出檔案。

  

MapReduce深度分析(一)MapReduce深度分析(一)

  步驟1、使用者的應用通過JobClient類送出到JobTracker,在JobClient類中,将使用者程式的Mapper類,Reducer類以及配置JobConf打包成一個JAR檔案并儲存在HDFS中,需要指定JobConf的輸入路徑(a)、輸出路徑,Mapper和Reducer類名的參數。JobClient在送出作業的時候,将作業jar檔案的路徑一起送出到JobTracker的master服務,也就是作業排程器。見序号1

  步驟2、在JobClient送出Job後,JobTracker會建立一個JobInProgress來跟蹤和排程這個作業,并将其添加到排程器的作業隊列中,見序号2

  步驟3、JobInProgress會根據送出作業jar檔案中定義的輸入資料集建立相應數量的TaskInProcess用于監控和排程MapTask和ReduceTask,序号3

  步驟4、JobTracker啟動任務時通過TaskInProgress來啟動作業任務序号4,這時會把Task對象(MapTask和ReduceTask)序列化寫入相應的TaskTracker服務中。序号5

  步驟5、TaskTracker收到後建立相應的TaskInProgress用于監控和排程運作該Task,序号6

  步驟6、啟動具體的Task程序,TaskTracker通過TaskInProgress管理的TaskRunner對象運作具體的Task,序号7

  步驟7、TaskRunner自動裝載作業jar檔案,并設定好環境變量後啟動一個獨立的Java子程序來執行Task,TaskRunner會首先調用執行MapTask,序号8

  步驟8、MapTask先調用Mapper,進行Mapper的相關操作。

  步驟9、MapTask的任務完成之後,TaskRunner調用ReduceTask程序來啟動Reducer,需要注意的是MapTask和ReduceTask不一定會運作在同一個TaskTracker中。

  步驟10、ReduceTask直接調用Reducer類處理Mapper的輸出結果,Reducer生成最終的結果鍵值對,寫到HDFS中。

  MapTask先被TaskRunner調用執行,然後調用執行使用者的Mapper類進而開始Map任務處理階段,是以MapTask是整個Map階段最核心的類。

MapReduce深度分析(一)MapReduce深度分析(一)

  圖為MapTask總執行邏輯圖。通過調用initialize()方法執行初始化工作。Map的初始化任務包括JobContext、TaskContext、輸出路徑等,調用Task.initialize()方法。初始化完成之後會依次判斷作業類型,MapTask中三種特殊的作業:cleanupJobTask(清理Map任務)、setupJobTask(初始化Map任務)和TaskCleanupTask(清理作業任務),這三種類型的作業會根據需要進行判斷調用。

  處理流程:

    步驟1、建立執行Mapper所需要的對象。首先建立的對象有:TaskAttemptContext、Mapper、InputFormat、InputSplit、RecordReader。這些類的對象主要為Mapper要讀取的輸入資料及切分準備。

    步驟2、建立所需要的輸出收集器OutputCollector。

    步驟3、初始化RecordReader,通過input.initialize(split,mapperContext)來對輸入資料進行初始化。

    步驟4、執行Mapper,通過mapper.run(mapperContext)最終調用使用者指定的Mapper類對資料進行Map操作。

  以上就是RunNewMapper總的處理步驟。

MapReduce深度分析(一)MapReduce深度分析(一)

  對于以上步驟可以分為以下幾個階段:

    Read階段:通過RecordReader對象,對HDFS上的檔案進行split切分,調用使用者指定的輸入檔案格式類解析每一個split檔案,輸出<key,value>鍵值對。

    Map階段:對輸入的鍵值對調用使用者編寫的Map函數進行處理,并輸出<key,value>鍵值對

    Collector和Partitioner階段:收集Mapper的輸出,并在OutputCollector函數内部對鍵值對進行Partitioner分區,以便确定相應的Reducer處理,這個階段将最終的鍵值對集合輸出到記憶體緩沖區。

    Spill階段:包含Sort和Combiner階段,從記憶體緩沖區寫到磁盤的過程。

    Merge階段:對Spill階段在本地磁盤生成的小檔案進行多次合并,生成一個大檔案。

  下面具體分析每個階段的具體任務:

    Read階段:首先通過taskContext.getInputFormatClass()得到使用者指定的InputFormatClass來建立InputFormat對象執行個體;其次建立InputSplit對象,這個對象負責對檔案進行資料塊的邏輯切分;最後,建立RecordReader對象。InputFormat對象會提供getSplit()重要方法,通過getSplit将輸入檔案切分成多個邏輯InputSplit執行個體并傳回,每一個InputSplit執行個體就由對應的一個Mapper處理,通過RecordReader對象把InputSplit提供的輸入檔案轉化為Mapper所需要的keys/values鍵值對集合。

    Map階段:Mapper類中有setup、map、cleanup、run四個核心方法,通過調用run方法,依次執行setup(context)-->map()-->cleanup(context)

    Collector和Partitioner階段:map函數處理的結果不直接寫到記憶體緩沖區,而有一個Collector對象進行收集。partition是對應的Reduce分區号,是Partitioner的傳回值,也就是傳到Reduce節點處理。自定義的MapperOutputCollector接口,并實作collect、close、flush方法。

    Spill階段:首先建立spill檔案,2、按照partition的順序對記憶體緩沖區中的資料進行排序。3、循環依次将每個partition寫入磁盤檔案。4、建立spill index檔案。

    Merge階段:經過spill階段會生成多個spill.out檔案和相應的索引檔案spill.out.index,MapTask最終需要将這些形式的臨時檔案經過多次合并成一個大的輸出檔案。

   和MapTask類似,ReduceTask也繼承了Task類,重寫了run方法,ReduceTask就是調用run方法來執行Reduce任務的。

MapReduce深度分析(一)MapReduce深度分析(一)

  圖為ReduceTask整體的處理邏輯,首先是ReduceTask的初始化工作,包括添加Reduce過程需要經過的copy、sort和Reduce階段,以便通知JobTracker目前運作的情況,設定并啟動reporter程序以便和JobTracker進行交流,最後進行一些和任務輸出相關的設定。比如建立commiter,設定工作目錄等。ReduceTask階段分為四個階段:

  shuffle階段:這個階段就是Reduce中的copy階段,運作Reduce的TaskTracker需要從各個Mapper節點遠端複制屬于自己處理的一段資料。

  Merge階段:進行多次合并。

  sort階段:雖然每個Mapper的輸出是按照key排序好的,但是經過Shuffle和Merge階段後并不是統一有序的。

  Reduce階段:在Sort階段完成後,執行Reduce類進行規約。

當神已無能為力,那便是魔渡衆生