天天看點

第一個MapReduce程式——WordCount一、MapReduce簡介二、運作WordCount程式三、WordCount程式分析

通常我們在學習一門語言的時候,寫的第一個程式就是Hello World。而在學習Hadoop時,我們要寫的第一個程式就是詞頻統計

WordCount

程式。

一、MapReduce簡介

1.1 MapReduce程式設計模型

MapReduce采用”分而治之”的思想,把對大規模資料集的操作,分發給一個主節點管理下的各個分節點共同完成,然後通過整合各個節點的中間結果,得到最終結果。簡單地說,MapReduce就是”任務的分解與結果的彙總”。

在Hadoop中,用于執行MapReduce任務的機器角色有兩個:

  • JobTracker用于排程工作的,一個Hadoop叢集中隻有一個JobTracker,位于master。
  • TaskTracker用于執行工作,位于各slave上。

在分布式計算中,MapReduce架構負責處理了并行程式設計中分布式存儲、工作排程、負載均衡、容錯均衡、容錯處理以及網絡通信等複雜問題,把處理過程高度抽象為兩個函數:map和reduce,map負責把任務分解成多個任務,reduce負責把分解後多任務處理的結果彙總起來。

需要注意的是,用MapReduce來處理的資料集(或任務)必須具備這樣的特點:待處理的資料集可以分解成許多小的資料集,而且每一個小資料集都可以完全并行地進行處理。

1.2 MapReduce工作過程

對于一個MR任務,它的輸入、輸出以及中間結果都是

<key, value>

鍵值對:

  • Map:

    <k1, v1>

    ——>

    list(<k2, v2>)

  • Reduce:

    <k2, list(v2)>

    ——>

    list(<k3, v3>)

MR程式的執行過程主要分為三步:Map階段、Shuffle階段、Reduce階段,如下圖:

第一個MapReduce程式——WordCount一、MapReduce簡介二、運作WordCount程式三、WordCount程式分析
  1. Map階段
    • 分片(Split):map階段的輸入通常是HDFS上檔案,在運作Mapper前,FileInputFormat會将輸入檔案分割成多個split ——1個split至少包含1個HDFS的Block(預設為64M);然後每一個分片運作一個map進行處理。
    • 執行(Map):對輸入分片中的每個鍵值對調用

      map()

      函數進行運算,然後輸出一個結果鍵值對。
      • Partitioner:對

        map()

        的輸出進行partition,即根據key或value及reduce的數量來決定目前的這對鍵值對最終應該交由哪個reduce處理。預設是對key哈希後再以reduce task數量取模,預設的取模方式隻是為了避免資料傾斜。然後該key/value對以及partitionIdx的結果都會被寫入環形緩沖區。
    • 溢寫(Spill):map輸出寫在記憶體中的環形緩沖區,預設當緩沖區滿80%,啟動溢寫線程,将緩沖的資料寫出到磁盤。
      • Sort:在溢寫到磁盤之前,使用快排對緩沖區資料按照partitionIdx, key排序。(每個partitionIdx表示一個分區,一個分區對應一個reduce)
      • Combiner:如果設定了Combiner,那麼在Sort之後,還會對具有相同key的鍵值對進行合并,減少溢寫到磁盤的資料量。
    • 合并(Merge):溢寫可能會生成多個檔案,這時需要将多個檔案合并成一個檔案。合并的過程中會不斷地進行 sort & combine 操作,最後合并成了一個已分區且已排序的檔案。
  2. Shuffle階段:廣義上Shuffle階段橫跨Map端和Reduce端,在Map端包括Spill過程,在Reduce端包括copy和merge/sort過程。通常認為Shuffle階段就是将map的輸出作為reduce的輸入的過程
    • Copy過程:Reduce端啟動一些copy線程,通過HTTP方式将map端輸出檔案中屬于自己的部分拉取到本地。Reduce會從多個map端拉取資料,并且每個map的資料都是有序的。
    • Merge過程:Copy過來的資料會先放入記憶體緩沖區中,這裡的緩沖區比較大;當緩沖區資料量達到一定門檻值時,将資料溢寫到磁盤(與map端類似,溢寫過程會執行 sort & combine)。如果生成了多個溢寫檔案,它們會被merge成一個有序的最終檔案。這個過程也會不停地執行 sort & combine 操作。
  3. Reduce階段:Shuffle階段最終生成了一個有序的檔案作為Reduce的輸入,對于該檔案中的每一個鍵值對調用

    reduce()

    方法,并将結果寫到HDFS。

