天天看點

MapReduce過程詳解及其性能優化

https://www.cnblogs.com/felixzh/p/8604188.html

MapReduce過程詳解及其性能優化

1. Map階段

1.1 從HDFS讀取資料

1.1.1 讀取資料産生多少個Mapper

Mapper資料過大的話,會産生大量的小檔案,由于Mapper是基于虛拟機的,過多的Mapper建立和初始化及關閉虛拟機都會消耗大量的硬體資源;Mapper數太小,并發度過小,Job執行時間過長,無法充分利用分布式硬體資源;

1.1.2 Mapper數量由什麼決定

  • 輸入檔案數目
  • 輸入檔案的大小
  • 配置參數

這三個因素決定的Mapper數量.涉及參數:

mapreduce.input.fileinputformat.split.minsize #啟動map最小的split size大小,預設0
mapreduce.input.fileinputformat.split.maxsize #啟動map最大的split size大小,預設256M
dfs.block.size                                #block塊大小,預設128M      

計算公式:

splitSize =  Math.max(minSize, Math.min(maxSize, blockSize));      

例1: 一個檔案800M,Block大小是128M,那麼Mapper數目就是7個。6個Mapper處理的資料是128M,1個Mapper處理的資料是32M;

例2:一個目錄下有三個檔案大小分别為:5M   10M 150M 這個時候其實會産生四個Mapper處理的資料分别是5M,10M,128M,22M。

Mapper是基于檔案自動産生的,如果想要自己控制Mapper的個數?

就如上面例2,5M,10M的資料很快處理完了,128M要很長時間;這個就需要通過參數的控制來調節Mapper的個數。減少Mapper的個數的話,就要合并小檔案,這種小檔案有可能是直接來自于資料源的小檔案,也可能是Reduce産生的小檔案。

設定合并器:(set都是在hive腳本,也可以配置Hadoop)

set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;  --hive file merge input format which could controls the merge of small files
set mapreduce.input.fileinputformat.split.maxsize=268435456;         --大于這個值會切分,256MB
set mapreduce.input.fileinputformat.split.minsize=134217728;         --(default:0),小于這個值會合并
set mapreduce.input.fileinputformat.split.minsize.per.node=134217728;
set mapreduce.input.fileinputformat.split.minsize.per.rack=134217728;
--Merge small files at the end of a map-only job. When enabled, a map-only job is created to merge the files in the destination table/partitions
set hive.merge.mapFiles=true; 
--Merge small files at the end of a map-reduce job. When enabled, a map-only job is created to merge the files in the destination table/partitions
set hive.merge.mapredFiles=true;
--The desired file size after merging. This should be larger than hive.merge.smallfiles.avgsize
set hive.merge.size.per.task=268435456;         --每個Mapper要處理的資料,就把上面的5M  10M……合并成為一個
set mapred.max.split.size=268435456;            --mapred切分的大小
set mapred.min.split.size.per.node=134217728;   --低于128M就算小檔案,資料在一個節點會合并,在多個不同的節點會把資料抓過來進行合并。
--When the average output file size of a job is less than the value of this property, 
--Hive will start an additional map-only job to merge the output files into bigger files. 
--This is only done for map-only jobs if hive.merge.mapfiles is true, 
--for map-reduce jobs if hive.merge.mapredfiles is true, and for Spark jobs if hive.merge.sparkfiles is true
set hive.merge.smallfiles.avgsize=167772160;
--Size per reducer. If the input size is 10GiB and this is set to 1GiB, Hive will use 10 reducers
set hive.exec.reducers.bytes.per.reducer= 107374182400;      

1.2 處理資料

1.2.1 Partition說明

對于map輸出的每一個鍵值對,系統都會給定一個partition,partition值預設是通過計算key的hash值後對Reduce task的數量取模獲得。如果一個鍵值對的partition值為1,意味着這個鍵值對會交給第一個Reducer處理。

