天天看點

Hadoop叢集(第6期)_WordCount運作詳解

  MapReduce采用"分而治之"的思想,把對大規模資料集的操作,分發給一個主節點管理下的各個分節點共同完成,然後通過整合各個節點的中間結果,得到最終結果。簡單地說,MapReduce就是"任務的分解與結果的彙總"。

  在Hadoop中,用于執行MapReduce任務的機器角色有兩個:一個是JobTracker;另一個是TaskTracker,JobTracker是用于排程工作的,TaskTracker是用于執行工作的。一個Hadoop叢集中隻有一台JobTracker。

  在分布式計算中,MapReduce架構負責處理了并行程式設計中分布式存儲、工作排程、負載均衡、容錯均衡、容錯處理以及網絡通信等複雜問題,把處理過程高度抽象為兩個函數:map和reduce,map負責把任務分解成多個任務,reduce負責把分解後多任務處理的結果彙總起來。

  需要注意的是,用MapReduce來處理的資料集(或任務)必須具備這樣的特點:待處理的資料集可以分解成許多小的資料集,而且每一個小資料集都可以完全并行地進行處理。

  在Hadoop中,每個MapReduce任務都被初始化為一個Job,每個Job又可以分為兩種階段:map階段和reduce階段。這兩個階段分别用兩個函數表示,即map函數和reduce函數。map函數接收一個<key,value>形式的輸入,然後同樣産生一個<key,value>形式的中間輸出,Hadoop函數接收一個如<key,(list of values)>形式的輸入,然後對這個value集合進行處理,每個reduce産生0或1個輸出,reduce的輸出也是<key,value>形式的。

MapReduce處理大資料集的過程

  單詞計數是最簡單也是最能展現MapReduce思想的程式之一,可以稱為MapReduce版"Hello World",該程式的完整代碼可以在Hadoop安裝包的"src/examples"目錄下找到。單詞計數主要完成功能是:統計一系列文本檔案中每個單詞出現的次數,如下圖所示。

  現在以"hadoop"普通使用者登入"Master.Hadoop"伺服器。

  1)建立本地示例檔案

  首先在"/home/hadoop"目錄下建立檔案夾"file"。

  接着建立兩個文本檔案file1.txt和file2.txt,使file1.txt内容為"Hello World",而file2.txt的内容為"Hello Hadoop"。

<a href="http://images.cnblogs.com/cnblogs_com/xia520pi/201205/201205171119112655.png"></a>

  2)在HDFS上建立輸入檔案夾

<a href="http://images.cnblogs.com/cnblogs_com/xia520pi/201205/201205171119165997.png"></a>

  3)上傳本地file中檔案到叢集的input目錄下

<a href="http://images.cnblogs.com/cnblogs_com/xia520pi/201205/201205171119189387.png"></a>

  1)在叢集上運作WordCount程式

  備注:以input作為輸入目錄,output目錄作為輸出目錄。

  已經編譯好的WordCount的Jar在"/usr/hadoop"下面,就是"hadoop-examples-1.0.0.jar",是以在下面執行指令時記得把路徑寫全了,不然會提示找不到該Jar包。

  2)MapReduce執行過程顯示資訊

<a href="http://images.cnblogs.com/cnblogs_com/xia520pi/201205/201205171119218467.png"></a>

  Hadoop指令會啟動一個JVM來運作這個MapReduce程式,并自動獲得Hadoop的配置,同時把類的路徑(及其依賴關系)加入到Hadoop的庫中。以上就是Hadoop Job的運作記錄,從這裡可以看到,這個Job被賦予了一個ID号:job_201202292213_0002,而且得知輸入檔案有兩個(Total input paths to process : 2),同時還可以了解map的輸入輸出記錄(record數及位元組數),以及reduce輸入輸出記錄。比如說,在本例中,map的task數量是2個,reduce的task數量是一個。map的輸入record數是2個,輸出record數是4個等資訊。

  1)檢視HDFS上output目錄内容

<a href="http://images.cnblogs.com/cnblogs_com/xia520pi/201205/201205171119242107.png"></a>

  從上圖中知道生成了三個檔案,我們的結果在"part-r-00000"中。

  2)檢視結果輸出檔案内容

