天天看點

MapReduce架構

一.MapReduce 的思想核心是 分 而治之 , 充分利用了并行處理的優勢。

  1. Mapper map()方法是對輸入的一個KV對調用一次!!
  2. Reduce Reduce()方法是對相同K的一組KV對調用執行一次
  3. Drive

二. MapReduce 原理分析

  1.  MapTask運作機制詳解:
MapReduce架構

1. 首先,讀取資料元件 InputFormat (預設 TextInputFormat )會通過 getSplits 方法對輸入目錄中檔案進行邏輯切片規劃得到splits ,有多少個 split 就對應啟動多少個 MapTask 。 split 與 block 的對應關系預設是一對一。 2. 将輸入檔案切分為 splits 之後,由 RecordReader 對象(預設 LineRecordReader )進行讀取,以 \n 作為分隔符,讀取一行資料,傳回<key , value> 。 Key 表示每行首字元偏移值, value 表示這一行 文本内容。 3. 讀取 split 傳回 <key,value> ,進入使用者自己繼承的 Mapper 類中,執行使用者重寫的 map 函數,RecordReader讀取一行這裡調用一次。 4. map 邏輯完之後,将 map 的每條結果通過 context.write 進行 collect 資料收集。在 collect 中,會先對其進行分區處理,預設使用HashPartitioner 。 MapReduce 提供 Partitioner 接口,它的作用就是根據 key 或 value 及 reduce 的數量來決定目前的這對 輸出資料最終應該交由哪個 reduce task 處理。預設對 key hash 後再以 reduce task 數量取模。預設的 取模方式隻是為了平均 reduce 的處理能力,如果使用者自己對 Partitioner 有需求,可以訂制并設定到 job 上。 5. 接下來,會将資料寫入記憶體,記憶體中這片區域叫做環形緩沖區,緩沖區的作用是批量收集 map 結果,減少磁盤IO 的影響。我們的 key/value 對以及 Partition 的結果都會被寫入緩沖區。當然寫入之前,key 與 value 值都會被序列化成位元組數組。     環形緩沖區其實是一個數組,數組中存放着 key 、 value 的序列化資料和 key 、 value 的中繼資料資訊,包括partition 、 key 的起始位置、 value 的起始位置以及 value 的長度。環形結構是一個抽象概念。      緩沖區是有大小限制,預設是100MB 。當 map task 的輸出結果很多時,就可能會撐爆記憶體,是以需要在一定條件下将緩沖區中的資料臨時寫入磁盤,然後重新利用這塊緩沖區。這個從記憶體往磁盤寫資料的過程被稱為Spill ,中文可譯為溢寫。這個溢寫是由單獨線程來完成,不影響往緩沖區寫map結果的線程。溢寫線程啟動時不應該阻止 map 的結果輸出,是以整個緩沖區有個溢寫的比例spill.percent。這個比例預設是 0.8 ,也就是當緩沖區的資料已經達到門檻值( buffer size * spillpercent = 100MB * 0.8 = 80MB),溢寫線程啟動,鎖定這 80MB 的記憶體,執行溢寫過程。 Maptask的輸出結果還可以往剩下的 20MB 記憶體中寫,互不影響。 6 、當溢寫線程啟動後,需要對這 80MB 空間内的 key 做排序 (Sort) 。排序是 MapReduce 模型預設的行為 !      如果 job 設定過 Combiner ,那麼現在就是使用 Combiner 的時候了。将有相同 key 的 key/value 對的value加起來,減少溢寫到磁盤的資料量。 Combiner 會優化 MapReduce 的中間結果,是以它在整個模型中會多次使用。      那哪些場景才能使用 Combiner 呢?從這裡分析, Combiner 的輸出是 Reducer 的輸入, Combiner絕不能改變最終的計算結果。Combiner 隻應該用于那種 Reduce 的輸入 key/value 與輸出 key/value類型完全一緻,且不影響最終結果的場景。比如累加,最大值等。Combiner 的使用一定得慎重,如果用好,它對job 執行效率有幫助,反之會影響 reduce 的最終結果。 7. 合并溢寫檔案:每次溢寫會在磁盤上生成一個臨時檔案(寫之前判斷是否有 combiner ),如果map的輸出結果真的很大,有多次這樣的溢寫發生,磁盤上相應的就會有多個臨時檔案存在。當整個資料處理結束之後開始對磁盤中的臨時檔案進行merge 合并,因為最終的檔案隻有一個,寫入磁盤,并且為這個檔案提供了一個索引檔案,以記錄每個reduce 對應資料的偏移量。 -----------------------------------------------------------------------------------------------------------------------------------------------------------------