自定義partitioner的情況:

  •  我們知道每一個Reduce的輸出都是有序的,但是将所有Reduce的輸出合并到一起卻并非是全局有序的,如果要做到全局有序,我們該怎麼做呢?最簡單的方式,隻設定一個Reduce task,但是這樣完全發揮不出叢集的優勢,而且能應對的資料量也很受限。最佳的方式是自己定義一個Partitioner,用輸入資料的最大值除以系統Reduce task數量的商作為分割邊界,也就是說分割資料的邊界為此商的1倍、2倍至numPartitions-1倍,這樣就能保證執行partition後的資料是整體有序的。
  •  解決資料傾斜:另一種需要我們自己定義一個Partitioner的情況是各個Reduce task處理的鍵值對數量極不平衡。對于某些資料集,由于很多不同的key的hash值都一樣,導緻這些鍵值對都被分給同一個Reducer處理,而其他的Reducer處理的鍵值對很少,進而拖延整個任務的進度。當然,編寫自己的Partitioner必須要保證具有相同key值的鍵值對分發到同一個Reducer。
  • 自定義的Key包含了好幾個字段,比如自定義key是一個對象,包括type1,type2,type3,隻需要根據type1去分發資料,其他字段用作二次排序.

1.2.2 環形緩沖區

   Map的輸出結果是由collector處理的,每個Map任務不斷地将鍵值對輸出到在記憶體中構造的一個環形資料結構中。使用環形資料結構是為了更有效地使用記憶體空間,在記憶體中放置盡可能多的資料。

   這個資料結構其實就是個位元組數組,叫Kvbuffer,名如其義,但是這裡面不光放置了資料,還放置了一些索引資料,給放置索引資料的區域起了一個Kvmeta的别名,在Kvbuffer的一塊區域上穿了一個IntBuffer(位元組序采用的是平台自身的位元組序)的馬甲。資料區域和索引資料區域在Kvbuffer中是相鄰不重疊的兩個區域,用一個分界點來劃分兩者,分界點不是亘古不變的,而是每次Spill之後都會更新一次。初始的分界點是0,資料的存儲方向是向上增長,索引資料的存儲方向是向下增長Kvbuffer的存放指針bufindex是一直悶着頭地向上增長,比如bufindex初始值為0,一個Int型的key寫完之後,bufindex增長為4,一個Int型的value寫完之後,bufindex增長為8。

    索引是對在kvbuffer中的鍵值對的索引,是個四元組,包括:value的起始位置、key的起始位置、partition值、value的長度,占用四個Int長度,Kvmeta的存放指針Kvindex每次都是向下跳四個“格子”,然後再向上一個格子一個格子地填充四元組的資料。比如Kvindex初始位置是-4,當第一個鍵值對寫完之後,(Kvindex+0)的位置存放value的起始位置、(Kvindex+1)的位置存放key的起始位置、(Kvindex+2)的位置存放partition的值、(Kvindex+3)的位置存放value的長度,然後Kvindex跳到-8位置,等第二個鍵值對和索引寫完之後,Kvindex跳到-12位置。

1.3 寫資料到磁盤

    Mapper中的Kvbuffer的大小預設100M,可以通過mapreduce.task.io.sort.mb(default:100)參數來調整。可以根據不同的硬體尤其是記憶體的大小來調整,調大的話,會減少磁盤spill的次數此時如果記憶體足夠的話,一般都會顯著提升性能。spill一般會在Buffer空間大小的80%開始進行spill(因為spill的時候還有可能别的線程在往裡寫資料,因為還預留白間,有可能有正在寫到Buffer中的資料),可以通過mapreduce.map.sort.spill.percent(default:0.80)進行調整,Map Task在計算的時候會不斷産生很多spill檔案,在Map Task結束前會對這些spill檔案進行合并,這個過程就是merge的過程。mapreduce.task.io.sort.factor(default:10),代表進行merge的時候最多能同時merge多少spill,如果有100個spill個檔案,此時就無法一次完成整個merge的過程,這個時候需要調大mapreduce.task.io.sort.factor(default:10)來減少merge的次數,進而減少磁盤的操作;

Spill這個重要的過程是由Spill線程承擔,Spill線程從Map任務接到“指令”之後就開始正式幹活,幹的活叫SortAndSpill,原來不僅僅是Spill,在Spill之前還有個頗具争議性的Sort。

    Combiner存在的時候,此時會根據Combiner定義的函數對map的結果進行合并,什麼時候進行Combiner操作呢???和Map在一個JVM中,是由min.num.spill.for.combine的參數決定的,預設是3,也就是說spill的檔案數在預設情況下由三個的時候就要進行combine操作,最終減少磁盤資料;

