MapReduce:超大機群上的簡單資料處理 摘要 MapReduce是一個程式設計模型,和處理,産生大資料集的相關實作.使用者指定一個map函數處理一個key/value對,進而産生中間的key/value對集.然後再指定一個reduce函數合并所有的具有相同中間key的中間value.下面将列舉許多可以用這個模型來表示的現實世界的工作. 以這種方式寫的程式能自動的在大規模的普通機器上實作并行化.這個運作時系統關心這些細節:分割輸入資料,在機群上的排程,機器的錯誤處理,管理機器之間必要的通信.這樣就可以讓那些沒有并行分布式處理系統經驗的程式員利用大量分布式系統的資源. 我們的MapReduce實作運作在規模可以靈活調整的由普通機器組成的機群上,一個典型的MapReduce計算處理幾千台機器上的以TB計算的資料.程式員發現這個系統非常好用:已經實作了數以百計的MapReduce程式,每天在Google的機群上都有1000多個MapReduce程式在執行. 1.介紹 在過去的5年裡,作者和Google的許多人已經實作了數以百計的為專門目的而寫的計算來處理大量的原始資料,比如,爬行的文檔,Web請求日志,等等.為了計算各種類型的派生資料,比如,反向索引,Web文檔的圖結構的各種表示,每個主機上爬行的頁面數量的概要,每天被請求數量最多的集合,等等.很多這樣的計算在概念上很容易了解.然而,輸入的資料量很大,并且隻有計算被分布在成百上千的機器上才能在可以接受的時間内完成.怎樣并行計算,分發資料,處理錯誤,所有這些問題綜合在一起,使得原本很簡介的計算,因為要大量的複雜代碼來處理這些問題,而變得讓人難以處理. 作為對這個複雜性的回應,我們設計一個新的抽象模型,它讓我們表示我們将要執行的簡單計算,而隐藏并行化,容錯,資料分布,負載均衡的那些雜亂的細節,在一個庫裡.我們的抽象模型的靈感來自Lisp和許多其他函數語言的map和reduce的原始表示.我們認識到我們的許多計算都包含這樣的操作:在我們輸入資料的邏輯記錄上應用map操作,來計算出一個中間key/value對集,在所有具有相同key的value上應用reduce操作,來适當的合并派生的資料.功能模型的使用,再結合使用者指定的map和reduce操作,讓我們可以非常容易的實作大規模并行化計算,和使用再次執行作為初級機制來實作容錯. 這個工作的主要貢獻是通過簡單有力的接口來實作自動的并行化和大規模分布式計算,結合這個接口的實作來在大量普通的PC機上實作高性能計算. 第二部分描述基本的程式設計模型,并且給一些例子.第三部分描述符合我們的基于叢集的計算環境的MapReduce的接口的實作.第四部分描述我們覺得程式設計模型中一些有用的技巧.第五部分對于各種不同的任務,測量我們實作的性能.第六部分探究在Google内部使用MapReduce作為基礎來重寫我們的索引系統産品.第七部分讨論相關的,和未來的工作. 2.程式設計模型 計算利用一個輸入key/value對集,來産生一個輸出key/value對集.MapReduce庫的使用者用兩個函數表達這個計算:map和reduce. 使用者自定義的map函數,接受一個輸入對,然後産生一個中間key/value對集.MapReduce庫把所有具有相同中間key I的中間value聚合在一起,然後把它們傳遞給reduce函數. 使用者自定義的reduce函數,接受一個中間key I和相關的一個value集.它合并這些value,形成一個比較小的value集.一般的,每次reduce調用隻産生0或1個輸出value.通過一個疊代器把中間value提供給使用者自定義的reduce函數.這樣可以使我們根據記憶體來控制value清單的大小. 2.1 執行個體 考慮這個問題:計算在一個大的文檔集合中每個詞出現的次數.使用者将寫和下面類似的僞代碼: map(String key,String value): //key:文檔的名字 //value:文檔的内容 for each word w in value: EmitIntermediate(w,"1"); reduce(String key,Iterator values): //key:一個詞 //values:一個計數清單 int result=0; for each v in values: result+=ParseInt(v); Emit(AsString(resut)); map函數産生每個詞和這個詞的出現次數(在這個簡單的例子裡就是1).reduce函數把産生的每一個特定的詞的計數加在一起. 另外,使用者用輸入輸出檔案的名字和可選的調節參數來填充一個mapreduce規範對象.使用者然後調用MapReduce函數,并把規範對象傳遞給它.使用者的代碼和MapReduce庫連結在一起(用C++實作).附錄A包含這個執行個體的全部文本. 2.2類型 即使前面的僞代碼寫成了字元串輸入和輸出的term格式,但是概念上使用者寫的map和reduce函數有關聯的類型: map(k1,v1) ->list(k2,v2) reduce(k2,list(v2)) ->list(v2) 例如,輸入的key,value和輸出的key,value的域不同.此外,中間key,value和輸出key,values的域相同. 我們的C++實作傳遞字元串來和使用者自定義的函數互動,并把它留給使用者的代碼,來在字元串和适當的類型間進行轉換. 2.3更多執行個體 這裡有一些讓人感興趣的簡單程式,可以容易的用MapReduce計算來表示. 分布式的Grep(UNIX工具程式, 可做檔案内的字元串查找) :如果輸入行比對給定的樣式,map函數就輸出這一行.reduce函數就是把中間資料複制到輸出. 計算URL通路頻率:map函數處理web頁面請求的記錄,輸出(URL,1).reduce函數把相同URL的value都加起來,産生一個(URL,記錄總數)的對. 倒轉網絡連結圖:map函數為每個連結輸出(目标,源)對,一個URL叫做目标,包含這個URL的頁面叫做源.reduce函數根據給定的相關目标URLs連接配接所有的源URLs形成一個清單,産生(目标,源清單)對. 每個主機的術語向量:一個術語向量用一個(詞,頻率)清單來概述出現在一個文檔或一個文檔集中的最重要的一些詞.map函數為每一個輸入文檔産生一個(主機名,術語向量)對(主機名來自文檔的URL).reduce函數接收給定主機的所有文檔的術語向量.它把這些術語向量加在一起,丢棄低頻的術語,然後産生一個最終的(主機名,術語向量)對. 反向索引:map函數分析每個文檔,然後産生一個(詞,文檔号)對的序列.reduce函數接受一個給定詞的所有對,排序相應的文檔IDs,并且産生一個(詞,文檔ID清單)對.所有的輸出對集形成一個簡單的反向索引.它可以簡單的增加跟蹤詞位置的計算. 分布式排序:map函數從每個記錄提取key,并且産生一個(key,record)對.reduce函數不改變任何的對.這個計算依賴分割工具(在4.1描述)和排序屬性(在4.2描述). 3實作 MapReduce接口可能有許多不同的實作.根據環境進行正确的選擇.例如,一個實作對一個共享記憶體較小的機器是合适的,另外的适合一個大NUMA的多處理器的機器,而有的适合一個更大的網絡機器的集合. 這部分描述一個在Google廣泛使用的計算環境的實作:用交換機連接配接的普通PC機的大機群.我們的環境是: 1.Linux作業系統,雙處理器,2-4GB記憶體的機器. 2.普通的網絡硬體,每個機器的帶寬或者是百兆或者千兆,但是平均小于全部帶寬的一半. 3.因為一個機群包含成百上千的機器,所有機器會經常出現問題. 4.存儲用直接連到每個機器上的廉價IDE硬碟.一個從内部檔案系統發展起來的分布式檔案系統被用來管理存儲在這些磁盤上的資料.檔案系統用複制的方式在不可靠的硬體上來保證可靠性和有效性. 5.使用者送出工作給排程系統.每個工作包含一個任務集,每個工作被排程者映射到機群中一個可用的機器集上. 3.1執行預覽 通過自動分割輸入資料成一個有M個split的集,map調用被分布到多台機器上.輸入的split能夠在不同的機器上被并行處理.通過用分割函數分割中間key,來形成R個片(例如,hash(key) mod R),reduce調用被分布到多台機器上.分割數量(R)和分割函數由使用者來指定. 圖1顯示了我們實作的MapReduce操作的全部流程.當使用者的程式調用MapReduce的函數的時候,将發生下面的一系列動作(下面的數字和圖1中的數字标簽相對應): 1.在使用者程式裡的MapReduce庫首先分割輸入檔案成M個片,每個片的大小一般從 16到64MB(使用者可以通過可選的參數來控制).然後在機群中開始大量的拷貝程式. 2.這些程式拷貝中的一個是master,其他的都是由master配置設定任務的worker.有M 個map任務和R個reduce任務将被配置設定.管理者配置設定一個map任務或reduce任務給一個空閑的worker. 3.一個被配置設定了map任務的worker讀取相關輸入split的内容.它從輸入資料中分析出key/value對,然後把key/value對傳遞給使用者自定義的map函數.由map函數産生的中間key/value對被緩存在記憶體中. 4.緩存在記憶體中的key/value對被周期性的寫入到本地磁盤上,通過分割函數把它們寫入R個區域.在本地磁盤上的緩存對的位置被傳送給master,master負責把這些位置傳送給reduce worker. 5.當一個reduce worker得到master的位置通知的時候,它使用遠端過程調用來從map worker的磁盤上讀取緩存的資料.當reduce worker讀取了所有的中間資料後,它通過排序使具有相同key的内容聚合在一起.因為許多不同的key映射到相同的reduce任務,是以排序是必須的.如果中間資料比記憶體還大,那麼還需要一個外部排序. 6.reduce worker疊代排過序的中間資料,對于遇到的每一個唯一的中間key,它把key和相關的中間value集傳遞給使用者自定義的reduce函數.reduce函數的輸出被添加到這個reduce分割的最終的輸出檔案中. 7.當所有的map和reduce任務都完成了,管理者喚醒使用者程式.在這個時候,在使用者程式裡的MapReduce調用傳回到使用者代碼. 在成功完成之後,mapreduce執行的輸出存放在R個輸出檔案中(每一個reduce任務産生一個由使用者指定名字的檔案).一般,使用者不需要合并這R個輸出檔案成一個檔案--他們經常把這些檔案當作一個輸入傳遞給其他的MapReduce調用,或者在可以處理多個分割檔案的分布式應用中使用他們. 3.2master資料結構 master保持一些資料結構.它為每一個map和reduce任務存儲它們的狀态(空閑,工作中,完成),和worker機器(非空閑任務的機器)的辨別. master就像一個管道,通過它,中間檔案區域的位置從map任務傳遞到reduce任務.是以,對于每個完成的map任務,master存儲由map任務産生的R個中間檔案區域的大小和位置.當map任務完成的時候,位置和大小的更新資訊被接受.這些資訊被逐漸增加的傳遞給那些正在工作的reduce任務. 3.3容錯 因為MapReduce庫被設計用來使用成百上千的機器來幫助處理非常大規模的資料,是以這個庫必須要能很好的處理機器故障. worker故障 master周期性的ping每個worker.如果master在一個确定的時間段内沒有收到worker傳回的資訊,那麼它将把這個worker标記成失效.因為每一個由這個失效的worker完成的map任務被重新設定成它初始的空閑狀态,是以它可以被安排給其他的worker.同樣的,每一個在失敗的worker上正在運作的map或reduce任務,也被重新設定成空閑狀态,并且将被重新排程. 在一個失敗機器上已經完成的map任務将被再次執行,因為它的輸出存儲在它的磁盤上,是以不可通路.已經完成的reduce任務将不會再次執行,因為它的輸出存儲在全局檔案系統中. 當一個map任務首先被worker A執行之後,又被B執行了(因為A失效了),重新執行這個情況被通知給所有執行reduce任務的worker.任何還沒有從A讀資料的reduce任務将從worker B讀取資料. MapReduce可以處理大規模worker失敗的情況.例如,在一個MapReduce操作期間,在正在運作的機群上進行網絡維護引起80台機器在幾分鐘内不可通路了,MapReduce master隻是簡單的再次執行已經被不可通路的worker完成的工作,繼續執行,最終完成這個MapReduce操作. master失敗 可以很容易的讓管理者周期的寫入上面描述的資料結構的checkpoints.如果這個master任務失效了,可以從上次最後一個checkpoint開始啟動另一個master程序.然而,因為隻有一個master,是以它的失敗是比較麻煩的,是以我們現在的實作是,如果master失敗,就中止MapReduce計算.客戶可以檢查這個狀态,并且可以根據需要重新執行MapReduce操作. 在錯誤面前的處理機制 當使用者提供的map和reduce操作對它的輸出值是确定的函數時,我們的分布式實作産生,和全部程式沒有錯誤的順序執行一樣,相同的輸出. 我們依賴對map和reduce任務的輸出進行原子送出來完成這個性質.每個工作中的任務把它的輸出寫到私有臨時檔案中.一個reduce任務産生一個這樣的檔案,而一個map任務産生R個這樣的檔案(一個reduce任務對應一個檔案).當一個map任務完成的時候,worker發送一個消息給master,在這個消息中包含這R個臨時檔案的名字.如果master從一個已經完成的map任務再次收到一個完成的消息,它将忽略這個消息.否則,它在master的資料結構裡記錄這R個檔案的名字. 當一個reduce任務完成的時候,這個reduce worker原子的把臨時檔案重命名成最終的輸出檔案.如果相同的reduce任務在多個機器上執行,多個重命名調用将被執行,并産生相同的輸出檔案.我們依賴由底層檔案系統提供的原子重命名操作來保證,最終的檔案系統狀态僅僅包含一個reduce任務産生的資料. 我們的map和reduce操作大部分都是确定的,并且我們的處理機制等價于一個順序的執行的這個事實,使得程式員可以很容易的了解程式的行為.當map或/和reduce操作是不确定的時候,我們提供雖然比較弱但是合理的處理機制.當在一個非确定操作的前面,一個reduce任務R1的輸出等價于一個非确定順序程式執行産生的輸出.然而,一個不同的reduce任務R2的輸出也許符合一個不同的非确定順序程式執行産生的輸出. 考慮map任務M和reduce任務R1,R2的情況.我們設定e(Ri)為已經送出的Ri的執行(有且僅有一個這樣的執行).這個比較弱的語義出現,因為e(R1)也許已經讀取了由M的執行産生的輸出,而e(R2)也許已經讀取了由M的不同執行産生的輸出. 3.4存儲位置 在我們的計算機環境裡,網絡帶寬是一個相當缺乏的資源.我們利用把輸入資料(由GFS管理)存儲在機器的本地磁盤上來儲存網絡帶寬.GFS把每個檔案分成64MB的一些塊,然後每個塊的幾個拷貝存儲在不同的機器上(一般是3個拷貝).MapReduce的master考慮輸入檔案的位置資訊,并且努力在一個包含相關輸入資料的機器上安排一個map任務.如果這樣做失敗了,它嘗試在那個任務的輸入資料的附近安排一個map任務(例如,配置設定到一個和包含輸入資料塊在一個switch裡的worker機器上執行).當運作巨大的MapReduce操作在一個機群中的一部分機器上的時候,大部分輸入資料在本地被讀取,進而不消耗網絡帶寬. 3.5任務粒度 象上面描述的那樣,我們細分map階段成M個片,reduce階段成R個片.M和R應當比worker機器的數量大許多.每個worker執行許多不同的工作來提高動态負載均衡,也可以加速從一個worker失效中的恢複,這個機器上的許多已經完成的map任務可以被配置設定到所有其他的worker機器上. 在我們的實作裡,M和R的範圍是有大小限制的,因為master必須做O(M+R)次排程,并且儲存O(M*R)個狀态在記憶體中.(這個因素使用的記憶體是很少的,在O(M*R)個狀态片裡,大約每個map任務/reduce任務對使用一個位元組的資料). 此外,R經常被使用者限制,因為每一個reduce任務最終都是一個獨立的輸出檔案.實際上,我們傾向于選擇M,以便每一個單獨的任務大概都是16到64MB的輸入資料(以便上面描述的位置優化是最有效的),我們把R設定成我們希望使用的worker機器數量的小倍數.我們經常執行MapReduce計算,在M=200000,R=5000,使用2000台工作者機器的情況下. 3.6備用任務 一個落後者是延長MapReduce操作時間的原因之一:一個機器花費一個異乎尋常地的長時間來完成最後的一些map或reduce任務中的一個.有很多原因可能産生落後者.例如,一個有壞磁盤的機器經常發生可以糾正的錯誤,這樣就使讀性能從30MB/s降低到3MB/s.機群排程系統也許已經安排其他的任務在這個機器上,由于計算要使用CPU,記憶體,本地磁盤,網絡帶寬的原因,引起它執行MapReduce代碼很慢.我們最近遇到的一個問題是,一個在機器初始化時的Bug引起處理器緩存的失效:在一個被影響的機器上的計算性能有上百倍的影響. 我們有一個一般的機制來減輕這個落後者的問題.當一個MapReduce操作将要完成的時候,master排程備用程序來執行那些剩下的還在執行的任務.無論是原來的還是備用的執行完成了,工作都被标記成完成.我們已經調整了這個機制,通常隻會占用多幾個百分點的機器資源.我們發現這可以顯著的減少完成大規模MapReduce操作的時間.作為一個例子,将要在5.3描述的排序程式,在關閉掉備用任務的情況下,要比有備用任務的情況下多花44%的時間. 4技巧 盡管簡單的map和reduce函數的功能對于大多數需求是足夠的了,但是我們開發了一些有用的擴充.這些将在這個部分描述. 4.1分割函數 MapReduce使用者指定reduce任務和reduce任務需要的輸出檔案的數量.在中間key上使用分割函數,使資料分割後通過這些任務.一個預設的分割函數使用hash方法(例如,hash(key) mod R).這個導緻非常平衡的分割.然後,有的時候,使用其他的key分割函數來分割資料有非常有用的.例如,有時候,輸出的key是URLs,并且我們希望每個主機的所有條目保持在同一個輸出檔案中.為了支援像這樣的情況,MapReduce庫的使用者可以提供專門的分割函數.例如,使用"hash(Hostname(urlkey)) mod R"作為分割函數,使所有來自同一個主機的URLs儲存在同一個輸出檔案中. 4.2順序保證 我們保證在一個給定的分割裡面,中間key/value對以key遞增的順序處理.這個順序保證可以使每個分割産出一個有序的輸出檔案,當輸出檔案的格式需要支援有效率的随機通路key的時候,或者對輸出資料集再作排序的時候,就很容易. 4.3combiner函數 在某些情況下,允許中間結果key重複會占據相當的比重,并且使用者定義的reduce函數 滿足結合律和交換律.一個很好的例子就是在2.1部分的詞統計程式.因為詞頻率傾向于一個zipf分布(齊夫分布),每個map任務将産生成百上千個這樣的記錄<the,1>.所有的這些計數将通過網絡被傳輸到一個單獨的reduce任務,然後由reduce函數加在一起産生一個數字.我們允許使用者指定一個可選的combiner函數,先在本地進行合并一下,然後再通過網絡發送. 在每一個執行map任務的機器上combiner函數被執行.一般的,相同的代碼被用在combiner和reduce函數.在combiner和reduce函數之間唯一的差別是MapReduce庫怎樣控制函數的輸出.reduce函數的輸出被儲存最終輸出檔案裡.combiner函數的輸出被寫到中間檔案裡,然後被發送給reduce任務. 部分使用combiner可以顯著的提高一些MapReduce操作的速度.附錄A包含一個使用combiner函數的例子. 4.4輸入輸出類型 MapReduce庫支援以幾種不同的格式讀取輸入資料.例如,文本模式輸入把每一行看作是一個key/value對.key是檔案的偏移量,value是那一行的内容.其他普通的支援格式以key的順序存儲key/value對序列.每一個輸入類型的實作知道怎樣把輸入分割成對每個單獨的map任務來說是有意義的(例如,文本模式的範圍分割確定僅僅在每行的邊界進行範圍分割).雖然許多使用者僅僅使用很少的預定意輸入類型的一個,但是使用者可以通過提供一個簡單的reader接口來支援一個新的輸入類型. 一個reader不必要從檔案裡讀資料.例如,我們可以很容易的定義它從資料庫裡讀記錄,或從記憶體中的資料結構讀取. 4.5副作用 有的時候,MapReduce的使用者發現在map操作或/和reduce操作時産生輔助檔案作為一個附加的輸出是很友善的.我們依靠應用程式寫來使這個副作用成為原子的.一般的,應用程式寫一個臨時檔案,然後一旦這個檔案全部産生完,就自動的被重命名. 對于單個任務産生的多個輸出檔案來說,我們沒有提供其上的兩階段送出的原子操作支援.是以,一個産生需要交叉檔案連接配接的多個輸出檔案的任務,應該使确定性的任務.不過這個限制在實際的工作中并不是一個問題. 4.6跳過錯誤記錄 有的時候因為使用者的代碼裡有bug,導緻在某一個記錄上map或reduce函數突然crash掉.這樣的bug使得MapReduce操作不能完成.雖然一般是修複這個bug,但是有時候這是不現實的;也許這個bug是在源代碼不可得到的第三方庫裡.有的時候也可以忽略一些記錄,例如,當在一個大的資料集上進行統計分析.我們提供一個可選的執行模式,在這個模式下,MapReduce庫檢測那些記錄引起的crash,然後跳過那些記錄,來繼續執行程式. 每個worker程式安裝一個信号處理器來擷取記憶體段異常和總線錯誤.在調用一個使用者自定義的map或reduce操作之前,MapReduce庫把記錄的序列号存儲在一個全局變量裡.如果使用者代碼産生一個信号,那個信号處理器就會發送一個包含序号的"last gasp"UDP包給MapReduce的master.當master不止一次看到同一個記錄的時候,它就會指出,當相關的map或reduce任務再次執行的時候,這個記錄應當被跳過. 4.7本地執行 調試在map或reduce函數中問題是很困難的,因為實際的計算發生在一個分布式的系統中,經常是有一個master動态的配置設定工作給幾千台機器.為了簡化調試和測試,我們開發了一個可替換的實作,這個實作在本地執行所有的MapReduce操作.使用者可以控制執行,這樣計算可以限制到特定的map任務上.使用者以一個标志調用他們的程式,然後可以容易的使用他們認為好用的任何調試和測試工具(例如,gdb). 4.8狀态資訊 master運作一個HTTP伺服器,并且可以輸出一組狀況頁來供人們使用.狀态頁顯示計算進度,象多少個任務已經完成,多少個還在運作,輸入的位元組數,中間資料位元組數,輸出位元組數,處理百分比,等等.這個頁也包含到标準錯誤的連結,和由每個任務産生的标準輸出的連結.使用者可以根據這些資料預測計算需要花費的時間,和是否需要更多的資源.當計算比預期的要慢很多的時候,這些頁面也可以被用來判斷是不是這樣. 此外,最上面的狀态頁顯示已經有多少個工作者失敗了,和當它們失敗的時候,那個map和reduce任務正在運作.當試圖診斷在使用者代碼裡的bug時,這個資訊也是有用的. 4.9計數器 MapReduce庫提供一個計數器工具,來計算各種事件的發生次數.例如,使用者代碼想要計算所有處理的詞的個數,或者被索引的德文文檔的數量. 為了使用這個工具,使用者代碼建立一個命名的計數器對象,然後在map或/和reduce函數裡适當的增加計數器.例如: Counter * uppercase; uppercase=GetCounter("uppercase"); map(String name,String contents): for each word w in contents: if(IsCapitalized(w)): uppercase->Increment(); EmitIntermediate(w,"1"); 來自不同worker機器上的計數器值被周期性的傳送給master(在ping回應裡).master把來自成功的map和reduce任務的計數器值加起來,在MapReduce操作完成的時候,把它傳回給使用者代碼.目前計數器的值也被顯示在master狀态頁裡,以便人們可以檢視實際的計算進度.當計算計數器值的時候消除重複執行的影響,避免資料的累加.(在備用任務的使用,和由于出錯的重新執行,可以産生重複執行) 有些計數器值被MapReduce庫自動的維護,比如,被處理的輸入key/value對的數量,和被産生的輸出key/value對的數量. 使用者發現計數器工具對于檢查MapReduce操作的完整性很有用.例如,在一些MapReduce操作中,使用者代碼也許想要確定輸出對的數量完全等于輸入對的數量,或者處理過的德文文檔的數量是在全部被處理的文檔數量中屬于合理的範圍. 5性能 在本節,我們用在一個大型叢集上運作的兩個計算來衡量MapReduce的性能.一個計算用來在一個大概1TB的資料中查找特定的比對串.另一個計算排序大概1TB的資料. 這兩個程式代表了MapReduce的使用者實作的真實的程式的一個大子集.一類是,把資料從一種表示轉化到另一種表示.另一類是,從一個大的資料集中提取少量的關心的資料. 5.1機群配置 所有的程式在包含大概1800台機器的機群上執行.機器的配置是:2個2G的Intel Xeon超線程處理器,4GB記憶體,兩個160GB IDE磁盤,一個千兆網卡.這些機器部署在一個由兩層的,樹形交換網絡中,在根節點上大概有100到2000G的帶寬.所有這些機器都有相同的部署(對等部署),是以任意兩點之間的來回時間小于1毫秒. 在4GB的記憶體裡,大概有1-1.5GB被用來運作在機群中其他的任務.這個程式是在周末的下午開始執行的,這個時候CPU,磁盤,網絡基本上是空閑的. 5.2Grep 這個Grep程式掃描大概10^10個,每個100位元組的記錄,查找比較少的3字元的查找串(這個查找串出現在92337個記錄中).輸入資料被分割成大概64MB的片(M=15000),全部 的輸出存放在一個檔案中(R=1). 圖2顯示計算過程随時間變化的情況.Y軸表示輸入資料被掃描的速度.随着更多的機群被配置設定給這個MapReduce計算,速度在逐漸的提高,當有1764個worker的時候這個速度達到最高的30GB/s.當map任務完成的時候,速度開始下降,在計算開始後80秒,輸入的速度降到0.這個計算持續的時間大概是150秒.這包括了前面大概一分鐘的啟動時間.啟動時間用來把程式傳播到所有的機器上,等待GFS打開1000個輸入檔案,得到必要的位置優化資訊. 5.3排序 這個sort程式排序10^10個記錄,每個記錄100個位元組(大概1TB的資料).這個程式是模仿TeraSort的. 這個排序程式隻包含不到50行的使用者代碼.其中有3行map函數用來從文本行提取10位元組的排序key,并且産生一個由這個key和原始文本行組成的中間key/value對.我們使用一個内置的Identity函數作為reduce操作.這個函數直接把中間key/value對作為輸出的key/value對.最終的排序輸出寫到一個2路複制的GFS檔案中(也就是,程式的輸出會寫2TB的資料). 象以前一樣,輸入資料被分割成64MB的片(M=15000).我們把排序後的輸出寫到4000個檔案中(R=4000).分區函數使用key的原始位元組來把資料分區到R個小片中. 我們以這個基準的分割函數,知道key的分布情況.在一般的排序程式中,我們會增加一個預處理的MapReduce操作,這個操作用于采樣key的情況,并且用這個采樣的key的分布情況來計算對最終排序處理的分割點。 圖3(a)顯示這個排序程式的正常執行情況.左上圖顯示輸入資料的讀取速度.這個速度最高到達13GB/s,并且在不到200秒所有map任務完成之後迅速滑落到0.注意到這個輸入速度小于Grep.這是因為這個排序map任務花費大概一半的時間和帶寬,來把中間資料寫到本地硬碟中.而Grep相關的中間資料可以忽略不計. 左中圖顯示資料通過網絡從map任務傳輸給reduce任務的速度.當第一個map任務完成後,這個排序過程就開始了.圖示上的第一個高峰是啟動了第一批大概1700個reduce任務(整個MapReduce任務被配置設定到1700台機器上,每個機器一次隻執行一個reduce任務).大概開始計算後的300秒,第一批reduce任務中的一些完成了,我們開始執行剩下的reduce任務.全部的排序過程持續了大概600秒的時間. 左下圖顯示排序後的資料被reduce任務寫入最終檔案的速度.因為機器忙于排序中間資料,是以在第一個排序階段的結束和寫階段的開始有一個延遲.寫的速度大概是2-4GB/s.大概開始計算後的850秒寫過程結束.包括前面的啟動過程,全部的計算任務持續的891秒.這個和TeraSort benchmark的最高紀錄1057秒差不多. 需要注意的事情是:是以位置優化的原因,很多資料都是從本地磁盤讀取的而沒有通過我們有限帶寬的網絡,是以輸入速度比排序速度和輸出速度都要快.排序速度比輸出速度快的原因是輸出階段寫兩個排序後資料的拷貝(我們寫兩個副本的原因是為了可靠性和可用性).我們寫兩份的原因是因為底層檔案系統的可靠性和可用性的要求.如果底層檔案系統用類似容錯編碼(erasure coding)的方式,而不采用複制寫的方式,在寫盤階段可以降低網絡帶寬的要求。 5.4備用任務的影響 在圖3(b)中,顯示我們不用備用任務的排序程式的執行情況.除了它有一個很長的幾乎沒有寫動作發生的尾巴外,執行流程和圖3(a)相似.在960秒後,隻有5個reduce任務沒有完成.然而,就是這最後幾個落後者知道300秒後才完成.全部的計算任務執行了1283秒,多花了44%的時間. 5.5機器失效 在圖3(c)中,顯示我們有意的在排序程式計算過程中停止1746台worker中的200台機器上的程式的情況.底層機群排程者在這些機器上馬上重新開始新的worker程式(因為僅僅程式被停止,而機器仍然在正常運作). 因為已經完成的map工作丢失了(由于相關的map worker被殺掉了),需要重新再作,是以worker死掉會導緻一個負數的輸入速率.相關map任務的重新執行很快就重新執行了.整個計算過程在933秒内完成,包括了前邊的啟動時間(隻比正常執行時間多了5%的時間). 6經驗 我們在2003年的2月寫了MapReduce庫的第一個版本,并且在2003年的8月做了顯著的增強,包括位置優化,worker機器間任務執行的動态負載均衡,等等.從那個時候起,我們驚奇的發現MapReduce函數庫廣泛用于我們日常處理的問題.它現在在Google内部各個領域内廣泛應用,包括: 大規模機器學習問題 Google News和Froogle産品的機器問題. 提取資料産生一個流行查詢的報告(例如,Google Zeitgeist). 為新的試驗和産品提取網頁的屬性(例如,從一個web頁的大集合中提取位置資訊 用在位置查詢). 大規模的圖計算. 圖4顯示了我們主要的源代碼管理系統中,随着時間推移,MapReduce程式的顯著增加,從2003年早先時候的0個增長到2004年9月份的差不多900個不同的程式.MapReduce之是以這樣的成功,是因為他能夠在不到半小時時間内寫出一個簡單的能夠應用于上千台機器的大規模并發程式,并且極大的提高了開發和原形設計的周期效率.并且,他可以讓一個完全沒有分布式和/或并行系統經驗的程式員,能夠很容易的利用大量的資源. 在每一個任務結束的時候,MapReduce函數庫記錄使用的計算資源的統計資訊.在圖1裡,我們列出了2004年8月份在Google運作的一些MapReduce的工作的統計資訊. 6.1大規模索引 到目前為止,最成功的MapReduce的應用就是重寫了Google web 搜尋服務所使用到的index系統.索引系統處理爬蟲系統抓回來的超大量的文檔集,這些文檔集儲存在GFS檔案裡.這些文檔的原始内容的大小,超過了20TB.索引程式是通過一系列的,大概5到10次MapReduce操作來建立索引.通過利用MapReduce(替換掉上一個版本的特别設計的分布處理的索引程式版本)有這樣一些好處: 索引的代碼簡單,量少,容易了解,因為容錯,分布式,并行處理都隐藏在MapReduce庫中了.例如,當使用MapReduce函數庫的時候,計算的代碼行數從原來的3800行C++代碼一下減少到大概700行代碼. MapReduce的函數庫的性能已經非常好,是以我們可以把概念上不相關的計算步驟分開處理,而不是混在一起以期減少在資料上的處理.這使得改變索引過程很容易.例如,我們對老索引系統的一個小更改可能要好幾個月的時間,但是在新系統内,隻需要花幾天時間就可以了. 索引系統的操作更容易了,這是因為機器的失效,速度慢的機器,以及網絡失效都已經由MapReduce自己解決了,而不需要操作人員的互動.另外,我們可以簡單的通過對索引系統增加機器的方式提高處理性能. 7相關工作 很多系統都提供了嚴格的設計模式,并且通過對程式設計的嚴格限制來實作自動的并行計算.例如,一個結合函數可以通過N個元素的數組的字首在N個處理器上使用并行字首計算在log N的時間内計算完.MapReduce是基于我們的大型現實計算的經驗,對這些模型的一個簡化和精煉.并且,我們還提供了基于上千台處理器的容錯實作.而大部分并發處理系統都隻在小規模的尺度上實作,并且機器的容錯還是程式員來控制的. Bulk Synchronous Programming以及一些MPI primitives提供了更進階别的抽象,可以更容易寫出并行處理的程式.這些系統和MapReduce系統的不同之處在,MapReduce利用嚴格的程式設計模式自動實作使用者程式的并發處理,并且提供了透明的容錯處理. 我們本地的優化政策是受active disks等技術的啟發,在active disks中,計算任務是盡量推送到靠近本地磁盤的處理單元上,這樣就減少了通過I/O子系統或網絡的資料量.我們在少量磁盤直接連接配接到普通處理機運作,來代替直接連接配接到磁盤控制器的處理機上,但是一般的步驟是相似的. 我們的備用任務的機制和在Charlotte系統上的積極排程機制相似.這個簡單的積極排程的一個缺陷是,如果一個任務引起了一個重複性的失敗,那個整個計算将無法完成.我們通過在故障情況下跳過故障記錄的機制,在某種程度上解決了這個問題. MapReduce實作依賴一個内置的機群管理系統來在一個大規模共享機器組上分布和運作使用者任務.雖然這個不是本論文的重點,但是叢集管理系統在理念上和Condor等其他系統是一樣的. 在MapReduce庫中的排序工具在操作上和NOW-Sort相似.源機器(map worker)分割将要被排序的資料,然後把它發送到R個reduce worker中的一個上.每個reduce worker來本地排序它的資料(如果可能,就在記憶體中).當然,NOW-Sort沒有使用者自定義的map和reduce函數,使得我們的庫可以廣泛的應用. River提供一個程式設計模型,在這個模型下,處理程序可以靠在分布式的隊列上發送資料進行彼此通訊.和MapReduce一樣,River系統嘗試提供對不同應用有近似平均的性能,即使在不對等的硬體環境下或者在系統颠簸的情況下也能提供近似平均的性.River是通過精心排程硬碟和網絡的通訊,來平衡任務的完成時間.MapReduce不和它不同.利用嚴格程式設計模型,MapReduce構架來把問題分割成大量的任務.這些任務被自動的在可用的worker上排程,以便速度快的worker可以處理更多的任務.這個嚴格程式設計模型也讓我們可以在工作快要結束的時候安排備援的執行,來在非一緻處理的情況減少完成時間(比如,在有慢機或者阻塞的worker的時候). BAD-FS是一個很MapReduce完全不同的程式設計模型,它的目标是在一個廣闊的網絡上執行工作.然而,它們有兩個基本原理是相同的.(1)這兩個系統使用備援的執行來從由失效引起的資料丢失中恢複.(2)這兩個系統使用本地化排程政策,來減少通過擁擠的網絡連接配接發送的資料數量. TACC是一個被設計用來簡化高有效性網絡服務結構的系統.和MapReduce一樣,它通過再次執行來實作容錯. 8結束語 MapReduce程式設計模型已經在Google成功的用在不同的目的.我們把這個成功歸于以下幾個原因:第一,這個模型使用簡單,甚至對沒有并行和分布式經驗的程式員也是如此,因為它隐藏了并行化,容錯,位置優化和負載均衡的細節.第二,大量不同的問題可以用MapReduce計算來表達.例如,MapReduce被用來,為Google的産品web搜尋服務,排序,資料挖掘,機器學習,和其他許多系統,産生資料.第三,我們已經在一個好幾千台計算機的大型叢集上開發實作了這個MapReduce.這個實作使得對于這些機器資源的利用非常簡單,是以也适用于解決Google遇到的其他很多需要大量計算的問題. 從這個工作中我們也學習到了一些東西.首先,嚴格的程式設計模型使得并行化和分布式計算簡單,并且也易于構造這樣的容錯計算環境.第二,網絡帶寬是系統的瓶頸.是以在我們的系統中大量的優化目标是減少通過網絡發送的資料量,本地優化使用我們從本地磁盤讀取資料,并且把中間資料寫到本地磁盤,以保留網絡帶寬.第三,備援的執行可以用來減少速度慢的機器的影響,和控制機器失效和資料丢失. 感謝 Josh Levenberg校定和擴充了使用者級别的MapReduce API,并且結合他的适用經驗和其他人的改進建議,增加了很多新的功能.MapReduce從GFS中讀取和寫入資料.我們要感謝Mohit Aron,Howard Gobioff,Markus Gutschke,David Krame,Shun-Tak Leung,和Josh Redstone,他們在開發GFS中的工作.我們還感謝Percy Liang Olcan Sercinoglu 在開發用于MapReduce的叢集管理系統得工作.Mike Burrows,Wilson Hsieh,Josh Levenberg,Sharon Perl,RobPike,Debby Wallach為本論文提出了寶貴的意見.OSDI的無名審閱者,以及我們的稽核者Eric Brewer,在論文應當如何改進方面給出了有益的意見.最後,我們感謝Google的工程部的所有MapReduce的使用者,感謝他們提供了有用的回報,建議,以及錯誤報告等等. A單詞頻率統計 本節包含了一個完整的程式,用于統計在一組指令行指定的輸入檔案中,每一個不同的單詞出現頻率. #include "mapreduce/mapreduce.h" //使用者map函數 class WordCounter : public Mapper { public: virtual void Map(const MapInput& input) { const string& text = input.value(); const int n = text.size(); for (int i = 0; i < n; ) { //跳過前導空格 while ((i < n) && isspace(text[i])) i++; // 查找單詞的結束位置 int start = i; while ((i < n) && !isspace(text[i])) i++; if (start < i) Emit(text.substr(start,i-start),"1"); } } }; REGISTER_MAPPER(WordCounter); //使用者的reduce函數 class Adder : public Reducer { virtual void Reduce(ReduceInput* input) { //疊代具有相同key的所有條目,并且累加它們的value int64 value = 0; while (!input->done()) { value += StringToInt(input->value()); input->NextValue(); } //送出這個輸入key的綜合 Emit(IntToString(value)); } }; REGISTER_REDUCER(Adder); int main(int argc, char** argv) { ParseCommandLineFlags(argc, argv); MapReduceSpecification spec; // 把輸入檔案清單存入"spec" for (int i = 1; i < argc; i++) { MapReduceInput* input = spec.add_input(); input->set_format("text"); input->set_filepattern(argv[i]); input->set_mapper_class("WordCounter"); } //指定輸出檔案: // /gfs/test/freq-00000-of-00100 // /gfs/test/freq-00001-of-00100 // ... MapReduceOutput* out = spec.output(); out->set_filebase("/gfs/test/freq"); out->set_num_tasks(100); out->set_format("text"); out->set_reducer_class("Adder"); // 可選操作:在map任務中做部分累加工作,以便節省帶寬 out->set_combiner_class("Adder"); // 調整參數: 使用2000台機器,每個任務100MB記憶體 spec.set_machines(2000); spec.set_map_megabytes(100); spec.set_reduce_megabytes(100); // 運作它 MapReduceResult result; if (!MapReduce(spec, &result)) abort(); // 完成: 'result'結構包含計數,花費時間,和使用機器的資訊 return 0; }