當Map 開始産生輸出時,它并不是簡單的把資料寫到磁盤,因為頻繁的磁盤操作會導緻性能嚴重下降。它的處理過程更複雜,資料首先是寫到記憶體中的一個緩沖區,并做了一些預排序,以提升效率。
每個Map 任務都有一個用來寫入輸出資料的循環記憶體緩沖區。這個緩沖區預設大小是100MB,可以通過io.sort.mb 屬性來設定具體大小。當緩沖區中的資料量達到一個特定閥值(io.sort.mb * io.sort.spill.percent,其中io.sort.spill.percent 預設是0.80)時,系統将會啟動一個背景線程把緩沖區中的内容spill 到磁盤。在spill 過程中,Map 的輸出将會繼續寫入到緩沖區,但如果緩沖區已滿,Map 就會被阻塞直到spill 完成。spill 線程在把緩沖區的資料寫到磁盤前,會對它進行一個二次快速排序,首先根據資料所屬的partition 排序,然後每個partition 中再按Key 排序。輸出包括一個索引檔案和資料檔案。
如果設定了Combiner,将在排序輸出的基礎上運作。Combiner 就是一個Mini Reducer,它在執行Map 任務的節點本身運作,先對Map 的輸出做一次簡單Reduce,使得Map 的輸出更緊湊,更少的資料會被寫入磁盤和傳送到Reducer。
spill 檔案儲存在由mapred.local.dir指定的目錄中,Map 任務結束後删除。
每當記憶體中的資料達到spill 閥值的時候,都會産生一個新的spill 檔案,是以在Map任務寫完它的最後一個輸出記錄時,可能會有多個spill 檔案。在Map 任務完成前,所有的spill 檔案将會被歸并排序為一個索引檔案和資料檔案。這是一個多路歸并過程,最大歸并路數由io.sort.factor 控制(預設是10)。如果設定了Combiner,并且spill檔案的數量至少是3(由min.num.spills.for.combine 屬性控制),那麼Combiner 将在輸出檔案被寫入磁盤前運作以壓縮資料。
對寫入到磁盤的資料進行壓縮,通常是一個很好的方法,因為這樣做使得資料寫入磁盤的速度更快,節省磁盤空間,并減少需要傳送到Reducer 的資料量。預設輸出是不被壓縮的, 但可以很簡單的設定mapred.compress.map.output 為true 啟用該功能。壓縮所使用的庫由mapred.map.output.compression.codec 來設定。
當spill 檔案歸并完畢後,Map 将删除所有的臨時spill 檔案,并告知TaskTracker 任務已完成。Reducers 通過HTTP來擷取對應的資料。用來傳輸partitions 資料的工作線程數由tasktracker.http.threads 控制,這個設定是針對每一個TaskTracker 的,并不是單個Map,預設值為40,在運作大作業的大叢集上可以增大以提升資料傳輸速率。
2.1 copy階段
Map 的輸出檔案放置在運作Map 任務的TaskTracker 的本地磁盤上(注意:Map 輸出總是寫到本地磁盤,但Reduce 輸出不是,一般是寫到HDFS),它是運作Reduce 任務的TaskTracker 所需要的輸入資料。Reduce 任務的輸入資料分布在叢集内的多個Map 任務的輸出中,Map 任務可能會在不同的時間内完成,隻要完成的Map 任務數達到占總Map任務數一定比例(mapred.reduce.slowstart.completed.maps 預設0.05),Reduce 任務就開始拷貝它的輸出。
Reduce 任務擁有多個拷貝線程, 可以并行的擷取Map 輸出。可以通過設定mapred.reduce.parallel.copies 來改變線程數,預設是5。
如果Map 輸出足夠小,它們會被拷貝到Reduce TaskTracker 的記憶體中(緩沖區的大小
由mapred.job.shuffle.input.buffer.percent 控制,指定了用于此目的的堆記憶體的百分比);如果緩沖區空間不足,會被拷貝到磁盤上。當記憶體中的緩沖區用量達到一定比例閥值(由mapred.job.shuffle.merge.percent 控制),或者達到了Map 輸出的閥值大小(由mapred.inmem.merge.threshold 控制),緩沖區中的資料将會被歸并然後spill到磁盤。
拷貝來的資料疊加在磁盤上,有一個背景線程會将它們歸并為更大的排序檔案,這樣做節省了後期歸并的時間。對于經過壓縮的Map 輸出,系統會自動把它們解壓到記憶體友善對其執行歸并。
2.2 sort階段
當所有的Map 輸出都被拷貝後,Reduce 任務進入排序階段(更恰當的說應該是歸并階段,因為排序在Map 端就已經完成),這個階段會對所有的Map 輸出進行歸并排序,這個工作會重複多次才能完成。
假設這裡有50 個Map 輸出(可能有儲存在記憶體中的),并且歸并因子是10(由io.sort.factor 控制,就像Map 端的merge 一樣),那最終需要5 次歸并。每次歸并會把10個檔案歸并為一個,最終生成5 個中間檔案。
注:每趟合并的檔案數實際上比示例中展示的更微妙。目标是合并最小數量的檔案以便滿足最後一趟的合并系數。是以如果是40個檔案,我們不會在四趟中,每趟合并10個檔案進而得到4個檔案。相反,第一趟隻合并4個檔案,随後三趟合并所有十個檔案。在最後一趟中,4個已合并的檔案和餘下的6個(未合并的)檔案合計10個檔案。這并沒有改變合并的次數,它隻是一個優化措施,盡量減少寫到磁盤的資料量,因為最後一趟總是直接合并到reduce。
2.3 reduce階段
在Reduce 階段,Reduce 函數會作用在排序輸出的每一個key 上。這個階段的輸出被直接寫到輸出檔案系統,一般是HDFS。在HDFS 中,因為TaskTracker 節點也運作着一個DataNode 程序,是以第一個塊備份會直接寫到本地磁盤。
該配置調優方案主要是對以上Shuffle整個過程中涉及到的配置項按流程順序一一呈現并給以調優建議。
1. Map端
1) io.sort.mb
用于map輸出排序的記憶體緩沖區大小
類型:Int
預設:100mb
備注:如果能估算map輸出大小,就可以合理設定該值來盡可能減少溢出寫的次數,這對調優很有幫助。
2)io.sort.spill.percent
map輸出排序時的spill閥值(即使用比例達到該值時,将緩沖區中的内容spill 到磁盤)
類型:float
預設:0.80
3)io.sort.factor
歸并因子(歸并時的最多合并的流數),map、reduce階段都要用到
預設:10
備注:将此值增加到100是很常見的。
4)min.num.spills.for.combine
運作combiner所需的最少溢出寫檔案數(如果已指定combiner)
預設:3
5)mapred.compress.map.output
map輸出是否壓縮
類型:Boolean
預設:false
備注:如果map輸出的資料量非常大,那麼在寫入磁盤時壓縮資料往往是個很好的主意,因為這樣會讓寫磁盤的速度更快,節約磁盤空間,并且減少傳給reducer的資料量。
6)mapred.map.output.compression.codec
用于map輸出的壓縮編解碼器
類型:Classname
備注:推薦使用LZO壓縮。Intel内部測試表明,相比未壓縮,使用LZO壓縮的 TeraSort作業,運作時間減少60%,且明顯快于Zlib壓縮。
7) tasktracker.http.threads
每個tasktracker的工作線程數,用于将map輸出到reducer。
(注:這是叢集範圍的設定,不能由單個作業設定)
預設:40
備注:tasktracker開http服務的線程數。用于reduce拉取map輸出資料,大叢集可以将其設為40~50。
2. reduce端
1)mapred.reduce.slowstart.completed.maps
調用reduce之前,map必須完成的最少比例
預設:0.05
2)mapred.reduce.parallel.copies
reducer在copy階段同時從mapper上拉取的檔案數
類型:int
預設:5
3)mapred.job.shuffle.input.buffer.percent
在shuffle的複制階段,配置設定給map輸出的緩沖區占堆空間的百分比
預設:0.70
4)mapred.job.shuffle.merge.percent
map輸出緩沖區(由mapred.job.shuffle.input.buffer.percent定義)使用比例閥值,當達到此閥值,緩沖區中的資料将會被歸并然後spill 到磁盤。
預設:0.66
5)mapred.inmem.merge.threshold
map輸出緩沖區中檔案數
預設:1000
備注:0或小于0的數意味着沒有閥值限制,溢出寫将有mapred.job.shuffle.merge.percent單獨控制。
6)mapred.job.reduce.input.buffer.percent
在reduce過程中,在記憶體中儲存map輸出的空間占整個堆空間的比例。
預設:0.0
備注:reduce階段開始時,記憶體中的map輸出大小不能大于該值。預設情況下,在reduce任務開始之前,所有的map輸出都合并到磁盤上,以便為reducer提供盡可能多的記憶體。然而,如果reducer需要的記憶體較少,則可以增加此值來最小化通路磁盤的次數,以提高reduce性能。
3.性能調優補充
相對于大批量的小檔案,hadoop更合适處理少量的大檔案。一個原因是FileInputFormat生成的InputSplit是一個檔案或該檔案的一部分。如果檔案很小,并且檔案數量很多,那麼每次map任務隻處理很少的輸入資料,每次map操作都會造成額外的開銷。