maptask的并行度(數量)

注意:預設spitsize == blocksize ==128m  這樣的好處是為了所謂的資料本地化或hdfs的短路讀取,減少一些沒必要的網絡資源。

MapReduce架構
MapReduce架構

MapTask 并行度是不是越多越好呢?(源碼中的split_slop=1.1) 答案不是,如果一個檔案僅僅比 128M 大一點點也被當成一個 split 來對待,而不是多個 split. MR 架構在并行運算的同時也會消耗更多資源,并行度越高資源消耗也越高,假設 129M 檔案分為兩個分片,一個是128M ,一個是 1M ;對于1M 的切片的 Maptask 來說,太浪費資源。  

   2.ReduceTask 工作機制:

MapReduce架構

Reduce 大緻分為 copy 、 sort 、 reduce 三個階段,重點在前兩個階段。 copy 階段包含一個eventFetcher來擷取已完成的 map 清單,由 Fetcher 線程去 copy 資料,在此過程中會啟動兩個 merge 線程,分别為inMemoryMerger 和 onDiskMerger ,分别将記憶體中資料 merge 到磁盤和将磁盤中的資料 進行merge 。待資料 copy 完成之後, copy 階段就完成了,開始進行 sort 階段, sort 階段主要是執行 finalMerge操作,純粹的 sort 階段,完成之後就是 reduce 階段,調用使用者定義的 reduce 函數進行處理。 詳細步驟

  1. Copy階段,簡單地拉取資料。Reduce程序啟動一些資料copy線程(Fetcher),通過HTTP方式請求 maptask擷取屬于自己的檔案。
  2. Merge階段。這裡的merge如map端的merge動作,隻是數組中存放的是不同map端copy來的數 值。Copy過來的資料會先放入記憶體緩沖區中,這裡的緩沖區大小要比map端的更為靈活。merge 有三種形式:記憶體到記憶體;記憶體到磁盤;磁盤到磁盤。預設情況下第一種形式不啟用。當記憶體中的 資料量到達一定門檻值,就啟動記憶體到磁盤的merge。與map 端類似,這也是溢寫的過程,這個過 程中如果你設定有Combiner,也是會啟用的,然後在磁盤中生成了衆多的溢寫檔案。第二種 merge方式一直在運作,直到沒有map端的資料時才結束,然後啟動第三種磁盤到磁盤的merge 方式生成最終的檔案。
  3. 合并排序。把分散的資料合并成一個大的資料後,還會再對合并後的資料排序。對排序後的鍵值對調用reduce方法,鍵相等的鍵值對調用一次reduce方法,每次調用會産生零個或者多個鍵值對,最後把這些輸出的鍵值對寫入到HDFS檔案中。

ReduceTask并行度

MapTask 的并發數由切片數決定,ReduceTask 數量的決定是可以直接手動設定: 注意事項 1. ReduceTask=0 ,表示沒有 Reduce 階段,輸出檔案數和 MapTask 數量保持一緻; 2. ReduceTask 數量不設定預設就是一個,輸出檔案數量為 1 個; 3. 如果資料分布不均勻,可能在 Reduce 階段産生傾斜;   三.shuffle機制  

MapReduce架構

map輸入到reduce這個階段的過程稱為shuffle;

job對象進入後會有分區:

MapReduce架構

自定義分區小結:

分區是在map方法輸出後在緩沖區内做的,他的參數就是map的輸出的參數。

1. 自定義分區器時最好保證分區數量與 reduceTask 數量保持一緻; 2. 如果分區數量不止 1 個,但是 reduceTask 數量 1 個,此時隻會輸出一個檔案。 3. 如果 reduceTask 數量大于分區數量,但是輸出多個空檔案 4. 如果 reduceTask 數量小于分區數量,有可能會報錯 combiner合并小結

