天天看點

hadoop job調優

1 Map side tuning參數

1.1 MapTask運作内部原理

hadoop job調優

當map task開始運算,并産生中間資料時,其産生的中間結果并非直接就簡單的寫入磁盤。這中間的過程比較複雜,并且利用到了記憶體buffer來進行已經産生的部分結果的緩存,并在記憶體buffer中進行一些預排序來優化整個map的性能。如上圖所示,每一個map都會對應存在一個記憶體buffer(MapOutputBuffer,即上圖的buffer in memory),map會将已經産生的部分結果先寫入到該buffer中,這個buffer預設是100MB大小,但是這個大小是可以根據job送出時的參數設定來調整的,該參數即為:io.sort.mb。當map的産生資料非常大時,并且把io.sort.mb調大,那麼map在整個計算過程中spill的次數就勢必會降低,map task對磁盤的操作就會變少,如果map tasks的瓶頸在磁盤上,這樣調整就會大大提高map的計算性能。map做sort和spill的記憶體結構如下如所示:

hadoop job調優

map在運作過程中,不停的向該buffer中寫入已有的計算結果,但是該buffer并不一定能将全部的map輸出緩存下來,當map輸出超出一定門檻值(比如100M),那麼map就必須将該buffer中的資料寫入到磁盤中去,這個過程在mapreduce中叫做spill。map并不是要等到将該buffer全部寫滿時才進行spill,因為如果全部寫滿了再去寫spill,勢必會造成map的計算部分等待buffer釋放空間的情況。是以,map其實是當buffer被寫滿到一定程度(比如80%)時,就開始進行spill。這個門檻值也是由一個job的配置參數來控制,即io.sort.spill.percent,預設為0.80或80%。這個參數同樣也是影響spill頻繁程度,進而影響map task運作周期對磁盤的讀寫頻率的。但非特殊情況下,通常不需要人為的調整。調整io.sort.mb對使用者來說更加友善。

當map task的計算部分全部完成後,如果map有輸出,就會生成一個或者多個spill檔案,這些檔案就是map的輸出結果。map在正常退出之前,需要将這些spill合并(merge)成一個,是以map在結束之前還有一個merge的過程。merge的過程中,有一個參數可以調整這個過程的行為,該參數為:io.sort.factor。該參數預設為10。它表示當merge spill檔案時,最多能有多少并行的stream向merge檔案中寫入。比如如果map産生的資料非常的大,産生的spill檔案大于10,而io.sort.factor使用的是預設的10,那麼當map計算完成做merge時,就沒有辦法一次将所有的spill檔案merge成一個,而是會分多次,每次最多10個stream。這也就是說,當map的中間結果非常大,調大io.sort.factor,有利于減少merge次數,進而減少map對磁盤的讀寫頻率,有可能達到優化作業的目的。

當job指定了combiner的時候,我們都知道map介紹後會在map端根據combiner定義的函數将map結果進行合并。運作combiner函數的時機有可能會是merge完成之前,或者之後,這個時機可以由一個參數控制,即min.num.spill.for.combine(default 3),當job中設定了combiner,并且spill數最少有3個的時候,那麼combiner函數就會在merge産生結果檔案之前運作。通過這樣的方式,就可以在spill非常多需要merge,并且很多資料需要做conbine的時候,減少寫入到磁盤檔案的資料數量,同樣是為了減少對磁盤的讀寫頻率,有可能達到優化作業的目的。

減少中間結果讀寫進出磁盤的方法不止這些,還有就是壓縮。也就是說map的中間,無論是spill的時候,還是最後merge産生的結果檔案,都是可以壓縮的。壓縮的好處在于,通過壓縮減少寫入讀出磁盤的資料量。對中間結果非常大,磁盤速度成為map執行瓶頸的job,尤其有用。控制map中間結果是否使用壓縮的參數為:mapred.compress.map.output(true/false)。将這個參數設定為true時,那麼map在寫中間結果時,就會将資料壓縮後再寫入磁盤,讀結果時也會采用先解壓後讀取資料。這樣做的後果就是:寫入磁盤的中間結果資料量會變少,但是cpu會消耗一些用來壓縮和解壓。是以這種方式通常适合job中間結果非常大,瓶頸不在cpu,而是在磁盤的讀寫的情況。說的直白一些就是用cpu換IO。根據觀察,通常大部分的作業cpu都不是瓶頸,除非運算邏輯異常複雜。是以對中間結果采用壓縮通常來說是有收益的。以下是一個wordcount中間結果采用壓縮和不采用壓縮産生的map中間結果本地磁盤讀寫的資料量對比:

map中間結果不壓縮:

hadoop job調優

map中間結果壓縮:

hadoop job調優

可以看出,同樣的job,同樣的資料,在采用壓縮的情況下,map中間結果能縮小将近10倍,如果map的瓶頸在磁盤,那麼job的性能提升将會非常可觀。

當采用map中間結果壓縮的情況下,使用者還可以選擇壓縮時采用哪種壓縮格式進行壓縮,現在hadoop支援的壓縮格式有:GzipCodec,LzoCodec,BZip2Codec,LzmaCodec等壓縮格式。通常來說,想要達到比較平衡的cpu和磁盤壓縮比,LzoCodec比較适合。但也要取決于job的具體情況。使用者若想要自行選擇中間結果的壓縮算法,可以設定配置參數:mapred.map.output.compression.codec=org.apache.hadoop.io.compress.DefaultCodec或者其他使用者自行選擇的壓縮方式。

1.2 Map side相關參數調優

hadoop job調優

2 Reduce side tuning參數

2.1 ReduceTask運作内部原理

hadoop job調優

reduce的運作是分成三個階段的。分别為copy->sort->reduce。由于job的每一個map都會根據reduce(n)數将資料分成map 輸出結果分成n個partition,是以map的中間結果中是有可能包含每一個reduce需要處理的部分資料的。是以,為了優化reduce的執行時間,hadoop中是等job的第一個map結束後,所有的reduce就開始嘗試從完成的map中下載下傳該reduce對應的partition部分資料。這個過程就是通常所說的shuffle,也就是copy過程。

Reduce task在做shuffle時,實際上就是從不同的已經完成的map上去下載下傳屬于自己這個reduce的部分資料,由于map通常有許多個,是以對一個reduce來說,下載下傳也可以是并行的從多個map下載下傳,這個并行度是可以調整的,調整參數為:mapred.reduce.parallel.copies(default 5)。預設情況下,每個隻會有5個并行的下載下傳線程在從map下資料,如果一個時間段内job完成的map有100個或者更多,那麼reduce也最多隻能同時下載下傳5個map的資料,是以這個參數比較适合map很多并且完成的比較快的job的情況下調大,有利于reduce更快的擷取屬于自己部分的資料。

reduce的每一個下載下傳線程在下載下傳某個map資料的時候,有可能因為那個map中間結果所在機器發生錯誤,或者中間結果的檔案丢失,或者網絡瞬斷等等情況,這樣reduce的下載下傳就有可能失敗,是以reduce的下載下傳線程并不會無休止的等待下去,當一定時間後下載下傳仍然失敗,那麼下載下傳線程就會放棄這次下載下傳,并在随後嘗試從另外的地方下載下傳(因為這段時間map可能重跑)。是以reduce下載下傳線程的這個最大的下載下傳時間段是可以調整的,調整參數為:mapred.reduce.copy.backoff(default 300秒)。如果叢集環境的網絡本身是瓶頸,那麼使用者可以通過調大這個參數來避免reduce下載下傳線程被誤判為失敗的情況。不過在網絡環境比較好的情況下,沒有必要調整。通常來說專業的叢集網絡不應該有太大問題,是以這個參數需要調整的情況不多。

Reduce将map結果下載下傳到本地時,同樣也是需要進行merge的,是以io.sort.factor的配置選項同樣會影響reduce進行merge時的行為,該參數的詳細介紹上文已經提到,當發現reduce在shuffle階段iowait非常的高的時候,就有可能通過調大這個參數來加大一次merge時的并發吞吐,優化reduce效率。

