天天看點

MapReduce-排序(全部排序、輔助排序)

排序

排序是MapReduce的核心技術。

1.準備

示例:按照氣溫字段對天氣資料集排序。由于氣溫字段是有符号的整數,是以不能将該字段視為Text對象并以字典順序排序。反之,用順序檔案存儲資料,其IntWritable鍵代表氣溫(并且正确排序),其Text值就是資料行。

MapReduce作業隻包含map任務,它過濾輸入資料并移除空資料行的記錄。各個map建立并輸出一個塊壓縮的順序檔案。

代碼如下

package com.zhen.mapreduce.sort.preprocessor;

import java.io.IOException;

import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/**
 * @author FengZhen
 * @date 2018年9月9日
 * 過濾掉無用資料并使用順序檔案存儲資料
 */
public class SortDataPreprocessor extends Configured implements Tool{

  static class CleanerMapper extends Mapper<LongWritable, Text, IntWritable, Text>{
    private RecordParser recordParser = new RecordParser();
    @Override
    protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, IntWritable, Text>.Context context)
        throws IOException, InterruptedException {
      recordParser.parse(value.toString());
      if (recordParser.isValidTemperature()) {
        context.write(new IntWritable(recordParser.getTemperature()), new Text(recordParser.getCity()));
      }
    }
  }
  
  public int run(String[] args) throws Exception {
    
    Job job = Job.getInstance(getConf());
    job.setJobName("SortDataPreprocessor");
    job.setJarByClass(SortDataPreprocessor.class);
    
    job.setMapperClass(CleanerMapper.class);
    job.setOutputKeyClass(IntWritable.class);
    job.setOutputValueClass(Text.class);
    job.setNumReduceTasks(0);
    job.setOutputFormatClass(SequenceFileOutputFormat.class);
    
    FileInputFormat.setInputPaths(job, new Path(args[0]));
    
    SequenceFileOutputFormat.setOutputPath(job, new Path(args[1]));
    //是否被壓縮都會被輸出
    SequenceFileOutputFormat.setCompressOutput(job, true);
    SequenceFileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
    SequenceFileOutputFormat.setOutputCompressionType(job, CompressionType.BLOCK);
  
    return job.waitForCompletion(true) ? 0 : 1;
  }

  public static void main(String[] args) throws Exception {
    String[] params = new String[]{"hdfs://fz/user/hdfs/MapReduce/data/sort/preprocessor/input","hdfs://fz/user/hdfs/MapReduce/data/sort/preprocessor/output"};
    int exitCode = ToolRunner.run(new SortDataPreprocessor(), params);
    System.exit(exitCode);
  }
  
}      

  

package com.zhen.mapreduce.sort.preprocessor;

import java.io.Serializable;

/**
 * @author FengZhen
 * @date 2018年9月9日
 * 解析MapReduce中map的資料
 */
public class RecordParser implements Serializable{

  private static final long serialVersionUID = 1L;

  /**
   * 城市
   */
  private String city;
  /**
   * 氣溫
   */
  private Integer temperature;
  
  /**
   * 解析
   * @param value
   */
  public void parse(String value) {
    String[] values = value.split(",");
    if (values.length >= 2) {
      city = values[0];
      temperature = Integer.valueOf(values[1]);
    }
  }
  
  /**
   * 校驗是否合格
   * @return
   */
  public boolean isValidTemperature() {
    return null != temperature;
  }
  
  
  public String getCity() {
    return city;
  }
  public void setCity(String city) {
    this.city = city;
  }
  public int getTemperature() {
    return temperature;
  }
  public void setTemperature(Integer temperature) {
    this.temperature = temperature;
  }
  
}      

  

打jar包上傳至伺服器執行

scp /Users/FengZhen/Desktop/Hadoop/file/Sort.jar [email protected]:/usr/local/test/mr
hadoop jar Sort.jar com.zhen.mapreduce.sort.SortDataPreprocessor      

2.部分排序

