1、問題描述
在大資料規模中,經常遇到一類需要求出現頻率最高的K個數,這類問題稱為“TOPK”問題!例如:統計歌曲中最熱門的前10首歌曲,統計通路流量最高的前5個網站等。
2、例如統計通路流量最高的前5個網站:
資料test.data檔案:

資料格式解釋:域名 上行流量 下行流量
思路:
1、Mapper每解析一行内容,按照"\t"擷取各個字段
2、因為URL有很多重複記錄,是以将URL放到key(通過分析MapReduce原理),流量放在value
3、在reduce統計總流量,通過TreeMap進行對資料進行緩存,最後一并輸出(值得注意的是要一次性輸出必須要用到Reduce類的cleanup方法)
程式如下:
Mapper類:
package com.itheima.hadoop.mapreduce.mapper;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Counter;
import com.itheima.hadoop.mapreduce.bean.FlowBean;
public class TopKURLMapper extends Mapper<LongWritable, Text, Text, FlowBean> {
/**
* @param key
* : 每一行偏移量
* @param value
* : 每一行的内容
* @param context
* : 環境上下文
*/
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
/**
* 該計數器是org.apache.hadoop.mapreduce.Counter
*/
Counter counter = context
.getCounter("ExistProblem", "ExistProblemLine"); // 自定義存在問題的行錯誤計數器
String line = value.toString(); // 讀取一行資料
String[] fields = line.split("\t"); // 擷取各個字段,按照\t劃分
try {
String url = fields[0]; // 擷取URL字段
long upFlow = Long.parseLong(fields[1]); // 擷取上行流量(upFlow)字段
long downFlow = Long.parseLong(fields[2]); // 擷取下行流量(downFlow)字段
FlowBean bean = new FlowBean(upFlow, downFlow); // 将上行流量和下行流量封裝到bean中
Text tUrl = new Text(url); // 将java資料類型轉換hadoop資料類型
context.write(tUrl, bean); // 傳遞的資料較多,封裝到bean進行傳輸(tips:bean傳輸時需要注意序列化問題)
} catch (Exception e) {
e.printStackTrace();
counter.increment(1); // 記錄錯誤行數
}
}
}
Reduce類:
package com.itheima.hadoop.mapreduce.reducer;
import java.io.IOException;
import java.util.Map.Entry;
import java.util.TreeMap;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import com.itheima.hadoop.mapreduce.bean.FlowBean;
public class TopKURLReducer extends Reducer<Text, FlowBean, FlowBean, Text> {
private TreeMap<FlowBean, Text> treeMap = new TreeMap<FlowBean, Text>();
/**
* @param key
* : 每一行相同URL
* @param values
* : 總流量bean
*/
@Override
public void reduce(Text key, Iterable<FlowBean> values, Context context)
throws IOException, InterruptedException {
long countUpFlow = 0;
long countDownFlow = 0;
/*
* 1、取出每個bean的總流量 2、統計多個bean的總流量 3、緩存到treeMap中
*/
for (FlowBean bean : values) {
countUpFlow += bean.getUpFlow(); // 統計上行流量
countDownFlow += bean.getDownFlow(); // 統計下行總流量
}
// 封裝統計的流量
FlowBean bean = new FlowBean(countUpFlow, countDownFlow);
treeMap.put(bean, new Text(key)); // 緩存到treeMap中
}
@Override
public void cleanup(Context context) throws IOException,
InterruptedException {
//周遊緩存
for (Entry<FlowBean,Text> entry : treeMap.entrySet()) {
context.write(entry.getKey(), entry.getValue());
}
super.cleanup(context); // 不能動原本的銷毀操作
}
}
FlowBean類:
package com.itheima.hadoop.mapreduce.bean;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;
public class FlowBean implements Writable, Comparable<FlowBean> {
private long upFlow;
private long downFlow;
private long maxFlow;
@Override
public String toString() {
return upFlow + "\t" + downFlow + "\t" + maxFlow;
}
/**
* 1、序列化注意的問題,序列化需要預設的構造方法(反射) 2、在readFields()和write()方法中,應該遵循按照順序寫出和讀入
*/
public FlowBean() {
}
public FlowBean(long upFlow, long downFlow) {
this.upFlow = upFlow;
this.downFlow = downFlow;
this.maxFlow = upFlow + downFlow;
}
public long getUpFlow() {
return upFlow;
}
public void setUpFlow(long upFlow) {
this.upFlow = upFlow;
}
public long getDownFlow() {
return downFlow;
}
public void setDownFlow(long downFlow) {
this.downFlow = downFlow;
}
public long getMaxFlow() {
return maxFlow;
}
public void setMaxFlow(long maxFlow) {
this.maxFlow = maxFlow;
}
@Override
public void readFields(DataInput dataIn) throws IOException {
upFlow = dataIn.readLong();
downFlow = dataIn.readLong();
maxFlow = dataIn.readLong();
}
@Override
public void write(DataOutput dataOut) throws IOException {
dataOut.writeLong(upFlow);
dataOut.writeLong(downFlow);
dataOut.writeLong(maxFlow);
}
@Override
public int compareTo(FlowBean o) {
return this.maxFlow > o.maxFlow ? -1
: this.maxFlow < o.maxFlow ? 1 : 0;
}
}
驅動類:
package com.itheima.hadoop.drivers;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import com.itheima.hadoop.mapreduce.bean.FlowBean;
import com.itheima.hadoop.mapreduce.mapper.TopKURLMapper;
import com.itheima.hadoop.mapreduce.reducer.TopKURLReducer;
public class TopKURLDriver extends Configured implements Tool{
@Override
public int run(String[] args) throws Exception {
/**
* 1、建立job作業
* 2、設定job送出的Class
* 3、設定MapperClass,設定ReduceClass
* 4、設定Mapper和Reduce各自的OutputKey和OutputValue類型
* 5、設定處理檔案的路徑,輸出結果的路徑
* 6、送出job
*/
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(TopKURLRunner.class);
job.setMapperClass(TopKURLMapper.class);
job.setReducerClass(TopKURLReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FlowBean.class);
job.setOutputKeyClass(FlowBean.class);
job.setOutputValueClass(Text.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job,new Path(args[1]));
//參數true為列印進度
return job.waitForCompletion(true)?0:1;
}
}
package com.itheima.hadoop.runner;
import org.apache.hadoop.util.ToolRunner;
import com.itheima.hadoop.runner.TopKURLRunner;
public class TopKURLRunner {
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new TopKURLRunner(), args);
System.exit(res);
}
}
運作指令:hadoop jar topkurl.jar com.itheima.hadoop.drives.TopKURLDriver /test/inputData /test/outputData
運作結果:
版權聲明:本文為CSDN部落客「weixin_34376986」的原創文章,遵循CC 4.0 BY-SA版權協定,轉載請附上原文出處連結及本聲明。
原文連結:https://blog.csdn.net/weixin_34376986/article/details/92259813