![](https://img.laitimes.com/img/_0nNw4CM6IyYiwiM6ICdiwiI0NXYFhGd192UvwVe0lmdhJ3ZvwFM38CXlZHbvN3cpR2Lc1TPB10QGtWUCpEMJ9CXsxWam9CXwADNvwVZ6l2c052bm9CXUJDT1wkNhVzLcRnbvZ2Lc1TSYplc50WYmZlMZVnRyMmesdVW1ZVbjZXUYpVd1kmYr50MZV3YyI2cKJDT29GRjBjUIF2LcRHelR3LcJzLctmch1mclRXY39TM2EjMwADNyEDMxMDM3EDMy8CX0Vmbu4GZzNmLn9Gbi1yZtl2Lc9CX6MHc0RHaiojIsJye.jpg)
shuffle的主要工作是從Map結束到Reduce開始之間的過程。shuffle階段又可以分為Map端的shuffle和Reduce端的shuffle。
1.Map端的Shuffler
每個map task都有一個記憶體緩沖區(如上圖中的buffer in memory預設為100MB),存儲着map的輸出資料(格式為Key-Value鍵值對),當緩沖區快滿的時候,需要将緩沖區中的資料以一個臨時檔案的方式存放到磁盤上,溢寫是單線程來完成的,他不影響往該緩存中寫map結果的線程。溢寫線程啟動時不應該阻止map的結果輸出,是以整個緩沖區有個溢寫的比例spill.percent,這個比例預設是0.8,也就是當緩沖區的資料已經達到門檻值(buffer size * spill percent = 100MB * 0.8 = 80MB),溢寫線程啟動,鎖定這80MB的記憶體,執行溢寫過程。Map task的輸出結果還可以往剩下的20MB記憶體中寫,互不影響。這裡需要注意在寫入磁盤之前,需要進行partition,sort,combine操作。随着資料不斷地輸出,溢寫的檔案越來越多,還要在磁盤上進行合并操作,最後合成一個溢出檔案。
1)partition
每個輸出資料要經過partition這個程式得到一個分區号。預設情況下,partition根據每一對Key-Value鍵值對的鍵的哈希值對reduce的個數取模得到一個分區号(hashcode%reduce個數)。partition的目的是把資料配置設定到一個reduce task中去。
2)sort
sort程式的目的是排序,排序的目标是map輸出的資料。預設情況下,按照鍵值對的key的ASCII碼值進行排序(也就是字典排序)。
3)combine
combine函數是将具有相同Key的多個Key-Value鍵值對合并成一個鍵值對,這裡需要注意,若在map端使用combine操作,則首先需要在map中也調用group程式(在後面将會被介紹),因為在combine中需要用到相同的鍵。這樣做的目的是為了減少網絡傳輸
2.Reduce端的Shuffler
1)fetch
Reduce端按照各個Map端的partition程式得出的分區号進行抓取,抓取的資料同樣存在于記憶體中。
2)sort
将Reduce抓到的資料再次進行排序,排序調用的還是Map端使用的Sort程式。
3)group
接下來進行的是分組,預設的分組模式是根據每個資料(Key-Value)的Key是否相同進行分組的。鍵相等就在同一個組中。每一組資料傳給Reduce程式中(每組資料的特點都是相同的Key鍵)。
3)MapReduce的split大小
分片大小要趨向與一個HDFS的一個資料塊的大小,預設是64MB,這裡我們需要叙述一下為何要趨向與一個資料塊的大小:
hadoop在存儲輸入資料的節點上運作map任務,可以獲得最佳性能。這就是所謂的“資料本地化優化”,因為它無需使用寶貴的叢集帶寬資源。分片趨向于資料塊大小的目的也是為了節省叢集帶寬資源,hdfs資料塊的大小就等于存儲在單個節點上的最大傳輸輸入塊的大小。若分片大于HDFS資料塊(跨越兩個資料塊),對于任何一個節點,基本上都不可能同時存儲這兩個資料塊,是以分片中的部分資料需要通過網絡傳輸到map任務節點,這樣就要消耗叢集帶寬資源。
split大小是根據以下算法确定的:
- max.split(100M)
- min.split(10M)
- block(64M)
- max(min.split,min(max.split,block))(任何一個split都不能大于block)