天天看点

hadoop自定义实现排序流量统计

1.上篇文章讲到mapreduce实现简单的流量统计,但最后的结果是按手机号的字典顺序进行输出的,如果我们需要实现按总流量的大小进行排序输出,怎么办?

2.我们可以用上篇的结果文件作为mapreduce的输入,从新写一个mapreduce程序。

3.map输出为FlowBean作为key,输出为null,而FlowBean我们实现WritableComparable接口,自定义它的排序规则,那么map输出到reduce框架中的key时候,他就会按照我们自定义的规则排序输出。

4.FlowBean具体代码如下:

package com.zhichao.wan.flowmr;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;

public class FlowBean implements WritableComparable<FlowBean>{


    private String phoneNB;
    private long u_load;
    private long d_load;
    private long s_load;


    public FlowBean(){

    }

    public FlowBean(String phoneNB, long u_load, long d_load) {
        super();
        this.phoneNB = phoneNB;
        this.u_load = u_load;
        this.d_load = d_load;
        this.s_load=u_load+d_load;
    }

    public String getPhoneNB() {
        return phoneNB;
    }

    public void setPhoneNB(String phoneNB) {
        this.phoneNB = phoneNB;
    }

    public long getU_load() {
        return u_load;
    }

    public void setU_load(long u_load) {
        this.u_load = u_load;
    }

    public long getD_load() {
        return d_load;
    }

    public void setD_load(long d_load) {
        this.d_load = d_load;
    }

    public long getS_load() {
        return s_load;
    }

    public void setS_load(long s_load) {
        this.s_load = s_load;
    }

    @Override
    public void write(DataOutput out) throws IOException {

        out.writeUTF(phoneNB);
        out.writeLong(d_load);
        out.writeLong(u_load);
        out.writeLong(s_load);

    }

    @Override
    public void readFields(DataInput in) throws IOException {


        phoneNB=in.readUTF();
        d_load=in.readLong();
        u_load=in.readLong();
        s_load=in.readLong();

    }

    @Override
    public String toString() {
        return ""+d_load+"\t"+u_load+"\t"+s_load;
    }

    @Override
    public int compareTo(FlowBean arg0) {
        // TODO Auto-generated method stub
        return s_load>arg0.getS_load()?-1:1;
    }




}
           

5.mapreduce程序代码如下:

package com.zhichao.wan.flowmr.sort;

import java.io.IOException;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import com.zhichao.wan.flowmr.FlowBean;

public class FlowSortMR {


    public static class SortMapper extends Mapper<LongWritable, Text, FlowBean, NullWritable>{

        @Override
        protected void map(LongWritable key, Text value,Context context)
                throws IOException, InterruptedException {
            String line = value.toString();

            String[] fileds = StringUtils.split(line,"\t");

            String phoneNB=fileds[0];
            long u_load=Long.parseLong(fileds[1]);
            long d_load=Long.parseLong(fileds[2]);

            context.write(new FlowBean(phoneNB, u_load, d_load), NullWritable.get());

        }

    }

    public static class sortReducder extends Reducer<FlowBean, NullWritable, Text, FlowBean>{
        @Override
        protected void reduce(FlowBean key, Iterable<NullWritable> values,Context context)
                throws IOException, InterruptedException {

            String phoneNB=key.getPhoneNB();

            context.write(new Text(phoneNB), new FlowBean(phoneNB, key.getU_load(), key.getD_load()));


        }

    }


    public static void main(String[] args) throws Exception {


        Configuration conf=new Configuration();

        Job job=Job.getInstance(conf);


        job.setJarByClass(FlowSortMR.class);

        job.setMapperClass(SortMapper.class);
        job.setReducerClass(sortReducder.class);

        job.setMapOutputKeyClass(FlowBean.class);
        job.setMapOutputValueClass(NullWritable.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);


        FileInputFormat.setInputPaths(job, new Path("F:/hadoop/flow/output"));

        FileOutputFormat.setOutputPath(job, new Path("F:/hadoop/flow/output1"));



        job.waitForCompletion(true);



    }


}
           

6.运行结果如下:

13726238888 2481    24681   27162
13726230503 2481    24681   27162
13925057413 63  11058   11121
18320173382 18  9531    9549
13502468823 102 7335    7437
13660577991 9   6960    6969
13922314466 3008    3720    6728
13560439658 5892    400 6292
84138413    4116    1432    5548
15013685858 27  3659    3686
15920133257 20  3156    3176
13602846565 12  1938    1950
15989002119 3   1938    1941
13926435656 1512    200 1712
18211575961 12  1527    1539
13560436666 954 200 1154
13480253104 180 200 380
13760778710 120 200 320
13826544101 0   200 200
13926251106 0   200 200
13719199419 0   200 200
           

继续阅读