天天看點

MapReduce之流量彙總案例

流量彙總案例

1.需求

 現在我們采集到了一份使用者通路流量的資料,我們需要從這份資料中統計出每個使用者的流量資料。

MapReduce之流量彙總案例
部分測試資料如下:可以拷貝出去做測試

1363157985066   13726230503 00-FD-07-A4-72-B8:CMCC  120.196.100.82  i02.c.aliimg.com        24  27  2481    24681   200
1363157995052   13826544101 5C-0E-8B-C7-F1-E0:CMCC  120.197.40.4            4   0   264 0   200
1363157991076   13926435656 20-10-7A-28-CC-0A:CMCC  120.196.100.99          2   4   132 1512    200
1363154400022   13926251106 5C-0E-8B-8B-B1-50:CMCC  120.197.40.4            4   0   240 0   200
1363157993044   18211575961 94-71-AC-CD-E6-18:CMCC-EASY 120.196.100.99  iface.qiyi.com  視訊網站    15  12  1527    2106    200
1363157995074   84138413    5C-0E-8B-8C-E8-20:7DaysInn  120.197.40.4    122.72.52.12        20  16  4116    1432    200
1363157993055   13560439658 C4-17-FE-BA-DE-D9:CMCC  120.196.100.99          18  15  1116    954 200
1363157995033   15920133257 5C-0E-8B-C7-BA-20:CMCC  120.197.40.4    sug.so.360.cn   資訊安全    20  20  3156    2936    200
1363157983019   13719199419 68-A1-B7-03-07-B1:CMCC-EASY 120.196.100.82          4   0   240 0   200
1363157984041   13660577991 5C-0E-8B-92-5C-20:CMCC-EASY 120.197.40.4    s19.cnzz.com    站點統計    24  9   6960    690 200
1363157973098   15013685858 5C-0E-8B-C7-F7-90:CMCC  120.197.40.4    rank.ie.sogou.com   搜尋引擎    28  27  3659    3538    200
1363157986029   15989002119 E8-99-C4-4E-93-E0:CMCC-EASY 120.196.100.99  www.umeng.com   站點統計    3   3   1938    180 200
1363157992093   13560439658 C4-17-FE-BA-DE-D9:CMCC  120.196.100.99          15  9   918 4938    200
1363157986041   13480253104 5C-0E-8B-C7-FC-80:CMCC-EASY 120.197.40.4            3   3   180 180 200
1363157984040   13602846565 5C-0E-8B-8B-B6-00:CMCC  120.197.40.4    2052.flash2-http.qq.com 綜合門戶    15  12  1938    2910    200
1363157995093   13922314466 00-FD-07-A2-EC-BA:CMCC  120.196.100.82  img.qfc.cn      12  12  3008    3720    200
1363157982040   13502468823 5C-0A-5B-6A-0B-D4:CMCC-EASY 120.196.100.99  y0.ifengimg.com 綜合門戶    57  102 7335    110349  200
1363157986072   18320173382 84-25-DB-4F-10-1A:CMCC-EASY 120.196.100.99  input.shouji.sogou.com  搜尋引擎    21  18  9531    2412    200
1363157990043   13925057413 00-1F-64-E1-E6-9A:CMCC  120.196.100.55  t3.baidu.com    搜尋引擎    69  63  11058   48243   200
1363157988072   13760778710 00-FD-07-A4-7B-08:CMCC  120.196.100.82          2   2   120 120 200
1363157985066   13726238888 00-FD-07-A4-72-B8:CMCC  120.196.100.82  i02.c.aliimg.com        24  27  2481    24681   200
1363157993055   13560436666 C4-17-FE-BA-DE-D9:CMCC  120.196.100.99          18  15  1116    954 200
1363157993055   13560436646 C4-17-FE-BA-DE-D9:CMCC  120.196.100.99          18  15  1116    954 200
1363157984041   13660573991 5C-0E-8B-92-5C-20:CMCC-EASY 120.197.40.4    s19.cnzz.com    站點統計    24  9   6960    690 200
1363157973098   15013685858 5C-0E-8B-C7-F7-90:CMCC  120.197.40.4    rank.ie.sogou.com   搜尋引擎    28  27  3659    3538    200
1363157986029   15989102119 E8-99-C4-4E-93-E0:CMCC-EASY 120.196.100.99  www.umeng.com   站點統計    3   3   1938    180 200
1363157992093   13560439658 C4-17-FE-BA-DE-D9:CMCC  120.196.100.99          15  9   918 4938    200
1363157986041   13480453104 5C-0E-8B-C7-FC-80:CMCC-EASY 120.197.40.4            3   3   180 180 200
1363157984040   13602246565 5C-0E-8B-8B-B6-00:CMCC  120.197.40.4    2052.flash2-http.qq.com 綜合門戶    15  12  1938    2910    200
1363157995093   13922114466 00-FD-07-A2-EC-BA:CMCC  120.196.100.82  img.qfc.cn      12  12  3008    3720    200
1363157982040   13506468823 5C-0A-5B-6A-0B-D4:CMCC-EASY 120.196.100.99  y0.ifengimg.com 綜合門戶    57  102 7335    110349  200
1363157986072   18320173382 84-25-DB-4F-10-1A:CMCC-EASY 120.196.100.99  input.shouji.sogou.com  搜尋引擎    21  18  9531    2412    200
1363157990043   13925047413 00-1F-64-E1-E6-9A:CMCC  120.196.100.55  t3.baidu.com    搜尋引擎    69  63  11058   48243   200
1363157988072   13760758710 00-FD-07-A4-7B-08:CMCC  120.196.100.82          2   2   120 120 200
1363157985066   13726228888 00-FD-07-A4-72-B8:CMCC  120.196.100.82  i02.c.aliimg.com        24  27  2481    24681   200
1363157993055   13560416666 C4-17-FE-BA-DE-D9:CMCC  120.196.100.99          18  15  1116    954 200
1363157993055   13560436766 C4-17-FE-BA-DE-D9:CMCC  120.196.100.99          18  15  1116    954 200      

