一、MapReduce簡介
- MapReduce:2004年 Google paper中提出。
- 一個用于分布式資料處理的程式設計模型和運作環境。适合處理各種結構化和非結構化的資料。
- HDFS(hadoop 分布式檔案系統)是MapReduce的基礎。
- 分布式系統的設計原則
moving computation is more cheaper than moving data。(現場辦公)
- Map
示例:我們要數圖書館中的所有的書,你數1号書架,我數2号書架。這就是“Map”。我們人越多,數書就更快。
- Reduce
現在我們到一起,把各自統計的數報給管理者,管理者把所有人的統計數加在一起,這就是“Reduce”。
- Hadoop生态系統架構
- MapReduce 部署視圖
MR是和HDFS一樣的master/slave模式的星型結構,MR的Nodemanage必須和HDFS的datanode部署在同一節點上。
- Map Reduce運作視圖
MR的幾個重要對象:
(1)、MR client:和使用者互動,和MR叢集互動。
(2)、Application:MR任務,Map算法、Reduce算法。
(3)、App config:MR任務相關配置參數。
(1)、Resourcemanage:
Resource deamon,整個MR叢集的資源管理,主要是CPU、記憶體、網絡。
Applications deamon,整個叢集中MR任務(application)的排程監控,但隻排程app master。
(1)、Nodemanage:對應HDFS的data node,負責本節點計算資源的監控,和Resourcemanage保持心跳。
(2)、Application master:一個map reduce任務内部的管理,task的啟動,監控。
(3)、task:一個任務(app)被分解成若幹task來執行具體的map reduce算法。分為map task和reduce task兩類。通常,一個task運作在一個jvm内。
二、Map Reduce原理
Map、Shuffle和Reduce每一步都有key-value對作為輸入和輸出。
示例:處理一批有關天氣的資料,計算每年的最高溫度。
資料其格式如下:
1、按照ASCII碼存儲,每一行一條記錄
2、每一行字元從0開始計數,第15個到第18個字元為年
3、第25個到第29個字元為溫度,其中第25位是符号+/-
Map執行過程:
1、輸入資料拆分(一般按block拆分)
2、map算法運作
3、map結果輸出到本地硬碟
Map的輸入(key,value)如下:
第一個map:
(0,0067011990999991950051507+0000+)
(1,0043011990999991950051518-0011+)
第二個map:
(2,0043012650999991949032412+0111+)
(3,0043012650999991949032418+0078+)
(4,0067011990999991937051507+0001+)
第三個map:
(5,0043011990999991937051512-0002+)
(6,0043011990999991945051518+0001+)
由map任務進行計算,map過程中:通過對每一行字元串的解析,得到年-溫度的鍵值對(key-value)作為輸出:
Map輸出的(key,value)如下:
(1950,0)
(1950,-11)
(1949,111)
(1949,78)
(1937,1)
(1937,-2)
(1945,1)
Map的輸出就是Shuffle階段的輸入,shuffle階段分排序、合并兩個過程
1、排序(按年份,年份相同的随機排)
2、合并+分區(分區:與reduce對應)
Shuffle的輸出就是reduce階段的輸入,reduce會根據shuffle輸出的結果資料進行分區,reduce根據上述例子中的資料分為兩個區,分别給兩個reduce任務。
(1950,<0,-11>)
(1949,<111,78>)
(1945,1)
(1937,<1,-2>)
在reduce過程中,在清單中選擇出最大的溫度,将年-最大溫度的(key,value)作為輸出,輸出至HDFS的兩個檔案中:
(1950,0)
(1949,111)
(1945,1)
(1937,1)
Map Reduce ,就是<key,value> 轉換過程
Map task 運作分析
1、從輸入的split中讀取記錄(K1,V1)
2、執行使用者編寫的map算法,生成(K2,V2)
3、(K2、V2)輸出到本地磁盤
(1)、首先輸出到記憶體
(2)、通過sort和merge生成(K2,list<V2>)
(3)、輸出到本地硬碟(多個spill檔案)
4、spill檔案繼續sort和merge,生成map結果檔案及索引檔案
5、map結束
(1)、清理spill檔案
(2)、告訴Application master,map結束。
注意:
(1)、Without reduce task?? Output format write to HDFS
(2)、Map result file clear?? Job finished
為什麼搞這麼複雜?
目的:在盡量減少磁盤IO情況下,對map task的輸出進行partition 和sort ------這是Reduce的要求。
限制:記憶體buffer不能無限大。
優化:
1、map執行結果先輸出為split檔案,然後merge
2、用map輸出資料的key進行排序
3、combiner(local reduce),減少reduce shu資料量
4、map輸出結果檔案壓縮
reduce task 運作分析
很多階段都可以并行進行
1、向Application master 查詢結束的map task。
2、從結束的map task 節點上下載下傳屬于自己的資料(from map result file)下載下傳到本地檔案系統。(K2,list<V2>)格式。
3、(K2,list<V2>)格式資料的sort 和merge,一邊從多個map task 節點上下載下傳資料,一邊對這些資料進行sort 和merge,直道從所有map task 節點下載下傳完資料。
4、使用者reduce 算法執行,生成(K3,V3)格式資料。
5、reduce 執行結果寫入HDFS,(K3,V3)格式。
6、通知Application master ,reduce task結束。
reduce task 任務和map task 任務一樣複雜
目的:提高效率,在盡量減少磁盤IO情況下,對多個map task 的輸出進行merge和sort ------這個是reduce 的要求。
限制:記憶體buffer不能無限大。
處理小檔案問題:
1、小檔案問題:
(1)、meta-data太多,占namenode記憶體。
(2)、map task任務太多。
2、處理方法:
(1)、task JVM reuse(資源重用)
(2)、存檔檔案(隻能解決分布式檔案系統存儲小檔案問題,不能解決MapReduce處理小檔案問題)
HAR files: Hadoop Archives
(3)、分布式檔案系統提供了一種MapReduce處理小檔案的方法,MapReduce提供了一個名為CombineFileInputFormat的接口解決MapReduce處理小檔案的方法。CombineFileInputFormat接口允許一個MapReduce任務處理多個小檔案。
(4)、hadoop還提供了一種方法是 SequenceFile,SequenceFile可以存儲多個小檔案,像HAR文檔一樣。同時SequenceFile類型的檔案在通過MapReduce處理的時候,一個SequenceFile檔案可以啟動一個MapReduce任務進行處理。可避免因小檔案過多而啟動多個MapReduce任務問題。