天天看点

每日总结

WritableComparable 排序案例实操(全排序)

(尚硅谷MapReduce实验)根据案例 2.3 序列化案例产生的结果再次对总流量进行倒序排序。

(1)

package com.atguigu.mapreduce.writablecompable;

import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;

import java.io.DataOutput;

import java.io.IOException;

public class FlowBean implements WritableComparable<FlowBean> {

private long upFlow; //上行流量

private long downFlow; //下行流量

private long sumFlow; //总流量

//提供无参构造

public FlowBean() {

}

//生成三个属性的 getter 和 setter 方法

public long getUpFlow() {

return upFlow;

public void setUpFlow(long upFlow) {

this.upFlow = upFlow;

v>

public long getDownFlow() {

return downFlow;

public void setDownFlow(long downFlow) {

this.downFlow = downFlow;

public long getSumFlow() {

return sumFlow;

public void setSumFlow(long sumFlow) {

this.sumFlow = sumFlow;

public void setSumFlow() {

this.sumFlow = this.upFlow + this.downFlow;

//实现序列化和反序列化方法,注意顺序一定要一致

@Override

public void write(DataOutput out) throws IOException {

out.writeLong(this.upFlow);

out.writeLong(this.downFlow);

out.writeLong(this.sumFlow);

public void readFields(DataInput in) throws IOException {

this.upFlow = in.readLong();

this.downFlow = in.readLong();

this.sumFlow = in.readLong();

//重写 ToString,最后要输出 FlowBean

public String toString() {

return upFlow + "\t" + downFlow + "\t" + sumFlow;

public int compareTo(FlowBean o) {

//按照总流量比较,倒序排列

if(this.sumFlow > o.sumFlow){

return -1;

}else if(this.sumFlow < o.sumFlow){

return 1;

}else {

return 0;

(2)编写 Mapper 类

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;

public class FlowMapper extends Mapper<LongWritable, Text, FlowBean, Text>

{

private FlowBean outK = new FlowBean();

private Text outV = new Text();

protected void map(LongWritable key, Text value, Context context)

throws IOException, InterruptedException {

//1 获取一行数据

String line = value.toString();

//2 按照"\t",切割数据

String[] split = line.split("\t");

//3 封装 outK outV

outK.setUpFlow(Long.parseLong(split[1]));

outK.setDownFlow(Long.parseLong(split[2]));

outK.setSumFlow();

outV.set(split[0]);

//4 写出 outK outV

context.write(outK,outV);

(3)编写 Reducer 类

import org.apache.hadoop.mapreduce.Reducer;

public class FlowReducer extends Reducer<FlowBean, Text, Text, FlowBean>

protected void reduce(FlowBean key, Iterable<Text> values, Context

context) throws IOException, InterruptedException {

//遍历 values 集合,循环写出,避免总流量相同的情况

for (Text value : values) {

//调换 KV 位置,反向写出

context.write(value,key);

(4)编写 Driver 类

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class FlowDriver {

public static void main(String[] args) throws IOException,

ClassNotFoundException, InterruptedException {

//1 获取 job 对象

Configuration conf = new Configuration();

Job job = Job.getInstance(conf);

//2 关联本 Driver 类

job.setJarByClass(FlowDriver.class);

//3 关联 Mapper 和 Reducer

job.setMapperClass(FlowMapper.class);

job.setReducerClass(FlowReducer.class);

//4 设置 Map 端输出数据的 KV 类型

job.setMapOutputKeyClass(FlowBean.class);

job.setMapOutputValueClass(Text.class);

//5 设置程序最终输出的 KV 类型

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(FlowBean.class);

//6 设置输入输出路径

FileInputFormat.setInputPaths(job, new Path("D:\\inputflow2"));

FileOutputFormat.setOutputPath(job, new Path("D:\\comparout"));

//7 提交 Job

boolean b = job.waitForCompletion(true);

System.exit(b ? 0 : 1);