天天看點

MapReduce核心map reduce shuffle (spill sort partition merge)詳解hadoop的心髒–shuffle過程解析:

shuffle過程是mapreduce的核心,也被稱為奇迹發生的地方。要想了解mapreduce,

shuffle是必須要了解的。shuffle的正常意思是洗牌或弄亂,可能大家更熟悉的是java

api裡collections.shuffle(list)方法,它會随機地打亂參數list裡的元素順序。如果你不知道mapreduce裡

shuffle是什麼,那麼請看這張圖:

MapReduce核心map reduce shuffle (spill sort partition merge)詳解hadoop的心髒–shuffle過程解析:

        這張是官方對shuffle過程的描述。但我可以肯定的

是,單從這張圖你基本不可能明白shuffle的過程,因為它與事實相差挺多,細節也是錯亂的。後面我會具體描述shuffle的事實情況,是以這裡你隻

要清楚shuffle的大緻範圍就成-怎樣把map task的輸出結果有效地傳送到reduce端。也可以這樣了解,

shuffle描述着資料從map task輸出到reduce task輸入的這段過程。

        在hadoop這樣的叢集環境中,大部分map task與reduce

task的執行是在不同的節點上。當然很多情況下reduce執行時需要跨節點去拉取其它節點上的map

task結果。如果叢集正在運作的job有很多,那麼task的正常執行對叢集内部的網絡資源消耗會很嚴重。這種網絡消耗是正常的,我們不能限制,能做的

就是最大化地減少不必要的消耗。還有在節點内,相比于記憶體,磁盤io對job完成時間的影響也是可觀的。從最基本的要求來說,我們對shuffle過程的

期望可以有:

完整地從map task端拉取資料到reduce 端。

在跨節點拉取資料時,盡可能地減少對帶寬的不必要消耗。

減少磁盤io對task執行的影響。

        ok,看到這裡時,大家可以先停下來想想,如果是自己來設計這段shuffle過程,那麼你的設計目标是什麼。我想能優化的地方主要在于減少拉取資料的量及盡量使用記憶體而不是磁盤。

我的分析是基于hadoop0.21.0的源碼,如果與你所認識的shuffle過程有差别,不吝指出。我會以wordcount為例,并假設它有8個

map task和3個reduce task。從上圖看出,shuffle過程橫跨map與reduce兩端,是以下面我也會分兩部分來展開。

        先看看map端的情況,如下圖:

MapReduce核心map reduce shuffle (spill sort partition merge)詳解hadoop的心髒–shuffle過程解析:

        上圖可能是某個map

task的運作情況。拿它與官方圖的左半邊比較,會發現很多不一緻。官方圖沒有清楚地說明partition,

sort與combiner到底作用在哪個階段。我畫了這張圖,希望讓大家清晰地了解從map資料輸入到map端所有資料準備好的全過程。

        整個流程我分了四步。簡單些可以這樣說,每個map

task都有一個記憶體緩沖區,存儲着map的輸出結果,當緩沖區快滿的時候需要将緩沖區的資料以一個臨時檔案的方式存放到磁盤,當整個map

task結束後再對磁盤中這個map task産生的所有臨時檔案做合并,生成最終的正式輸出檔案,然後等待reduce task來拉資料。

        當然這裡的每一步都可能包含着多個步驟與細節,下面我對細節來一一說明:

1.        在map

task執行時,它的輸入資料來源于hdfs的block,當然在mapreduce概念中,map

task隻讀取split。split與block的對應關系可能是多對一,預設是一對一。在wordcount例子裡,假設map的輸入資料都是像

“aaa”這樣的字元串。

2.       

在經過mapper的運作後,我們得知mapper的輸出是這樣一個key/value對: key是“aaa”,

value是數值1。因為目前map端隻做加1的操作,在reduce task裡才去合并結果集。前面我們知道這個job有3個reduce

task,到底目前的“aaa”應該交由哪個reduce去做呢,是需要現在決定的。

