环境准备
1.输入文件 a.txt
aa a
bbb cc
dd d dd ee
2.集成环境
参考之前博客即可hadoop(阿里云内网环境)集成springboot及一些坑
map程序
package com.example.springbootintegrationtest.hadoop.mapreduce;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/**
* @author wuxinxin
*
* 对单词做map操作,凡是map程序都要继承Mapper
*/
public class WordCountMap extends Mapper<LongWritable,Text, Text, IntWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//数据是按照行获取的 value,先按照分隔符进行拆分
String s = value.toString();
String[] split = s.split(" ");
//拆分完后,开始做map操作
Text k = new Text();
IntWritable v = new IntWritable();
v.set(1);
for(String temp:split){
//输出map结果
k.set(temp);
context.write(k,v);
}
}
}
说明: map作用很简单,就是将输入的字符串,按照给定规则进行切割即可.例如我们统计每个单词数量,那么我们只要根据以空格作为分隔符,对单词一行行切割,输出即可
reduce程序
package com.example.springbootintegrationtest.hadoop.mapreduce;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
/**
* @author wuxinxin
*
* 对map结果做reduce
* 凡是reduce程序,都需要继承Reducer
*/
public class WordCountReduce extends Reducer<Text, IntWritable,Text,IntWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
//对map好的结果进行合并操作(reduce)
// key是代表的单词, values代表数量
int sum=0;
for(IntWritable intWritable:values){
sum+=intWritable.get();
}
//写入reduce的结果
context.write(key,new IntWritable(sum));
}
}
说明: reduce程序就是对map切割好的数据进行一些统计和计算. 比如我们这里是对每个单词出现的次数进行计算,那么写我们的统计逻辑(对出现的相同单词进行+1)即可.
driver(job)程序
package com.example.springbootintegrationtest.hadoop.mapreduce;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
import java.util.Arrays;
/**
* 实现WordCount
* @author wuxinxin
*/
public class WordCountDriver {
public static void main(String[] args) throws Exception {
/**
* 创建一个job
*/
Configuration conf=new Configuration();
Job job = new Job(conf);
//设置jar存储位置
job.setJarByClass(WordCountDriver.class);
//关联map和reduce程序
job.setMapperClass(WordCountMap.class);
job.setReducerClass(WordCountReduce.class);
//设置map阶段输出key,value类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//设置最终输出结果的key,value类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//设置输出路径和输入路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job,new Path(args[1]));
//提交job
job.submit();
}
}
说明:driver程序其实就是一个启动程序,可以理解为map和reduce被包装一个可执行程序,然后按照map和reduce逻辑进行计算. job其实是最小的执行单元,按照给定的输入文件,使用map进行分割数据,使用reduce做计算统计,最后输出到给定的输出结果路径
执行结果
![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsICM38FdsYkRGZkRG9lcvx2bjxiNx8VZ6l2cs0TPR9EejpWT41ERPpHOsJGcohVYsR2MMBjVtJWd0ckW65UbM5WOHJWa5kHT20ESjBjUIF2X0hXZ0xCMx81dvRWYoNHLrdEZwZ1Rh5WNXp1bwNjW1ZUba9VZwlHdssmch1mclRXY39CXldWYtlWPzNXZj9mcw1ycz9WL49zZuBnL1YzM5MTN0YTMxIzMwEjMwIzLc52YucWbp5GZzNmLn9Gbi1yZtl2Lc9CX6MHc0RHaiojIsJye.png)
part-r-00000
a 1
aa 1
bbb 1
cc 1
d 1
dd 2
ee 1
说明: 当前程序使用的输入输出路径都是本地路径, 也可以直接使用hadoop的路径,将输入文件和输出结果都保存在hadoop中.
总结
1.mapreduce是作为hadoop的计算组件,计算的数据可以来源于任何地方,一般而言还是来源于hdfs, 和hdfs配合使用
2. 我们这里一定要清楚, mapreduce组件和hdfs组件是没关系的, mapreduce计算数据可以来源任何地方.我们这里举例就来源于本地文件数据.