天天看點

mapreduce topK實作

package com.sl.hadoop;

import java.io.IOException;
import java.util.Iterator;
import java.util.NavigableMap;
import java.util.Map.Entry;
import java.util.TreeMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
 * @ClassName: TopK
 * @Description:原始資料是每天的日期和溫度,用空格分割,求氣溫最高的前K天,結果顯示依然是日期和溫度
 * @author: qiLZ
 * @date: 2018年9月11日 下午3:20:07
 */
public class TopK {
	/**
	 * topK問題的思路:使用TreeMap集合,利用treeMap本身的排序去選擇topK,将需要排序的值當做key。
	 * 然後在map端和reduce端分别使用treeMap選出topK。在map端使用treeMap是減少reduce端的運算,
	 * reduce接收到的是每個map的topK。
	 */
	public static class topMap extends Mapper<Object, Text, Text, IntWritable> {

		TreeMap<Integer, String> top5 = new TreeMap<Integer, String>();

		public static final int K = 5;

		@Override
		protected void map(Object key, Text value, Mapper<Object, Text, Text, IntWritable>.Context context)
				throws IOException, InterruptedException {
			String line = value.toString();
			System.out.println(line);
			String[] split = line.split(" ");
			// 在這裡是将溫度當做key,日期當做value
			top5.put(Integer.valueOf(split[1]), split[0]);
			if (top5.size() > K) {
				top5.remove(top5.firstKey());
			}
		}

		@Override
		protected void cleanup(Mapper<Object, Text, Text, IntWritable>.Context context)
				throws IOException, InterruptedException {
			Iterator<Entry<Integer, String>> iterator = top5.entrySet().iterator();
			while (iterator.hasNext()) {
				Entry<Integer, String> next = iterator.next();
				Text key = new Text(next.getValue());
				IntWritable value = new IntWritable(next.getKey());
				context.write(key, value);
			}
		}

	}

	public static class topReduce extends Reducer<Text, IntWritable, Text, IntWritable> {

		TreeMap<Integer, String> tops = new TreeMap<Integer, String>();

		public static final int K = 5;

		@Override
		protected void reduce(Text arg0, Iterable<IntWritable> arg1,
				Reducer<Text, IntWritable, Text, IntWritable>.Context arg2) throws IOException, InterruptedException {
			for (IntWritable value : arg1) {
				tops.put(value.get(), arg0.toString());
				if (tops.size() > K) {
					tops.remove(tops.firstKey());
				}
			}
		}

		@Override
		protected void cleanup(Reducer<Text, IntWritable, Text, IntWritable>.Context context)
				throws IOException, InterruptedException {
			// 倒序排列
			NavigableMap<Integer, String> descendingMap = tops.descendingMap();
			Iterator<Entry<Integer, String>> iterator = descendingMap.entrySet().iterator();
			while (iterator.hasNext()) {
				Entry<Integer, String> next = iterator.next();
				Text key = new Text(next.getValue());
				IntWritable value = new IntWritable(next.getKey());
				context.write(key, value);
			}
		}
	}

	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		Path outPath = new Path("/outTop");
		FileSystem fs = FileSystem.get(conf);
		if (fs.exists(outPath)) {
			fs.delete(outPath, true);
		}
		Job job = Job.getInstance(conf);
		job.setJobName("toptest");
		job.setJarByClass(TopK.class);
		job.setMapperClass(topMap.class);
		job.setReducerClass(topReduce.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(IntWritable.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputKeyClass(IntWritable.class);
		FileInputFormat.addInputPath(job, new Path("/testtop.txt"));
		FileOutputFormat.setOutputPath(job, outPath);
		System.exit(job.waitForCompletion(true) ? 0 : 1);
	}

}
           

繼續閱讀