Reduce在shuffle階段對下載下傳來的map資料,并不是立刻就寫入磁盤的,而是會先緩存在記憶體中,然後當使用記憶體達到一定量的時候才刷入磁盤。這個記憶體大小的控制就不像map一樣可以通過io.sort.mb來設定了,而是通過另外一個參數來設定:mapred.job.shuffle.input.buffer.percent(default 0.7),這個參數其實是一個百分比,意思是說,shuffile在reduce記憶體中的資料最多使用記憶體量為:0.7 × maxHeap of reduce task。也就是說,如果該reduce task的最大heap使用量(通常通過mapred.child.java.opts來設定,比如設定為-Xmx1024m)的一定比例用來緩存資料。預設情況下,reduce會使用其heapsize的70%來在記憶體中緩存資料。如果reduce的heap由于業務原因調整的比較大,相應的緩存大小也會變大,這也是為什麼reduce用來做緩存的參數是一個百分比,而不是一個固定的值了。

假設mapred.job.shuffle.input.buffer.percent為0.7,reduce task的max heapsize為1G,那麼用來做下載下傳資料緩存的記憶體就為大概700MB左右,這700M的記憶體,跟map端一樣,也不是要等到全部寫滿才會往磁盤刷的,而是當這700M中被使用到了一定的限度(通常是一個百分比),就會開始往磁盤刷。這個限度門檻值也是可以通過job參數來設定的,設定參數為:mapred.job.shuffle.merge.percent(default 0.66)。如果下載下傳速度很快,很容易就把記憶體緩存撐大,那麼調整一下這個參數有可能會對reduce的性能有所幫助。

當reduce将所有的map上對應自己partition的資料下載下傳完成後,就會開始真正的reduce計算階段(中間有個sort階段通常時間非常短,幾秒鐘就完成了,因為整個下載下傳階段就已經是邊下載下傳邊sort,然後邊merge的)。當reduce task真正進入reduce函數的計算階段的時候,有一個參數也是可以調整reduce的計算行為。也就是:mapred.job.reduce.input.buffer.percent(default 0.0)。由于reduce計算時肯定也是需要消耗記憶體的,而在讀取reduce需要的資料時,同樣是需要記憶體作為buffer,這個參數是控制,需要多少的記憶體百分比來作為reduce讀已經sort好的資料的buffer百分比。預設情況下為0,也就是說,預設情況下,reduce是全部從磁盤開始讀處理資料。如果這個參數大于0,那麼就會有一定量的資料被緩存在記憶體并輸送給reduce,當reduce計算邏輯消耗記憶體很小時,可以分一部分記憶體用來緩存資料,反正reduce的記憶體閑着也是閑着。

2.2 Reduce side相關參數調優

hadoop job調優

本文轉載自:http://www.tbdata.org/archives/1470

此文章是淘寶的資料團隊部落格,下面再說些其他方面的優化

