概要
MapReduce 是一個處理和生成大資料集的程式模型和相關實作。使用者定義一個
map
函數:處理一個 key/value 對生成一組
中間鍵值對
,和一個
Reduce
函數:合并關聯相同 key 的所有中間值資料。許多現實世界的任務可以用這個模型來表達,就像這篇論文裡描述的那樣。
寫成函數式風格的程式可以自動地在大規模叢集上并行處理。運作時系統隻關心分割輸入資料的細節,跨多個工作機器排程程式的執行,處理機器失敗,以及管理需要的機器間的互動。這使得程式員不需要任何并行和分布式系統的經驗也同樣可以使用大規模叢集的資源。
我們 MapReduce 的實作運作在一個大規模商業機器叢集上具有很高的擴充:一個經典的 MapReduce 計算在成千上百台機器上處理 TB 級别的資料。開發者發現這個系統很好用:已經實作了上百個 MapReduce 程式,超過一千個 MapReduce 任務運作在 Google 的叢集上。
1. 介紹
在過去的五年,作者以及在 Google 的其他同僚實作了上百個特定需求的計算程式用來處理大規模的原生資料,例如爬取的文檔,web 請求日志,等等。計算多種形式的派生資料,例如反向索引,多種形式的 web 文本圖結構,從每個主機爬取文檔的數量,給定日期最常通路的請求等等。大多數這樣的計算在概念上是直接的。可是,輸入資料通常是巨大的,為了在合理時間完成計算程式不得不分布在幾百或幾千台機器上執行。如何并行化計算、分布資料和處理故障等問題,使用大量複雜代碼來處理這些問題使得原始簡單計算變得模糊起來。
作為一個對這種複雜性的應對,我們設計了一種新的抽象:允許我們使用簡單的時間去執行,把并行化、分布式、容錯以及負載均衡的複雜邏輯封裝到一個庫裡。我們的抽象是由 Lisp 中的
map
和
reduce
原型以及其他函數式函數啟發的。 我們意識到我們的大部分計算都包括一個
map
函數從輸入裡每個
邏輯
記錄計算出一些中間的
鍵值對
,然後使用一個
reduce
函數處理相同
key
關聯的所有值,為恰當地将派生資料資料合并。使用者指定
map
和
reduce
函數的函數式模型使用使得大規模計算并行化變得簡單,并且使用重試作為容錯的預設機制。
這項工作的主要貢獻在于:一個簡單而又強大的接口, 它實作了大規模計算的自動并行以及分布式,結合這個接口的實作可以在大規模 PC 機器上取得巨大的性能。
第 2 部分主要介紹了基本的程式設計模型以及給了幾個例子。第 3 部分描述了一個 MapReduce 的實作:專門為我們的以叢集為基礎的計算環境而定制的。第 4 部分我們介紹了一個關于程式模型我們發現的幾個有用的改善點。第 5 部分介紹了針對不同任務的實作的性能名額。第 6 部分探讨了在 Google 内部 MapReduce 的使用:使用它作為基礎重寫生産索引系統的經驗。第 7 部分讨論了相關的和未來的工作。
2 程式設計模型
模型:擷取一組 key/value 的
輸入
,并
輸出
一組 key/value 集合。
MapReduce 庫的使用者将這個計算表達成兩個函數:Map 和 Reduce。
Map, 由使用者編寫,獲得一個 key/value 輸入對,産生出一組
中間的
key/value 對。MapReduce 庫按相同中間 key I 對所有中間值進行分組然後把他們發送給 Reduce 函數。
Reduce 函數,也是由使用者編寫,接受一個中間 key I 和這個 key 對應的 value 集合。它把這些 value 合并成可能是更小點的 value 集合。通常每次
Reduce
調用隻會傳回一個或 0 個 value。這些中間 value 是通過疊代器提供給
Reduce
函數的。這允許我們處理超過記憶體大小的 value 集合。
2.1 例子
思考一個關于從一個巨大的文檔集合裡計算每一個詞的出現次數。使用者可能會編寫出如下的僞代碼:
map(String key, String value):
// key: document name
// value: document contents
for each word w in value:
EmitIntermediate(w, "1");
reduce(String key, Iterator values):
// key: a word
// values: a list of counts
int result = 0;
for each v in values:
result += ParseInt(v);
Emit(AsString(result));
map
函數生成一個單詞及其出現的次數(在例子裡剛好是
1
)。
Reduce
函數則累加每一個單詞所出現的次數。
另外,使用者使用輸入輸出檔案的名稱和一些可選的調優參數來建構
mapreduce
對象。使用者然後調用 MapReduce 函數把
mapreduce
對象傳給它。這樣使用者代碼和 MapReduce 庫(由 C++ 實作)就聯系在一起。附錄 A 包含了這個例子的所有代碼。
2.2 類型
盡管前面的僞代碼是字元串類型輸入輸出相關的,但從概念上講 使用者提供的 map 和 reduce 函數在類型上是有關聯的:
map (k1,v1) → list(k2,v2)
reduce (k2,list(v2)) → list(v2)
I.e.,輸入 key 和 value 與輸出 key 和 value 來自不同的領域。此外,中間的 key 和 value 和 輸出 key 和 value 屬于相同域。
我們 C++ 實作在使用者自定義函數之間傳遞字元串,讓使用者代碼去處理字元串和相關類型的之前轉換。
2.3 更多的例子
這裡有些有趣的例子,他們可以很容易地用 MapReduce 函數表達。
分布式過濾:map 函數如果輸入的某行和某個模式(pattern)比對則傳回這行資料。 reduce 函數是個恒等函數隻是把提供的中間資料複制到輸出檔案中。
URL 通路的次數: map 函數處理 web 網頁請求日志輸出
<URL,1>
。reduce 函數累加所有具有相同 URL 的鍵值對的所有值,生成一個 <URL, 總數> 對。
翻轉 Web 連結圖:map 函數為每個從名叫
source
頁面中找到的
target
生成一個
<target, source>
對。reduce 函數給定
target
URL 的所有
source
URL 拼接在一起然後生成:
<target, list(source)>
。
每台主機的檢索詞向量:一個檢索詞向量 (term vector) 統計了統計了出現在一個/組文檔裡最重要的檢索詞集合:
<word, frequency>
清單。map 函數輸入一個文檔生成
<hostname, term vector>
(hostname 從文檔的 URL 中提取出來的)。reduce 接收一個給定主機(host) 所有的每個檔案檢索詞向量,它把這些檢索詞向量累加,丢掉不常見的檢索詞,然後生成最終的
<hostname, term vector>
對。
反向索引:map 函數解析每個文檔然後生成一系列
<word, documentId>
對。reduce 函數接受一個單詞的所有 kv 對,然後按照相關的 documentId 排序生成一個
<word, list(document ID)>
kv對。所有的輸出就變成了一個簡單的反向索引。它很容易加速計算單詞的位置(文檔ID)。
分布式排序:map 函數從每個單詞中抽取出 key ,然後生成一個
<key, word>
kv 對。reduce 函數是個恒等函數:接受
<key, word>
集合 傳回它。這個計算依賴 4.1 節提到的分割機制以及 4.2 節 提到的排序屬性。
3 實作
MapReduce 的很多不同實作都是可能的,沒有最好的方式隻有适合的。例如某個實作可能适合小的共享記憶體機器,另一個适合大型 NUMA 多處理器機器,然而另外一個适合基于網絡的巨型叢集。
這部分介紹的實作針對 Google 裡廣泛使用的計算環境:通過交換機關聯的巨型 PC 機叢集。在我們的環境裡:
(1) 機器大多是雙核 x86 處理器,運作着 linux,每台機器大約有 2-4 GB 記憶體。
(2) 使用商業網絡硬體,機器級别通常是 100 Mb/s 或者 1 G Mb/s 帶寬,但平均到每台機器就很少了。
(3)存儲通常就直接用 每個機器自帶的廉價的 IDE 磁盤。一個内部研發的檔案管理系統【8】就是用來管理存儲在這類磁盤上的資料。這個管理系統使用備份機制來提供可用性和可靠性在這些可靠的硬體環境上。
(4)使用者把任務送出給排程系統。多個任務組成一組任務,然後由排程系統配置設定給叢集中可用的機器。
3.1 執行預覽