二、運作WordCount程式

在運作程式之前,需要先搭建好Hadoop叢集環境,參考《Hadoop+HBase+ZooKeeper分布式叢集環境搭建》。

2.1 源代碼

WordCount可以說是最簡單的MapReduce程式了,隻包含三個檔案:一個 Map 的 Java 檔案,一個 Reduce 的 Java 檔案,一個負責調用的主程式 Java 檔案。

我們在目前使用者的主檔案夾下建立

wordcount_01/

目錄,在該目錄下再建立

src/

classes/

。 src 目錄存放 Java 的源代碼,classes 目錄存放編譯結果。

TokenizerMapper.java

package com.lisong.hadoop;

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
    IntWritable one = new IntWritable();
    Text word = new Text();

    public void map(Object key, Text value, Context context) throws IOException,InterruptedException {
        StringTokenizer itr = new StringTokenizer(value.toString());
        while(itr.hasMoreTokens()) {
            word.set(itr.nextToken());
            context.write(word, one);
        }
    }
}
           

IntSumReducer.java

package com.lisong.hadoop;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
    IntWritable result = new IntWritable();

    public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException,InterruptedException {
        int sum = ;
        for(IntWritable val:values) {
            sum += val.get();
        }
        result.set(sum);
        context.write(key,result);
    }
}
           

WordCount.java

package com.lisong.hadoop;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class WordCount {
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
        if(otherArgs.length != ) {
            System.err.println("Usage: wordcount <in> <out>");
            System.exit();
        }

        Job job = new Job(conf, "wordcount");
        job.setJarByClass(WordCount.class);
        job.setMapperClass(TokenizerMapper.class);
        job.setCombinerClass(IntSumReducer.class);
        job.setReducerClass(IntSumReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        FileInputFormat.addInputPath(job, new Path(otherArgs[]));
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[]));
        System.exit(job.waitForCompletion(true)?:);
    } 
}
           

以上三個.java源檔案均置于 src 目錄下。

2.2 編譯

Hadoop 2.x 版本中jar不再集中在一個 hadoop-core-*.jar 中,而是分成多個 jar。編譯WordCount程式需要如下三個 jar:

$HADOOP_HOME/share/hadoop/common/hadoop-common-.jar
$HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-client-core-.jar
$HADOOP_HOME/share/hadoop/common/lib/commons-cli-.jar
           

使用

javac

指令進行編譯:

$ cd wordcount_01

