天天看点

MapReduce(一):入门级程序wordcount及其分析

1.MapReduce处理过程

   map函数接收一个<key,value>形式的输入,然后同样产生一个<key,value>形式的中间输出,reduce函数接收一个如<key,(list of values)>形式的输入,然后对这个value集合进行处理,每个reduce产生0或1个输出,reduce的输出也是<key,value>形式的。

MapReduce(一):入门级程序wordcount及其分析

 (1). map任务处理

          1.1 读取输入文件内容,解析成key、value对。对输入文件的每一行,解析成key、value对。每一个键值对调用一次map函数。

          1.2 写自己的逻辑,对输入的key、value处理,转换成新的key、value输出。

          1.3 对输出的key、value进行分区。

          1.4 对不同分区的数据,按照key进行排序、分组。相同key的value放到一个集合中。

          1.5 (可选)分组后的数据进行归约。

(2).reduce任务处理

          2.1 对多个map任务的输出,按照不同的分区,通过网络copy到不同的reduce节点。

          2.2 对多个map任务的输出进行合并、排序。写reduce函数自己的逻辑,对输入的key、value处理,转换成新的key、value输出。

          2.3 把reduce的输出保存到文件中。

2.特别数据类型简介

    Hadoop提供了如下内容的数据类型,这些数据类型都实现了WritableComparable接口,以便用这些类型定义的数据可以被序列化进行网络传输和文件存储,以及进行大小比较。

    BooleanWritable:标准布尔型数值

    ByteWritable:单字节数值

    DoubleWritable:双字节数

    FloatWritable:浮点数

    IntWritable:整型数

    LongWritable:长整型数

    Text:使用UTF8格式存储的文本

    NullWritable:当<key,value>中的key或value为空时使用

3.源代码呈现

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


public class WordCount {
	//Map方法
	//输入类型为<longWritable,Text>
	//输出类型为<Text,IntWritable>
    public static class WCMapper extends Mapper<LongWritable,Text,Text,IntWritable>
    {
    	private final static IntWritable one=new IntWritable(1);
    	private Text text=new Text();
    	@Override
		protected void map(LongWritable key, Text value,Context context)
				throws IOException, InterruptedException 
    	{
    		String line=value.toString();
    		String []words=line.split("	");//制表符分离
    		for(String w:words)
    		{
    			text.set(w);
    			context.write(text, one);
    		}
		}
    }
    
    //reduce方法
	public static class WCReducer extends Reducer<Text,IntWritable,Text,IntWritable>
	{
		private IntWritable result=new IntWritable();
		@Override
		protected void reduce(Text key, Iterable<IntWritable>values,Context context)
				throws IOException, InterruptedException {
			int sum=0;
			for(IntWritable v:values)
			{
				sum+=v.get();
			}
			result.set(sum);
			context.write(key, result);
		}
	}
    
	//主函数,基本操作6大步
	public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException
	{
		
		//构建job对象
		Configuration conf=new Configuration();
		Job job=Job.getInstance(conf, "WordCount");
		
		//设置main方法所在的类
		job.setJarByClass(WordCount.class);
		
		//设置mapper相关属性
		job.setMapperClass(WCMapper.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(IntWritable.class);
		
		//设置reducer相关属性
		job.setReducerClass(WCReducer.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);
		
		//设置文件输入输出
		FileInputFormat.setInputPaths(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job,new Path(args[1]));
		
		//提交任务
		System.exit(job.waitForCompletion(true)? 0:1);
	}
}
           

4.执行流程分析

  1)将文件拆分成splits,由于测试用的文件较小,所以每个文件为一个split,并将文件按行分割形成<key,value>对,如图4-1所示。这一步由MapReduce框架自动完成,其中偏移量(即key值)包括了回车所占的字符数(Windows和Linux环境会不同)。

MapReduce(一):入门级程序wordcount及其分析

 2)将分割好的<key,value>对交给用户定义的map方法进行处理,生成新的<key,value>对,如图4-2所示。

MapReduce(一):入门级程序wordcount及其分析

 3)得到map方法输出的<key,value>对后,Mapper会将它们按照key值进行排序,并执行Combine过程,将key至相同value值累加,得到Mapper的最终输出结果。如图4-3所示。

MapReduce(一):入门级程序wordcount及其分析

 4)Reducer先对从Mapper接收的数据进行排序,再交由用户自定义的reduce方法进行处理,得到新的<key,value>对,并作为WordCount的输出结果,如图4-4所示。

MapReduce(一):入门级程序wordcount及其分析

继续阅读