Map 調用在多機器間分布式執行通過自動地将輸入資料分成
M 份。資料分割可以被不同機器并行執行。Reduce 執行也是分布式的通過将中間 key 空間分割成 R 份使用分區函數(eg:hash(key) mod R)。分成多少份(R)以及分區函數由使用者自己定義。
圖 1 展示了我們的 MapReduce 實作的執行全流程。當使用者程式調用
MapReduce
函數後,将會按照下面的執行序列進行(圖中的數字标簽對應下面清單中的數字):
- 使用者程式中的 MapReduce 庫首先将輸入資料分成 M 份,每份通常是 16 MB 到 64 MB 之間(通過一個可選項讓使用者控制)。然後開始将使用者程式複制給叢集的其它機器。
- 使用者程式的多個拷貝其中之一是特殊的 - master。剩下的是由 master 配置設定任務的 worker。由 M 個 map 任務和 R 個 reduce 任務需要配置設定。master 選擇空閑的機器清單,然後給他們每個配置設定一個 map 任務或者一個 reduce 任務。
- 配置設定到 map 任務的 worker 讀取對應的那份輸入資料。它會從輸入資料解析出 key/value 對集合,然後将每個 k/v 對傳給使用者定義的
函數。中間的 key/value 對集合由 Map 函數産生并緩沖在記憶體中。Map
- 緩沖的 k/v 對定期地寫入磁盤,被分區函數分成 R 份。這些緩沖的 k/v 對在磁盤的位置會被傳給 master ,它會将這些位置傳給運作 reduce worker機器。
- 當一個 reduce worker 通過 master 接到了這些通知,它使用遠端調用從 map worker 機器上的磁盤讀取緩沖的 k/v 對。當一個 reduce worker 讀完了所有的中間資料,它會按照 key 對中間資料進行排序這樣相同 key 的資料會出現在一起。這個排序是需要因為通常許多不同的 key 會配置設定給一個 reduce 任務。如果中間資料太大而超過記憶體限制則使用外部排序。
-
worker 便利排序好的中間資料,對于遇到的每個中間 key,它會把這個 key 及其對應的中間值集合傳給使用者定義的Reduce
函數。Reduce
函數的輸出添加到這個 reduce 所屬分片的那個輸出檔案裡。Reduce
- 當所有的
和map
任務完成後,master 會喚醒使用者程式。此時,使用者程式裡的reduce
調用将會傳回到使用者代碼。MapReduce
成功執行之後,mapreduce 的執行結果會被放在 R 個輸出檔案中(每個 reduce 任務一個,檔案名稱由使用者指定)。
通常地,使用者不需要合并這個 R 份檔案成一個-他們會把這些輸出檔案傳遞給另一個
MapReduce
函數,或者從另一個分布式系統中使用它們,這個分布式系統可以處理分割成多個檔案的輸入。
3.2 master 資料結構
master 儲存了幾個資料結構。對于每個 map 和 reduce 任務,它儲存狀态(idle(空閑) ,in-process(進行中) 或者 completed(已完成)),以及 worker 的ID (針對非空閑任務)。
master 是從 map 任務到 task 任務傳遞中間檔案區間位置的管道。是以,對于每個完成的 map 任務,master 存儲着由 map 任務提供的 R 份中間檔案區間的位置和大小。當 map 任務結束的時候通過傳過來的資訊更新這些位置和大小。這些資訊增量地推送給執行中的
reduce 任務。
3.3 容錯
因為
MapReduce
庫設計出來是幫助在成千上百台機器上處理海量資料的,是以這個庫必須優雅地容忍機器當機等失敗。
Worker 失敗(Worker Falure)
master 定期 ping 每一個worker。如果一個 worker 在指定的時間内沒有響應,master 會把這個 worker 标記為失敗。任何一個通過 該 worker 完成的 map 任務會把狀态設定回初始的
initial
狀态,這樣他們就可以被排程器分布給其它 worker 了。類似地,任何進行中的 map 和 reduce 任務運作在一個失敗的 worker 上都會設定回
initial
狀态,然後可以被重新排程。
一個失敗 worker 完成的 map 任務需要重新執行是因為它的輸出是存儲在失敗機器的本地磁盤上——不能通路到。完成的 reduce 任務不需要重新執行是因為它的輸出檔案存儲在一個全局的檔案系統裡。
當一個 map 任務首先在 worker A 上執行然後再 worker B 上執行(因為 A 挂了),所有執行 reduce 任務的 worker 會被通知到這個重新執行。任何一個還沒有從 worker A 讀資料的 reduce 任務将會從 worker B 讀資料。
MapReduce 是能容忍大規模 worker 失敗的。例如,由于網絡維護導緻一組 80 台機器在同一時間不能夠被通路到并持續了幾分鐘。MapReduce master 隻是簡單地重新執行下這些不可達的 woker 所執行的那些任務,然後繼續向前執行,最終完成這個 MapReduce 函數。
Master 挂了
可以很簡單的将上述提到的 master 資料結構定期做好快照。如果 master 任務挂了,新的 master 馬上啟動起來,狀态從最新的快照複制一份。考慮到隻有一個 master , 它的失敗是不受歡迎的,是以我們現在的實作會中斷 MapReduce 計算執行當 master 挂了。用戶端可檢測到這種信号,他們可以按照自己的意願決定是否重試。
失敗存在時的語義
當使用者提供的
map
和
reduce
操作是輸入值的确定函數,則我們的分布式實作産生的結果和整個程式串行執行産生的結果一緻。
我們依賴 map 和 reduce 任務輸出的原子送出來實作這個特性。每個進行中的任務會把它的輸出寫到私有的臨時檔案中。一個 reduce 任務産生一個這樣的檔案,一個 map 任務産生 R 個這樣的檔案(一個 reduce 任務一個)。當一個 map 任務完成時,worker 會發送一個資訊給master, 資訊中包含 這 R 個臨時檔案的名字。如果 master 接收到一個已經完成的 map 任務的資訊它會忽略掉的。否則它會将這 R 個檔案的名稱記錄在一個 master 資料結構中。
當一個 reduce 任務完成時,reduce worker 将它臨時輸出檔案名稱原子地重命名成最終的輸出檔案。如果相同的 reduce 任務在多個機器上執行,多個重命名調用會作用到一個相同的最終輸出檔案。我們依賴的這個原子重命名操作由底層系統提供:保證最終的檔案系統狀态隻包含 reduce 任務一次執行産生的資料。
我們的大部分
map
和
reduce
操作時确定性的(輸入完全由輸入決定,同樣地輸入一定由同樣輸出)。在這種場景中我們的語義和串行執行的是一樣的,這很容易讓程式員推出他們程式的行為。當
map
或
reduce
函數不是确定性的時候,我們提供了比較弱點的但仍舊合理的語義。在不确定的操作(map 和 reduce)中,一個特定的 reduce 任務 R1 的輸出和整個程式的串行執行結果一緻。 可是對一個不同的 reduce 任務 R2 的輸出可能會和整個程式不同順序執行的結果一緻。
給定 map 任務 M 以及 reduce 任務 R1 和 R2。e(Ri) 代表 Ri 的執行。 e(R1) 可能讀取 M 的一個執行産生的輸出, e(R2) 可能讀取 M 的另一個不同執行的輸出,弱語義出現了。
3.4 本地化
網絡帶寬在我們的計算環境裡是相對稀缺的資源。我們通過把輸入資料存儲在組成叢集的機器磁盤上來節省帶寬。GFS 把每個檔案分成 多個大小為 64 MB 的 block(塊),在不同機器上儲存多個 block 的備份(通常為 3 份)。MapReduce master 擷取輸入檔案的位置資訊然後嘗試将 map 任務排程到包含對應輸入資料的機器。 如果失敗了,它會嘗試将 map 任務排程到靠近任務資料備份所在機器(eg: 共享相同交換機的兩個機器)的機器上。當在一個叢集的大部分機器上運作 大規模 MapReduce 處理,大部分的輸入資料是從本地讀取的不消耗任何網絡。
3.4 任務粒度
如上所述,我們把 map 階段分為 M 份,reduce 階段分為 R 份。理想地, M 和 R 應該遠大于 worker 機器的數量。這樣每個機器可能執行許多不同的任務提高動态負載均衡以及當一個 worker 挂了的時候還可以提升恢複速度:在這個失敗 worker 上已經完成的任務可以快速廣播到别的機器。
在我們的實作裡 M 和 R 有多大都是有一些客觀限制,因為 master 必須做 O(M + R) 次排程以及儲存 O(M * R) 個狀态在記憶體裡。
更進一步,R 的值通常由使用者指定,因為每個 reduce 任務的輸出是放在一個單獨的輸出檔案裡的。在實踐中,我們趨向于選擇一個 M 可以把輸入資料分成每份大小在 16 MB 和 64 MB 之間(是以本地存儲優化才能發揮最大作用),我們使 R 是我們使用 worker 節點數量的小倍數 。我們執行 MapReduce 計算時,經常使 M = 200,000 和 R =5,000,使用 2,000 台 woker 機器。
3.6 備份任務
延長一個 MapReduce 操作的耗時的常見原因之一是 “落伍者”:一個機器用不正常長的時間完成計算中剩餘少量的 map 任務或者 reduce 任務。"落伍者"出現的原因有很多。例如,一個機器的磁盤壞了可能時不時地将讀性能從 30 MB/s 降到 1 MB/s。叢集排程系統可能已将其他的任務排程到這台機器,導緻它執行 MapReduce 代碼很慢因為 CPU,記憶體,本地磁盤以及網絡帶寬的競争。我遇到的一個最近的問題: 在機器初始化代碼裡有個 bug 會導緻處理器緩存失效:這些機器上的計算速度比正常的慢了超過一百倍。
我們有一個通用的機制來緩解落伍者的問題。當一個 MapReduce 操作接近完成時,master 啟動剩餘進行中任務的備份。無論主任務還是副任務完成該任務就标記為完成。我們調整了這個機制讓它即使會增加資源的使用但也隻會多用百分之幾。我們發現這個可以很大程度上減少一個巨大的 MapReduce 操作耗時。例如,在5.3 節描述的排序程式當備份任務機制關閉的時候使用了超過 44% 的時間才完成。
4 改進(細化)Refinements
即使 map 和 reduce 函數提供的基本函數滿足于大部分需求,但我們還有些有用的擴充。這一章節我将描述它。
4.1 分區函數 Partioning Function
MapReduce 的使用者可以指定 reduce 任務/輸出檔案的數量。使用中間的鍵上的分區函數講跨任務的資料分割。預設提供的分區函數使用hash (eg: hash(key))。這可以将結果拆的很均勻。但是在一些案例中使用别的分區函數很有用。例如,結果 key 是 URL,我們想把一個 host 的 URL 放到一個輸出檔案裡。為了支援這種場景 MapReduce 庫的使用者可以提供一個特定的分區函數。例如,使用 “hash(Hostname(urlkey)) mod R” 作為分區函數就可以将屬于同一個 host 的所有 URL 放到相同檔案裡了。
4.2 順序性保證
我們保證在一個分區裡,中間的 key/value 是按 key 升序或降序處理的。這個順序性保證使得每個分區生成一個有序輸出檔案變得簡單,這很用:在随機查找某個 key 速度很快以及對于全部結果排序很友善。
4.3 合并函數
在一些場景中,每個 map 任務産生的 key 有很多是重複的,而且使用者定義的 reduce 函數是滿足交換律和結合律的。一個很好的例子是 2.1 節描述的單詞計數的例子。因為單詞頻率趨近于 Zipf 分布(二八原則),每個 map 任務會生産出成千上百個 <the, 1> 格式的記錄。這些所有的計數會通過網絡發給一個 reduce 任務,然後 reduce 任務會将他們累加成一個總數。我們允許使用者指定一個可選的合并函數:在發送給 reduce 任務之前做部分的合并。
合并 函數運作在每台執行 map 任務的機器上。通常 combine(合并) 函數和 reduce 函數的代碼相同,combine 函數 和 reduce 函數的不同之處在于 MapReduce 庫如何處理函數的輸出。reduce 函數的輸出會被寫到一個檔案,combine 函數的結果會被寫到一個中間檔案将會發給一個 reduce 任務。
部分合并大幅度地提高了 MapReduce 操作的速度。 附錄 A 包含了一個使用合并的例子。
4.4 輸入和輸出類型
MapReduce 庫支援了幾種不同的輸入資料格式。例如, “text” 形式的輸入會把每行當做一個 key/value 對:key 是檔案的偏移量,value 是行的内容。其他的常見支援格式都存儲着按照 key 排序的 key/value 對清單。每個輸入實作都知道如何将對應的輸入分成有意義的範圍(部分)以便給單獨的 map 任務處理。使用者可以提供一個 reader 接口實作來支援新的輸入類型,即使大部分使用者都是使用預設的輸入類型中的一個。
一個 reader 不需要提供從檔案裡讀的資料。例如,定義一個從資料庫讀取記錄或者從記憶體裡讀取映射資料結構的 reader 很簡單。
類似的是, 我們提供多種不同輸出檔案的格式,使用者很簡單添加新的類型。
4.5 副作用
在一些場景中,MapReduce 的使用者發現從他們的 map 和 reduce 操作中生成一些輔助檔案作為額外輸出很友善。我們依賴于應用編寫者讓這些副作用具有原子性和幂等性。通常應用程式會寫到一個臨時檔案裡然後當全部生成時把臨時檔案改名。
我們沒有提供一個任務生成的多個檔案的原子兩階段送出。是以,産生多檔案且需要跨檔案一緻性需求的任務應該具有确定性(串行執行和分布式執行結果一樣)。在實踐中這個限制不會成為一個問題。
4.6 跳過壞的記錄(Skip Bad Record)
有時,使用者代碼中的 bug 會導緻 MapReduce 操作在處理一些固定的記錄時失敗,這樣的 bug 會阻止 MapReduce 操作完成。通常解決辦法是修複這些 bug,有時這并不可行;這些 bug 可能是在第三方庫裡我們不可改。有時忽略一些記錄也是可以接受的,比如在對一個很大的資料集做統計分析時。我們提供一個可選的執行模式當 MapReduce 操作發現某些記錄會确定導緻崩潰然後會跳過它以繼續向前運作。
每個 woker 安裝了一個信号處理器:捕獲記憶體段錯誤和總線錯誤。在調用一個使用者 Map 或者 Reduce 操作之前,MapReduce 庫會将記錄的編号存儲在全局變量中。如果使用者代碼生成一個信号,信号處理器就會發送一個包含着編号的名叫 “last gasp”的 UDP 包給 master。當 master 在一個特殊的 record 發現超過一次錯誤時,它告訴重新執行對應 Map 或 Reduce 任務的執行跳過這個記錄。
4.7 本地執行
在 Map 或 Reduce 程式裡調試以找到錯誤是很棘手的,因為實際的計算發生于一個分布式系統上,經常在幾千台機器上,通過 master 動态排程。為了幫助友善調試,分析以及小規模測試,我開發了一個 MapReduce 庫的實作可以在本地機器上串行執行一個 MapReduce 操作的所有工作。控制權提供給使用者以至于可以将計算限制到指定的 map 任務。使用者調用他們的程式用一個特殊的 flag ,可以很容易地使用任何調試或者測試工具。
4.8 狀态資訊
master 内部運作着一個 HTTP 伺服器,可以導出一些狀态頁面給使用者使用。狀态頁面顯示着計算的進度,例如多少任務已經完成了,還有多少任務在運作中,輸入位元組數,中間資料位元組數,輸出的位元組數,處理速度等等。這些頁面也包含标準錯誤輸對外連結接以及每個任務生成的結果檔案連結。使用者可以使用這些資料預測計算耗時多久,是要加入更多的資源到計算中去。當計算比預料地要慢的時候這些頁面也可以用來分析原因。
4.9 計數器
MapReduce 庫提供了計數器用來計數不同時間的出現次數。例如,使用者代碼可能想統計下處理的單詞總個數或者被索引德文文檔的總數。
使用者可以建立一個名叫 counter 的對象然後在 Map 和/或 Reduce 函數裡恰當地遞增計數即可。例如:
Counter* uppercase;
uppercase = GetCounter("uppercase");
map(String name, String contents):
for each word w in contents:
if (IsCapitalized(w)):
uppercase->Increment();
EmitIntermediate(w, "1");
來自單獨 woker 機器的計數值定期地傳播到 master 。master 聚合來自成功的 map 和 reduce 任務的計數值然後當 MapReduce 操作結束時傳回給使用者代碼。目前的計數值同樣會展示在狀态頁面上以至于使用者可以看到正在執行計算的進度。當聚合計數值時,master 去掉相同 map 和 reduce 任務重複執行的影響。
一些計數值會被 MapReduce 庫自動存儲,例如輸入 key/value 對的處理個數以及輸出 key/value 生成的個數。
使用者發現使用計數器對檢查 MapReduce 操作的行為很有用。例如,在一些 MapReduce 操作中,使用者代碼希望保證生成的輸出對(pair)的數量和已經處理的輸入對(pair)數量絕對一緻,或者處理的德文文檔數量在處理過的總文檔數量中比例在合理範圍内。
5 性能
本章我們測量一下兩個運作在大規模叢集上的 MapReduce 計算的性能。一個計算是從大約 1 TB 資料裡搜尋一個指定的 pattern (grep)。另一個是給 1 TB 資料排序。
這兩個程式是 MapReduce 使用者寫的衆多真實程式裡的代表- 一類程式是将資料從一種形式轉換為另外一種形式,另外一類是從一個大資料集中抽取一小部分感興趣的資料。
5.1 叢集配置
程式的全部運作在一個由大約 1800 個機器組成的叢集上。每台機器擁有 2 GHZ Intel Xeon 處理器(開啟超線程),4 GB 記憶體,兩塊 160 GB IDE 磁盤,千兆以太網絡鍊路。這些機器部署在一個兩層樹形交換網絡中,在根節點上大概可以聚合 100-200 Gbps 的帶寬。所有的機器都是對等部署的,是以機器兩兩之間的互動時間小于 1 ms。
除了 4G 記憶體之外,大約有 1-1.5 GB 預留來運作機器上的其他任務。測試程式運作在一個周末的下午,此時 CPU,磁盤以及網絡是最空閑的時候。
5.2 Grep
grep 程式通過掃描 1010 個 100-位元組大小的記錄,搜尋着一個相對罕見三個字元的 pattern (這個 pattern 大概存在于92,337個記錄裡)。輸入資料被分成大約 64 MB 大小的片(M=15000),整個輸出存放在一個輸出檔案裡(R = 1)。
圖2 展示了這個計算時間上的進度。Y 軸掃描輸入資料的速度,随着越來越多的機器被配置設定到這個 MapReduce 計算這個速度逐漸上漲,當 1764 台 worker 參與進來的時候速度到達頂峰:超過 30 GB/s。當 map 任務結束時,速度開始下降然後大約 80 秒的時候降到了 0。整個計算從開始到結束大約花了 150 秒,這包含了 1 分鐘的啟動開銷,這個開銷包含程式複制到 woker 機器的開銷以及與 GFS 的互動為了打開這 1000 個輸入檔案以及為本地優化擷取一些需要的資訊。
5.3 排序
排序程式給 1010 個 100-位元組大小的記錄(大約 1 TB)排序,這個程式是模仿 TeraSort 基準測試【10】的。
排序程式有小于 50 行的代碼組成,一個三行的 Map 函數從一個文本行中抽取出一個 key,然後把這個 key 和 原生文本行作為中間的 key/value 對。我們使用内置的恒等函數作為 Reduce 操作符,這個函數将中間的 key/value 對不加修改地作為輸出 key/value 對。最終的排序好的輸出 2 副本的 GFS 檔案裡。
像之前一樣輸入資料被分成 64 MB 的片(M =15000),我們把排序的輸出檔案分成 4000 份,分片函數使用 key 的初始位元組将其分成 R 片之一。
這個基準測試的分區函數内置了 key 的分布情況,在一般的排序程式中,我們會添加一個預處理 MapReduce 操作,這個操作主要采樣 key 的分布情況以便計算最終排序資料的分割點。
圖3(a)排序程式的正常執行進度,左邊最上邊的圖展示了讀輸入資料的速率,速率頂峰達到了大約 13 GB/s 在 200 秒所有 map 任務結束時瞬間歸 0。提醒一下輸入速率小于 grep 程式,這是因為排序的 map 任務要花一半的時間和帶寬把中間的輸出寫到他們的本地磁盤上。grep 的中間輸出可以忽略不計。左邊中間的圖展示了資料從 map 任務通過網絡發給 reduce 任務的速率,這個傳輸開始于第一個 map 任務結束。圖裡第一個坡峰對應着大約 1700 個 reduce 任務的第一批(整個 MapReduce 任務大約給安排了 1700 個機器,每個機器同時最多隻能運作一個 reduce 任務)。計算到大概 300 秒的時候,reduce 任務的第一批中的一些結束了,我們開始為剩餘的 reduce 任務傳輸。傳輸在計算進行到 600 秒的時候全部結束。左邊底部的圖展示了 reduce 任務将排序資料寫入到輸出檔案裡的速率。在第一次傳輸資料的結尾和寫資料之間有個空隙,因為機器在忙着排序中間資料,保持了 2~4 GB/s 的速度一會兒。 寫在計算進行到800秒的時候全部完成了。包含啟動消耗,整個計算耗時 891 秒,這比較接近于 TeraSort 基準測試的報告結果:1057 秒。
還有一些事需要說明下:輸入速度大于傳輸速度和輸出速度得益于我們的本地優化-大部分資料從本地磁盤讀取繞過我們相對受限的帶寬。傳輸速度大于輸出速度是因為輸出階段寫兩份排序資料的備份。我們寫兩份是因為這是底層檔案系統提供的機制為了可用性和可靠性。如果底層系統使用擦除編碼【14】而不是備份則寫資料的網絡帶寬需求就會減少。
5.4 備份任務的影響
在 圖 3(b),我們展示了一個備份任務關閉的排序程式的執行。執行流程和圖 3 (a)展示的很類似,除了有很長的尾巴(很難有任何寫活動發生)。960 秒後,除了 5 個 reduce 任務以外所有任務都完成了。可是這些少量的落後者直到 300 秒之後才完成。整個計算耗時 1283 秒,完成時間漲了 44%。
5.5 機器失敗
在 圖 3 (c),我們展示了排序的執行流程,在這個流程中,我們故意在執行的幾分鐘内殺死了 1746 個工作程序中的 200 個。底層叢集排程器在這些機器上立馬重新啟動新的工作程序。
worker 挂掉的位置展示了一個負的讀入速率這是因為一些之前的完成的 map 工作不見了需要重做,這些 map 任務的重新執行相對來說是快的。整個計算在 933 秒完成包含啟動的消耗(隻比正常執行耗時漲了 5%)。
6 實踐
我們在 2003 年 1 月寫了 MapReduce 庫的第一個版本,到 2003 年 8 月我們做了很多增強,包括本地優化,跨機器任務執行的動态負載均衡,等等。從那個時候起,我們驚訝地發現 MapReduce 庫可以廣泛地應用于我們平常工作中遇到的問題。在 Google 它已經應用到很多地方了:
- 大規模機器學習問題,
- Google 新聞 和 Froogle 産品的叢集問題
- 為了受歡迎查詢報表生成而去抽取資料
- 為了新的實作和産品抽取 web 頁面的屬性
- 大規模圖計算 圖 4 展示了随着時間地推移,進入我們主要源碼管理系統地 MapReduce 程式的顯著增長。從 2003 年早些時候的 0 到 2004 年9月下旬的 900 個執行個體。MapReduce 之是以取得這樣的成功是因為它實作了在半小時内寫一個簡單的程式并友善地運作在 1000 台機器上,極大地提升了開發效率和原型開發周期。更有甚者,它允許一個沒有分布式和/或并行化系統經驗的程式員很簡單的使用大規模資源。
【譯】MapReduce: 在大規模機器上的簡易資料處理概要1. 介紹3 實作4 改進(細化)Refinements5 性能6 實踐7 相關工作8 結論引用 【譯】MapReduce: 在大規模機器上的簡易資料處理概要1. 介紹3 實作4 改進(細化)Refinements5 性能6 實踐7 相關工作8 結論引用
在每個 job 結束, MapReduce 庫記錄了關于 job 使用資源的統計情況。在表 1,我們展示了 Google 在 2004年 8 月 運作的一組 MapReduce 任務的一些統計。
6.1 大規模索引
在衆多 MapReduce 庫使用中值得紀念的是為 Google 網頁搜尋服務提供資料結構的生産索引系統的重寫。索引系統的輸入是我們爬蟲系統爬取的巨大的文檔集,存儲成一組 GFS 檔案。這些文檔的原生内容大小超過了 20 TB。索引程序分成了大概 5-10 個 MapReduce 操作。 使用 MapReduce 有以下幾個好處:
- 索引系統代碼更簡單、更小以及更易了解,因為處理容錯、分布式以及并行化的代碼封裝到了 MapReduce 庫。例如,這個索引系統的一個階段 3800 行的 C++ 代碼,使用 MapReduce 重寫之後降到了 700 行。
- 我們将概念上不相關的計算分開 MapReduce 庫的性能也足夠好,而不是為了避免資料的傳遞将他們偶合在一起。這使我們很容易修改索引系統,例如,舊的索引系統一個修改需要花費幾個月但是新的系統隻需要花費幾天。
- 索引程序也很好去運維了,因為大多數的問題都是由機器當機,機器運作慢以及網絡延遲引起的,這些問題已經被 MapRedcue 庫自動解決了而不需要運維幹預。還有一點,可以很簡單地通過添加機器來提升性能。
7 相關工作
很多系統已經提供了嚴格的程式設計模型以及使用這些規則自動地并行化計算。例如,使用并行字首計算【6,9,13】可以在 log N 時間内在 N 個處理器上完成大小為 N 數組上所有字首相關運算 。MapReduce 可以被當成這些模型中并基于我們現實世界中大規模運算經驗的一種簡化和精華。更重要的是,我們提供了一種容錯的且可以擴充到幾千個處理器規模的實作。相對來說,大多數并行處理系統隻是基于小規模運算來實作的而且把處理機器失敗的細節留給了使用者。
批量同步程式設計【17】以及一些 MPI 原型【11】提供了高層次的抽象-使得程式員很容易編寫并行程式。這些系統和 MapReduce 的一個重要不同之處在于 MapReduce 利用一個嚴格的程式設計模型使使用者程式自動地并行化以及提供了透明的容錯機制。我們的本地優化從活動磁盤【12,15】獲得靈感,計算可以推到距離處理元素進的磁盤,這樣節省了資料通過網絡 I/O 傳輸的花費。我們運作在連接配接着少數幾個磁盤的商業處理器上而不是直接運作在磁盤控制處理器上,不過大緻的方法類似。
我們的備份任務機制】和 Charlotte 系統【3】中的 eager 排程機制類似。eager 排程機制的一個缺點是如果一個給定的任務重複失敗則整個計算不能完成,我們用跳過壞的記錄機制修複了這個問題的一些場景。
MapReduce 實作依賴于我們内部叢集管理系統,它的主要職責是讓使用者任務分布式地運作在大規模共享機器上。即使不是這個論文的重點,這個叢集管理系統在設計理念上類似于别的系統例如 Condor 【16】。
MapReduce 中的一個功能:排序類似于 NOW-sort【1】。源機器(map 機器)把資料分割成每份都排序好的然後發給 R 個 reduce workers 其中之一。每個 reduce woker 将它資料排好序。當然 NOW-sort 沒有定義使我們的庫變得如此廣泛使用的使用者自定義 map 和 reduce 函數。
River 【2】提供了一種程式設計模型:程序可以通過分布式隊列互動。像 MapReduce , River 系統試圖提供良好的平均性能,即使在異構硬體或系統擾動的情況下,為了達到這個目的,River 精心地排程網絡和磁盤資源來平衡任務完成時間。MapReduce 有另一個方法,通過嚴格的程式設計模型, MapReduce 架構可以将問題拆分成以及多個細粒度的任務,這些任務可以被排程到空閑的 worker 上,是以速度快的 worker 程序可以處理更多的任務。嚴格的程式設計模型也允許我們在接近 job 結束排程起未完成子 task 的備份以減少整個 job 的完成時間。
8 結論
引用
[1] Andrea C. Arpaci-Dusseau, Remzi H. Arpaci-Dusseau, David E. Culler, Joseph M. Hellerstein, and David A. Patterson. High-performance sorting on networks of workstations. In Proceedings of the 1997 ACM SIGMOD International Conference on Management of Data, Tucson, Arizona, May 1997.
[2] Remzi H. Arpaci-Dusseau, Eric Anderson, Noah Treuhaft, David E. Culler, Joseph M. Hellerstein, David Patterson, and Kathy Yelick. Cluster I/O with River: Making the fast case common. In Proceedings of the Sixth Workshop on Input/Output in Parallel and Distributed Systems (IOPADS ’99), pages 10–22, Atlanta, Georgia, May 1999.
[3] Arash Baratloo, Mehmet Karaul, Zvi Kedem, and Peter Wyckoff. Charlotte: Metacomputing on the web. In Proceedings of the 9th International Conference on Parallel and Distributed Computing Systems, 1996.
[4] Luiz A. Barroso, Jeffrey Dean, and Urs Holzle. ¨ Web search for a planet: The Google cluster architecture. IEEE Micro, 23(2):22–28, April 2003.
[5] John Bent, Douglas Thain, Andrea C.Arpaci-Dusseau, Remzi H. Arpaci-Dusseau, and Miron Livny. Explicit control in a batch-aware distributed file system. In Proceedings of the 1st USENIX Symposium on Networked Systems Design and Implementation NSDI, March 2004.
[6] Guy E. Blelloch. Scans as primitive parallel operations. IEEE Transactions on Computers, C-38(11), November 1989.
[7] Armando Fox, Steven D. Gribble, Yatin Chawathe, Eric A. Brewer, and Paul Gauthier. Cluster-based scalable network services. In Proceedings of the 16th ACM Symposium on Operating System Principles, pages 78–91, Saint-Malo, France, 1997.
[8] Sanjay Ghemawat, Howard Gobioff, and Shun-Tak Leung. The Google file system. In 19th Symposium on Operating Systems Principles, pages 29–43, Lake George, New York, 2003.
[9] S. Gorlatch. Systematic efficient parallelization of scan and other list homomorphisms. In L. Bouge, P. Fraigniaud, A. Mignotte, and Y. Robert, editors, Euro-Par’96.Parallel Processing, Lecture Notes in Computer Science 1124, pages 401–408. Springer-Verlag, 1996.
[10] Jim Gray. Sort benchmark home page.
http://research.microsoft.com/barc/SortBenchmark/.
[11] William Gropp, Ewing Lusk, and Anthony Skjellum.
Using MPI: Portable Parallel Programming with the
Message-Passing Interface. MIT Press, Cambridge, MA,1999.
[12] L. Huston, R. Sukthankar, R. Wickremesinghe, M. Satyanarayanan, G. R. Ganger, E. Riedel, and A. Ailamaki. Diamond: A storage architecture for early discard in interactive search. In Proceedings of the 2004 USENIX File and Storage Technologies FAST Conference, April 2004.
[13] Richard E. Ladner and Michael J. Fischer. Parallel prefix computation. Journal of the ACM, 27(4):831–838, 1980.
[14] Michael O. Rabin. Efficient dispersal of information for security, load balancing and fault tolerance. Journal of the ACM, 36(2):335–348, 1989.
[15] Erik Riedel, Christos Faloutsos, Garth A. Gibson, and David Nagle. Active disks for large-scale data processing. IEEE Computer, pages 68–74, June 2001.
[16] Douglas Thain, Todd Tannenbaum, and Miron Livny. Distributed computing in practice: The Condor experience. Concurrency and Computation: Practice and Experience, 2004.
[17] L. G. Valiant. A bridging model for parallel computation. Communications of the ACM, 33(8):103–111, 1997.
[18] Jim Wyllie. Spsort: How to sort a terabyte quickly. http://alme1.almaden.ibm.com/cs/spsort.pdf.