天天看點

每日總結

WritableComparable 排序案例實操(全排序)

(尚矽谷MapReduce實驗)根據案例 2.3 序列化案例産生的結果再次對總流量進行倒序排序。

(1)

package com.atguigu.mapreduce.writablecompable;

import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;

import java.io.DataOutput;

import java.io.IOException;

public class FlowBean implements WritableComparable<FlowBean> {

private long upFlow; //上行流量

private long downFlow; //下行流量

private long sumFlow; //總流量

//提供無參構造

public FlowBean() {

}

//生成三個屬性的 getter 和 setter 方法

public long getUpFlow() {

return upFlow;

public void setUpFlow(long upFlow) {

this.upFlow = upFlow;

v>

public long getDownFlow() {

return downFlow;

public void setDownFlow(long downFlow) {

this.downFlow = downFlow;

public long getSumFlow() {

return sumFlow;

public void setSumFlow(long sumFlow) {

this.sumFlow = sumFlow;

public void setSumFlow() {

this.sumFlow = this.upFlow + this.downFlow;

//實作序列化和反序列化方法,注意順序一定要一緻

@Override

public void write(DataOutput out) throws IOException {

out.writeLong(this.upFlow);

out.writeLong(this.downFlow);

out.writeLong(this.sumFlow);

public void readFields(DataInput in) throws IOException {

this.upFlow = in.readLong();

this.downFlow = in.readLong();

this.sumFlow = in.readLong();

//重寫 ToString,最後要輸出 FlowBean

public String toString() {

return upFlow + "\t" + downFlow + "\t" + sumFlow;

public int compareTo(FlowBean o) {

//按照總流量比較,倒序排列

if(this.sumFlow > o.sumFlow){

return -1;

}else if(this.sumFlow < o.sumFlow){

return 1;

}else {

return 0;

(2)編寫 Mapper 類

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;

public class FlowMapper extends Mapper<LongWritable, Text, FlowBean, Text>

{

private FlowBean outK = new FlowBean();

private Text outV = new Text();

protected void map(LongWritable key, Text value, Context context)

throws IOException, InterruptedException {

//1 擷取一行資料

String line = value.toString();

//2 按照"\t",切割資料

String[] split = line.split("\t");

//3 封裝 outK outV

outK.setUpFlow(Long.parseLong(split[1]));

outK.setDownFlow(Long.parseLong(split[2]));

outK.setSumFlow();

outV.set(split[0]);

//4 寫出 outK outV

context.write(outK,outV);

(3)編寫 Reducer 類

import org.apache.hadoop.mapreduce.Reducer;

public class FlowReducer extends Reducer<FlowBean, Text, Text, FlowBean>

protected void reduce(FlowBean key, Iterable<Text> values, Context

context) throws IOException, InterruptedException {

//周遊 values 集合,循環寫出,避免總流量相同的情況

for (Text value : values) {

//調換 KV 位置,反向寫出

context.write(value,key);

(4)編寫 Driver 類

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class FlowDriver {

public static void main(String[] args) throws IOException,

ClassNotFoundException, InterruptedException {

//1 擷取 job 對象

Configuration conf = new Configuration();

Job job = Job.getInstance(conf);

//2 關聯本 Driver 類

job.setJarByClass(FlowDriver.class);

//3 關聯 Mapper 和 Reducer

job.setMapperClass(FlowMapper.class);

job.setReducerClass(FlowReducer.class);

//4 設定 Map 端輸出資料的 KV 類型

job.setMapOutputKeyClass(FlowBean.class);

job.setMapOutputValueClass(Text.class);

//5 設定程式最終輸出的 KV 類型

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(FlowBean.class);

//6 設定輸入輸出路徑

FileInputFormat.setInputPaths(job, new Path("D:\\inputflow2"));

FileOutputFormat.setOutputPath(job, new Path("D:\\comparout"));

//7 送出 Job

boolean b = job.waitForCompletion(true);

System.exit(b ? 0 : 1);