天天看点

MapReduce案例WordCount

环境准备

1.输入文件 a.txt

aa a
bbb cc
dd d dd ee
           

2.集成环境

参考之前博客即可hadoop(阿里云内网环境)集成springboot及一些坑

map程序

package com.example.springbootintegrationtest.hadoop.mapreduce;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**
 * @author wuxinxin
 *
 * 对单词做map操作,凡是map程序都要继承Mapper
 */
public class WordCountMap extends Mapper<LongWritable,Text, Text, IntWritable> {

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        //数据是按照行获取的 value,先按照分隔符进行拆分

        String s = value.toString();

        String[] split = s.split(" ");

        //拆分完后,开始做map操作
        Text k = new Text();
        IntWritable v = new IntWritable();
        v.set(1);
        for(String temp:split){
            //输出map结果
            k.set(temp);
            context.write(k,v);
        }
    }
}

           

说明: map作用很简单,就是将输入的字符串,按照给定规则进行切割即可.例如我们统计每个单词数量,那么我们只要根据以空格作为分隔符,对单词一行行切割,输出即可

reduce程序

package com.example.springbootintegrationtest.hadoop.mapreduce;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/**
 * @author wuxinxin
 *
 * 对map结果做reduce
 * 凡是reduce程序,都需要继承Reducer
 */
public class WordCountReduce extends Reducer<Text, IntWritable,Text,IntWritable> {
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        //对map好的结果进行合并操作(reduce)

        // key是代表的单词, values代表数量
        int sum=0;

        for(IntWritable intWritable:values){
            sum+=intWritable.get();
        }

        //写入reduce的结果
        context.write(key,new IntWritable(sum));
    }
}

           

说明: reduce程序就是对map切割好的数据进行一些统计和计算. 比如我们这里是对每个单词出现的次数进行计算,那么写我们的统计逻辑(对出现的相同单词进行+1)即可.

driver(job)程序

package com.example.springbootintegrationtest.hadoop.mapreduce;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;
import java.util.Arrays;
/**
 * 实现WordCount
 * @author wuxinxin
 */
public class WordCountDriver {

    public static void main(String[] args) throws Exception {

        /**
         * 创建一个job
         */
        Configuration conf=new Configuration();
        Job job = new Job(conf);

        //设置jar存储位置
        job.setJarByClass(WordCountDriver.class);

        //关联map和reduce程序
        job.setMapperClass(WordCountMap.class);
        job.setReducerClass(WordCountReduce.class);

        //设置map阶段输出key,value类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        //设置最终输出结果的key,value类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        //设置输出路径和输入路径
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job,new Path(args[1]));

        //提交job
        job.submit();
    }
}

           

说明:driver程序其实就是一个启动程序,可以理解为map和reduce被包装一个可执行程序,然后按照map和reduce逻辑进行计算. job其实是最小的执行单元,按照给定的输入文件,使用map进行分割数据,使用reduce做计算统计,最后输出到给定的输出结果路径

执行结果

MapReduce案例WordCount

part-r-00000

a	1
aa	1
bbb	1
cc	1
d	1
dd	2
ee	1

           

说明: 当前程序使用的输入输出路径都是本地路径, 也可以直接使用hadoop的路径,将输入文件和输出结果都保存在hadoop中.

总结

1.mapreduce是作为hadoop的计算组件,计算的数据可以来源于任何地方,一般而言还是来源于hdfs, 和hdfs配合使用

2. 我们这里一定要清楚, mapreduce组件和hdfs组件是没关系的, mapreduce计算数据可以来源任何地方.我们这里举例就来源于本地文件数据.

继续阅读