減少磁盤IO和網絡IO還可以進行:壓縮,對spill,merge檔案都可以進行壓縮。中間結果非常的大,IO成為瓶頸的時候壓縮就非常有用,可以通過mapreduce.map.output.compress(default:false)設定為true進行壓縮,資料會被壓縮寫入磁盤,讀資料讀的是壓縮資料需要解壓,在實際經驗中Hive在Hadoop的運作的瓶頸一般都是IO而不是CPU,壓縮一般可以10倍的減少IO操作,壓縮的方式Gzip,Lzo,BZip2,Lzma等,其中Lzo是一種比較平衡選擇,mapreduce.map.output.compress.codec(default:org.apache.hadoop.io.compress.DefaultCodec)參數設定。但這個過程會消耗CPU,适合IO瓶頸比較大。

2 Shuffle和Reduce階段

2.1 Copy

由于job的每一個map都會根據reduce(n)數将資料分成map 輸出結果分成n個partition,是以map的中間結果中是有可能包含每一個reduce需要處理的部分資料的。是以,為了優化reduce的執行時間,hadoop中是等job的第一個map結束後,所有的reduce就開始嘗試從完成的map中下載下傳該reduce對應的partition部分資料,是以map和reduce是交叉進行的,其實就是shuffle。Reduce任務通過HTTP向各個Map任務拖取(下載下傳)它所需要的資料(網絡傳輸),Reducer是如何知道要去哪些機器取資料呢?一旦map任務完成之後,就會通過正常心跳通知應用程式的Application Master。reduce的一個線程會周期性地向master詢問,直到提取完所有資料(如何知道提取完?)資料被reduce提走之後,map機器不會立刻删除資料,這是為了預防reduce任務失敗需要重做。是以map輸出資料是在整個作業完成之後才被删除掉的。

reduce程序啟動資料copy線程(Fetcher),通過HTTP方式請求maptask所在的TaskTracker擷取maptask的輸出檔案。由于map通常有許多個,是以對一個reduce來說,下載下傳也可以是并行的從多個map下載下傳,那到底同時到多少個Mapper下載下傳資料??這個并行度是可以通過mapreduce.reduce.shuffle.parallelcopies(default5)調整。預設情況下,每個Reducer隻會有5個map端并行的下載下傳線程在從map下資料,如果一個時間段内job完成的map有100個或者更多,那麼reduce也最多隻能同時下載下傳5個map的資料,是以這個參數比較适合map很多并且完成的比較快的job的情況下調大,有利于reduce更快的擷取屬于自己部分的資料。 在Reducer記憶體和網絡都比較好的情況下,可以調大該參數;

 reduce的每一個下載下傳線程在下載下傳某個map資料的時候,有可能因為那個map中間結果所在機器發生錯誤,或者中間結果的檔案丢失,或者網絡瞬斷等等情況,這樣reduce的下載下傳就有可能失敗,是以reduce的下載下傳線程并不會無休止的等待下去,當一定時間後下載下傳仍然失敗,那麼下載下傳線程就會放棄這次下載下傳,并在随後嘗試從另外的地方下載下傳(因為這段時間map可能重跑)。reduce下載下傳線程的這個最大的下載下傳時間段是可以通過mapreduce.reduce.shuffle.read.timeout(default180000秒)調整的。如果叢集環境的網絡本身是瓶頸,那麼使用者可以通過調大這個參數來避免reduce下載下傳線程被誤判為失敗的情況。一般情況下都會調大這個參數,這是企業級最佳實戰。

2.2 MergeSort

