1. 前言
通過基于MapReduce雲計算平台的海量資料處理實驗,我們了解了Hadoop的基本架構,已經如何編寫MapReduce程式,本實驗中我主要使用到的兩個程式分别是WordCount(詞頻統計)和InvertedIndex(反向索引)。在将這兩個程式之前,我會介紹我對Hadoop的了解。
2. Hadoop簡介及特性
2.1. Hadoop分布式檔案系統(HDFS)
Hadoop分布式檔案系統(HDFS)被設計成适合運作在通用硬體(commodity hardware)上的分布式檔案系統。它和現有的分布式檔案系統有很多共同點。但同時,它和其他的分布式檔案系統的差別也是很明顯的。HDFS是一個高度容錯性的系統,适合部署在廉價的機器上。HDFS能提供高吞吐量的資料通路,非常适合大規模資料集上的應用。HDFS放寬了一部分POSIX限制,來實作流式讀取檔案系統資料的目的。
2.2. 簡單的一緻性模型
HDFS應用需要一個“一次寫入多次讀取”的檔案通路模型。一個檔案經過建立、寫入和關閉之後就不需要改變。這一假設簡化了資料一緻性問題,并且使高吞吐量的資料通路成為可能。Map/Reduce應用或者網絡爬蟲應用都非常适合這個模型。目前還有計劃在将來擴充這個模型,使之支援檔案的附加寫操作。
2.3. “移動計算比移動資料更劃算”
一個應用請求的計算,離它操作的資料越近就越高效,在資料達到海量級别的時候更是如此。因為這樣就能降低網絡阻塞的影響,提高系統資料的吞吐量。将計算移動到資料附近,比之将資料移動到應用所在顯然更好。HDFS為應用提供了将它們自己移動到資料附近的接口。
2.4. 資料複制
HDFS被設計成能夠在一個大叢集中跨機器可靠地存儲超大檔案。它将每個檔案存儲成一系列的資料塊,除了最後一個,所有的資料塊都是同樣大小的。為了容錯,檔案的所有資料塊都會有副本。每個檔案的資料塊大小和副本系數都是可配置的。應用程式可以指定某個檔案的副本數目。副本系數可以在檔案建立的時候指定,也可以在之後改變。HDFS中的檔案都是一次性寫入的,并且嚴格要求在任何時候隻能有一個寫入者。
Namenode全權管理資料塊的複制,它周期性地從叢集中的每個Datanode接收心跳信号和塊狀态報告(Blockreport)。接收到心跳信号意味着該Datanode節點工作正常。塊狀态報告包含了一個該Datanode上所有資料塊的清單。
2.5. 副本選擇
為了降低整體的帶寬消耗和讀取延時,HDFS會盡量讓讀取程式讀取離它最近的副本。如果在讀取程式的同一個機架上有一個副本,那麼就讀取該副本。如果一個HDFS叢集跨越多個資料中心,那麼用戶端也将首先讀本地資料中心的副本。
2.6. 檔案系統中繼資料的持久化
Namenode上儲存着HDFS的名字空間。對于任何對檔案系統中繼資料産生修改的操作,Namenode都會使用一種稱為EditLog的事務日志記錄下來。例如,在HDFS中建立一個檔案,Namenode就會在Editlog中插入一條記錄來表示;同樣地,修改檔案的副本系數也将往Editlog插入一條記錄。Namenode在本地作業系統的檔案系統中存儲這個Editlog。整個檔案系統的名字空間,包括資料塊到檔案的映射、檔案的屬性等,都存儲在一個稱為FsImage的檔案中,這個檔案也是放在Namenode所在的本地檔案系統上。
Namenode在記憶體中儲存着整個檔案系統的名字空間和檔案資料塊映射(Blockmap)的映像。這個關鍵的中繼資料結構設計得很緊湊,因而一個有4G記憶體的Namenode足夠支撐大量的檔案和目錄。當Namenode啟動時,它從硬碟中讀取Editlog和FsImage,将所有Editlog中的事務作用在記憶體中的FsImage上,并将這個新版本的FsImage從記憶體中儲存到本地磁盤上,然後删除舊的Editlog,因為這個舊的Editlog的事務都已經作用在FsImage上了。這個過程稱為一個檢查點(checkpoint)。在目前實作中,檢查點隻發生在Namenode啟動時,在不久的将來将實作支援周期性的檢查點。
Datanode将HDFS資料以檔案的形式存儲在本地的檔案系統中,它并不知道有關HDFS檔案的資訊。它把每個HDFS資料塊存儲在本地檔案系統的一個單獨的檔案中。Datanode并不在同一個目錄建立所有的檔案,實際上,它用試探的方法來确定每個目錄的最佳檔案數目,并且在适當的時候建立子目錄。在同一個目錄中建立所有的本地檔案并不是最優的選擇,這是因為本地檔案系統可能無法高效地在單個目錄中支援大量的檔案。當一個Datanode啟動時,它會掃描本地檔案系統,産生一個這些本地檔案對應的所有HDFS資料塊的清單,然後作為報告發送到Namenode,這個報告就是塊狀态報告。
2.7. 叢集均衡
HDFS的架構支援資料均衡政策。如果某個Datanode節點上的空閑空間低于特定的臨界點,按照均衡政策系統就會自動地将資料從這個Datanode移動到其他空閑的Datanode。當對某個檔案的請求突然增加,那麼也可能啟動一個計劃建立該檔案新的副本,并且同時重新平衡叢集中的其他資料。這些均衡政策目前還沒有實作。
2.8. 資料完整性
從某個Datanode擷取的資料塊有可能是損壞的,損壞可能是由Datanode的儲存設備錯誤、網絡錯誤或者軟體bug造成的。HDFS用戶端軟體實作了對HDFS檔案内容的校驗和(checksum)檢查。當用戶端建立一個新的HDFS檔案,會計算這個檔案每個資料塊的校驗和,并将校驗和作為一個單獨的隐藏檔案儲存在同一個HDFS名字空間下。當用戶端擷取檔案内容後,它會檢驗從Datanode擷取的資料跟相應的校驗和檔案中的校驗和是否比對,如果不比對,用戶端可以選擇從其他Datanode擷取該資料塊的副本。
3. Hadoop程序簡介
我們在啟動Hadoop以後,會啟動相應的Hadoop程序,可以通過在終端中輸入:jps來檢視目前程序,下面來詳解介紹這些程序的具體含義及作用。
3.1. Namenode 和 Datanode
HDFS采用master/slave架構。一個HDFS叢集是由一個Namenode和一定數目的Datanodes組成。Namenode是一個中心伺服器,負責管理檔案系統的名字空間(namespace)以及用戶端對檔案的通路。叢集中的Datanode一般是一個節點一個,負責管理它所在節點上的存儲。HDFS暴露了檔案系統的名字空間,使用者能夠以檔案的形式在上面存儲資料。從内部看,一個檔案其實被分成一個或多個資料塊,這些塊存儲在一組Datanode上。Namenode執行檔案系統的名字空間操作,比如打開、關閉、重命名檔案或目錄。它也負責确定資料塊到具體Datanode節點的映射。Datanode負責處理檔案系統用戶端的讀寫請求。在Namenode的統一排程下進行資料塊的建立、删除和複制。
Namenode和Datanode被設計成可以在普通的商用機器上運作。這些機器一般運作着GNU/Linux作業系統(OS)。HDFS采用Java語言開發,是以任何支援Java的機器都可以部署Namenode或Datanode。由于采用了可移植性極強的Java語言,使得HDFS可以部署到多種類型的機器上。一個典型的部署場景是一台機器上隻運作一個Namenode執行個體,而叢集中的其它機器分别運作一個Datanode執行個體。這種架構并不排斥在一台機器上運作多個Datanode,隻不過這樣的情況比較少見。
叢集中單一Namenode的結構大大簡化了系統的架構。Namenode是所有HDFS中繼資料的仲裁者和管理者,這樣,使用者資料永遠不會流過Namenode。
3.2. Secondary NameNode
NameNode将對檔案系統的改動追加儲存到本地檔案系統上的一個日志檔案(edits)。當一個NameNode啟動時,它首先從一個映像檔案(fsimage)中讀取HDFS的狀态,接着應用日志檔案中的edits操作。然後它将新的HDFS狀态寫入(fsimage)中,并使用一個空的edits檔案開始正常操作。因為NameNode隻有在啟動階段才合并fsimage和edits,是以久而久之日志檔案可能會變得非常龐大,特别是對大型的叢集。日志檔案太大的另一個副作用是下一次NameNode啟動會花很長時間。
Secondary NameNode定期合并fsimage和edits日志,将edits日志檔案大小控制在一個限度下。因為記憶體需求和NameNode在一個數量級上,是以通常secondary NameNode和NameNode運作在不同的機器上。Secondary NameNode通過bin/start-dfs.sh在conf/masters中指定的節點上啟動。
Secondary NameNode的檢查點程序啟動,是由兩個配置參數控制的:
l fs.checkpoint.period,指定連續兩次檢查點的最大時間間隔,預設值是1小時。
l fs.checkpoint.size定義了edits日志檔案的最大值,一旦超過這個值會導緻強制執行檢查點(即使沒到檢查點的最大時間間隔)。預設值是64MB。
Secondary NameNode儲存最新檢查點的目錄與NameNode的目錄結構相同。 是以NameNode可以在需要的時候讀取Secondary NameNode上的檢查點鏡像。如果NameNode上除了最新的檢查點以外,所有的其他的曆史鏡像和edits檔案都丢失了, NameNode可以引入這個最新的檢查點。以下操作可以實作這個功能:
l 在配置參數dfs.name.dir指定的位置建立一個空檔案夾;
l 把檢查點目錄的位置指派給配置參數fs.checkpoint.dir;
l 啟動NameNode,并加上-importCheckpoint。
NameNode會從fs.checkpoint.dir目錄讀取檢查點, 并把它儲存在dfs.name.dir目錄下。 如果dfs.name.dir目錄下有合法的鏡像檔案,NameNode會啟動失敗。 NameNode會檢查fs.checkpoint.dir目錄下鏡像檔案的一緻性,但是不會去改動它。
3.3. JobTracker
建立一個InputFormat的執行個體,調用它的getSplits()方法,把輸入目錄的檔案拆分成FileSplist作為Mapper task 的輸入,生成Mapper task加入Queue。
3.4. TaskTracker
向JobTracker索求下一個Map/Reduce。Mapper Task先從InputFormat建立RecordReader,循環讀入FileSplits的内容生成Key與Value,傳給Mapper函數,處理完後中間結果寫成SequenceFile。Reducer Task 從運作Mapper的TaskTracker的Jetty上使用http協定擷取所需的中間内容(33%),Sort/Merge後(66%),執行Reducer函數,最後按照OutputFormat寫入結果目錄。 TaskTracker 每10秒向JobTracker報告一次運作情況,每完成一個Task10秒後,就會向JobTracker索求下一個Task。
4. Hadoop Map/Reduce教程
4.1. 概述
Hadoop Map/Reduce是一個使用簡易的軟體架構,基于它寫出來的應用程式能夠運作在由上千個商用機器組成的大型叢集上,并以一種可靠容錯的方式并行處理上T級别的資料集。一個Map/Reduce 作業(job) 通常會把輸入的資料集切分為若幹獨立的資料塊,由 map任務(task)以完全并行的方式處理它們。架構會對map的輸出先進行排序,然後把結果輸入給reduce任務。通常作業的輸入和輸出都會被存儲在檔案系統中。整個架構負責任務的排程和監控,以及重新執行已經失敗的任務。
通常Map/Reduce架構和分布式檔案系統是運作在一組相同的節點上的,也就是說,計算節點和存儲節點通常在一起。這種配置允許架構在那些已經存好資料的節點上高效地排程任務,這可以使整個叢集的網絡帶寬被非常高效地利用。
Map/Reduce架構由一個單獨的master JobTracker 和每個叢集節點一個slave TaskTracker共同組成。master負責排程構成一個作業的所有任務,這些任務分布在不同的slave上,master監控它們的執行,重新執行已經失敗的任務。而slave僅負責執行由master指派的任務。
應用程式至少應該指明輸入/輸出的位置(路徑),并通過實作合适的接口或抽象類提供map和reduce函數。再加上其他作業的參數,就構成了作業配置(job configuration)。然後,Hadoop的 job client送出作業(jar包/可執行程式等)和配置資訊給JobTracker,後者負責分發這些軟體和配置資訊給slave、排程任務并監控它們的執行,同時提供狀态和診斷資訊給job-client。
雖然Hadoop架構是用JavaTM實作的,但Map/Reduce應用程式則不一定要用 Java來寫 。Hadoop Streaming是一種運作作業的實用工具,它允許使用者建立和運作任何可執行程式 (例如:Shell工具)來做為mapper和reducer。Hadoop Pipes是一個與SWIG相容的C++ API(沒有基于JNITM技術),它也可用于實作Map/Reduce應用程式。
5. 輸入與輸出
Map/Reduce架構運轉在 鍵值對上,也就是說,架構把作業的輸入看為是一組 鍵值對,同樣也産出一組 鍵值對做為作業的輸出,這兩組鍵值對的類型可能不同。架構需要對key和value的類(classes)進行序列化操作,是以,這些類需要實作 Writable接口。另外,為了友善架構執行排序操作,key類必須實作 WritableComparable接口。,>,>,>
一個Map/Reduce 作業的輸入和輸出類型如下所示:(input) -> map -> -> combine -> -> reduce -> (output),>,>,>,>
作者:xwdreamer