<a href="http://images.cnblogs.com/cnblogs_com/xia520pi/201205/201205171119292102.png"></a>

  Hadoop提供了如下内容的資料類型,這些資料類型都實作了WritableComparable接口,以便用這些類型定義的資料可以被序列化進行網絡傳輸和檔案存儲,以及進行大小比較。

    BooleanWritable:标準布爾型數值

    ByteWritable:單位元組數值

    DoubleWritable:雙位元組數

    FloatWritable:浮點數

    IntWritable:整型數

    LongWritable:長整型數

    Text:使用UTF8格式存儲的文本

    NullWritable:當&lt;key,value&gt;中的key或value為空時使用

  1)源代碼程式

package org.apache.hadoop.examples; import java.io.IOException; import java.util.Iterator; import java.util.StringTokenizer; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reducer; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.TextInputFormat; import org.apache.hadoop.mapred.TextOutputFormat; public class WordCount {     public static class Map extends MapReduceBase implements             Mapper&lt;LongWritable, Text, Text, IntWritable&gt; {         private final static IntWritable one = new IntWritable(1);         private Text word = new Text();         public void map(LongWritable key, Text value,                 OutputCollector&lt;Text, IntWritable&gt; output, Reporter reporter)                 throws IOException {             String line = value.toString();             StringTokenizer tokenizer = new StringTokenizer(line);             while (tokenizer.hasMoreTokens()) {                 word.set(tokenizer.nextToken());                 output.collect(word, one);             }         }     }     public static class Reduce extends MapReduceBase implements             Reducer&lt;Text, IntWritable, Text, IntWritable&gt; {         public void reduce(Text key, Iterator&lt;IntWritable&gt; values,             int sum = 0;             while (values.hasNext()) {                 sum += values.next().get();             output.collect(key, new IntWritable(sum));     public static void main(String[] args) throws Exception {         JobConf conf = new JobConf(WordCount.class);         conf.setJobName("wordcount");         conf.setOutputKeyClass(Text.class);         conf.setOutputValueClass(IntWritable.class);         conf.setMapperClass(Map.class);         conf.setCombinerClass(Reduce.class);         conf.setReducerClass(Reduce.class);         conf.setInputFormat(TextInputFormat.class);         conf.setOutputFormat(TextOutputFormat.class);         FileInputFormat.setInputPaths(conf, new Path(args[0]));         FileOutputFormat.setOutputPath(conf, new Path(args[1]));         JobClient.runJob(conf); }

  3)主方法Main分析

public static void main(String[] args) throws Exception {     JobConf conf = new JobConf(WordCount.class);     conf.setJobName("wordcount");     conf.setOutputKeyClass(Text.class);     conf.setOutputValueClass(IntWritable.class);     conf.setMapperClass(Map.class);     conf.setCombinerClass(Reduce.class);     conf.setReducerClass(Reduce.class);     conf.setInputFormat(TextInputFormat.class);     conf.setOutputFormat(TextOutputFormat.class);     FileInputFormat.setInputPaths(conf, new Path(args[0]));     FileOutputFormat.setOutputPath(conf, new Path(args[1]));     JobClient.runJob(conf);

  首先講解一下Job的初始化過程。main函數調用Jobconf類來對MapReduce Job進行初始化,然後調用setJobName()方法命名這個Job。對Job進行合理的命名有助于更快地找到Job,以便在JobTracker和Tasktracker的頁面中對其進行監視。

JobConf conf = new JobConf(WordCount. class ); conf.setJobName("wordcount" );

  接着設定Job輸出結果&lt;key,value&gt;的中key和value資料類型,因為結果是&lt;單詞,個數&gt;,是以key設定為"Text"類型,相當于Java中String類型。Value設定為"IntWritable",相當于Java中的int類型。

conf.setOutputKeyClass(Text.class ); conf.setOutputValueClass(IntWritable.class );

  然後設定Job處理的Map(拆分)、Combiner(中間結果合并)以及Reduce(合并)的相關處理類。這裡用Reduce類來進行Map産生的中間結果合并,避免給網絡資料傳輸産生壓力。

conf.setMapperClass(Map.class ); conf.setCombinerClass(Reduce.class ); conf.setReducerClass(Reduce.class );

  接着就是調用setInputPath()和setOutputPath()設定輸入輸出路徑。

conf.setInputFormat(TextInputFormat.class ); conf.setOutputFormat(TextOutputFormat.class );

  (1)InputFormat和InputSplit

  InputSplit是Hadoop定義的用來傳送給每個單獨的map的資料,InputSplit存儲的并非資料本身,而是一個分片長度和一個記錄資料位置的數組。生成InputSplit的方法可以通過InputFormat()來設定。

  當資料傳送給map時,map會将輸入分片傳送到InputFormat,InputFormat則調用方法getRecordReader()生成RecordReader,RecordReader再通過creatKey()、creatValue()方法建立可供map處理的&lt;key,value&gt;對。簡而言之,InputFormat()方法是用來生成可供map處理的&lt;key,value&gt;對的。

  Hadoop預定義了多種方法将不同類型的輸入資料轉化為map能夠處理的&lt;key,value&gt;對,它們都繼承自InputFormat,分别是:

    InputFormat         |         |---BaileyBorweinPlouffe.BbpInputFormat         |---ComposableInputFormat         |---CompositeInputFormat         |---DBInputFormat         |---DistSum.Machine.AbstractInputFormat         |---FileInputFormat             |---CombineFileInputFormat             |---KeyValueTextInputFormat             |---NLineInputFormat             |---SequenceFileInputFormat             |---TeraInputFormat             |---TextInputFormat

  其中TextInputFormat是Hadoop預設的輸入方法,在TextInputFormat中,每個檔案(或其一部分)都會單獨地作為map的輸入,而這個是繼承自FileInputFormat的。之後,每行資料都會生成一條記錄,每條記錄則表示成&lt;key,value&gt;形式:

key值是每個資料的記錄在資料分片中位元組偏移量,資料類型是LongWritable;  

value值是每行的内容,資料類型是Text。

  (2)OutputFormat

  每一種輸入格式都有一種輸出格式與其對應。預設的輸出格式是TextOutputFormat,這種輸出方式與輸入類似,會将每條記錄以一行的形式存入文本檔案。不過,它的鍵和值可以是任意形式的,因為程式内容會調用toString()方法将鍵和值轉換為String類型再輸出。

  3)Map類中map方法分析

public static class Map extends MapReduceBase implements         Mapper&lt;LongWritable, Text, Text, IntWritable&gt; {     private final static IntWritable one = new IntWritable(1);     private Text word = new Text();     public void map(LongWritable key, Text value,             OutputCollector&lt;Text, IntWritable&gt; output, Reporter reporter)             throws IOException {         String line = value.toString();         StringTokenizer tokenizer = new StringTokenizer(line);         while (tokenizer.hasMoreTokens()) {             word.set(tokenizer.nextToken());             output.collect(word, one);

  Map類繼承自MapReduceBase,并且它實作了Mapper接口,此接口是一個規範類型,它有4種形式的參數,分别用來指定map的輸入key值類型、輸入value值類型、輸出key值類型和輸出value值類型。在本例中,因為使用的是TextInputFormat,它的輸出key值是LongWritable類型,輸出value值是Text類型,是以map的輸入類型為&lt;LongWritable,Text&gt;。在本例中需要輸出&lt;word,1&gt;這樣的形式,是以輸出的key值類型是Text,輸出的value值類型是IntWritable。

  實作此接口類還需要實作map方法,map方法會具體負責對輸入進行操作,在本例中,map方法對輸入的行以空格為機關進行切分,然後使用OutputCollect收集輸出的&lt;word,1&gt;。

  4)Reduce類中reduce方法分析

public static class Reduce extends MapReduceBase implements         Reducer&lt;Text, IntWritable, Text, IntWritable&gt; {     public void reduce(Text key, Iterator&lt;IntWritable&gt; values,         int sum = 0;         while (values.hasNext()) {             sum += values.next().get();         output.collect(key, new IntWritable(sum));

  Reduce類也是繼承自MapReduceBase的,需要實作Reducer接口。Reduce類以map的輸出作為輸入,是以Reduce的輸入類型是&lt;Text,Intwritable&gt;。而Reduce的輸出是單詞和它的數目,是以,它的輸出類型是&lt;Text,IntWritable&gt;。Reduce類也要實作reduce方法,在此方法中,reduce函數将輸入的key值作為輸出的key值,然後将獲得多個value值加起來,作為輸出的值。

import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser;   public static class TokenizerMapper       extends Mapper&lt;Object, Text, Text, IntWritable&gt;{       private final static IntWritable one = new IntWritable(1);       private Text word = new Text();       public void map(Object key, Text value, Context context)         throws IOException, InterruptedException {         StringTokenizer itr = new StringTokenizer(value.toString());         while (itr.hasMoreTokens()) {         word.set(itr.nextToken());         context.write(word, one);       }     }   }   public static class IntSumReducer       extends Reducer&lt;Text,IntWritable,Text,IntWritable&gt; {       private IntWritable result = new IntWritable();       public void reduce(Text key, Iterable&lt;IntWritable&gt; values,Context context)            throws IOException, InterruptedException {         int sum = 0;         for (IntWritable val : values) {            sum += val.get();         }       result.set(sum);       context.write(key, result);   public static void main(String[] args) throws Exception {     Configuration conf = new Configuration();     String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();     if (otherArgs.length != 2) {       System.err.println("Usage: wordcount &lt;in&gt; &lt;out&gt;");       System.exit(2);     Job job = new Job(conf, "word count");     job.setJarByClass(WordCount.class);     job.setMapperClass(TokenizerMapper.class);     job.setCombinerClass(IntSumReducer.class);     job.setReducerClass(IntSumReducer.class);     job.setOutputKeyClass(Text.class);     job.setOutputValueClass(IntWritable.class);     FileInputFormat.addInputPath(job, new Path(otherArgs[0]));     FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));     System.exit(job.waitForCompletion(true) ? 0 : 1);

   1)Map過程

public static class TokenizerMapper   extends Mapper&lt;Object, Text, Text, IntWritable&gt;{   private final static IntWritable one = new IntWritable(1);   private Text word = new Text();   public void map(Object key, Text value, Context context)     throws IOException, InterruptedException {     StringTokenizer itr = new StringTokenizer(value.toString());     while (itr.hasMoreTokens()) {       word.set(itr.nextToken());       context.write(word, one);

  Map過程需要繼承org.apache.hadoop.mapreduce包中Mapper類,并重寫其map方法。通過在map方法中添加兩句把key值和value值輸出到控制台的代碼,可以發現map方法中value值存儲的是文本檔案中的一行(以回車符為行結束标記),而key值為該行的首字母相對于文本檔案的首位址的偏移量。然後StringTokenizer類将每一行拆分成為一個個的單詞,并将&lt;word,1&gt;作為map方法的結果輸出,其餘的工作都交有MapReduce架構處理。

  2)Reduce過程

public static class IntSumReducer   extends Reducer&lt;Text,IntWritable,Text,IntWritable&gt; {   private IntWritable result = new IntWritable();   public void reduce(Text key, Iterable&lt;IntWritable&gt; values,Context context)      throws IOException, InterruptedException {     int sum = 0;     for (IntWritable val : values) {       sum += val.get();     result.set(sum);     context.write(key, result);

  Reduce過程需要繼承org.apache.hadoop.mapreduce包中Reducer類,并重寫其reduce方法。Map過程輸出&lt;key,values&gt;中key為單個單詞,而values是對應單詞的計數值所組成的清單,Map的輸出就是Reduce的輸入,是以reduce方法隻要周遊values并求和,即可得到某個單詞的總次數。

    3)執行MapReduce任務

public static void main(String[] args) throws Exception {   Configuration conf = new Configuration();   String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();   if (otherArgs.length != 2) {     System.err.println("Usage: wordcount &lt;in&gt; &lt;out&gt;");     System.exit(2);   Job job = new Job(conf, "word count");   job.setJarByClass(WordCount.class);   job.setMapperClass(TokenizerMapper.class);   job.setCombinerClass(IntSumReducer.class);   job.setReducerClass(IntSumReducer.class);   job.setOutputKeyClass(Text.class);   job.setOutputValueClass(IntWritable.class);   FileInputFormat.addInputPath(job, new Path(otherArgs[0]));   FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));   System.exit(job.waitForCompletion(true) ? 0 : 1);

  在MapReduce中,由Job對象負責管理和運作一個計算任務,并通過Job的一些方法對任務的參數進行相關的設定。此處設定了使用TokenizerMapper完成Map過程中的處理和使用IntSumReducer完成Combine和Reduce過程中的處理。還設定了Map過程和Reduce過程的輸出類型:key的類型為Text,value的類型為IntWritable。任務的輸出和輸入路徑則由指令行參數指定,并由FileInputFormat和FileOutputFormat分别設定。完成相應任務的參數設定後,即可調用job.waitForCompletion()方法執行任務。

  本節将對WordCount進行更詳細的講解。詳細執行步驟如下:

  1)将檔案拆分成splits,由于測試用的檔案較小,是以每個檔案為一個split,并将檔案按行分割形成&lt;key,value&gt;對,如圖4-1所示。這一步由MapReduce架構自動完成,其中偏移量(即key值)包括了回車所占的字元數(Windows和Linux環境會不同)。

圖4-1 分割過程

  2)将分割好的&lt;key,value&gt;對交給使用者定義的map方法進行處理,生成新的&lt;key,value&gt;對,如圖4-2所示。

圖4-2 執行map方法

  3)得到map方法輸出的&lt;key,value&gt;對後,Mapper會将它們按照key值進行排序,并執行Combine過程,将key至相同value值累加,得到Mapper的最終輸出結果。如圖4-3所示。

圖4-3 Map端排序及Combine過程

  4)Reducer先對從Mapper接收的資料進行排序,再交由使用者自定義的reduce方法進行處理,得到新的&lt;key,value&gt;對,并作為WordCount的輸出結果,如圖4-4所示。

圖4-4 Reduce端排序及輸出結果

  Hadoop最新版本的MapReduce Release 0.20.0的API包括了一個全新的Mapreduce JAVA API,有時候也稱為上下文對象。

  新的API類型上不相容以前的API,是以,以前的應用程式需要重寫才能使新的API發揮其作用 。

  新的API和舊的API之間有下面幾個明顯的差別。

新的API傾向于使用抽象類,而不是接口,因為這更容易擴充。例如,你可以添加一個方法(用預設的實作)到一個抽象類而不需修改類之前的實作方法。在新的API中,Mapper和Reducer是抽象類。

新的API是在org.apache.hadoop.mapreduce包(和子包)中的。之前版本的API則是放在org.apache.hadoop.mapred中的。

新的API廣泛使用context object(上下文對象),并允許使用者代碼與MapReduce系統進行通信。例如,MapContext基本上充當着JobConf的OutputCollector和Reporter的角色。

新的API同時支援"推"和"拉"式的疊代。在這兩個新老API中,鍵/值記錄對被推mapper中,但除此之外,新的API允許把記錄從map()方法中拉出,這也适用于reducer。"拉"式的一個有用的例子是分批處理記錄,而不是一個接一個。

新的API統一了配置。舊的API有一個特殊的JobConf對象用于作業配置,這是一個對于Hadoop通常的Configuration對象的擴充。在新的API中,這種差別沒有了,是以作業配置通過Configuration來完成。作業控制的執行由Job類來負責,而不是JobClient,它在新的API中已經蕩然無存。

如果,您認為閱讀這篇部落格讓您有些收獲,不妨點選一下右下角的【<b>推薦</b>】。

如果,您希望更容易地發現我的新部落格,不妨點選一下左下角的【<b>關注我</b>】。

本文版權歸作者和部落格園共有,歡迎轉載,但未經作者同意必須保留此段聲明,且在文章頁面明顯位置給出原文連接配接,否則保留追究法律責任的權利。

本文轉自蝦皮部落格園部落格,原文連結:http://www.cnblogs.com/xia520pi/archive/2012/05/16/2504205.html