天天看点

每日总结

序列化案例实操

统计每一个手机号耗费的总上行流量、总下行流量、总流量

(1)编写流量统计的 Bean 对象

v>

package com.atguigu.mapreduce.writable;

import org.apache.hadoop.io.Writable;

import java.io.DataInput;

import java.io.DataOutput;

import java.io.IOException;

//1 继承 Writable 接口

public class FlowBean implements Writable {

private long upFlow; //上行流量

private long downFlow; //下行流量

private long sumFlow; //总流量

//2 提供无参构造

public FlowBean() {

}

//3 提供三个参数的 getter 和 setter 方法

public long getUpFlow() {

return upFlow;

public void setUpFlow(long upFlow) {

this.upFlow = upFlow;

public long getDownFlow() {

return downFlow;

public void setDownFlow(long downFlow) {

this.downFlow = downFlow;

public long getSumFlow() {

return sumFlow;

public void setSumFlow(long sumFlow) {

this.sumFlow = sumFlow;

public void setSumFlow() {

this.sumFlow = this.upFlow + this.downFlow;

//4 实现序列化和反序列化方法,注意顺序一定要保持一致

@Override

public void write(DataOutput dataOutput) throws IOException {

dataOutput.writeLong(upFlow);

dataOutput.writeLong(downFlow);

dataOutput.writeLong(sumFlow);

public void readFields(DataInput dataInput) throws IOException {

this.upFlow = dataInput.readLong();

this.downFlow = dataInput.readLong();

this.sumFlow = dataInput.readLong();

//5 重写 ToString

public String toString() {

return upFlow + "\t" + downFlow + "\t" + sumFlow;

(2)编写 Mapper 类

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;

public class FlowMapper extends Mapper<LongWritable, Text, Text, FlowBean>

{

private Text outK = new Text();

private FlowBean outV = new FlowBean();

protected void map(LongWritable key, Text value, Context context)

throws IOException, InterruptedException {

//1 获取一行数据,转成字符串

String line = value.toString();

//2 切割数据

String[] split = line.split("\t");

//3 抓取我们需要的数据:手机号,上行流量,下行流量

String phone = split[1];

String up = split[split.length - 3];

String down = split[split.length - 2];

//4 封装 outK outV

outK.set(phone);

outV.setUpFlow(Long.parseLong(up));

outV.setDownFlow(Long.parseLong(down));

outV.setSumFlow();

//5 写出 outK out

context.write(outK, outV);

(3)编写 Reducer 类

import org.apache.hadoop.mapreduce.Reducer;

public class FlowReducer extends Reducer<Text, FlowBean, Text, FlowBean>

protected void reduce(Text key, Iterable<FlowBean> values, Context

context) throws IOException, InterruptedException {

long totalUp = 0;

long totalDown = 0;

//1 遍历 values,将其中的上行流量,下行流量分别累加

for (FlowBean flowBean : values) {

totalUp += flowBean.getUpFlow();

totalDown += flowBean.getDownFlow();

//2 封装 outKV

outV.setUpFlow(totalUp);

outV.setDownFlow(totalDown);

//3 写出 outK outV

context.write(key,outV);

(4)编写 Driver 驱动类

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class FlowDriver {

public static void main(String[] args) throws IOException,

ClassNotFoundException, InterruptedException {

//1 获取 job 对象

Configuration conf = new Configuration();

Job job = Job.getInstance(conf);

//2 关联本 Driver 类

job.setJarByClass(FlowDriver.class);

//3 关联 Mapper 和 Reducer

job.setMapperClass(FlowMapper.class);

job.setReducerClass(FlowReducer.class);

//4 设置 Map 端输出 KV 类型

job.setMapOutputKeyClass(Text.class);

job.setMapOutputValueClass(FlowBean.class);

//5 设置程序最终输出的 KV 类型

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(FlowBean.class);

//6 设置程序的输入输出路径

FileInputFormat.setInputPaths(job, new Path("D:\\inputflow"));

FileOutputFormat.setOutputPath(job, new Path("D:\\flowoutput"));

//7 提交 Job

boolean b = job.waitForCompletion(true);

System.exit(b ? 0 : 1);