mapreduce優化總結
2016-03-03 23:21
Mandylover
閱讀(7357)
評論(0)
編輯
收藏
舉報
叢集的優化
1、合理配置設定map和reduce任務的數量(單個節點上map任務、reduce任務的最大數量)
2、其他配置
io.file.buffer.size
hadoop通路檔案的IO操作都需要通過代碼庫。是以,在很多情況下,io.file.buffer.size都被用來設定緩存的大小不論是對硬碟或者是網絡操作來講,較大的緩存都可以提供更高的資料傳輸,但這也就意味着更大的記憶體消耗和延遲
這個參數要設定為系統頁面大小的倍數,以byte為機關,預設值是4KB,一般情況下,可以設定為64KB(65536byte)
dfs.balance.bandwidthPerSec
HDFS平衡器檢測叢集中使用過度或者使用不足的DataNode,并在這些DataNode之間移動資料塊來保證負載均衡,如果不對平衡操作進行帶寬限制,那麼它會很快就會搶占所有的網絡資源,不會為Mapreduce作業或者資料輸入預留資源
參數dfs.balance.bandwidthPerSec定義了每個DataNode平衡操作所允許的最大使用帶寬,這個值的機關是byte,這是很不直覺的,因為網絡帶寬一般都是用bit來描述的。是以,在設定的時候,要先計算好。DataNode使用這個參數來控制網絡帶寬的使用,但不幸的是,這個參數在守護程序啟動的時候就讀入,導緻管理者沒辦法在平衡運作時來修改這個值
dfs.block.size 預設128M
dfs.block.size的機關是byte,預設值是67108864 (64MB)。對于很多情況來說,134217728 (128MB)更加合适
對于一個Mapreduce作業(尤其是用子類FileInputFormat定義輸入格式的作業),對檔案的每個資料塊會啟用一個map任務來處理
這就意味這資料塊的大小顯著地影響Mapreduce作業的效率
dfs.datanode.du.reserved 保留白間 用于mapreduce使用 預設為0
當DataNode向NameNode彙報可用的硬碟大小的時候,它會把所有dfs.data.dir所列出的可用的硬碟大小總和發給NameNode
由于mapred.local.dir經常會跟DataNode共享可用的硬碟資源,因為我們需要為Mapreduce任務保留一些硬碟資源。dfs.datanode.du.reserved
定義了每個dfs.data.dir所定義的硬碟空間需要保留的大小,以byte為機關。預設情況下,該值為0.也就意味着HDFS可以使用每個資料硬碟的所有空間
節點硬碟資源耗盡時就會進入讀模式。是以,建議每個硬碟都為map任務保留最少10GB的空間,如果每個Mapreduce作業都會産生大量的中間結果
或者每個硬碟空間都比較大(超過2TB),那麼建議相應的增大保留的硬碟空間
dfs.namenode.handler.count
NameNode有一個工作線程池用來處理用戶端的遠端過程調用及叢集守護程序的調用。處理程式數量越多意味着要更大的池來處理來自不同DataNode的并發心跳以及用戶端并發的中繼資料操作。對于大叢集或者有大量用戶端的叢集來說,通常需要增大參數dfs.namenode.handler.count的預設值10。設定該值的一般原則是将其設定為叢集大小的自然對數乘以20,即20logN,N為叢集大小
如果該值設的太小,明顯的狀況就是DataNode在連接配接NameNode的時候總是逾時或者連接配接被拒絕,但NameNode的遠端過程調用隊列很大時遠端過程調用延時就會加大。症狀之間是互相影響的,很難說修改dfs.namenode.handler.count就能解決問題,但是在查找故障時,檢查一下該值的設定是必要的
dfs.datanode.failed.volumes.tolerated
當DataNode的任何一個本地磁盤出故障時,它的預設行為認定整個DataNode失效。在一個中到大型的叢集中,硬碟故障是相當常見的,是以這種行為不是最優的。一個 DataNode的丢失會導緻一些資料塊備份數下降,是以,NameNode會指令其他DataNode複制這些丢失的資料塊來增加被附屬。參數dfs.datanode.failed.volumes.tolerated 定義整個DataNode聲明失敗前允許多少個硬碟出現故障
很多人會問,為什麼不能容忍所有磁盤失效的情況,這樣就可以把整個DataNode的失效推遲到沒有任何可工作的硬碟為止
對于一個永久的時間視窗來說,這看上去很合理的,但是實際上,對于是以的磁盤來說,管理者對于磁盤故障的處理都會早于由于正常磨損而出現的故障。隻有一種情況例外,是以的硬碟在一個極短的時間内全部出現故障,這種異常情況需要立即調查。在實踐中快速的磁盤故障通常意味着驅動控制器或者某些部件故障。正因為罕見,但如果磁盤在短時間内開始出現一連串故障,最好的辦法就是立即隔離。先把整個叢集設定為不可用,直到找到失敗的原因為止。參數dfs.datanode.failed.volumes.tolerated預設值為0,也就意味着隻要有一個磁盤出現故障就會導緻整個DataNode不可用,管理者可以增大該值來保證在出現部分磁盤故障時,DataNode仍能持續運作,但是需要保持謹慎的是,在極短的時間範圍内出現一個或者兩個磁盤故障表明一個更大的問題存在
fs.trash.interval
使用者經常會意外删除檔案。HDFS支援資源回收筒功能,這類似于大多數作業系統的資源回收筒,當這個功能被啟用檔案被移到使用者的HDFS主目錄中一個名為.Trash目錄中,來保留被删除的檔案,而不是立即徹底删除。fs.trash.interval定義.Trash目錄下檔案被永久删除前保留的時間。在檔案被從HDFS永久删除前,使用者可以自由地把檔案從該目錄下移出來并立即還原。預設值是0說明垃圾資源回收筒功能是關閉的。要清楚,資源回收筒功能不是萬能的,推遲删除意味着要檔案所占據的空間仍不可用,除非它被永久删除。使用者可以通過運作hadoop fs -expunge指令。或者幹脆等待指定的時間來明确資源回收筒清空。可以在hadoop fs -rm指令通過指定-skipTrash參數來跳過資源回收筒進而立即删除檔案
3、map的優化
io.sort.mb 預設為100M
map任務産生中間資料預設會先放到記憶體中 當大小達到上面設定的值會進行spill根據需要調整改參數 适當增大 減少spill的次數
io.sort.spill.percent 一般不做調整
spill時占比 預設為0.8
io.sort.factor 預設為10
在map任務計算完成後會有一個merge操作 預設為10 即最多10個spill檔案合并,如果spill大于10個會分多次merge,合理的設定該參數可以減少merge次數
min.num.spill.for.combine(default 3)
當job中設定了combiner,并且spill數最少有3個的時候,那麼combiner函數就會在merge産生結果檔案之前運作
如果spill過多的時候 合适的設定該值 可以減少spill前的資料,減少合并時的資料
mapred.compress.map.output(default false)
也就是說map的中間,無論是spill的時候,還是最後merge産生的結果檔案,都是可以壓縮的
壓縮的好處在于,通過壓縮減少寫入讀出磁盤的資料量,中間結果非常大,磁盤速度成為map執行瓶頸的job,尤其有用
該參數采用的是cpu換io(解壓縮) ,一般來說都會是磁盤io産生瓶頸 ,是以通常可以設定為true
mapred.map.output.compression.codec( default org.apache.hadoop.io.compress.DefaultCodec)
采用map中間結果壓縮的情況下,使用者還可以選擇壓縮時采用哪種壓縮格式進行壓縮
現在hadoop支援的壓縮格式有:GzipCodec,LzoCodec,BZip2Codec,LzmaCodec等壓縮格式
通常來說,想要達到比較平衡的cpu和磁盤壓縮比,LzoCodec比較适合。但也要取決于job的具體情況
4、reduce的優化
reduce 分成三個階段的 分别為copy->sort->reduce
由于job的每一個map都會根據reduce(n)數将資料分成map 輸出結果分成n個partition,是以map的中間結果中是有可能包含每一個reduce需要處理的部分資料的,是以,為了優化reduce的執行時間,hadoop中是等job的第一個map結束後,所有的reduce就開始嘗試從完成的map中下載下傳該reduce對應的partition部分資料,這個過程就是通常所說的shuffle,也就是copy過程
mapred.reduce.parallel.copies(default 5) 每個reduce并行下載下傳map結果的最大線程數
每個隻會有5個并行的下載下傳線程在從map下資料,如果一個時間段内job完成的map有100個或者更多,那麼reduce也最多隻能同時下載下傳5個map的資料
是以這個參數比較适合map很多并且完成的比較快的job的情況下調大,有利于reduce更快的擷取屬于自己部分的資料
mapred.reduce.copy.backoff(default 300秒) reduce下載下傳線程最大等待時間(秒)
reduce的每一個下載下傳線程在下載下傳某個map資料的時候,有可能因為那個map中間結果所在機器發生錯誤或者中間結果的檔案丢失,或者網絡瞬斷等等情況,這樣reduce的下載下傳就有可能失敗,是以reduce的下載下傳線程并不會無休止的等待下去,當一定時間後下載下傳仍然失敗,那麼下載下傳線程就會放棄這次下載下傳,并在随後嘗試從另外的地方下載下傳,如果叢集環境的網絡本身是瓶頸,那麼使用者可以通過調大這個參數來避免reduce下載下傳線程被誤判為失敗的情況。不過在網絡環境比較好的情況下,沒有必要調整。通常來說專業的叢集網絡不應該有太大問題,是以這個參數需要調整的情況不多
io.sort.factor
Reduce将map結果下載下傳到本地時,同樣也是需要進行merge的,是以io.sort.factor的配置選項同樣會影響reduce
進行merge時的行為,當發現reduce在shuffle階段iowait非常的高的時候,就有可能通過調大這個參數來加大一次merge時
的并發吞吐,優化reduce效率
mapred.job.shuffle.input.buffer.percent(default 0.7) 用來緩存shuffle資料的reduce task heap百分比
Reduce在shuffle階段對下載下傳來的map資料,并不是立刻就寫入磁盤的,而是會先緩存在記憶體中,然後當使用記憶體達到一定量的時候才刷入磁盤,這個記憶體大小的控制就不像map一樣可以通過io.sort.mb來設定了,而是通過另外一個參數來設定,也就是說,如這個參數其實是一個百分比,意思是說,shuffile在reduce記憶體中的資料最多使用記憶體量為:0.7 × maxHeap of reduce task果該reduce task的最大heap使用量(通常通過mapred.child.java.opts來設定,比如設定為-Xmx1024m)的一定比例用來緩存資料,如果reduce的heap由于業務原因調整的比較大,相應的緩存大小也會變大,這也是為什麼reduce用來做緩存的參數是一個百分比,而不是一個固定的值了
mapred.job.shuffle.merge.percent(default 0.66) 緩存的記憶體中多少百分比後開始做spill操作
如果下載下傳速度很快,很容易就把記憶體緩存撐大,那麼調整一下這個參數有可能會對reduce的性能有所幫助
mapred.job.reduce.input.buffer.percent(default 0.0) sort完成後reduce計算階段用來緩解資料的百分比
當reduce将所有的map上對應自己partition的資料下載下傳完成後,就會開始真正的reduce計算階段(中間有個sort階段通常時間非常短,幾秒鐘就完成了,因為整個下載下傳階段就已經是邊下載下傳邊sort,然後邊merge的)當reduce task真正進入reduce函數的計算階段的時候,有一個參數也是可以調整reduce的計算行為,由于reduce計算時肯定也是需要消耗記憶體的,而在讀取reduce需要的資料時,同樣是需要記憶體作為buffer,這個參數是控制,需要多少的記憶體百分比來作為reduce讀已經sort好的資料的buffer百分比,預設情況下為0也就是說,預設情況下,reduce是全部從磁盤開始讀處理資料,如果這個參數大于0,那麼就會有一定量的資料,被緩存在記憶體并輸送給reduce,當reduce計算邏輯消耗記憶體很小時,可以分一部分記憶體用來緩存資料,反正reduce的記憶體閑着也是閑着
mapred.reduce.slowstart.completed.maps(map完成多少百分比時,開始shuffle)
當map運作慢,reduce運作很快時,如果不設定mapred.reduce.slowstart.completed.maps會使job的shuffle時間變的很長,map運作完很早就開始了reduce,導緻reduce的slot一直處于被占用狀态,這個值是和“運作完的map數除以總map數”做判斷的,當後者大于等于設定的值時,開始reduce的shuffle。是以當map比reduce的執行時間多很多時,可以調整這個值(0.75,0.80,0.85及以上)
下面從map流程裡描述一下map中各個參數的作用: 沒有reduce
1、當map task開始運算,并産生中間資料時,其産生的中間結果并非直接就簡單的寫入磁盤。這中間的過程比較複雜,并且利用到了
記憶體buffer來進行已經産生的部分結果的緩存,并在記憶體buffer中進行一些預排序來優化整個map的性能。每一個map都會對應存
在一個記憶體buffer(MapOutputBuffer),map會将已經産生的部分結果先寫入到該buffer中,這個buffer預設是100MB大小,但
是這個大小是可以根據job送出時的參數設定來調整的,該參數即為:io.sort.mb。當map的産生資料非常大時,并且把io.sort.mb
調大,那麼map在整個計算過程中spill的次數就勢必會降低,map task對磁盤的操作就會變少,如果map tasks的瓶頸在磁盤上,
這樣調整就會大大提高map的計算性能
2、map在運作過程中,不停的向該buffer中寫入已有的計算結果,但是該buffer并不一定能将全部的map輸出緩存下來,當map輸出
超出一定門檻值(比如100M),那麼map就必須将該buffer中的資料寫入到磁盤中去,這個過程在mapreduce中叫做spill。map并
不是要等到将該buffer全部寫滿時才進行spill,因為如果全部寫滿了再去寫spill,勢必會造成map的計算部分等待buffer釋放空間的
情況。是以,map其實是當buffer被寫滿到一定程度(比如80%)時,就開始進行spill。這個門檻值也是由一個job的配置參數來控
制,即io.sort.spill.percent,預設為0.80或80%。這個參數同樣也是影響spill頻繁程度,進而影響map task運作周期對磁盤的讀寫
頻率的。但非特殊情況下,通常不需要人為的調整。調整io.sort.mb對使用者來說更加友善。
3、當map task的計算部分全部完成後,如果map有輸出,就會生成一個或者多個spill檔案,這些檔案就是map的輸出結果。map在正
常退出之前,需要将這些spill合并(merge)成一個,是以map在結束之前還有一個merge的過程。merge的過程中,有一個參數
可以調整這個過程的行為,該參數為:io.sort.factor。該參數預設為10。它表示當merge spill檔案時,最多能有多少并行的stream
向merge檔案中寫入。比如如果map産生的資料非常的大,産生的spill檔案大于10,而io.sort.factor使用的是預設的10,那麼當
map計算完成做merge時,就沒有辦法一次将所有的spill檔案merge成一個,而是會分多次,每次最多10個stream。這也就是說,
當map的中間結果非常大,調大io.sort.factor,有利于減少merge次數,進而減少map對磁盤的讀寫頻率,有可能達到優化作業的
目的
4、當job指定了combiner的時候,我們都知道map介紹後會在map端根據combiner定義的函數将map結果進行合并。運作combiner
函數的時機有可能會是merge完成之前,或者之後,這個時機可以由一個參數控制,即min.num.spill.for.combine(default 3),
當job中設定了combiner,并且spill數最少有3個的時候,那麼combiner函數就會在merge産生結果檔案之前運作。通過這樣的方
式,就可以在spill非常多需要merge,并且很多資料需要做conbine的時候,減少寫入到磁盤檔案的資料數量,同樣是為了減少對磁
盤的讀寫頻率,有可能達到優化作業的目的
5、減少中間結果讀寫進出磁盤的方法不止這些,還有就是壓縮。也就是說map的中間,無論是spill的時候,還是最後merge産生的結
果檔案,都是可以壓縮的。壓縮的好處在于,通過壓縮減少寫入讀出磁盤的資料量。對中間結果非常大,磁盤速度成為map執行瓶
頸的job,尤其有用。控制map中間結果是否使用壓縮的參數為:mapred.compress.map.output(true/false)。将這個參數設定為
true時,那麼map在寫中間結果時,就會将資料壓縮後再寫入磁盤,讀結果時也會采用先解壓後讀取資料。這樣做的後果就是:寫
入磁盤的中間結果資料量會變少,但是cpu會消耗一些用來壓縮和解壓。是以這種方式通常适合job中間結果非常大,瓶頸不在
cpu,而是在磁盤的讀寫的情況。說的直白一些就是用cpu換IO。根據觀察,通常大部分的作業cpu都不是瓶頸,除非運算邏輯異常
複雜。是以對中間結果采用壓縮通常來說是有收益的
6、當采用map中間結果壓縮的情況下,使用者還可以選擇壓縮時采用哪種壓縮格式進行壓縮,現在hadoop支援的壓縮格式有:
GzipCodec,LzoCodec,BZip2Codec,LzmaCodec等壓縮格式。通常來說,想要達到比較平衡的cpu和磁盤壓縮比,LzoCodec
比較适合。但也要取決于job的具體情況。使用者若想要自行選擇中間結果的壓縮算法,可以設定配置參數:
mapred.map.output.compression.codec=org.apache.hadoop.io.compress.DefaultCodec或者其他使用者自行選擇的壓縮方式。
- 分類 Hadoop
