文章目錄
- 引言
- 批處理系統
- MapReduce
- 把資料放在一起
- 排序-合并join
- 輸出
- 容錯
- 落後者 straggler
- 改進
- 總結
引言
正如DDIA上所說,MapReduce論文發表時從某種意義上來說其實并不新鮮。因為其政策很多都已經在大規模并行資料庫上得到了實作,但是作為"三駕馬車"之一的MapReduce與分布式檔案系統的結合确使其的功能性大大提升。在了解MapReduce之前,我們首先需要了解批處理系統,這有助于我們了解MapReduce。

批處理系統
我們來看看百度百科上對于批處理系統的解釋:
批處理系統,又名批處理作業系統。批處理是指使用者将一批作業送出給作業系統後就不再幹預,由作業系統控制它們自動運作。
批處理是指使用者将一批作業送出給作業系統後就不再幹預,由作業系統控制它們自動運作。這種采用批量處理作業技術的作業系統稱為批處理作業系統;批處理作業系統不具有互動性,它是為了提高CPU的使用率而提出的一種作業系統。
用于類比,我們可以想象Linux下的批處理工具,如果要分析一個巨大無比的日志,我覺得正常的一個人不會去編寫一個規規矩矩的java或者C或者Python去做一個處理。而是會使用諸如awk,sed,grep這樣的工具去完成這個任務。因為我們可以使用管道把上一個工具的輸出當做下一個工具的輸入。隻需要一串指令就可以完成所有的工作,不必在意其中的轉換。這其實蘊藏着Unix的設計哲學:
- 每個程式做好一件事情,新的工作交給新的程式,靠通信聯系,而不是使一個程式的複雜性提升。
- 期待一個程式的輸出成為另一個的輸入。即不要再輸出中混入無關資訊(日志的重要性)。
- 優先使用工具減輕程式設計任務(埋頭苦碼有時不是認真是無知,但在大學時"無知"一點沒什麼不好的)。
- 需要扔掉笨拙的部分時不要猶豫,重構有時是最好的方法(設計的重要性)。
回到那些批處理工具,如何做到之間的互動毫不費力呢?這就需要統一接口,産物就是檔案。這是一個非常美麗的抽象,也就是我們所說的"Linux下一切皆檔案",這使得顯然不同的各種事物通過一個統一的接口被連接配接在一起。與之類似的還有URL和HTTP的統一接口,它們使得使用者通過連結可以在不同營運商的伺服器之間跳轉。我們現在看來顯而易見的理念其實并沒有那麼簡單。
MapReduce
如果我們把分布式檔案系統比作一個龐大的機器,MapReduce就是這個機器内的程序。它可以接收一個或者多個輸入,産生一個或者多個輸出.在Hadoop中這個組合就是HDFS(Hadoop Distributed File System)+MapReduce。這樣看來你也不難為什麼Hadoop是一個資料密集型分布式應用程式了,它其實本身就可以看做一個可擴充的龐大機器。
回到MapReduce,這其實并不是一個實物,它不過是一種程式設計模型,隻不過Hadoop的實作也恰好叫這個而已。這個模型就是使用者定義map函數來處理key/value鍵值對來産生一系列的中間的key/value鍵值對。還要定義一個reduce函數用來合并有着相同中間key值的中間value。很多現實中的事件都可以用這種模型表達,舉些簡單的例子:
- 分布式的grep工具,由map進行grep,reduce直接輸出即可。
- 查找一個URL的通路次數,Map函數處理web請求的日志,并且輸出<URL, 1>.Reduce函數将擁有相同URL的value相加,得到<URL, total count>對。
- …
接下來我們看看一個MapReduce的作業如何運作:
接下來的過程将展示一次MapReduce的執行過程:
- 使用者程式中的MapReduce庫首先将輸入檔案劃分為M片,每片大小一般在16M到64M之間(由使用者通過一個可選的參數指定,這可以使得并行執行,互相不受影響)之後,它在叢集的很多台機器上都啟動了相同的程式拷貝。
- 其中有一個拷貝程式是特别的----master。剩下的都是worker,它們接收master配置設定的任務。其中有M個Map任務和R個Reduce任務要配置設定。master挑選一個空閑的worker并且給它配置設定一個map任務或者reduce任務。其實就是一個集中式的管理,當然不是高可用的,因為master當機一定會出現一段時間的服務終止。但是分布式計算不需要確定高可用,因為一個任務執行幾天很正常。
- 被配置設定到Map任務的worker會去讀取相應的輸入塊的内容。它從輸入檔案中解析出鍵值對并且将每個鍵值對傳送給使用者定義的Map函數。而由Map函數産生的中間鍵值對緩存在記憶體中。
- 被緩存的鍵值對會階段性地寫回本地磁盤,并且被劃分函數分割成R份。這些緩存對在磁盤上的位置會被回傳給master,master再負責将這些位置轉發給Reduce worker。
- 當Reduce worker從master那裡接收到這些位置資訊時,它會使用遠端過程調用從Map worker的本地磁盤中擷取緩存的資料。當Reduce worker讀入全部的中間資料之後。它會根據中間鍵對它們進行
,這樣所有具有相同鍵的鍵值對就都聚集在一起了。排序是必須的,因為會有許多不同的鍵被映射到同一個reduce task中。如果中間資料的數量太大,以至于不能夠裝入記憶體的話,還需要另外的排序。排序
- Reduce worker周遊已經排完序的中間資料。每當遇到一個新的中間鍵,它會将key和相應的中間值傳遞給使用者定義的Reduce函數。Reduce函數的輸出會被添加到這個Reduce部分的輸出檔案中。Reduce函數一般通過一個疊代器來擷取中間值,進而在中間值的數目遠遠大于記憶體容量時。我們也能夠處理。
- 當所有的Map tasks和Reduce tasks都已經完成的時候,master将喚醒使用者程式,到此為止,使用者代碼中的MapReduce調用傳回。
當然Reduce的輸出可以當做其他Map的輸入,這是不是有點類似于管道,這使得我們可以集中精力完成一個闆塊即可,隻需要保證輸出格式,這些獨立的子子產品最後就可以合并成一個大的處理過程。
把資料放在一起
我們來聯想join操作,也就是我們在關系型資料庫中所說的聯結操作。在批處理下讨論join的話,我們要解決的是資料集記憶體在關聯的所有事件,就比如說我們有兩張表,如下所示:
假如這兩張表存在不同的機器内,那麼我們該如何聯結它們呢?最簡單的方法就是使用學号去周遊學生成績表,把比對的傳回即可,這必然牽扯到極大的通信成本,降低整個過程的吞吐量。我們要做的是盡可能把計算放在本地機器上,減少通信。此時的做法如下:
排序-合并join
如圖中所示,主鍵為ID,可以用一個mapper去掃描學生資訊表,一個mapper去掃描學生成績表。用學号進行分區,然後Reduce進行合并分區,因為Reduce中設計排序,這樣的話資訊與成績的項就在reduce中相鄰了。
這裡我們可以看到其實是資料放在一起的過程類似與資料分區這樣的話我們就可以像資料分區一樣,可以在map端使用基于關鍵字的分區,哈希分區等方法來分區。當然這裡也涉及到負載均衡,當出現所謂的熱點資料的時候,即map中某個分區資料特别多,這需要首先檢測出熱點資料,然後把資料分發到不同的分區。
輸出
首先我們應該明白一點,就是MapReduce的輸出并不是單一的某種類型,它的輸出是某種資料結構。也就是說要麼這個資料結構是我們所需要的某個值,要麼是下一個MapReduce的輸入,這實際上意味着所有的資料處理邏輯被包含在了map,reduce程式中。舉個例子,Google最初使用MapReduce的目的就是為了建構索引。map對文檔進行分區,Reduce建構分區索引,并寫入分布式檔案系統,這就是一個非常典型的應用,可以說明輸出的結構問題。
還有一些情況需要我們将輸出寫入資料庫,如果在map、reduce程式中直接寫入可行嗎?這可能會産生以下問題:
- 每個記錄執行一次網絡通信代價太大,盡管可以支援批處理系統。(同一台機器也要通信)
- MapReduce經常并行處理很多任務,M個map,R個reduce,如果同時寫入可能會出現過載。
- 如果出現失敗在MapReduce會重新執行這個任務,但是如果已經寫入資料庫可能會造成外部系統可見的副作用。
解決的方案就是建構一種全新的資料庫,可以将其作為檔案寫入分布式檔案系統的輸出目錄,這就避免了以上的問題,如果完成複制,下次查詢就可以采用新的檔案,失敗的話使用舊檔案就可以了。諸如HBase等資料庫都實作了這個功能。
容錯
MapReduce的容錯機制很有意思,對于資料庫和一般HTTP伺服器錯誤的處理的方法就是終止請求,但是一個MapReduce任務可能是由多個任務組成的,且使用者要求的實時性較低,是以完全可以重新執行那些錯誤的子任務。發生故障的機器上已經完成的Map task需要重新執行的原因是,它們的輸入是儲存在本地磁盤的,是以發生故障之後就不能擷取了。而已經完成的Reduce task并不需要被重新執行,因為它們的輸出是存放在全局的檔案系統中的。當一個Map task開始由worker A執行,後來又由worker B執行(因為A故障了)。所有執行Reduce task的worker都會收到這個重新執行的通知。那些還未從worker A中讀取資料的Reduce task将會從worker B中讀取資料。當然以上讨論的是worker的故障。下面我們來說說master的故障處理,顯然MapReduce是一個集中式的架構,無法保證真正意義上的高可用,也就是master當機的時候一定有一段時間無法提供服務,但是其實master也有備份節點,用于在當機的時候執行從節點更新,論文中并沒有說這裡的實作,當然也沒有必要,這裡我們的選擇就多了,哨兵,ZooKeeer等等。
這裡我們會發現一個問題,就是容錯的粒度是單個MapReduce任務。我們來思考一個問題,一個MapReduce任務可以分為W+R個子任務,如果因為一個失敗就要重新執行整個任務顯得十分低效,隻有在故障率較高時才是一種有效的方式,雖然機器總會故障,但畢竟機率較小,但這樣真的有必要嗎?答案是有,這與MapReduce的設計環境有關,因為谷歌内部任務之間都有優先級,進階别任務會搶占低級别任務的資源,這使得一個子任務失敗的機率其實并不低。也就是說在任務不經常被終止的環境中這樣的決策并沒有什麼意義。這裡我們也可以看出來一件事,谷歌壞的很。。
當然還需要考慮函數的确定性(幂等性),即多次執行是否産生相同的結果。如果不是的話重新執行的話就可能導緻中間值不同,進而導緻級聯的修改,這裡我們最好使得函數确定,如果有些算法需要随機值,則設定一個固定的随機種子。其實當中間資料比原資料小的多或者計算量特别大的話,顯然把中間資料轉化為檔案将有效的多,這是一個權衡的過程。
得益于這樣的架構,使得我們編寫代碼不必擔心容錯的問題。
落後者 straggler
這是一種特殊的情況,在論文中提出了這個概念,這是拖慢整個MapReduce操作的通常的原因之一。所謂的"straggler"是指一台機器用了過長的時間去完成整個計算任務中最後幾個Map或者Reduce task,進而延長了整個MapReduce的執行時長。對此一個通用的解決方案就是:當MapReduce操作接近結束的時候,master會将那些還在執行的task的備份進行排程執行。無論是原本的還是備份執行完成,該task都被标記為已完成,據論文中所言,效率的提升是很明顯的。
改進
雖然這玩意被炒的很熱,但是不可否認的是它隻是一種可能的程式設計模型罷了,我們需要設計一些工具去适應面臨的環境,而不是固守着一個工具試圖适應環境。取決于資料量,資料的結構,其他的工具可能有更好的效率。我們來看看MapReduce有一些什麼問題可以改進:
- 中間狀态是否必要?将中間狀态存儲在分布式檔案系統中意味着需要備援資料,即在其他節點上進行複制,對于一個大型作業的中間步驟這是否有必要呢?
- 當mapper輸入格式與reduce輸出格式相同的時候分為兩步是否有必要呢?也就是說mapper的任務可以合并到上一個reduce。
- 昂貴的排序操作在每一步都需要呢?(這一步也使得不能在輸出剛産生的時候直接進行下一步,因為要排序)
顯然這些地方都是可以去優化的。
總結
- 書籍《Designing Data-Intensive Application》
- 論文《MapReduce: Simplified Data Processing on Large Cluster》