2.需求分析

 Map階段讀取一行資料需要記錄’上行流量’,‘下行流量’以及’總流量’,單個基本資料類型不友善儲存,引入自定義對象儲存,但需要序列化.

3.具體實作

3.1建立自定義對象

 注意需要實作序列化,此處我們實作Writable接口,重寫相關的方法

package com.sxt.mr.flow;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.Writable;
/**
 * 存儲流量相關資料
 * @author 波波烤鴨
 *
 */
public class Flow implements Writable {
    // 上下流量
    private long upFlow;

    // 下行流量
    private long downFlow;
    // 總流量
    private long sumFlow;

    public long getUpFlow() {
        return upFlow;
    }

    public void setUpFlow(long upFlow) {
        this.upFlow = upFlow;
    }

    public long getDownFlow() {
        return downFlow;
    }

    public void setDownFlow(long downFlow) {
        this.downFlow = downFlow;
    }

    public Flow(long upFlow, long downFlow) {
        super();
        this.upFlow = upFlow;
        this.downFlow = downFlow;
        this.sumFlow = upFlow + downFlow;
    }

    /**
     * 無參構造方法必須要有 反射的時候需要用到
     */
    public Flow() {
        super();
    }

    /**
     * 序列化方法
     */
    @Override
    public void write(DataOutput out) throws IOException {
        // TODO Auto-generated method stub
        out.writeLong(upFlow);
        out.writeLong(downFlow);
        out.writeLong(sumFlow);
    }

    @Override
    public String toString() {
        return upFlow + "\t" + downFlow + "\t" + sumFlow;
    }

    /**
     * 反序列化 反序列化的順序和序列化的順序一緻
     */
    @Override
    public void readFields(DataInput in) throws IOException {
        // TODO Auto-generated method stub
        upFlow = in.readLong();
        downFlow = in.readLong();
        sumFlow = in.readLong();
    }

    public long getSumFlow() {
        return sumFlow;
    }

    public void setSumFlow(long sumFlow) {
        this.sumFlow = sumFlow;
    }
}      

3.2 自定義MapTask

 注意輸出的value就是我們自定義的類型

public class MyMapTask extends Mapper<LongWritable, Text, Text, Flow>{

    @Override
    protected void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {
        // 将一行資料轉換為String
        String line = value.toString();
        // 切分字段
        String[] fields = line.split("\t");
        // 取出手機号
        String phoneNum = fields[1];
        // 取出上行流量下行流量
        long upFlow = Long.parseLong(fields[fields.length-3]);
        long downFlow = Long.parseLong(fields[fields.length-2]);
        Flow flow = new Flow(upFlow,downFlow);
        context.write(new Text(phoneNum), flow);
    }
}      

3.3 自定義ReduceTask

public class MyReduceTask extends Reducer<Text, Flow, Text, Flow>{

    @Override
    protected void reduce(Text key, Iterable<Flow> values, Context context)
            throws IOException, InterruptedException {
        long sum_upflow = 0;
        long sum_downflow = 0;
        for (Flow flow : values) {
            sum_upflow += flow.getUpFlow();
            sum_downflow += flow.getDownFlow();
        }
        Flow f = new Flow(sum_upflow,sum_downflow);
        // 必須重寫Flow的toString方法
        context.write(new Text(key), f);
    }
}      

3.4 啟動類測試

 采用本地模式運作

package com.sxt.mr.flow;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class FlowTest {

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration(true);
        conf.set("mapreduce.framework.name", "local");
        // 輸出到HDFS檔案系統中
        // conf.set("fs.defaultFS", "hdfs://hadoop-node01:9000");
        // 輸出到本地檔案系統
        conf.set("fs.defaultFS", "file:///");
        Job job = Job.getInstance(conf);
        
        job.setJarByClass(FlowTest.class);
        
        // 指定本job要使用的map/reduce的工具類
        job.setMapperClass(MyMapTask.class);
        job.setReducerClass(MyReduceTask.class);
        
        // 指定mapper輸出kv的類型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Flow.class);
        
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Flow.class);
        
        // 指定job的原始檔案輸入目錄
        // 6.設定輸出輸出類
        FileInputFormat.setInputPaths(job, new Path("c:/tools/bigdata/mr/flow/input/"));
        FileOutputFormat.setOutputPath(job, new Path("c:/tools/bigdata/mr/flow/output/"));
                
        //将job中配置的相關參數,以及job所用的jar包送出給yarn運作
        //job.submit();  waitForCompletion等待執行完成
        boolean flag = job.waitForCompletion(true);
        System.exit(flag?0:1);
    }
}
      

3.5 輸出結果

MapReduce之流量彙總案例
MapReduce之流量彙總案例

4. 總結

 本案例主要是示範了自定義對象在MapReduce任務中的使用,注意序列化!

繼續閱讀