mapreduce提供partitioner接口,它的作用就是根據key或value及reduce的數量來決定目前的這對輸出資料最終應該交由哪個

reduce task處理。預設對key hash後再以reduce

task數量取模。預設的取模方式隻是為了平均reduce的處理能力,如果使用者自己對partitioner有需求,可以訂制并設定到job上。

在我們的例子中,“aaa”經過partitioner後傳回0,也就是這對值應當交由第一個reducer來處理。接下來,需要将資料寫入記憶體緩沖區

中,緩沖區的作用是批量收集map結果,減少磁盤io的影響。我們的key/value對以及partition的結果都會被寫入緩沖區。當然寫入之

前,key與value值都會被序列化成位元組數組。

        整個記憶體緩沖區就是一個位元組數組,它的位元組索引及key/value存儲結構我沒有研究過。如果有朋友對它有研究,那麼請大緻描述下它的細節吧。

3.        這個記憶體緩沖區是有大小限制的,預設是100mb。當map

task的輸出結果很多時,就可能會撐爆記憶體,是以需要在一定條件下将緩沖區中的資料臨時寫入磁盤,然後重新利用這塊緩沖區。這個從記憶體往磁盤寫資料的過

程被稱為spill,中文可譯為溢寫,字面意思很直覺。這個溢寫是由單獨線程來完成,不影響往緩沖區寫map結果的線程。溢寫線程啟動時不應該阻止map

的結果輸出,是以整個緩沖區有個溢寫的比例spill.percent。這個比例預設是0.8,也就是當緩沖區的資料已經達到門檻值(buffer

size * spill percent = 100mb * 0.8 = 80mb),溢寫線程啟動,鎖定這80mb的記憶體,執行溢寫過程。map

task的輸出結果還可以往剩下的20mb記憶體中寫,互不影響。

        當溢寫線程啟動後,需要對這80mb空間内的key做排序(sort)。排序是mapreduce模型預設的行為,這裡的排序也是對序列化的位元組做的排序。

        在這裡我們可以想想,因為map

task的輸出是需要發送到不同的reduce端去,而記憶體緩沖區沒有對将發送到相同reduce端的資料做合并,那麼這種合并應該是展現是磁盤檔案中

的。從官方圖上也可以看到寫到磁盤中的溢寫檔案是對不同的reduce端的數值做過合并。是以溢寫過程一個很重要的細節在于,如果有很多個

key/value對需要發送到某個reduce端去,那麼需要将這些key/value值拼接到一塊,減少與partition相關的索引記錄。

        在針對每個reduce端而合并資料時,有些資料可能像這樣:“aaa”/1,

“aaa”/1。對于wordcount例子,就是簡單地統計單詞出現的次數,如果在同一個map

task的結果中有很多個像“aaa”一樣出現多次的key,我們就應該把它們的值合并到一塊,這個過程叫reduce也叫combine。但

mapreduce的術語中,reduce隻指reduce端執行從多個map

task取資料做計算的過程。除reduce外,非正式地合并資料隻能算做combine了。其實大家知道的,mapreduce中将combiner等

同于reducer。

如果client設定過combiner,那麼現在就是使用combiner的時候了。将有相同key的key/value對的value加起來,減少溢寫到磁盤的資料量。combiner會優化mapreduce的中間結果,是以它在整個模型中會多次使用。那哪些場景才能使用combiner呢?從這裡

分析,combiner的輸出是reducer的輸入,combiner絕不能改變最終的計算結果。是以從我的想法來看,combiner隻應該用于那種

reduce的輸入key/value與輸出key/value類型完全一緻,且不影響最終結果的場景。比如累加,最大值等。combiner的使用一定

得慎重,如果用好,它對job執行效率有幫助,反之會影響reduce的最終結果。

4.       

每次溢寫會在磁盤上生成一個溢寫檔案,如果map的輸出結果真的很大,有多次這樣的溢寫發生,磁盤上相應的就會有多個溢寫檔案存在。當map

