一個完整的mapreduce作業稱作job,它包括三部分:
輸入資料
mapreduce程式
配置資訊
hadoop工作時會将job分成若幹個task:map任務和reduce任務
有兩類節點控制作業執行的過程:jobtracker和tasktracker
jobtracker:記錄作業整體進度,對tasktracker進行排程
tasktracker:執行task任務并向jobtracker彙報
hadoop會将輸入資料劃分成等長的資料塊,成為資料分片。hadoop會為每個分片建構一個map任務。并行的處理分片時間肯定會少于處理整個大資料塊的時間,但由于各個節點性能及作業運作情況的不同,每個分片的處理時間可能不一樣,是以,把資料分片切分的更細可以得到更好的負載均衡。
但另一方面,分片太小的話,管理分片和建構map任務的時間将會增多。是以,需要在hadoop分片大小和處理分片時間之間做一個權衡。對大多數作業來說,一個分片大小為64mb比較合适,其實,hadoop的預設塊大小也是64mb。
我們上面看到了hadoop的資料塊大小與最佳分片大小相同,這樣的話,資料分片就不容易跨資料塊存儲,是以,一個map任務的輸入分片便可以直接讀取本地資料塊,這就避免了再從其它節點讀取分片資料,進而節省了網絡開銷。
map的任務輸出是寫入到本地磁盤而非hdfs的。那麼為什麼呢?因為map任務輸出的是中間結果,一旦map任務完成即會被删除,如果把它存入hdfs中并實作備份容錯,未免有點大題小做。如果一個map任務失敗,hadoop會再另一個節點重新開機map一個map任務。
而reduce任務并不具備資料本地化優勢——單個reduce任務的輸入通常來自所有mapper輸出。一般排序過的map輸出需要通過網絡傳輸發送到運作reduce任務的節點,并在reduce端進行合并。reduce的輸出通常需要存儲到hdfs中以實作可靠存儲。每個reduce輸出hdfs塊第一個複本會存儲在本地節點,而其它複本則存儲到其它節點,是以reduce輸出也需要占用網絡帶寬。
如下圖:一個reduce任務的mapreduce任務資料流
reduce任務的數量并非由輸入資料大小決定,而是特别指定。如有多個reduce任務,則每個map任務都會對其輸出進行分區(partition),因為每個reduce任務會建一個分區。相同鍵的記錄都會被partition到同一個分區中。具體的分區方式由分區函數來控制,一般通過hash函數來分區。
我們把map任務和reduce任務之間的資料流稱為shuffle,因為每個reduce任務的輸入都來自多個map任務,是以,這個階段比較複雜,而shuffle過程中的參數調整對job運作的總時間是有非常大的影響的,一般mapreduce的調優主要就是調整shuffle階段的參數。
如下圖:多個reduce任務的資料流
叢集上的可用帶寬限制了mapreduce的作業數量,因為map的中間結果傳遞給reduce是要經過網絡傳輸的,是以最重要的一點就是盡量減少map和reduce任務間傳輸的資料量。不過,hadoop允許使用者針對map任務的輸出指定一個合并函數(combiner),用合并函數的輸出作為reduce函數的輸入,但需要注意,合并函數的運用不應該改變reduce函數的計算結果。
例如有兩個map的輸出分别是map1={0,20,10};map2={15,25},求最大值,我們可以對先每個map的資料的資料進行合并,合并完成之後再傳輸給reducer:
map1={0,20,10}->combiner->{20};
map2={15,25}->combiner->{25};
reducer->{25}
即 max(0,20,10,15,25)=max(max(0,20,10),max(15,25))=25
如下圖:将combiner後的輸出作為reducer的輸入
但需要特别注意的是,并不是任何場景都是可以用combiner的,比如把上面的例子改成求平均值:
combiner後的reducer的結果: avg(avg(0,20,10),avg(15,25))=avg(10,20)=15;
沒有進行combiner的reducer結果: avg(0,20,10,15,25)=14;