1.Mapper程式
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
// k1 v1 k2 v2
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
@Override
protected void map(LongWritable k1, Text v1,Context context)
throws IOException, InterruptedException {
/*
*context 表示Mapper的上下文
*上文:HDFS
*下文:Reducer
*/
//資料
String data = v1.toString();
//分詞操作
String[] words = data.split(" ");
for (String w : words) {
context.write(new Text(w), new IntWritable(1));
}
}
}
2.Reducer程式
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
// k3 v3 k4 v4
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
protected void reduce(Text k3, Iterable<IntWritable> v3, Context context)
throws IOException, InterruptedException {
/*
* context:Reducer的上下文
* 上文是:Mapper
* 下文是:HDFS
*/
//對v3求和
int total = 0;
for (IntWritable v : v3) {
total += v.get();
}
//輸出k4單詞 v4頻率
context.write(k3, new IntWritable(total));
}
}
3.送出任務的主程式
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class WordCountMain {
/**
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
// 建立一個Job
Job job = Job.getInstance(new Configuration());
job.setJarByClass(WordCountMain.class);//main方法所在的class
//指定job的mapper <k2,v2>
job.setMapperClass(WordCountMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//指定job的reducer <k4,v4>
job.setReducerClass(WordCountReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//指定job的輸入和輸出
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//執行job
job.waitForCompletion(true);
}
}