task真正完成時,記憶體緩沖區中的資料也全部溢寫到磁盤中形成一個溢寫檔案。最終磁盤中會至少有一個這樣的溢寫檔案存在(如果map的輸出結果很少,當

map執行完成時,隻會産生一個溢寫檔案),因為最終的檔案隻有一個,是以需要将這些溢寫檔案歸并到一起,這個過程就叫做merge。merge是怎樣

的?如前面的例子,“aaa”從某個map task讀取過來時值是5,從map的另一個溢寫檔案讀取時值是8,因為它們有相同的key,是以得merge成group。什麼是group。對于“aaa”就是像這樣的:{“aaa”, [5, 8,

2,

…]},數組中的值就是從不同溢寫檔案中讀取出來的,然後再把這些值加起來。請注意,因為merge是将多個溢寫檔案合并到一個檔案,是以可能也有相同的

key存在,在這個過程中如果client設定過combiner,也會使用combiner來合并相同的key。

至此,map端的所有工作都已結束,最終生成的這個檔案也存放在tasktracker夠得着的某個本地目錄内。每個reduce

task不斷地通過rpc從jobtracker那裡擷取map task是否完成的資訊,如果reduce

task得到通知,獲知某台tasktracker上的map task執行完成,shuffle的後半段過程開始啟動。

        簡單地說,reduce task在執行之前的工作就是不斷地拉取目前job裡每個map task的最終結果,然後對從不同地方拉取過來的資料不斷地做merge,也最終形成一個檔案作為reduce task的輸入檔案。見下圖:

MapReduce核心map reduce shuffle (spill sort partition merge)詳解hadoop的心髒–shuffle過程解析:

        如map 端的細節圖,shuffle在reduce端的過程也能用圖上标明的三點來概括。目前reduce

copy資料的前提是它要從jobtracker獲得有哪些map

task已執行結束,這段過程不表,有興趣的朋友可以關注下。reducer真正運作之前,所有的時間都是在拉取資料,做merge,且不斷重複地在做。

如前面的方式一樣,下面我也分段地描述reduce 端的shuffle細節:

1.       

copy過程,簡單地拉取資料。reduce程序啟動一些資料copy線程(fetcher),通過http方式請求map

task所在的tasktracker擷取map task的輸出檔案。因為map

task早已結束,這些檔案就歸tasktracker管理在本地磁盤中。

merge階段。這裡的merge如map端的merge動作,隻是數組中存放的是不同map端copy來的數值。copy過來的資料會先放入記憶體緩沖區

中,這裡的緩沖區大小要比map端的更為靈活,它基于jvm的heap

size設定,因為shuffle階段reducer不運作,是以應該把絕大部分的記憶體都給shuffle用。這裡需要強調的是,merge有三種形

式:1)記憶體到記憶體  2)記憶體到磁盤 

3)磁盤到磁盤。預設情況下第一種形式不啟用,讓人比較困惑,是吧。當記憶體中的資料量到達一定門檻值,就啟動記憶體到磁盤的merge。與map

端類似,這也是溢寫的過程,這個過程中如果你設定有combiner,也是會啟用的,然後在磁盤中生成了衆多的溢寫檔案。第二種merge方式一直在運作,直到沒有map端的資料時才結束,然後啟動第三種磁盤到磁盤的merge方式生成最終的那個檔案。

3.       

reducer的輸入檔案。不斷地merge後,最後會生成一個“最終檔案”。為什麼加引号?因為這個檔案可能存在于磁盤上,也可能存在于記憶體中。對我們

combiner作用的地方:

1.從緩存溢寫到磁盤,減少溢寫到磁盤的

2.多個溢寫檔案合并(merge)時

參考文章:

<a href="http://langyu.iteye.com/blog/992916?page=3#comments" target="_blank">http://langyu.iteye.com/blog/992916?page=3#comments</a>

<a href="http://blog.yfteach.com/?p=283" target="_blank">http://blog.yfteach.com/?p=283</a>