天天看點

《Hadoop實戰第2版》——3.2節MapReduce計算模型

3.2 mapreduce計算模型

要了解mapreduce,首先需要了解mapreduce的載體是什麼。在hadoop中,用于執行mapreduce任務的機器有兩個角色:一個是jobtracker,另一個是tasktracker。jobtracker是用于管理和排程工作的,tasktracker是用于執行工作的。一個hadoop叢集中隻有一台jobtracker。

3.2.1 mapreduce job

在hadoop中,每個mapreduce任務都被初始化為一個job。每個job又可以分為兩個階段:map階段和reduce階段。這兩個階段分别用兩個函數來表示,即map函數和reduce函數。map函數接收一個形式的輸入,然後産生同樣為形式的中間輸出,hadoop會負責将所有具有相同中間key值的value集合到一起傳遞給reduce函數,reduce函數接收一個如形式的輸入,然後對這個value集合進行處理并輸出結果,reduce的輸出也是形式的。

為了友善了解,分别将三個對标記為、、,那麼上面所述的過程就可以用圖3-1來表示了。

《Hadoop實戰第2版》——3.2節MapReduce計算模型

3.2.2 hadoop中的hello world程式

上面所述的過程是mapreduce的核心,所有的mapreduce程式都具有圖3-1所示的結構。下面我再舉一個例子詳述mapreduce的執行過程。

大家初次接觸程式設計時學習的不論是哪種語言,看到的第一個示例程式可能都是“hello world”。在hadoop中也有一個類似于hello world的程式。這就是wordcount。本節會結合這個程式具體講解與mapreduce程式有關的所有類。這個程式的内容如下:

同時,為了叙述友善,設定兩個輸入檔案,如下:

看到這個程式,相信很多讀者會對衆多的預定義類感到很迷惑。其實這些類非常簡單明了。首先,wordcount程式的代碼雖多,但是執行過程卻很簡單,在本例中,它首先将輸入檔案讀進來,然後交由map程式處理,map程式将輸入讀入後切出其中的單詞,并标記它的數目為1,形成的形式,然後交由reduce處理,reduce将相同key值(也就是word)的value值收集起來,形成的形式,之後将這些1值加起來,即為單詞的個數,最後将這個對以textoutputformat的形式輸出到hdfs中。

針對這個資料流動過程,我挑出了如下幾句代碼來表述它的執行過程:

首先講解一下job的初始化過程。main函數調用jobconf類來對mapreduce job進行初始化,然後調用setjobname()方法命名這個job。對job進行合理的命名有助于更快地找到job,以便在jobtracker和tasktracker的頁面中對其進行監視。接着就會調用setinputpath()和setoutputpath()設定輸入輸出路徑。下面會結合wordcount程式重點講解inputformat()、outputformat()、map()、reduce()這4種方法。

inputformat()和inputsplit

inputsplit是hadoop中用來把輸入資料傳送給每個單獨的map,inputsplit存儲的并非資料本身,而是一個分片長度和一個記錄資料位置的數組。生成inputsplit的方法可以通過inputformat()來設定。當資料傳送給map時,map會将輸入分片傳送到inputformat()上,inputformat()則調用getrecordreader()方法生成recordreader,recordreader再通過creatkey()、creatvalue()方法建立可供map處理的對,即。簡而言之,inputformat()方法是用來生成可供map處理的對的。

hadoop預定義了多種方法将不同類型的輸入資料轉化為map能夠處理的對,它們都繼承自inputformat,分别是:

其中,textinputformat是hadoop預設的輸入方法,在textinputformat中,每個檔案(或其一部分)都會單獨作為map的輸入,而這是繼承自fileinputformat的。之後,每行資料都會生成一條記錄,每條記錄則表示成形式:

key值是每個資料的記錄在資料分片中的位元組偏移量,資料類型是longwritable;

value值是每行的内容,資料類型是text。

也就是說,輸入資料會以如下的形式被傳入map中

因為file01和file02都會被單獨輸入到一個map中,是以它們的key值都是0。

outputformat()

對于每一種輸入格式都有一種輸出格式與其對應。同樣,預設的輸出格式是textoutputformat,這種輸出方式與輸入類似,會将每條記錄以一行的形式存入文本檔案。不過,它的鍵和值可以是任意形式的,因為程式内部會調用tostring()方法将鍵和值轉換為string類型再輸出。最後的輸出形式如下所示:

map()函數繼承自mapreducebase,并且它實作了mapper接口,此接口是一個範型類型,它有4種形式的參數,分别用來指定map()的輸入key值類型、輸入value值類型、輸出key值類型和輸出value值類型。在本例中,因為使用的是textinputformat,它的輸出key值是longwritable類型,輸出value值是text類型,是以map()的輸入類型即為。如前面的内容所述,在本例中需要輸出這樣的形式,是以輸出的key值類型是text,輸出的value值類型是intwritable。

實作此接口類還需要實作map()方法,map()方法會負責具體對輸入進行操作,在本例中,map()方法對輸入的行以空格為機關進行切分,然後使用outputcollect收集輸出的,即。

下面來看reduce()函數:

與map()類似,reduce()函數也繼承自mapreducebase,需要實作reducer接口。reduce()函數以map()的輸出作為輸入,是以reduce()的輸入類型是。而reduce()的輸出是單詞和它的數目,是以,它的輸出類型是。reduce()函數也要實作reduce()方法,在此方法中,reduce()函數将輸入的key值作為輸出的key值,然後将獲得的多個value值加起來,作為輸出的value值。

