package com.mapreduce;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class ModelDemo implements Tool{
//第一個參數(key)必須為LongWritable或者IntWritable,因為他代表的是 行偏移量:每一行的第一個字母距離該檔案的首位置的距離
//第二個參數 代表 map階段輸入value類型
//第三個參數 代表 map階段輸出key類型
//第四個參數 代表map階段輸出value類型
public static class MyMapper extends Mapper<LongWritable, Text, Text, Text> {
/**
* value值代表輸入的值
*/
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 1、從輸入資料中擷取每一個檔案中的每一行的值
String line = value.toString();
// 2、對每一行的資料進行切分(看情況)
String[] words = line.split(" ");
// 3、循環處理
for (String word : words) {
value.set(1 + "");
// map階段的輸出
context.write(new Text(word), value);
}
}
}
/**
*
* 第一個Text代表 Reduce階段Key輸入的值類型 需要和Map階段輸出Key的類型相同
* 第二個Text代表 Reduce階段Value輸入的值類型 需要和Map階段輸出Value的類型相同
* 第三個Text代表 Reduce階段Key輸出的值類型 按邏輯自己定義
* 第二個Text代表 Reduce階段Value輸出的值類型 按邏輯自己定義
* @author Administrator
*
*/
public static class MyReduce extends Reducer<Text, Text, Text, Text> {
@Override
protected void reduce(Text value, Iterable<Text> list, Context context)
throws IOException, InterruptedException {
int count = 0;
for (Text i : list) {
count += Integer.parseInt(i.toString());
}
context.write(value, new Text(count+""));
}
}
/**
* 設定conf類型
*/
public void setConf(Configuration conf) {
// TODO Auto-generated method stub
conf.set("fs.defaultFS", "hdfs://zwj");
conf.set("dfs.nameservices", "zwj");
conf.set("dfs.ha.namenodes.zwj", "nn1,nn2");
conf.set("dfs.namenode.rpc-address.zwj.nn1", "hadoop01:9000");
conf.set("dfs.namenode.rpc-address.zwj.nn2", "hadoop02:9000");
conf.set("dfs.client.failover.proxy.provider.zwj",
"org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider");
}
public Configuration getConf() {
// TODO Auto-generated method stub
return new Configuration();
}
public int run(String[] args) throws Exception {
// TODO Auto-generated method stub
Configuration conf = getConf();
Job job = Job.getInstance(conf, "job");
job.setJarByClass(ModelDemo.class);
//設定自定義mapper的值
job.setMapperClass(MyMapper.class);
//對Map階段輸出 的key value 的類型進行指派
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
//設定自定義Reduce的值
job.setReducerClass(MyReduce.class);
//對Reduce階段輸出 的key value 的類型進行指派
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
//設定input output 的數值,通過args[進行指派]
setInputAndOutput(job, conf, args);
return (job.waitForCompletion(true)? 0 : 1);
}
private void setInputAndOutput(Job job, Configuration conf, String[] args) throws Exception {
if (args.length != 2) {
System.out.println("資料格式不正确");
return;
}
FileInputFormat.addInputPath(job, new Path(args[0]));
FileSystem fs = FileSystem.get(conf);
Path outPath = new Path(args[1]);
if (fs.exists(outPath)) {
fs.delete(outPath, true);
}
FileOutputFormat.setOutputPath(job, outPath);
}
/**
* 主調用函數通過這個執行方法,并且傳入參數
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
int isok = ToolRunner.run( new ModelDemo(), args);
// 退出整個job
System.exit(isok);
}
}