1、基本概念
2、Mapper代碼package com.ares.hadoop.mr.flowsum;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.util.StringUtils;
import org.apache.log4j.Logger;
import com.ares.hadoop.mr.wordcount.MRTest;
//Long, String, String, Long --> LongWritable, Text, Text, LongWritable
public class FlowSumMapper extends Mapper<LongWritable, Text, Text, FlowBean> {
private static final Logger LOGGER = Logger.getLogger(MRTest.class);
private String line;
private int length;
private final static char separator = '\t';
private String phoneNum;
private long upFlow;
private long downFlow;
//private long sumFlow;
private Text text = new Text();
private FlowBean flowBean = new FlowBean();
@Override
protected void map(LongWritable key, Text value,
Mapper<LongWritable, Text, Text, FlowBean>.Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
//super.map(key, value, context);
line = value.toString();
String[] fields = StringUtils.split(line, separator);
length = fields.length;
if (length != 11) {
LOGGER.error(key.get() + ", " + line + " LENGTH INVALID, IGNORE...");
}
phoneNum = fields[1];
try {
upFlow = Long.parseLong(fields[length-3]);
downFlow = Long.parseLong(fields[length-2]);
//sumFlow = upFlow + downFlow;
} catch (Exception e) {
// TODO: handle exception
LOGGER.error(key.get() + ", " + line + " FLOW DATA INVALID, IGNORE...");
}
flowBean.setPhoneNum(phoneNum);
flowBean.setUpFlow(upFlow);
flowBean.setDownFlow(downFlow);
//flowBean.setSumFlow(sumFlow);
text.set(phoneNum);
context.write(text, flowBean);
}
}
3、Reducer代碼
package com.ares.hadoop.mr.flowsum;
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class FlowSumReducer extends Reducer<Text, FlowBean, Text, FlowBean>{
//private static final Logger LOGGER = Logger.getLogger(MRTest.class);
private FlowBean flowBean = new FlowBean();
@Override
protected void reduce(Text key, Iterable<FlowBean> values,
Reducer<Text, FlowBean, Text, FlowBean>.Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
//super.reduce(arg0, arg1, arg2);
long upFlowCounter = 0;
long downFlowCounter = 0;
for (FlowBean flowBean : values) {
upFlowCounter += flowBean.getUpFlow();
downFlowCounter += flowBean.getDownFlow();
}
flowBean.setPhoneNum(key.toString());
flowBean.setUpFlow(upFlowCounter);
flowBean.setDownFlow(downFlowCounter);
flowBean.setSumFlow(upFlowCounter + downFlowCounter);
context.write(key, flowBean);
}
}
4、序列化Bean代碼
package com.ares.hadoop.mr.flowsum;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;
public class FlowBean implements Writable {
private String phoneNum;
private long upFlow;
private long downFlow;
private long sumFlow;
public FlowBean() {
// TODO Auto-generated constructor stub
}
// public FlowBean(String phoneNum, long upFlow, long downFlow, long sumFlow) {
// super();
// this.phoneNum = phoneNum;
// this.upFlow = upFlow;
// this.downFlow = downFlow;
// this.sumFlow = sumFlow;
// }
public String getPhoneNum() {
return phoneNum;
}
public void setPhoneNum(String phoneNum) {
this.phoneNum = phoneNum;
}
public long getUpFlow() {
return upFlow;
}
public void setUpFlow(long upFlow) {
this.upFlow = upFlow;
}
public long getDownFlow() {
return downFlow;
}
public void setDownFlow(long downFlow) {
this.downFlow = downFlow;
}
public long getSumFlow() {
return sumFlow;
}
public void setSumFlow(long sumFlow) {
this.sumFlow = sumFlow;
}
@Override
public void readFields(DataInput in) throws IOException {
// TODO Auto-generated method stub
phoneNum = in.readUTF();
upFlow = in.readLong();
downFlow = in.readLong();
sumFlow = in.readLong();
}
@Override
public void write(DataOutput out) throws IOException {
// TODO Auto-generated method stub
out.writeUTF(phoneNum);
out.writeLong(upFlow);
out.writeLong(downFlow);
out.writeLong(sumFlow);
}
@Override
public String toString() {
return "" + upFlow + "\t" + downFlow + "\t" + sumFlow;
}
}
package com.ares.hadoop.mr.flowsum;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.log4j.Logger;
public class FlowSumRunner extends Configured implements Tool {
private static final Logger LOGGER = Logger.getLogger(FlowSumRunner.class);
@Override
public int run(String[] args) throws Exception {
// TODO Auto-generated method stub
LOGGER.debug("MRTest: MRTest STARTED...");
if (args.length != 2) {
LOGGER.error("MRTest: ARGUMENTS ERROR");
System.exit(-1);
}
Configuration conf = new Configuration();
//FOR Eclipse JVM Debug
//conf.set("mapreduce.job.jar", "flowsum.jar");
Job job = Job.getInstance(conf);
// JOB NAME
job.setJobName("flowsum");
// JOB MAPPER & REDUCER
job.setJarByClass(FlowSumRunner.class);
job.setMapperClass(FlowSumMapper.class);
job.setReducerClass(FlowSumReducer.class);
// MAP & REDUCE
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
// MAP
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FlowBean.class);
// JOB INPUT & OUTPUT PATH
//FileInputFormat.addInputPath(job, new Path(args[0]));
FileInputFormat.setInputPaths(job, args[0]);
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// VERBOSE OUTPUT
if (job.waitForCompletion(true)) {
LOGGER.debug("MRTest: MRTest SUCCESSFULLY...");
return 0;
} else {
LOGGER.debug("MRTest: MRTest FAILED...");
return 1;
}
}
public static void main(String[] args) throws Exception {
int result = ToolRunner.run(new Configuration(), new FlowSumRunner(), args);
System.exit(result);
}
}