運作mapreduce程式

讀者可以在eclipse裡運作mapreduce程式,也可以在指令行中運作mapreduce程式,但是在實際應用中,還是推薦到指令行中運作程式。按照第2章介紹的步驟,首先安裝hadoop,然後輸入編譯打包生成的jar程式,如下所示(以hadoop-0.20.2為例,安裝路徑是~/hadoop):

首先建立firstjar,然後編譯檔案生成.class,存放到檔案夾firstjar中,并将firstjar中的檔案打包生成wordcount.jar檔案。

接着上傳輸入檔案(輸入檔案是file01,file02,存放在~/input):

在此上傳過程中,先建立檔案夾input,然後上傳檔案file01、file02到input中。

最後運作生成的jar檔案,為了叙述友善,先将生成的jar檔案放入hadoop的安裝檔案夾中(hadoop_home),然後運作如下指令。

hadoop指令(注意不是hadoop本身)會啟動一個jvm來運作這個mapreduce程式,并自動擷取hadoop的配置,同時把類的路徑(及其依賴關系)加入到hadoop的庫中。以上就是hadoop job的運作記錄,從這裡面可以看到,這個job被賦予了一個id号:job_201101111819_0002,而且得知輸入檔案有兩個(total input paths to process : 2),同時還可以了解map的輸入輸出記錄(record數及位元組數),以及reduce的輸入輸出記錄。比如說,在本例中,map的task數量是2個,reduce的task數量是一個;map的輸入record數是2個,輸出record數是8個等。

可以通過指令檢視輸出檔案輸出檔案為:

從這個程式可以看到新舊api的幾個差別:

在新的api中,mapper與reducer已經不是接口而是抽象類。而且map函數與reduce函數也已經不再實作mapper和reducer接口,而是繼承mapper和reducer抽象類。這樣做更容易擴充,因為添加方法到抽象類中更容易。

新的api中更廣泛地使用了context對象,并使用mapcontext進行mapreduce間的通信,mapcontext同時充當outputcollector和reporter的角色。

job的配置統一由configurartion來完成,而不必額外地使用jobconf對守護程序進行配置。

由job類來負責job的控制,而不是jobclient,jobclient在新的api中已經被删除。

這些差別,都可以在以上的程式中看出。

此外,新的api同時支援“推”和“拉”式的疊代方式。在以往的操作中,對是被推入到map中的,但是在新的api中,允許程式将資料拉入map中,reduce也一樣。這樣做更加友善程式分批處理資料。

3.2.3 mapreduce的資料流和控制流

前面已經提到了mapreduce的資料流和控制流的關系,本節将結合wordcount執行個體具體解釋它們的含義。圖3-2是上例中wordcount程式的執行流程。

《Hadoop實戰第2版》——3.2節MapReduce計算模型

由前面的内容知道,負責控制及排程mapreduce的job的是jobtracker,負責運作mapreduce的job的是tasktracker。當然,mapreduce在運作時是分成map task和reduce task來處理的,而不是完整的job。簡單的控制流大概是這樣的:jobtracker排程任務給tasktracker,tasktracker執行任務時,會傳回進度報告。jobtracker則會記錄進度的進行狀況,如果某個tasktracker上的任務執行失敗,那麼jobtracker會把這個任務配置設定給另一台tasktracker,直到任務執行完成。

這裡更詳細地解釋一下資料流。上例中有兩個map任務及一個reduce任務。資料首先按照textinputformat形式被處理成兩個inputsplit,然後輸入到兩個map中,map程式會讀取inputsplit指定位置的資料,然後按照設定的方式處理該資料,最後寫入到本地磁盤中。注意,這裡并不是寫到hdfs上,這應該很好了解,因為map的輸出在job完成後即可删除了,是以不需要存儲到hdfs上,雖然存儲到hdfs上會更安全,但是因為網絡傳輸會降低mapreduce任務的執行效率,是以map的輸出檔案是寫在本地磁盤上的。如果map程式在沒來得及将資料傳送給reduce時就崩潰了(程式出錯或機器崩潰),那麼jobtracker隻需要另選一台機器重新執行這個task就可以了。

reduce會讀取map的輸出資料,合并value,然後将它們輸出到hdfs上。reduce的輸出會占用很多的網絡帶寬,不過這與上傳資料一樣是不可避免的。如果大家還是不能很好地了解資料流的話,下面有一個更具體的圖(wordcount執行時的資料流),如圖3-3所示。

《Hadoop實戰第2版》——3.2節MapReduce計算模型

相信看到圖3-3,大家就會對mapreduce的執行過程有更深刻的了解了。

除此之外,還有兩種情況需要注意:

1)mapreduce在執行過程中往往不止一個reduce task,reduce task的數量是可以程式指定的。當存在多個reduce task時,每個reduce會搜集一個或多個key值。需要注意的是,當出現多個reduce task時,每個reduce task都會生成一個輸出檔案。

2)另外,沒有reduce任務的時候,系統會直接将map的輸出結果作為最終結果,同時map task的數量可以看做是reduce task的數量,即有多少個map task就有多少個輸出檔案。

繼續閱讀