這裡的merge和map端的merge動作類似,隻是數組中存放的是不同map端copy來的數值。Copy過來的資料會先放入記憶體緩沖區中,然後當使用記憶體達到一定量的時候才spill磁盤。這裡的緩沖區大小要比map端的更為靈活,它基于JVM的heap size設定。這個記憶體大小的控制就不像map一樣可以通過io.sort.mb來設定了,而是通過另外一個參數 mapreduce.reduce.shuffle.input.buffer.percent(default 0.7f 源碼裡面寫死了) 來設定,這個參數其實是一個百分比,意思是說,shuffile在reduce記憶體中的資料最多使用記憶體量為:0.7 × maxHeap of reduce task。JVM的heapsize的70%。記憶體到磁盤merge的啟動門限可以通過mapreduce.reduce.shuffle.merge.percent(default0.66)配置。也就是說,如果該reduce task的最大heap使用量(通常通過mapreduce.admin.reduce.child.java.opts來設定,比如設定為-Xmx1024m)的一定比例用來緩存資料。預設情況下,reduce會使用其heapsize的70%來在記憶體中緩存資料。假設 mapreduce.reduce.shuffle.input.buffer.percent 為0.7,reducetask的max heapsize為1G,那麼用來做下載下傳資料緩存的記憶體就為大概700MB左右。這700M的記憶體,跟map端一樣,也不是要等到全部寫滿才會往磁盤刷的,而是當這700M中被使用到了一定的限度(通常是一個百分比),就會開始往磁盤刷(刷磁盤前會先做sortMerge)。這個限度門檻值也是可以通過參數 mapreduce.reduce.shuffle.merge.percent(default0.66)來設定。與map 端類似,這也是溢寫的過程,這個過程中如果你設定有Combiner,也是會啟用的,然後在磁盤中生成了衆多的溢寫檔案。這種merge方式一直在運作,直到沒有map端的資料時才結束,然後啟動磁盤到磁盤的merge方式生成最終的那個檔案。

    這裡需要強調的是,merge有三種形式:1)記憶體到記憶體(memToMemMerger)2)記憶體中Merge(inMemoryMerger)3)磁盤上的Merge(onDiskMerger)具體包括兩個:(一)Copy過程中磁盤合并(二)磁盤到磁盤。

    (1)記憶體到記憶體Merge(memToMemMerger)    Hadoop定義了一種MemToMem合并,這種合并将記憶體中的map輸出合并,然後再寫入記憶體。這種合并預設關閉,可以通過mapreduce.reduce.merge.memtomem.enabled(default:false)

打開,當map輸出檔案達到mapreduce.reduce.merge.memtomem.threshold時,觸發這種合并。

    (2)記憶體中Merge(inMemoryMerger):當緩沖中資料達到配置的門檻值時,這些資料在記憶體中被合并、寫入機器磁盤。門檻值有2種配置方式:

        配置記憶體比例:前面提到reduceJVM堆記憶體的一部分用于存放來自map任務的輸入,在這基礎之上配置一個開始合并資料的比例。假設用于存放map輸出的記憶體為500M,mapreduce.reduce.shuffle.merge.percent配置為0.66,則當記憶體中的資料達到330M的時候,會觸發合并寫入。

   配置map輸出數量: 通過mapreduce.reduce.merge.inmem.threshold配置。在合并的過程中,會對被合并的檔案做全局的排序。如果作業配置了Combiner,則會運作combine函數,減少寫入磁盤的資料量。

    (3)磁盤上的Merge(onDiskMerger):

            (3.1)Copy過程中磁盤Merge:在copy過來的資料不斷寫入磁盤的過程中,一個背景線程會把這些檔案合并為更大的、有序的檔案。如果map的輸出結果進行了壓縮,則在合并過程中,需要在記憶體中解壓後才能給進行合并。這裡的合并隻是為了減少最終合并的工作量,也就是在map輸出還在拷貝時,就開始進行一部分合并工作。合并的過程一樣會進行全局排序。

            (3.2)最終磁盤中Merge:當所有map輸出都拷貝完畢之後,所有資料被最後合并成一個整體有序的檔案,作為reduce任務的輸入。這個合并過程是一輪一輪進行的,最後一輪的合并結果直接推送給reduce作為輸入,節省了磁盤操作的一個來回。最後(是以map輸出都拷貝到reduce之後)進行合并的map輸出可能來自合并後寫入磁盤的檔案,也可能來及記憶體緩沖,在最後寫入記憶體的map輸出可能沒有達到門檻值觸發合并,是以還留在記憶體中。

   每一輪合并不一定合并平均數量的檔案數,指導原則是使用整個合并過程中寫入磁盤的資料量最小,為了達到這個目的,則需要最終的一輪合并中合并盡可能多的資料,因為最後一輪的資料直接作為reduce的輸入,無需寫入磁盤再讀出。是以我們讓最終的一輪合并的檔案數達到最大,即合并因子的值,通過mapreduce.task.io.sort.factor(default:10)來配置。