當有多個reduce任務時,産生多個已排序的輸出檔案。但是如何将這些小檔案合并成一個有序的檔案卻并非易事。

3.全排序

如何使用Hadoop産生一個全局排序的檔案?最簡單的方法是使用一個分區(a single partition)。但該方法在處理大型檔案時效率極低,因為一台機器必須處理所有輸出檔案,進而完全喪失了MapReduce所提供的并行架構的優勢。

事實上仍有替代方案:首先,建立一系列排好序的檔案;其次,串聯這些檔案;最後,生成一個全局排序的檔案。主要的思路是使用一個partitioner來描述輸出的全局排序。

示例:以氣溫排序為例

給定一個partitioner,四個分區,第一個分區的溫度範圍在0-10,第二個在11-20,第三個在21-30,第四個在31-40.

這樣可以保證第i個分區的鍵小于第i+1個分區的鍵,保證了完全有序,但是會出現資料分布不均的情況。

獲得氣溫分布資訊意味着可以建立一系列分布非常均勻的分區。但由于該操作需要周遊整個資料集,是以并不實用。通過對鍵空間進行采樣,就可較為均勻地劃分資料集。采樣的核心思想是隻檢視一小部分鍵,獲得鍵的近似分布,并由此建構分區。Hadoop已經内置了若幹采樣器。

InputSampler類實作了Sampler接口,該接口的唯一成員方法(getSampler)有兩個輸入參數(一個InputFormat對象和一個Job對象),傳回一系列樣本鍵。

代碼如下

package com.zhen.mapreduce.sort.totalPartitioner;

import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.mapreduce.lib.partition.InputSampler;
import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/**
 * @author FengZhen
 * @date 2018年9月9日
 * 根據分區全排序
 */
public class SortByTemperatureUsingTotalOrderPartitioner extends Configured implements Tool{

  public int run(String[] args) throws Exception {
    Job job = Job.getInstance(getConf());
    job.setJobName("SortByTemperatureUsingTotalOrderPartitioner");
    job.setJarByClass(SortByTemperatureUsingTotalOrderPartitioner.class);
    
    job.setInputFormatClass(SequenceFileInputFormat.class);
    job.setOutputFormatClass(SequenceFileOutputFormat.class);
    
    job.setOutputKeyClass(IntWritable.class);
    job.setOutputFormatClass(SequenceFileOutputFormat.class);
    
    SequenceFileInputFormat.setInputPaths(job, new Path(args[0]));
    SequenceFileOutputFormat.setOutputPath(job, new Path(args[1]));
    //是否被壓縮都會被輸出
    SequenceFileOutputFormat.setCompressOutput(job, true);
    SequenceFileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
    SequenceFileOutputFormat.setOutputCompressionType(job, CompressionType.BLOCK);
    
    job.setPartitionerClass(TotalOrderPartitioner.class);
    /**
     * 采樣率設為 0.1
     * 最大樣本數 10000
     * 最大分區數 10
     * 這也是InputSampler作為應用程式運作時的預設設定
     * 隻要任意一個限制條件滿足,即停止采樣。
     */
    InputSampler.Sampler<IntWritable, Text> sampler = new InputSampler.RandomSampler(0.1, 10000, 10);
    InputSampler.writePartitionFile(job, sampler);
    
    //為了和叢集上運作的其他任務共享分區檔案,InputSampler需要将其所寫的分區檔案加到分布式緩存中。
    Configuration conf = job.getConfiguration();
    String partitionFile = TotalOrderPartitioner.getPartitionFile(conf);
    URI partitionUri = new URI(partitionFile + "#" + TotalOrderPartitioner.DEFAULT_PATH);
    job.addCacheFile(partitionUri);
    job.createSymlink();
    
    return job.waitForCompletion(true) ? 0 : 1;
  }

  public static void main(String[] args) throws Exception {
    String[] params = new String[]{"hdfs://fz/user/hdfs/MapReduce/data/sort/preprocessor/output","hdfs://fz/user/hdfs/MapReduce/data/sort/SortByTemperatureUsingTotalOrderPartitioner/output"};
    int exitCode = ToolRunner.run(new SortByTemperatureUsingTotalOrderPartitioner(), params);
    System.exit(exitCode);
  }
  
}      

  

