天天看點

Mapreduce Java實作WordCount 小案例

map的源碼:

package com.sfd.wordcount;

import java.io.IOException;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

/**
 * 自定義的map函數需要繼承mapreduce架構的規範,繼承Mapper類,這個類有
 * 四個泛型,他規定了map的輸入,輸出類型,map的輸入,輸出都是鍵值對的
 * 形式,是以 ,四個泛型的前兩個分别表示的是map輸入的鍵和值的格式,後兩個
 * 分别表示map的輸出的鍵和值格式。
 * 之是以使用架構自定義的LongWritable 和Text類型而不是Long和String類型是因為
 * 後者中包裝了大量的與傳輸資料無關的内容(如:方法。。。),會影響架構的執行
 * 效率。
 * 
 * @author sfd
 */
public class WCMap extends Mapper<LongWritable,Text,Text,LongWritable>{

    /**
     * 每當讀取一行資料是調用此方法,其中key表示的是這行資料的初始偏移量,
     * value則是我們在方法中寫的業務代碼所要使用的資料
     */
    @Override
    protected void map(LongWritable key, Text value,Context context)
            throws IOException, InterruptedException {

        //将Text類型的資料轉化成String類型的資料并對其進行拆分得到單詞數組
        String line=value.toString();
        String[] words=StringUtils.split(line, " ");
        //周遊數組并把資料以鍵值對的形式傳遞給reduce
        for(String word:words){
            context.write(new Text(word), new LongWritable());     
        }

    }

}
           

reduce的源碼

package com.sfd.wordcount;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
/**
 * 
 * *自定義的Reduce函數需要繼承mapreduce架構的規範,繼承Reduce類,這個類有
 * 四個泛型,他規定了Reduce的輸入,輸出類型,Reduce的輸入,輸出都是鍵值對的
 * 形式,是以 ,四個泛型的前兩個分别表示的是Reduce輸入的鍵和值的格式,後兩個
 * 分别表示Reduce的輸出的鍵和值格式。
 * @author sfd
 *
 */
public class WCReduce extends Reducer<Text, LongWritable, Text, LongWritable>{

    /**
     * 架構處理完map之後将輸出的鍵值對緩存起來,進行分組,形成<key,values{}>形式的鍵值對
     * 在将這個鍵值對傳遞給reduce下面的函數,(第二個參數可以看成是一個List集合)
     * 如:<"hello",{1,1,1,1......}>
     */
    @Override
    protected void reduce(Text word, Iterable<LongWritable> values,
            Context context)
            throws IOException, InterruptedException {

        //定義一個計數器,用來計算單詞的個數
        long count=;
        //周遊集合求出單詞個數
        for(LongWritable value:values){
            count+=value.get();         
        }
        //輸出這個單詞的統計結果
        context.write(word, new LongWritable(count));
    }

}

           

作業的排程類

package com.sfd.wordcount;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
 * 用來描述一個特定的作業
 * 例如:使用哪個類作為邏輯處理的map那個類作為邏輯處理的reduce
 * 還可指定作業所要處理的資料的輸入路徑和處理結果輸出路徑
 * @author sfd
 *
 */
public class WCRunner {

    public static void main(String[] args) throws Exception {
        //得到作業執行個體對象
        Configuration conf=new Configuration();
        Job wcJob=Job.getInstance(conf);
        //設定作業所需的類在哪個jar包
        wcJob.setJarByClass(WCRunner.class);
        //指定哪個類作為邏輯處理的map那個類作為邏輯處理的reduce
        wcJob.setMapperClass(WCMap.class);
        wcJob.setReducerClass(WCReduce.class);
        //指定map和reduce輸出的鍵值對的資料類型
        wcJob.setOutputKeyClass(Text.class);
        wcJob.setOutputValueClass(LongWritable.class);
        //指定map輸出的鍵值對的輸出類型,這個方法将會覆寫上面設定的map的
        //設定
        wcJob.setMapOutputKeyClass(Text.class);
        wcJob.setMapOutputValueClass(LongWritable.class);
        //指定業務所處裡資料的來源
        FileInputFormat.setInputPaths(wcJob, new Path("/wc/input"));
        //指定業務處理後産生結果所存方的路徑
        FileOutputFormat.setOutputPath(wcJob, new Path("/wc/output"));
        //将作頁交給叢集運作
        wcJob.waitForCompletion(true);
    }
}