MapReduce架構

1. Combiner 是 MR 程式中 Mapper 和 Reducer 之外的一種元件 2. Combiner 元件的父類就是 Reducer 3. Combiner 和 reducer 的差別在于運作的位置 4. Combiner 是在每一個 maptask 所在的節點運作 ; 5. Combiner 的意義就是對每一個 maptask 的輸出進行局部彙總,以減小 網絡傳輸量 。 6. Combiner 能夠應用的前提是不能影響最終的業務邏輯,此外, Combiner 的輸出 kv 應該跟 reducer 的輸入kv 類型要對應起來。

排序

MapTask       它會将處理的結果暫時放到環形緩沖區中,當環形緩沖區使用率達到一定門檻值後,再對緩沖區中的資料進行一次快速排序,并将這些有序資料溢寫到磁盤上,溢寫完畢後,它會對磁盤上所有檔案進行歸并排序。       ReduceTask 當所有資料拷貝完畢後, ReduceTask 統 - 對記憶體和磁盤上的所有資料進行一次歸并排序。 1. 部分排序 . MapReduce 根據輸入記錄的鍵對資料集排序。保證輸出的每個 檔案内部有序 。 2. 全排序 最終輸出結果隻有 一個檔案 ,且檔案内部有序。實作方式是隻設定 - - 個 ReduceTask 。但該方法在處理大型檔案時效率極低,因為- - 台機器處理所有檔案,完全喪失了 MapReduce 所提供的并行架構。 3. 輔助排序 : ( GroupingComparator 分組 ) 在 Reduce 端對 key 進行分組。應用于 : 在接收的 key 為 bean 對象時,想讓一個或幾個字段相同 ( 全部字段比較不相同) 的 key 進入到同一個 reduce 方法時,可以采用分組排序。 4. 二次排序 . 在自定義排序過程中,如果 compareTo 中的判斷條件為兩個即為二次排序。   GroupingComparator 是reduce端的元件:       GroupingComparator 是mapreduce當中reduce端的一個功能元件,主要的作用是決定哪些資料作為一組,調用一次reduce的邏輯,預設是每個不同的key,作為多個不同的組,每個組調用一次reduce邏輯,我們可以自定義GroupingComparator實作不同的key作為同一個組,調用一次reduce邏輯。   MapReduce 讀取和輸出資料:    InputFormat:InputFormat 是 MapReduce 架構用來讀取資料的類。    分類:

  1. TextInputFormat (普通文本檔案,MR架構預設的讀取實作類型)
  2. KeyValueTextInputFormat(讀取一行文本資料按照指定分隔符,把資料封裝為kv類型)
  3. NLineInputF ormat(讀取資料按照行數進行劃分分片)
  4. CombineTextInputFormat(合并小檔案,避免啟動過多MapTask任務)
  5. 自定義InputFormat

CombineTextInputFormat:MR架構預設的TextInputFormat切片機制按檔案劃分切片,檔案無論多小,都是單獨一個切片, 然後由一個MapTask處理,如果有大量小檔案,就對應的會生成并啟動大量的 MapTask,而每個 MapTask處理的資料量很小大量時間浪費在初始化資源啟動收回等階段,這種方式導緻資源利用 率不高。

CombineTextInputForma t 用于小檔案過多的場景,它可以将 多個小檔案從邏輯上劃分成一個切 片 ,這樣多個小檔案就可以交給一個 MapTask 處理,提高資源使用率。 使用代碼:             // 如果不設定 InputFormat ,它預設用的是 TextInputFormat.class             job . setInputFormatClass ( CombineTextInputFormat . class );             // 虛拟存儲切片最大值設定 4m             CombineTextInputFormat . setMaxInputSplitSize ( job , 4194304 );  

CombineTextInputFormat的原理:切片生成過程分為兩部分:虛拟存儲過程和切片過程