4.輔助排序

MapReduce架構在記錄到達reducer之前按鍵對記錄排序,但鍵所對應的值并沒有被排序。

示例:鍵升序,鍵相同的值升序

代碼如下

package com.zhen.mapreduce.sort.secondarySort;

import java.io.IOException;

import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;


/**
 * @author FengZhen
 * @date 2018年9月9日
 * 對鍵排序後的值排序
 */
public class MaxTemperatureUsingSecondarySort extends Configured implements Tool{

  static class MaxTemperatureMapper extends Mapper<LongWritable, Text, IntPair, NullWritable>{
    
    private RecordParser recordParser = new RecordParser();
    
    @Override
    protected void map(LongWritable key, Text value,
        Mapper<LongWritable, Text, IntPair, NullWritable>.Context context)
        throws IOException, InterruptedException {
      recordParser.parse(value.toString());
      if (recordParser.isValidTemperature()) {
        context.write(new IntPair(recordParser.getYear(), recordParser.getTemperature()), NullWritable.get());
      }
    }
  }
  
  static class MaxTemperatureReducer extends Reducer<IntPair, NullWritable, IntPair, NullWritable>{
    @Override
    protected void reduce(IntPair key, Iterable<NullWritable> values,
        Reducer<IntPair, NullWritable, IntPair, NullWritable>.Context context)
        throws IOException, InterruptedException {
        context.write(key, NullWritable.get());
    }
  }
  
  /**
   * 建立一個自定義的partitioner以按照組合鍵的守字段(年份)進行分區
   * @author FengZhen
   *
   */
  public static class FirstPartitioner extends Partitioner<IntWritable, IntWritable>{
    @Override
    public int getPartition(IntWritable key, IntWritable value, int numPartitions) {
      return Math.abs(key.get() * 127) % numPartitions;
    }
  }
  
  /**
   * 按照年份(升序)和氣溫(降序)排列鍵
   * @author FengZhen
   *
   */
  public static class KeyComparator extends WritableComparator{
    public KeyComparator() {
      super(IntPair.class, true);
    }
    @Override
    public int compare(WritableComparable a, WritableComparable b) {
      IntPair ip1 = (IntPair) a;
      IntPair ip2 = (IntPair) b;
      int cmp = IntPair.compare(ip1.getFirstKey(), ip2.getFirstKey());
      if (cmp != 0) {
        return cmp;
      }
      return -IntPair.compare(ip1.getSecondKey(), ip2.getSecondKey());
    }
  }
  
  /**
   * 按年份對鍵進行分組
   * @author FengZhen
   *
   */
  public static class GroupComparator extends WritableComparator {
    protected GroupComparator() {
      super(IntPair.class, true);
    }
    @Override
    public int compare(WritableComparable a, WritableComparable b) {
      IntPair ip1 = (IntPair) a;
      IntPair ip2 = (IntPair) b;
      return IntPair.compare(ip1.getFirstKey(), ip2.getFirstKey());
    }
  }
  
  public int run(String[] args) throws Exception {
    Job job = Job.getInstance(getConf());
    job.setJobName("MaxTemperatureUsingSecondarySort");
    job.setJarByClass(MaxTemperatureUsingSecondarySort.class);
    
    job.setMapperClass(MaxTemperatureMapper.class);
    job.setReducerClass(MaxTemperatureReducer.class);
    
    job.setPartitionerClass(FirstPartitioner.class);
    job.setSortComparatorClass(KeyComparator.class);
    job.setGroupingComparatorClass(GroupComparator.class);
    
    job.setOutputKeyClass(IntPair.class);
    job.setOutputValueClass(NullWritable.class);
    
    FileInputFormat.setInputPaths(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    
    return job.waitForCompletion(true) ? 0 : 1;
  }
  
  public static void main(String[] args) throws Exception {
    String[] params = new String[] {"hdfs://fz/user/hdfs/MapReduce/data/sort/MaxTemperatureUsingSecondarySort/input", "hdfs://fz/user/hdfs/MapReduce/data/sort/MaxTemperatureUsingSecondarySort/output"};
    int exitCode = ToolRunner.run(new MaxTemperatureUsingSecondarySort(), params);
    System.exit(exitCode);
  }
}      

  

IntPair:自定義組合鍵

package com.zhen.mapreduce.sort.secondarySort;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;

/**
 * 自定義組合鍵
 * map的鍵排序
 * */
public class IntPair implements WritableComparable{

