天天看点

Mapreduce Java实现WordCount 小案例

map的源码:

package com.sfd.wordcount;

import java.io.IOException;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

/**
 * 自定义的map函数需要继承mapreduce框架的规范,继承Mapper类,这个类有
 * 四个泛型,他规定了map的输入,输出类型,map的输入,输出都是键值对的
 * 形式,所以 ,四个泛型的前两个分别表示的是map输入的键和值的格式,后两个
 * 分别表示map的输出的键和值格式。
 * 之所以使用框架自定义的LongWritable 和Text类型而不是Long和String类型是因为
 * 后者中包装了大量的与传输数据无关的内容(如:方法。。。),会影响框架的执行
 * 效率。
 * 
 * @author sfd
 */
public class WCMap extends Mapper<LongWritable,Text,Text,LongWritable>{

    /**
     * 每当读取一行数据是调用此方法,其中key表示的是这行数据的初始偏移量,
     * value则是我们在方法中写的业务代码所要使用的数据
     */
    @Override
    protected void map(LongWritable key, Text value,Context context)
            throws IOException, InterruptedException {

        //将Text类型的数据转化成String类型的数据并对其进行拆分得到单词数组
        String line=value.toString();
        String[] words=StringUtils.split(line, " ");
        //遍历数组并把数据以键值对的形式传递给reduce
        for(String word:words){
            context.write(new Text(word), new LongWritable());     
        }

    }

}
           

reduce的源码

package com.sfd.wordcount;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
/**
 * 
 * *自定义的Reduce函数需要继承mapreduce框架的规范,继承Reduce类,这个类有
 * 四个泛型,他规定了Reduce的输入,输出类型,Reduce的输入,输出都是键值对的
 * 形式,所以 ,四个泛型的前两个分别表示的是Reduce输入的键和值的格式,后两个
 * 分别表示Reduce的输出的键和值格式。
 * @author sfd
 *
 */
public class WCReduce extends Reducer<Text, LongWritable, Text, LongWritable>{

    /**
     * 框架处理完map之后将输出的键值对缓存起来,进行分组,形成<key,values{}>形式的键值对
     * 在将这个键值对传递给reduce下面的函数,(第二个参数可以看成是一个List集合)
     * 如:<"hello",{1,1,1,1......}>
     */
    @Override
    protected void reduce(Text word, Iterable<LongWritable> values,
            Context context)
            throws IOException, InterruptedException {

        //定义一个计数器,用来计算单词的个数
        long count=;
        //遍历集合求出单词个数
        for(LongWritable value:values){
            count+=value.get();         
        }
        //输出这个单词的统计结果
        context.write(word, new LongWritable(count));
    }

}

           

作业的调度类

package com.sfd.wordcount;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
 * 用来描述一个特定的作业
 * 例如:使用哪个类作为逻辑处理的map那个类作为逻辑处理的reduce
 * 还可指定作业所要处理的数据的输入路径和处理结果输出路径
 * @author sfd
 *
 */
public class WCRunner {

    public static void main(String[] args) throws Exception {
        //得到作业实例对象
        Configuration conf=new Configuration();
        Job wcJob=Job.getInstance(conf);
        //设置作业所需的类在哪个jar包
        wcJob.setJarByClass(WCRunner.class);
        //指定哪个类作为逻辑处理的map那个类作为逻辑处理的reduce
        wcJob.setMapperClass(WCMap.class);
        wcJob.setReducerClass(WCReduce.class);
        //指定map和reduce输出的键值对的数据类型
        wcJob.setOutputKeyClass(Text.class);
        wcJob.setOutputValueClass(LongWritable.class);
        //指定map输出的键值对的输出类型,这个方法将会覆盖上面设置的map的
        //设置
        wcJob.setMapOutputKeyClass(Text.class);
        wcJob.setMapOutputValueClass(LongWritable.class);
        //指定业务所处里数据的来源
        FileInputFormat.setInputPaths(wcJob, new Path("/wc/input"));
        //指定业务处理后产生结果所存方的路径
        FileOutputFormat.setOutputPath(wcJob, new Path("/wc/output"));
        //将作页交给集群运行
        wcJob.waitForCompletion(true);
    }
}