假設設定 setMaxInputSplitSize 值為 4M 四個小檔案: 1.txt -->2M ;2.txt-->7M;3.txt-->0.3M;4.txt--->8.2M 虛拟存儲過程:把輸入目錄下所有檔案大小,依次和設定的 setMaxInputSplitSize 值進行比較,如果不大于設定的最大值,邏輯上劃分一個塊 。如果輸入檔案大于設定的最大值且大于兩倍,那麼以最大值切割一塊;當剩餘資料大小超過設定的最大值且不大于最大值2 倍,此将檔案均分成2 個虛拟存儲塊(防止出現太小切片)。 比如如setMaxInputSplitSize 值為 4M ,輸入檔案大小為 8.02M ,則先邏輯上分出一個 4M 的塊。剩餘的大小為4.02M ,如果按照 4M 邏輯劃分,就會出現 0.02M 的非常小的虛拟存儲檔案,是以将剩餘的4.02M 檔案切分成( 2.01M 和 2.01M )兩個檔案。 1.txt-->2M;2M<4M; 一個塊; 2.txt-->7M;7M>4M, 但是不大于兩倍,均勻分成兩塊;兩塊:每塊 3.5M ; 3.txt-->0.3M;0.3<4M ,0.3M<4M , 一個塊 4.txt-->8.2M; 大于最大值且大于兩倍;一個 4M 的塊,剩餘 4.2M 分成兩塊,每塊 2.1M 所有塊資訊: 2M , 3.5M , 3.5M , 0.3M , 4M , 2.1M , 2.1M 共 7 個虛拟存儲塊。 切片過程 判斷虛拟存儲的檔案大小是否大于 setMaxInputSplitSize 值,大于等于則單獨形成一個 切片。 如果不大于則跟下一個虛拟存儲檔案進行合并,共同形成一個切片。 按照之前輸入檔案:有 4 個小檔案大小分别為 2M 、 7M 、 0.3M 以及 8.2M 這四個小檔案, 則虛拟存儲之後形成 7 個檔案塊,大小分别為: 2M , 3.5M , 3.5M , 0.3M , 4M , 2.1M , 2.1M 最終會形成 3 個切片,大小分别為: ( 2+3.5 ) M ,( 3.5+0.3+4 ) M ,( 2.1+2.1 ) M 注意:虛拟存儲切片最大值設定最好根據實際的小檔案大小情況來設定具體的值。 自定義inputformat:     方案: 将多個小檔案合并成一個 SequenceFile 檔案( SequenceFile 檔案是 Hadoop 用來存儲二進制形式的 key-value 對的檔案格式), SequenceFile 裡面存儲着多個檔案,存儲的形式為檔案路徑 + 名稱為key,檔案内容為 value 。     outputformat: OutputFormat: 是 MapReduce 輸出資料的基類,所有 MapReduce 的資料輸出都實作了 OutputFormat抽象類。下面我們介紹幾種常見的OutputFormat 子類 TextOutputFormat      預設的輸出格式是 TextOutputFormat ,它把每條記錄寫為文本行。它的鍵和值可以是任意類型,因為TextOutputFormat 調用 toString() 方 法把它們轉換為字元串。 SequenceFileOutputFormat       将 SequenceFileOutputFormat 輸出作為後續 MapReduce 任務的輸入,這是一種好的輸出格式,因為它的格式緊湊,很容易被壓縮。

四 資料壓縮機制:

MapReduce架構
MapReduce架構
MapReduce架構

需要壓縮的地方:

Map輸入端壓縮  Map輸出端壓縮   Reduce端輸出壓縮   壓縮配置: 設定 map 階段壓縮 Configuration configuration = new Configuration(); configuration.set("mapreduce.map.output.compress","true"); configuration.set("mapreduce.map.output.compress.codec","org.apache.hadoop.io.compress.SnappyCodec"); 設定 reduce 階段的壓縮 configuration.set("mapreduce.output.fileoutputformat.compress","true"); configuration.set("mapreduce.output.fileoutputformat.compress.type","RECORD" ); configuration.set("mapreduce.output.fileoutputformat.compress.codec","org.ap ache.hadoop.io.compress.SnappyCodec");   配置檔案壓縮: <property> <name>mapreduce.output.fileoutputformat.compress</name> <value>true</value> </property> <property> <name>mapreduce.output.fileoutputformat.compress.type</name> <value>RECORD</value> </property> <property> <name>mapreduce.output.fileoutputformat.compress.codec</name> <value>org.apache.hadoop.io.compress.SnappyCodec</value> </property>  

繼續閱讀