1.MapReduce处理过程
map函数接收一个<key,value>形式的输入,然后同样产生一个<key,value>形式的中间输出,reduce函数接收一个如<key,(list of values)>形式的输入,然后对这个value集合进行处理,每个reduce产生0或1个输出,reduce的输出也是<key,value>形式的。
(1). map任务处理
1.1 读取输入文件内容,解析成key、value对。对输入文件的每一行,解析成key、value对。每一个键值对调用一次map函数。
1.2 写自己的逻辑,对输入的key、value处理,转换成新的key、value输出。
1.3 对输出的key、value进行分区。
1.4 对不同分区的数据,按照key进行排序、分组。相同key的value放到一个集合中。
1.5 (可选)分组后的数据进行归约。
(2).reduce任务处理
2.1 对多个map任务的输出,按照不同的分区,通过网络copy到不同的reduce节点。
2.2 对多个map任务的输出进行合并、排序。写reduce函数自己的逻辑,对输入的key、value处理,转换成新的key、value输出。
2.3 把reduce的输出保存到文件中。
2.特别数据类型简介
Hadoop提供了如下内容的数据类型,这些数据类型都实现了WritableComparable接口,以便用这些类型定义的数据可以被序列化进行网络传输和文件存储,以及进行大小比较。
BooleanWritable:标准布尔型数值
ByteWritable:单字节数值
DoubleWritable:双字节数
FloatWritable:浮点数
IntWritable:整型数
LongWritable:长整型数
Text:使用UTF8格式存储的文本
NullWritable:当<key,value>中的key或value为空时使用
3.源代码呈现
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class WordCount {
//Map方法
//输入类型为<longWritable,Text>
//输出类型为<Text,IntWritable>
public static class WCMapper extends Mapper<LongWritable,Text,Text,IntWritable>
{
private final static IntWritable one=new IntWritable(1);
private Text text=new Text();
@Override
protected void map(LongWritable key, Text value,Context context)
throws IOException, InterruptedException
{
String line=value.toString();
String []words=line.split(" ");//制表符分离
for(String w:words)
{
text.set(w);
context.write(text, one);
}
}
}
//reduce方法
public static class WCReducer extends Reducer<Text,IntWritable,Text,IntWritable>
{
private IntWritable result=new IntWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable>values,Context context)
throws IOException, InterruptedException {
int sum=0;
for(IntWritable v:values)
{
sum+=v.get();
}
result.set(sum);
context.write(key, result);
}
}
//主函数,基本操作6大步
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException
{
//构建job对象
Configuration conf=new Configuration();
Job job=Job.getInstance(conf, "WordCount");
//设置main方法所在的类
job.setJarByClass(WordCount.class);
//设置mapper相关属性
job.setMapperClass(WCMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//设置reducer相关属性
job.setReducerClass(WCReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
//设置文件输入输出
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job,new Path(args[1]));
//提交任务
System.exit(job.waitForCompletion(true)? 0:1);
}
}
4.执行流程分析
1)将文件拆分成splits,由于测试用的文件较小,所以每个文件为一个split,并将文件按行分割形成<key,value>对,如图4-1所示。这一步由MapReduce框架自动完成,其中偏移量(即key值)包括了回车所占的字符数(Windows和Linux环境会不同)。
2)将分割好的<key,value>对交给用户定义的map方法进行处理,生成新的<key,value>对,如图4-2所示。
3)得到map方法输出的<key,value>对后,Mapper会将它们按照key值进行排序,并执行Combine过程,将key至相同value值累加,得到Mapper的最终输出结果。如图4-3所示。
4)Reducer先对从Mapper接收的数据进行排序,再交由用户自定义的reduce方法进行处理,得到新的<key,value>对,并作为WordCount的输出结果,如图4-4所示。