  //使用java基本資料類型
  private int firstKey;
  private int secondKey;
  public IntPair() {
  }
  public IntPair(int firstKey, int secondKey) {
    this.firstKey = firstKey;
    this.secondKey = secondKey;
  }
  //必須有預設的構造函數
  public int getFirstKey() {
    return firstKey;
  }
  public void setFirstKey(int firstKey) {
    this.firstKey = firstKey;
  }
  public int getSecondKey() {
    return secondKey;
  }
  public void setSecondKey(int secondKey) {
    this.secondKey = secondKey;
  }

  public void readFields(DataInput in) throws IOException {
    firstKey = in.readInt();
    secondKey = in.readInt();
  }

  public void write(DataOutput out) throws IOException {
    out.writeInt(firstKey);
    out.writeInt(secondKey);
  }

  /**
   * map的鍵的比較就是根據這個方法來進行
   * */
  public int compareTo(Object o) {
    IntPair tInt = (IntPair)o;
    //利用這個來控制升序或降序
    //this在前為升序
    //this在後為降序
    return this.getFirstKey() >= (tInt.getFirstKey()) ? -1 : 1;
  }
  
  /**
   * 比較兩個int值大小
   * 降序
   * @param a
   * @param b
   * @return
   */
  public static int compare(int a, int b) {
    return a >= b ? -1 : 1;
  }
  @Override
  public String toString() {
    return "IntPair [firstKey=" + firstKey + ", secondKey=" + secondKey + "]";
  }
  
}      

  

RecordParser:解析每條記錄

package com.zhen.mapreduce.sort.secondarySort;

import java.io.Serializable;

/**
 * @author FengZhen
 * @date 2018年9月9日
 * 解析MapReduce中map的資料
 */
public class RecordParser implements Serializable{

  private static final long serialVersionUID = 1L;

  /**
   * 年份
   */
  private Integer year;
  /**
   * 氣溫
   */
  private Integer temperature;
  
  /**
   * 解析
   * @param value
   */
  public void parse(String value) {
    String[] values = value.split(",");
    if (values.length >= 2) {
      year = Integer.valueOf(values[0]);
      temperature = Integer.valueOf(values[1]);
    }
  }
  
  /**
   * 校驗是否合格
   * @return
   */
  public boolean isValidTemperature() {
    return null != temperature;
  }
  
  public Integer getYear() {
    return year;
  }

  public void setYear(Integer year) {
    this.year = year;
  }

  public int getTemperature() {
    return temperature;
  }
  public void setTemperature(Integer temperature) {
    this.temperature = temperature;
  }
}      

  

原始資料如下

1990,14
1980,12
1990,19
1960,11
1960,18
1980,17
1970,24
1970,23
1940,22
1940,35
1930,44
1920,43      
IntPair [firstKey=1990, secondKey=19]
IntPair [firstKey=1990, secondKey=14]
IntPair [firstKey=1980, secondKey=17]
IntPair [firstKey=1980, secondKey=12]
IntPair [firstKey=1970, secondKey=23]
IntPair [firstKey=1970, secondKey=24]
IntPair [firstKey=1960, secondKey=18]
IntPair [firstKey=1960, secondKey=11]
IntPair [firstKey=1940, secondKey=35]
IntPair [firstKey=1940, secondKey=22]
IntPair [firstKey=1930, secondKey=44]
IntPair [firstKey=1920, secondKey=43]