wordcount 作為經典的MR 入門程式,對了解和編寫MR程式有很大的幫助。
package wordCount2;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class WordCount {
/**
* Mapper<KEYIN,VALUEIN,KEYOUT,VALUEOUT>
* KEYIN :輸入 KV 資料對 中的key 的資料類型
* 預設情況下,是MR架構所讀到的一行文本的起始偏移量, long ---> LongWritable
* VALUEIN :輸入輸入 KV 資料對 中的value 的資料類型
* 預設情況下,是MR架構所讀到的一行文本的内容, String ---> Text
* KEYOUT :輸出 KV 資料對 中的key 的資料類型
* 使用者自定義邏輯處理完成後輸出資料中的 key ,在此處是單詞, String ---> Text
* VALUEOUT:輸出 KV 資料對 中的value 的資料類型
* 使用者自定義邏輯處理完成後輸出資料中的 value 此處是單詞次數 ,Integer ---> IntWritabel
* @author hadoop1
*
*/
public static class WordMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
/**
* map 方法是提供給 map task 程序來調用的,map task 程序是每讀取一行文本來調用一次重寫的map 方法
* map task 在調用 map 方法時,傳遞的參數
* key : 一行文本的偏移量
* value: 一行文本的内容
*/
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//得到一行文本後 ,将其轉成 String
String line = value.toString();
//将文本切割
String[] words = line.split(" ");
//輸出 <單詞 , 1> 到上下文
for (String word : words) {
context.write(new Text(word), new IntWritable(1));
}
}
}
/**
* Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>
* KEYIN : 對應 mapper 階段輸出的結果的 key 的類型
*
* VALUEIN :對應 mapper 階段輸出的結果的 value 的類型
*
* KEYOUT :reduce 處理完成後輸出結果KV對中 key 的類型
*
* VALUEOUT:reduce 處理完成後輸出結果KV對中 value 的類型
*
*
*/
public static class WordReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
/**
* reduce方法提供給 reduce task 程序調用
*
* reduce task 會将shuffle階段分發過來的大量的KV資料進行聚合,聚合的機制是相同的key的 kv 對聚合為一組
* 然後reduce task 對每一組聚合的KV調用重寫的reduce 方法
* 比如:<hello,1><hello,1><hello,1><hello,1><hello,1>
* <world,1><world,1><world,1>
* hello 會調用一次reduce方法進行處理,world組 也會調用一次reduce方法進行處理
* 調用時傳遞的參數:
* key : 一組KV中的key
* values : 一組KV中所有value的疊代器
*/
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int count = 0;
//周遊一組KV 中key 對應的 value 疊代器所有的值 進行累加。
for (IntWritable value : values) {
count += value.get();
}
//輸出單詞的統計結果
context.write(key, new IntWritable(count));
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = new Job(conf, "word count");
//設定job所需的jar包的本地路徑
//job.setJar("/home.hadoop1/wc.jar");
job.setJarByClass(WordCount.class);
//指定業務job要使用的 mapper/reduce 業務類
job.setMapperClass(WordMapper.class);
job.setReducerClass(WordReducer.class);
//指定reduce 完成後最終的資料KV類型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//指定輸入檔案的hdfs的所在目錄
FileInputFormat.addInputPath(job, new Path(args[0]));
//指定輸出結果的hdfs所在目錄
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}