hadoop優化相關: 1:對作業系統進行參數調優 (1):打開檔案描述符和網絡連接配接參數上限(具體操作内容:使用ulimit指令講允許同時打開的檔案描述符資料上限增大至一個合适的值,同時調整核心參數net.core.somaxconn) (2):關閉swap分區(具體操作内容是/etc/stsctl.conf中得vm.vm.swappiness參數) (3):設定合理的預讀取緩沖區大小(具體操作内容:使用linux指令blockdev設定預讀取緩沖區的大小) (4):檔案系統的選擇和配置 2:JVM參數的優化 3:通過hadoop的參數進行調優 (1):設定合理的槽位數目(具體配置 mapred.tasktracker.map.tasks.maximum | mapred.tasktracker.reduce.tasks.maximum |          mapreduce.tasktracker.map.tasks.maximum | mapreduce.tasktracker.reduce.tasks.maximum) (2):調整心跳間隔,對于300台以下的叢集 可以把心跳設定成300毫秒(預設是3秒),mapreduce.jobtracker.hearbeat.interval.min | mapred.hearbeats.in.second | mapreduce.jobtracker.heartbeats.scaling.factor (3):啟用外心跳,為了減少任務配置設定延遲(比如我們的任務心跳設定為10秒鐘,當有一個任務挂掉了之後,他就不能馬上通知jobtracker), 是以hadoop引入了外心跳,外心跳是任務運作結束或者任務運作失敗的時候觸發的,能夠在出現空閑資源時第一時間通知jobtracker,以便他能夠迅速為空閑資源配置設定新的任務 外心跳的配置參數是 mapreduce.tasktracker.outofband.hearbeat      (4):磁盤快的配置. map task會把中間結果放到本地磁盤中,是以對于I/O密集的任務來說這部分資料會對本地磁盤造成很大的壓力,我們可以配置多塊可用磁盤,hadoop将采用輪訓的方式将不同的maptask的中間結果寫到磁盤上                               maptask中間結果的配置參數是mapred.local.dir | mapreduce.cluster.local.dir      (5):配置RPC Handler的數量,jobracker需要冰法處理來自各個tasktracker的RPC請求,我們可以根據叢集規模和伺服器并發處理的情況調整RPC Handler的數目,以使jobtracker的服務能力最佳       配置參數是 mapred.job.tracker.handler.count | mapreduce.jobtracker.handler.count   (預設是10)      (6):配置HTTP線程數.   在shuffle階段,reduce task 通過http請求從各個tasktracker上讀取map task中間結果,而每個tasktracker通過jetty server處理這些http請求,是以可以适當配置調整jetty server的工作線程數       配置參數是 tasktracker.http.thread | mapreduce.tasktracker.http.threads   (預設是40)      (7):如果我們在運作作業的過程中發現某些機器被頻繁地添加到黑名單裡面,我們可以把此功能關閉      (8):使用合理排程器      (9):使用合适的壓縮算法,在hadoop裡面支援的壓縮格式是: gzip,zip,bzip2,LZO,Snappy,LZO和Snappy的呀搜比和壓縮效率都很優秀,Snappy是谷歌的開源資料壓縮哭,他已經内置在hadoop1.0之後的版本,LZO得自己去編譯      (10):開啟預讀機制. 預讀機制可以有效提高磁盤I/O的讀性能,目前标準版的apache hadoop不支援此功能,但是在cdh中是支援的       配置參數是: mapred.tasktracker.shuffle.fadvise=true (是否啟用shuffle預讀取機制)                      mapred.tasktracker.shuffle.readahead.bytes=4MB (shuffle預讀取緩沖區大小)                      mapreduce.ifile.readahead = true (是否啟用ifile預讀取機制)                      mapreduce.ifile.readahead.bytes = 4MB (IFile預讀取緩沖區大小)      (11):啟用推測執行機制      (12):map task調優: 合理調整io.sort.record.percent值,可減少中間檔案資料,提高任務執行效率.       (map task的輸出結果将被暫時存放到一個環形緩沖區中,這個緩沖區的大小由參數"io.sort.mb"指定,機關MB,預設是100MB,       該緩沖區主要由兩部分組成,索引和實際資料,預設情況下,索引占整個buffer的比例為io.sort.record.percent,預設是5%,       剩餘空間存放資料,僅當滿足以下任意一個條件時才會觸發一次flush,生成一個臨時檔案,索引或者資料空間使用率達到比例為       io.sort.spill.percent的80%)         是以具體調優參數如下:   io.sort.mb | io.sort.record.percent | io.sort.spill.percent                       (13):reduce task調優    reduce task會啟動多個拷貝線程從每個map task上讀取相應的中間結果,參數是"mapred.reduce.parallel.copies"(預設是5)       原理是這樣的-->對于每個待拷貝的檔案,如果檔案小于一定的閥值A,則将其放入到記憶體中,否則已檔案的形式存放到磁盤上,       如果記憶體中檔案滿足一定條件D,則會将這些資料寫入磁盤中,而當磁盤上檔案數目達到io.sort.factor(預設是10)時,       是以如果中間結果非常大,可以适當地調節這個參數的值         (14):跳過壞記錄 看具體參數說明,=号後面是預設值       mapred.skip.attempts.to.start.skipping=2 當任務失敗次數達到該值時,才會進入到skip mode,即啟用跳過壞記錄gongnneg       mapred.skip.map.max,skip.records=0 使用者可通過該參數設定最多運作跳過的記錄數目       mapred.skip.reduce.max.skip.groups=0 使用者可通過設定該參數設定Reduce Task最多允許跳過的記錄數目       mapred.skip.out.dir =${mapred.output.dir}/logs/ 檢測出得壞記錄存放到目錄裡面(一般為HDFS路徑),hadoop将壞記錄儲存起來以便于使用者調試和跟蹤                 (14):使用JVM重用 : mapred.job.reuse.jvm.aum.tasks | mapreduce.job.jvm.num.tasks = -1                          4:從使用者角度來優化          (1)設定combiner.   在應用中盡量使用combiner可以有效地提高效率          (2)選擇合适的writable           (3)設定合理的reduce數          (4)合理使用DistributedCache(建議如果需要一個外部檔案引入的時候,事先把他上傳到hdfs上,這樣效率高,因為這樣節省了用戶端上傳檔案的時間,并且還隐含地告訴DistributedCache,             請将檔案下載下傳到各節點的public共享目錄下)      (5)合理控制Reduce Task的啟動時機 ,因為在執行job的時候,reduce task晚于map task啟動,是以合理控制reduce task啟動時機不僅可以加快作業的運作速度       還可以提高資源使用率,如果reduce task啟動過早,則可能由于reduce task長時間占用reduce slot資源造成slot hoarding現象,而且還會降低資源使用率       反之則導緻reduce task擷取資源延遲,增加了作業的運作時間.hadoop配置reduce task啟動時機的參數是       mapred.reduce.slowstart.completed.maps | mapreduce.job.reduce.slowstart.completed.maps   (預設值是0.05,也就是map task完成數目達到5%時,開始啟動reduce task)

