MapReduce并行計算架構
- 基本知識
-
- 前言
- 核心概念
-
- 計算模型
- 系統架構
- 作業配置
- 計算流程與機制
-
- 作業送出和初始化
- Mapper
- Reducer
- 結構圖示
- 輸入/輸出格式(常用)
- 核心問題
-
- Map和Reduce數量
- 作業配置
- 作業排程
-
- 排程過程 :
- 有用的MapReduce特性
基本知識
前言
- MapReduce計算架構是Google提出的一種并行計算架構,是Google雲計算模型MapReduce的java開源實作,用于大規模資料集(通常1TB級以上)的并行計算。但其實,MR不僅是一種分布式的運算技術,也是簡化的分布式程式設計模式,是用于解決問題的程式開發模型。
核心概念
計算模型
- 計算模型的 核心概念 是”Map(映射)”和”Reduce(歸約)”。使用者需要指定一個Map函數,用來把一組鍵值對映射成一組新的鍵值對,并指定并發的Reduce函數用來合并所有的具有相同中間key值的中間的value值。作業的輸入和輸出都會被存儲在檔案系統中。整個架構負責 任務的排程和監控,以及重新執行已經失敗的任務 。
系統架構
-
在系統架構上,MapReduce架構是一種主從架構,由一個單獨的JobTracker節點和多個TaskTracker節點共同組成。
1)JobTracker是MapReduce的Master,負責排程構成一個作業的所有任務,這些任務分布在不同 的TaskTracker節點上,監控它們的執行,重新執行已經失敗的任務,同時提高狀态和診斷資訊給作業用戶端。
2)TaskTracker是MapReduce的Slave,僅負責運作由Master指派的任務執行。
作業配置
- 對于使用者來講,我們應該在應用程式中 指明輸入和輸出的位置路徑,并通過實作合适的接口或抽象類來提供Map和Reduce函數,再加上其他作業的參數,就構成了作業配置。
計算流程與機制
作業送出和初始化
-
(作業送出)指令行送出->作業上傳->産生切分檔案->送出作業到JobTracker->(作業初始化)->(Setup Task->Map Task->Reduce Task->Cleanup Task)
具體過程會在之後的文章介紹
Mapper
- Mapper是MapReduce架構給使用者暴露的Map程式設計接口,使用者在實作自己的Mapper類時需要繼承這個基類。執行Map Task任務:将輸入鍵值對(key/value pair)映射到一組中間格式的鍵值對集合。
處理流程如下:
- 通過InputFormat接口獲得InputSplit的實作,然後對輸入的資料切分。每一個Split分塊對應一個Mapper任務。
- 通過RecordReader對象讀取生成<k,v>鍵值對。Map函數接受資料并處理後輸出<k1,v1>鍵值對。
- 通過context.collect方法寫入context對象中。當鍵值對集中被收集後,會被Partition類中的partition()函數以指定方式區分并寫入輸出緩沖區(系統預設的是HashPartitioner),同時調用sort()進行排序。
- 如果使用者指定了Combiner,則會将鍵值對進行combine合并(相當于map端的reduce),輸出到reduce寫入檔案。
![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsICM38FdsYkRGZkRG9lcvx2bjxiNx8VZ6l2cs0TPn5EeBpnTysmaNBDOsJGcohVYsR2MMBjVtJWd0ckW65UbM5WOHJWa5kHT20ESjBjUIF2X0hXZ0xCMx81dvRWYoNHLrdEZwZ1Rh5WNXp1bwNjW1ZUba9VZwlHdssmch1mclRXY39CXldWYtlWPzNXZj9mcw1ycz9WL49zZuBnL1ITM5EjNzIjM5IDMxgTMwIzLc52YucWbp5GZzNmLn9Gbi1yZtl2Lc9CX6MHc0RHaiojIsJye.png)
Reducer
-
Reducer将與一個key關聯的一組中間數值集歸約為一個更小的數值集。
1.Shuffle階段。架構通過HTTP協定為每個Reducer獲得所有Mapper輸出中與之相關的分塊,這一階段也稱混洗階段,所做的大量操作就是資料複制,是以也可以稱為資料複制階段。
2.Sort階段。 架構按照key的值對Reducer的輸入進行分組(因為不同的Mapper輸出可能會有相同的key)。 Shuffle和Sort是同時進行的,Map的輸出也是一邊被取回一邊被合并的。 如果需要改變分組方式,則需要指定一個Compartor,實作二次排序(後面會介紹)。
3.Reduce階段。 調用Reduce()函數,對Shuffle和sort得到的<key,(list of values)>進行處理,輸出結果到DFS中。
結構圖示
輸入/輸出格式(常用)
- InputFormat
- 檢查作業輸入的有效性。
- 把輸入檔案切分成多個邏輯InputSplit執行個體,并把每個執行個體分發給一個Mapper(一對一);FileSplit是預設的InputSplit,通過write(DataOutput out)和readFields(DataInput in)兩種方法進行序列化和反序列化。
- 提供RecordReader實作。
- OutputFormat
- 檢驗作業的輸出。
- 驗證輸出結果類型是否如在Config中所配置的。
- 提供一個RecordWriter的實作,用來輸出作業結果。
核心問題
Map和Reduce數量
-
Map數量通常由 Hadoop叢集的DFS塊大小确定 ,也就是輸入檔案的總塊數。大緻是每一個Node是10~100個。
Reduce的數量有3種情況:0(特殊),1,多個。
- 單個Reduce:
【Hadoop】MapReduce并行計算架構基本知識計算流程與機制輸入/輸出格式(常用)核心問題 - 多個Reduce
【Hadoop】MapReduce并行計算架構基本知識計算流程與機制輸入/輸出格式(常用)核心問題 - 數量為0(适應于不需要歸約和處理的作業)
【Hadoop】MapReduce并行計算架構基本知識計算流程與機制輸入/輸出格式(常用)核心問題
作業配置
- 作業配置的相關設定方法
作業配置方法 | 功能說明 |
---|---|
setNumReduceTasks | 設定reduce數目 |
setNumMapTasks | 設定Map數目 |
setInputFormatClass | 設定輸入檔案格式類 |
setOutputFormatClass | 設定輸出檔案格式類 |
setMapperClass | 輸出Map類 |
setCombiner | 設定Combiner類 |
setReducerClass | 設定Reduce類 |
setPartitionerClass | 設定Partitioner類 |
setMapOutputKeyClass | 設定Map輸出的Key類 |
setMapOutputValueClass | 設定Map輸出的Value類 |
setOutputKeyClass | 設定輸出key類 |
setCompressMapOutput | 設定Map輸出是否壓縮 |
setOutputValueClass | 設定輸出value類 |
setJobName | 設定作業名字 |
setSpeculativeExecution | 設定是否開啟預防性執行 |
setMapSpeculativeExecution | 設定是否開啟Map任務的預防性執行 |
setReduceSpeculativeExecution | 設定是否開啟Reduce任務的預防性執行 |
作業排程
- 排程的功能是将各種類型的作業在排程算法作用下配置設定給Hadoop叢集中的計算節點,進而達到 分布式和并行計算 的目的。
- 排程算法子產品中至少涉及兩個重要流程:1.作業的選擇 2.任務的配置設定。
排程過程 :
-
1)MapReduce架構中作業通常是通過JobClient.runJob(job)方法送出到JobTracker,JobTracker接收到JobClient的請求後将其加入作業排程隊列中。
2)然後JobTracker一直等待JobClient通過RPC向其送出作業,而TaskTracker則一直通過RPC向JobTracker發送心跳信号詢問是否有任務可執行,有則請求JobTracker派發任務給它執行。
3)如果JobTracker的作業隊列不為空,則TaskTracker發送的心跳将會獲得JobTracker向它派發的任務。
這是一個主動請求的任務:slave的TaskTracker主動向master的JobTracker請求任務。
4)當TaskTracker接到任務後,通過自身排程在本slave建立起Task,執行任務。
常用排程器 主要包括:JobQueueTaskScheduler(FIFO排程器),CapacityScheduler(容量排程器),Fair Scheduler(公平排程器)等。
有用的MapReduce特性
- Counters 計數器
- DistributedCache 分布式緩存
- Tool 工具
- Compression 資料壓縮
(後面會做介紹)