天天看點

每日總結

序列化案例實操

統計每一個手機号耗費的總上行流量、總下行流量、總流量

(1)編寫流量統計的 Bean 對象

v>

package com.atguigu.mapreduce.writable;

import org.apache.hadoop.io.Writable;

import java.io.DataInput;

import java.io.DataOutput;

import java.io.IOException;

//1 繼承 Writable 接口

public class FlowBean implements Writable {

private long upFlow; //上行流量

private long downFlow; //下行流量

private long sumFlow; //總流量

//2 提供無參構造

public FlowBean() {

}

//3 提供三個參數的 getter 和 setter 方法

public long getUpFlow() {

return upFlow;

public void setUpFlow(long upFlow) {

this.upFlow = upFlow;

public long getDownFlow() {

return downFlow;

public void setDownFlow(long downFlow) {

this.downFlow = downFlow;

public long getSumFlow() {

return sumFlow;

public void setSumFlow(long sumFlow) {

this.sumFlow = sumFlow;

public void setSumFlow() {

this.sumFlow = this.upFlow + this.downFlow;

//4 實作序列化和反序列化方法,注意順序一定要保持一緻

@Override

public void write(DataOutput dataOutput) throws IOException {

dataOutput.writeLong(upFlow);

dataOutput.writeLong(downFlow);

dataOutput.writeLong(sumFlow);

public void readFields(DataInput dataInput) throws IOException {

this.upFlow = dataInput.readLong();

this.downFlow = dataInput.readLong();

this.sumFlow = dataInput.readLong();

//5 重寫 ToString

public String toString() {

return upFlow + "\t" + downFlow + "\t" + sumFlow;

(2)編寫 Mapper 類

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;

public class FlowMapper extends Mapper<LongWritable, Text, Text, FlowBean>

{

private Text outK = new Text();

private FlowBean outV = new FlowBean();

protected void map(LongWritable key, Text value, Context context)

throws IOException, InterruptedException {

//1 擷取一行資料,轉成字元串

String line = value.toString();

//2 切割資料

String[] split = line.split("\t");

//3 抓取我們需要的資料:手機号,上行流量,下行流量

String phone = split[1];

String up = split[split.length - 3];

String down = split[split.length - 2];

//4 封裝 outK outV

outK.set(phone);

outV.setUpFlow(Long.parseLong(up));

outV.setDownFlow(Long.parseLong(down));

outV.setSumFlow();

//5 寫出 outK out

context.write(outK, outV);

(3)編寫 Reducer 類

import org.apache.hadoop.mapreduce.Reducer;

public class FlowReducer extends Reducer<Text, FlowBean, Text, FlowBean>

protected void reduce(Text key, Iterable<FlowBean> values, Context

context) throws IOException, InterruptedException {

long totalUp = 0;

long totalDown = 0;

//1 周遊 values,将其中的上行流量,下行流量分别累加

for (FlowBean flowBean : values) {

totalUp += flowBean.getUpFlow();

totalDown += flowBean.getDownFlow();

//2 封裝 outKV

outV.setUpFlow(totalUp);

outV.setDownFlow(totalDown);

//3 寫出 outK outV

context.write(key,outV);

(4)編寫 Driver 驅動類

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class FlowDriver {

public static void main(String[] args) throws IOException,

ClassNotFoundException, InterruptedException {

//1 擷取 job 對象

Configuration conf = new Configuration();

Job job = Job.getInstance(conf);

//2 關聯本 Driver 類

job.setJarByClass(FlowDriver.class);

//3 關聯 Mapper 和 Reducer

job.setMapperClass(FlowMapper.class);

job.setReducerClass(FlowReducer.class);

//4 設定 Map 端輸出 KV 類型

job.setMapOutputKeyClass(Text.class);

job.setMapOutputValueClass(FlowBean.class);

//5 設定程式最終輸出的 KV 類型

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(FlowBean.class);

//6 設定程式的輸入輸出路徑

FileInputFormat.setInputPaths(job, new Path("D:\\inputflow"));

FileOutputFormat.setOutputPath(job, new Path("D:\\flowoutput"));

//7 送出 Job

boolean b = job.waitForCompletion(true);

System.exit(b ? 0 : 1);