天天看點

MapReduce解決在海量資料中求Top K

利用MapReduce求海量資料中最大的K個數   [java]  view plain copy

  1. package jtlyuan.csdn;  
  2. import java.io.IOException;  
  3. import org.apache.hadoop.conf.Configuration;  
  4. import org.apache.hadoop.conf.Configured;  
  5. import org.apache.hadoop.fs.Path;  
  6. import org.apache.hadoop.io.IntWritable;  
  7. import org.apache.hadoop.io.LongWritable;  
  8. import org.apache.hadoop.io.Text;  
  9. import org.apache.hadoop.mapreduce.Job;  
  10. import org.apache.hadoop.mapreduce.Mapper;  
  11. import org.apache.hadoop.mapreduce.Reducer;  
  12. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
  13. import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;  
  14. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
  15. import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;  
  16. import org.apache.hadoop.util.Tool;  
  17. import org.apache.hadoop.util.ToolRunner;  
  18. //利用MapReduce求最大值海量資料中的K個數  
  19. public class TopKNum extends Configured implements Tool {  
  20. public static class MapClass extends Mapper<LongWritable, Text, IntWritable, IntWritable> {  
  21. public static final int K = 100;  
  22. private int[] top = new int[K];  
  23. public void map(LongWritable key, Text value, Context context)  
  24. throws IOException, InterruptedException {  
  25. String[] str = value.toString().split(",", -2);  
  26. try {// 對于非數字字元我們忽略掉  
  27. int temp = Integer.parseInt(str[8]);  
  28. add(temp);  
  29. } catch (NumberFormatException e) {  
  30. }  
  31. }  
  32. private void add(int temp) {//實作插入  
  33. if(temp>top[0]){  
  34. top[0]=temp;  
  35. int i=0;  
  36. for(;i<99&&temp>top[i+1];i++){  
  37. top[i]=top[i+1];  
  38. }  
  39. top[i]=temp;  
  40. }  
  41. }  
  42. @Override  
  43. protected void cleanup(Context context) throws IOException,  
  44. InterruptedException {  
  45. for(int i=0;i<100;i++){  
  46. context.write(new IntWritable(top[i]), new IntWritable(top[i]));  
  47. }  
  48. }  
  49. }  
  50. public static class Reduce extends Reducer<IntWritable, IntWritable, IntWritable, IntWritable> {  
  51. public static final int K = 100;  
  52. private int[] top = new int[K];  
  53. public void reduce(IntWritable key, Iterable<IntWritable> values, Context context)  
  54. throws IOException, InterruptedException {  
  55. for (IntWritable val : values) {  
  56. add(val.get());  
  57. }  
  58. }  
  59. private void add(int temp) {//實作插入if(temp>top[0]){  
  60. top[0]=temp;  
  61. int i=0;  
  62. for(;i<99&&temp>top[i+1];i++){  
  63. top[i]=top[i+1];  
  64. }  
  65. top[i]=temp;  
  66. }  
  67. }  
  68. @Override  
  69. protected void cleanup(Context context) throws IOException,  
  70. InterruptedException {  
  71. for(int i=0;i<100;i++){  
  72. context.write(new IntWritable(top[i]), new IntWritable(top[i]));  
  73. }  
  74. }  
  75. }  
  76. public int run(String[] args) throws Exception {  
  77. Configuration conf = getConf();  
  78. Job job = new Job(conf, "TopKNum");  
  79. job.setJarByClass(TopKNum.class);  
  80. FileInputFormat.setInputPaths(job, new Path(args[0]));  
  81. FileOutputFormat.setOutputPath(job, new Path(args[1]));  
  82. job.setMapperClass(MapClass.class);  
  83. job.setCombinerClass(Reduce.class);  
  84. job.setReducerClass(Reduce.class);  
  85. job.setInputFormatClass(TextInputFormat.class);  
  86. job.setOutputFormatClass(TextOutputFormat.class);  
  87. job.setOutputKeyClass(IntWritable.class);  
  88. job.setOutputValueClass(IntWritable.class);  
  89. System.exit(job.waitForCompletion(true) ? 0 : 1);  
  90. return 0;  
  91. }  
  92. public static void main(String[] args) throws Exception {  
  93. int res = ToolRunner.run(new Configuration(), new TopKNum(), args);  
  94. System.exit(res);  
  95. }  
  96. }