$ javac -classpath /home/hadoop/hadoop/share/hadoop/common/hadoop-common-.jar:/home/hadoop/hadoop/share/hadoop/mapreduce/hadoop-mapreduce-client-core-.jar:/home/hadoop/hadoop/share/hadoop/common/lib/commons-cli-.jar -d classes/ src/*.java
           
  • -classpath,設定源代碼裡使用的各種類庫所在的路徑,多個路徑用

    ":"

    隔開。
  • -d,設定編譯後的 class 檔案儲存的路徑。
  • src/*.java,待編譯的源檔案。

2.3 打包

将編譯好的 class 檔案打包成 Jar 包,jar 指令是 JDK 的打包指令行工具。

$ jar -cvf wordcount.jar classes
           

打包結果是 wordcount.jar 檔案,放在目前目錄下。

2.4 執行

執行hadoop程式的時候,輸入檔案必須先放入hdfs檔案系統中,不能是本地檔案。

1 . 先檢視hdfs檔案系統的根目錄:

$ hadoop/bin/hadoop fs -ls /
Found  items
drwxr-xr-x   - hadoop supergroup           -- : /hbase
           

可以看出,hdfs的根目錄是一個叫

/hbase

的目錄。

2 . 然後利用

put

将輸入檔案(多個輸入檔案位于

input

檔案夾下)複制到hdfs檔案系統中:

$ hadoop/bin/hadoop fs -put input /hbase
           

3 . 運作wordcount程式

$ hadoop/bin/hadoop jar wordcount_01/wordcount.jar WordCount /hbase/input /hbase/output
           

提示找不到 WordCount 類:

Exception in thread "main" java.lang.NoClassDefFoundError: WordCount

因為程式中聲明了 package ,是以在指令中也要 com.lisong.hadoop.WordCount 寫完整:

$ hadoop/bin/hadoop jar wordcount_01/wordcount.jar com.lisong.hadoop.WordCount /hbase/input /hbase/output
           

其中 “jar” 參數是指定 jar 包的位置,com.lisong.hadoop.WordCount 是主類。運作程式處理 input 目錄下的多個檔案,将結果寫入 /hbase/output 目錄。

4 . 檢視運作結果

$ hadoop/bin/hadoop fs -ls /hbase/output
Found  items
-rw-r--r--    hadoop supergroup           -- : /hbase/output/_SUCCESS
-rw-r--r--    hadoop supergroup          -- : /hbase/output/part-r-
           

可以看到

/hbase/output/

目錄下有兩個檔案,結果就存在

part-r-00000

中:

$ hadoop/bin/hadoop fs -cat /hbase/output/part-r-
Google  
Java    
baidu   
hadoop  
           

三、WordCount程式分析

3.1 Hadoop資料類型

Hadoop MapReduce操作的是鍵值對,但這些鍵值對并不是Integer、String等标準的Java類型。為了讓鍵值對可以在叢集上移動,Hadoop提供了一些實作了

WritableComparable

接口的基本資料類型,以便用這些類型定義的資料可以被序列化進行網絡傳輸、檔案存儲與大小比較。

  • 值:僅會被簡單的傳遞,必須實作

    Writable

    WritableComparable

    接口。
  • 鍵:在Reduce階段排序時需要進行比較,故隻能實作

    WritableComparable

    接口。

下面是8個預定義的Hadoop基本資料類型,它們均實作了

WritableComparable

接口:

描述
BooleanWritable 标準布爾型數值
ByteWritable 單位元組數值
DoubleWritable 雙位元組數
FloatWritable 浮點數
IntWritable 整型數
LongWritable 長整型數
Text 使用UTF8格式存儲的文本
NullWritable

<key,value>

中的key或value為空時使用

3.2 源代碼分析

3.2.1 Map過程

package com.lisong.hadoop;

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
    IntWritable one = new IntWritable();
    Text word = new Text();

    public void map(Object key, Text value, Context context) throws IOException,InterruptedException {
        StringTokenizer itr = new StringTokenizer(value.toString());
        while(itr.hasMoreTokens()) {
            word.set(itr.nextToken());
            context.write(word, one);
        }
    }
}
           

Map過程需要繼承

org.apache.hadoop.mapreduce

包中 Mapper 類,并重寫其map方法。

其中的模闆參數:第一個Object表示輸入key的類型;第二個Text表示輸入value的類型;第三個Text表示表示輸出鍵的類型;第四個IntWritable表示輸出值的類型。

作為map方法輸入的鍵值對,其value值存儲的是文本檔案中的一行(以回車符為行結束标記),而key值為該行的首字母相對于文本檔案的首位址的偏移量。然後StringTokenizer類将每一行拆分成為一個個的單詞,并将

<word,1>

作為map方法的結果輸出,其餘的工作都交有 MapReduce架構 處理。

注:

StringTokenizer

是Java工具包中的一個類,用于将字元串進行拆分——預設情況下使用空格作為分隔符進行分割。

3.2.2 Reduce過程

package com.lisong.hadoop;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
    IntWritable result = new IntWritable();

    public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException,InterruptedException {
        int sum = ;
        for(IntWritable val:values) {
            sum += val.get();
        }
        result.set(sum);
        context.write(key,result);
    }
}
           

Reduce過程需要繼承

org.apache.hadoop.mapreduce

包中 Reducer 類,并 重寫 reduce方法。

其中模闆參數同Map一樣,依次表示是輸入鍵類型,輸入值類型,輸出鍵類型,輸出值類型。

public void reduce(Text key, Iterable<IntWritable> values, Context context)
           

reduce 方法的輸入參數 key 為單個單詞,而 values 是由各Mapper上對應單詞的計數值所組成的清單(一個實作了 Iterable 接口的變量,可以了解成 values 裡包含若幹個 IntWritable 整數,可以通過疊代的方式周遊所有的值),是以隻要周遊 values 并求和,即可得到某個單詞出現的總次數。

3.2.3 執行作業

package com.lisong.hadoop;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class WordCount {
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
        if(otherArgs.length != ) {
            System.err.println("Usage: wordcount <in> <out>");
            System.exit();
        }

        Job job = new Job(conf, "wordcount");
        job.setJarByClass(WordCount.class);
        job.setMapperClass(TokenizerMapper.class);
        job.setCombinerClass(IntSumReducer.class);
        job.setReducerClass(IntSumReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        FileInputFormat.addInputPath(job, new Path(otherArgs[]));
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[]));
        System.exit(job.waitForCompletion(true)?:);
    } 
}
           

在MapReduce中,由Job對象負責管理和運作一個計算任務,并通過Job的一些方法對任務的參數進行相關的設定,此處:

  • 設定了使用

    TokenizerMapper.class

    完成Map過程中的處理,使用

    IntSumReducer.class

    完成Combine和Reduce過程中的處理。
  • 還設定了Map過程和Reduce過程的輸出類型:key的類型為Text,value的類型為IntWritable。
  • 任務的輸出和輸入路徑則由指令行參數指定,并由FileInputFormat和FileOutputFormat分别設定。
    1. FileInputFormat類的很重要的作用就是将檔案進行切分 split,并将 split 進一步拆分成key/value對
    2. FileOutputFormat類的作用是将處理結果寫入輸出檔案。
  • 完成相應任務的參數設定後,即可調用

    job.waitForCompletion()

    方法執行任務。

3.2.4 WordCount流程

1)将檔案拆分成splits,由于測試用的檔案較小,是以每個檔案為一個split,并将檔案按行分割形成

<key,value>

對,key為偏移量(包括了回車符),value為文本行。這一步由MapReduce架構自動完成,如下圖:

第一個MapReduce程式——WordCount一、MapReduce簡介二、運作WordCount程式三、WordCount程式分析

2)将分割好的

<key,value>

對交給使用者定義的map方法進行處理,生成新的

<key,value>

對,如下圖所示:

第一個MapReduce程式——WordCount一、MapReduce簡介二、運作WordCount程式三、WordCount程式分析

3)得到map方法輸出的

<key,value>

對後,Mapper會将它們按照key值進行排序,并執行Combine過程,将key值相同的value值累加,得到Mapper的最終輸出結果。如下圖:

第一個MapReduce程式——WordCount一、MapReduce簡介二、運作WordCount程式三、WordCount程式分析

4)Reducer先對從Mapper接收的資料進行排序,再交由使用者自定義的reduce方法進行處理,得到新的

<key,value>

對,并作為WordCount的輸出結果,如下圖:

第一個MapReduce程式——WordCount一、MapReduce簡介二、運作WordCount程式三、WordCount程式分析

個人站點:http://songlee24.github.com

參考:

[1] 實戰Hadoop:開啟通向雲計算的捷徑

[2]《了解MapReduce核心Shuffle》

繼續閱讀