如上圖:Reduce階段中一個Reduce過程 可能的合并方式為:假設現在有20個map輸出檔案,合并因子配置為5,則需要4輪的合并。最終的一輪確定合并5個檔案,其中包括2個來自前2輪的合并結果,是以原始的20個中,再留出3個給最終一輪。

2.3 Reduce函數調用(使用者自定義業務邏輯)

    1、當reduce将所有的map上對應自己partition的資料下載下傳完成後,就會開始真正的reduce計算階段。reducetask真正進入reduce函數的計算階段,由于reduce計算時肯定也是需要消耗記憶體的,而在讀取reduce需要的資料時,同樣是需要記憶體作為buffer,這個參數是控制,reducer需要多少的記憶體百分比來作為reduce讀已經sort好的資料的buffer大小??預設用多大記憶體呢??預設情況下為0,也就是說,預設情況下,reduce是全部從磁盤開始讀處理資料。可以用mapreduce.reduce.input.buffer.percent(default 0.0)(源代碼MergeManagerImpl.java:674行)來設定reduce的緩存。如果這個參數大于0,那麼就會有一定量的資料被緩存在記憶體并輸送給reduce,當reduce計算邏輯消耗記憶體很小時,可以分一部分記憶體用來緩存資料,可以提升計算的速度。是以預設情況下都是從磁盤讀取資料,如果記憶體足夠大的話,務必設定該參數讓reduce直接從緩存讀資料,這樣做就有點Spark Cache的感覺;

    2、Reduce在這個階段,架構為已分組的輸入資料中的每個 <key, (list of values)>對調用一次 reduce(WritableComparable,Iterator, OutputCollector, Reporter)方法。Reduce任務的輸出通常是通過調用 OutputCollector.collect(WritableComparable,Writable)寫入檔案系統的。Reducer的輸出是沒有排序的。

性能調優

    如果能夠根據情況對shuffle過程進行調優,對于提供MapReduce性能很有幫助。相關的參數配置列在後面的表格中。

一個通用的原則是給shuffle過程配置設定盡可能大的記憶體,當然你需要確定map和reduce有足夠的記憶體來運作業務邏輯。是以在實作Mapper和Reducer時,應該盡量減少記憶體的使用,例如避免在Map中不斷地疊加。

運作map和reduce任務的JVM,記憶體通過mapred.child.java.opts屬性來設定,盡可能設大記憶體。容器的記憶體大小通過mapreduce.map.memory.mb和mapreduce.reduce.memory.mb來設定,預設都是1024M。

map優化

    在map端,避免寫入多個spill檔案可能達到最好的性能,一個spill檔案是最好的。通過估計map的輸出大小,設定合理的mapreduce.task.io.sort.*屬性,使得spill檔案數量最小。例如盡可能調大mapreduce.task.io.sort.mb。

map端相關的屬性如下表:

reduce優化

    在reduce端,如果能夠讓所有資料都儲存在記憶體中,可以達到最佳的性能。通常情況下,記憶體都保留給reduce函數,但是如果reduce函數對記憶體需求不是很高,将mapreduce.reduce.merge.inmem.threshold(觸發合并的map輸出檔案數)設為0,mapreduce.reduce.input.buffer.percent(用于儲存map輸出檔案的堆記憶體比例)設為1.0,可以達到很好的性能提升。在2008年的TB級别資料排序性能測試中,Hadoop就是通過将reduce的中間資料都儲存在記憶體中勝利的。

reduce端相關屬性:

通用優化

Hadoop預設使用4KB作為緩沖,這個算是很小的,可以通過io.file.buffer.size來調高緩沖池大小。

MapReduce過程詳解及其性能優化