天天看點

java 編寫MR程式之 - WordCount 及解析

wordcount 作為經典的MR 入門程式,對了解和編寫MR程式有很大的幫助。

package wordCount2;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class WordCount {
/**
 * Mapper<KEYIN,VALUEIN,KEYOUT,VALUEOUT>
 * KEYIN   :輸入 KV 資料對 中的key 的資料類型
 *           預設情況下,是MR架構所讀到的一行文本的起始偏移量, long ---> LongWritable
 * VALUEIN :輸入輸入 KV 資料對 中的value 的資料類型
 *           預設情況下,是MR架構所讀到的一行文本的内容, String ---> Text
 * KEYOUT  :輸出 KV 資料對 中的key 的資料類型
 *           使用者自定義邏輯處理完成後輸出資料中的 key ,在此處是單詞, String ---> Text
 * VALUEOUT:輸出 KV 資料對 中的value 的資料類型
 *           使用者自定義邏輯處理完成後輸出資料中的 value 此處是單詞次數 ,Integer ---> IntWritabel
 * @author hadoop1
 *
 */
	public static class WordMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
		/**
		 * map 方法是提供給 map task 程序來調用的,map task 程序是每讀取一行文本來調用一次重寫的map 方法
		 * map task 在調用 map 方法時,傳遞的參數
		 * key  : 一行文本的偏移量 
		 * value:  一行文本的内容
		 */
		public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
			//得到一行文本後 ,将其轉成 String
			String line = value.toString();
			//将文本切割
			String[] words = line.split(" ");
			//輸出 <單詞 , 1> 到上下文
			for (String word : words) {
				context.write(new Text(word), new IntWritable(1));
			}
		}
	}
	/**
	 * Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>
	 * KEYIN   : 對應 mapper 階段輸出的結果的 key 的類型
	 *           
	 * VALUEIN :對應 mapper 階段輸出的結果的 value 的類型
	 *      
	 * KEYOUT  :reduce 處理完成後輸出結果KV對中 key 的類型
	 *          
	 * VALUEOUT:reduce 處理完成後輸出結果KV對中 value 的類型
	 *         
	 *
	 */
	public static class WordReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
		/**
		 * reduce方法提供給 reduce task 程序調用
		 * 
		 * reduce task 會将shuffle階段分發過來的大量的KV資料進行聚合,聚合的機制是相同的key的 kv 對聚合為一組
		 * 然後reduce task 對每一組聚合的KV調用重寫的reduce 方法
		 * 比如:<hello,1><hello,1><hello,1><hello,1><hello,1>
		 *      <world,1><world,1><world,1>
		 * hello 會調用一次reduce方法進行處理,world組 也會調用一次reduce方法進行處理
		 * 調用時傳遞的參數:
		 *               key    : 一組KV中的key
		 *               values : 一組KV中所有value的疊代器
		 */     
		public void reduce(Text key, Iterable<IntWritable> values, Context context)
				throws IOException, InterruptedException {
			int count = 0;
			//周遊一組KV 中key 對應的 value 疊代器所有的值 進行累加。
			for (IntWritable value : values) {
				count += value.get();
			}
			//輸出單詞的統計結果
			context.write(key, new IntWritable(count));
		}
	}

	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		Job job = new Job(conf, "word count");
		//設定job所需的jar包的本地路徑
		//job.setJar("/home.hadoop1/wc.jar");
		job.setJarByClass(WordCount.class);
		
		//指定業務job要使用的 mapper/reduce 業務類	
		job.setMapperClass(WordMapper.class);
		job.setReducerClass(WordReducer.class);
		
		//指定reduce 完成後最終的資料KV類型
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);
		
		//指定輸入檔案的hdfs的所在目錄
		FileInputFormat.addInputPath(job, new Path(args[0]));
		
		//指定輸出結果的hdfs所在目錄
		FileOutputFormat.setOutputPath(job, new Path(args[1]));
		
		System.exit(job.waitForCompletion(true) ? 0 : 1);
	}
}