[Hive]從一個經典案例看優化mapred.map.tasks的重要性

我所在公司所使用的生産Hive環境的幾個參數配置如下:

dfs.block.size=268435456

hive.merge.mapredfiles=true

hive.merge.mapfiles=true

hive.merge.size.per.task=256000000

mapred.map.tasks=2 

因為合并小檔案預設為true,而dfs.block.size與hive.merge.size.per.task的搭配使得合并後的絕大部分檔案都在300MB左右。

CASE 1:

現在我們假設有3個300MB大小的檔案,那麼goalsize = min(900MB/2,256MB) = 256MB (具體如何計算map數請參見http://blog.sina.com.cn/s/blog_6ff05a2c010178qd.html)

是以整個JOB會有6個map,其中3個map分别處理256MB的資料,還有3個map分别處理44MB的資料。

這時候木桶效應就來了,整個JOB的map階段的執行時間不是看最短的1個map的執行時間,而是看最長的1個map的執行時間。是以,雖然有3個map分别隻處理44MB的資料,可以很快跑完,但它們還是要等待另外3個處理256MB的map。顯然,處理256MB的3個map拖了整個JOB的後腿。

CASE 2:

如果我們把mapred.map.tasks設定成6,再來看一下有什麼變化:

goalsize = min(900MB/6,256MB) = 150MB

整個JOB同樣會配置設定6個map來處理,每個map處理150MB的資料,非常均勻,誰都不會拖後腿,最合理地配置設定了資源,執行時間大約為CASE 1的59%(150/256) 

案例分析:

雖然mapred.map.tasks從2調整到了6,但是CASE 2并沒有比CASE 1多用map資源,同樣都是使用6個map。而CASE 2的執行時間約為CASE 1執行時間的59%。

從這個案例可以看出,對mapred.map.tasks進行自動化的優化設定其實是可以很明顯地提高作業執行效率的。

繼續閱讀