package com.sl.hadoop;
import java.io.IOException;
import java.util.Iterator;
import java.util.NavigableMap;
import java.util.Map.Entry;
import java.util.TreeMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
* @ClassName: TopK
* @Description:原始資料是每天的日期和溫度,用空格分割,求氣溫最高的前K天,結果顯示依然是日期和溫度
* @author: qiLZ
* @date: 2018年9月11日 下午3:20:07
*/
public class TopK {
/**
* topK問題的思路:使用TreeMap集合,利用treeMap本身的排序去選擇topK,将需要排序的值當做key。
* 然後在map端和reduce端分别使用treeMap選出topK。在map端使用treeMap是減少reduce端的運算,
* reduce接收到的是每個map的topK。
*/
public static class topMap extends Mapper<Object, Text, Text, IntWritable> {
TreeMap<Integer, String> top5 = new TreeMap<Integer, String>();
public static final int K = 5;
@Override
protected void map(Object key, Text value, Mapper<Object, Text, Text, IntWritable>.Context context)
throws IOException, InterruptedException {
String line = value.toString();
System.out.println(line);
String[] split = line.split(" ");
// 在這裡是将溫度當做key,日期當做value
top5.put(Integer.valueOf(split[1]), split[0]);
if (top5.size() > K) {
top5.remove(top5.firstKey());
}
}
@Override
protected void cleanup(Mapper<Object, Text, Text, IntWritable>.Context context)
throws IOException, InterruptedException {
Iterator<Entry<Integer, String>> iterator = top5.entrySet().iterator();
while (iterator.hasNext()) {
Entry<Integer, String> next = iterator.next();
Text key = new Text(next.getValue());
IntWritable value = new IntWritable(next.getKey());
context.write(key, value);
}
}
}
public static class topReduce extends Reducer<Text, IntWritable, Text, IntWritable> {
TreeMap<Integer, String> tops = new TreeMap<Integer, String>();
public static final int K = 5;
@Override
protected void reduce(Text arg0, Iterable<IntWritable> arg1,
Reducer<Text, IntWritable, Text, IntWritable>.Context arg2) throws IOException, InterruptedException {
for (IntWritable value : arg1) {
tops.put(value.get(), arg0.toString());
if (tops.size() > K) {
tops.remove(tops.firstKey());
}
}
}
@Override
protected void cleanup(Reducer<Text, IntWritable, Text, IntWritable>.Context context)
throws IOException, InterruptedException {
// 倒序排列
NavigableMap<Integer, String> descendingMap = tops.descendingMap();
Iterator<Entry<Integer, String>> iterator = descendingMap.entrySet().iterator();
while (iterator.hasNext()) {
Entry<Integer, String> next = iterator.next();
Text key = new Text(next.getValue());
IntWritable value = new IntWritable(next.getKey());
context.write(key, value);
}
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Path outPath = new Path("/outTop");
FileSystem fs = FileSystem.get(conf);
if (fs.exists(outPath)) {
fs.delete(outPath, true);
}
Job job = Job.getInstance(conf);
job.setJobName("toptest");
job.setJarByClass(TopK.class);
job.setMapperClass(topMap.class);
job.setReducerClass(topReduce.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputKeyClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path("/testtop.txt"));
FileOutputFormat.setOutputPath(job, outPath);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}