排序
排序是MapReduce的核心技術。
1.準備
示例:按照氣溫字段對天氣資料集排序。由于氣溫字段是有符号的整數,是以不能将該字段視為Text對象并以字典順序排序。反之,用順序檔案存儲資料,其IntWritable鍵代表氣溫(并且正确排序),其Text值就是資料行。
MapReduce作業隻包含map任務,它過濾輸入資料并移除空資料行的記錄。各個map建立并輸出一個塊壓縮的順序檔案。
代碼如下
package com.zhen.mapreduce.sort.preprocessor;
import java.io.IOException;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**
* @author FengZhen
* @date 2018年9月9日
* 過濾掉無用資料并使用順序檔案存儲資料
*/
public class SortDataPreprocessor extends Configured implements Tool{
static class CleanerMapper extends Mapper<LongWritable, Text, IntWritable, Text>{
private RecordParser recordParser = new RecordParser();
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, IntWritable, Text>.Context context)
throws IOException, InterruptedException {
recordParser.parse(value.toString());
if (recordParser.isValidTemperature()) {
context.write(new IntWritable(recordParser.getTemperature()), new Text(recordParser.getCity()));
}
}
}
public int run(String[] args) throws Exception {
Job job = Job.getInstance(getConf());
job.setJobName("SortDataPreprocessor");
job.setJarByClass(SortDataPreprocessor.class);
job.setMapperClass(CleanerMapper.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(Text.class);
job.setNumReduceTasks(0);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
SequenceFileOutputFormat.setOutputPath(job, new Path(args[1]));
//是否被壓縮都會被輸出
SequenceFileOutputFormat.setCompressOutput(job, true);
SequenceFileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
SequenceFileOutputFormat.setOutputCompressionType(job, CompressionType.BLOCK);
return job.waitForCompletion(true) ? 0 : 1;
}
public static void main(String[] args) throws Exception {
String[] params = new String[]{"hdfs://fz/user/hdfs/MapReduce/data/sort/preprocessor/input","hdfs://fz/user/hdfs/MapReduce/data/sort/preprocessor/output"};
int exitCode = ToolRunner.run(new SortDataPreprocessor(), params);
System.exit(exitCode);
}
}
package com.zhen.mapreduce.sort.preprocessor;
import java.io.Serializable;
/**
* @author FengZhen
* @date 2018年9月9日
* 解析MapReduce中map的資料
*/
public class RecordParser implements Serializable{
private static final long serialVersionUID = 1L;
/**
* 城市
*/
private String city;
/**
* 氣溫
*/
private Integer temperature;
/**
* 解析
* @param value
*/
public void parse(String value) {
String[] values = value.split(",");
if (values.length >= 2) {
city = values[0];
temperature = Integer.valueOf(values[1]);
}
}
/**
* 校驗是否合格
* @return
*/
public boolean isValidTemperature() {
return null != temperature;
}
public String getCity() {
return city;
}
public void setCity(String city) {
this.city = city;
}
public int getTemperature() {
return temperature;
}
public void setTemperature(Integer temperature) {
this.temperature = temperature;
}
}
打jar包上傳至伺服器執行
scp /Users/FengZhen/Desktop/Hadoop/file/Sort.jar [email protected]:/usr/local/test/mr
hadoop jar Sort.jar com.zhen.mapreduce.sort.SortDataPreprocessor
2.部分排序
當有多個reduce任務時,産生多個已排序的輸出檔案。但是如何将這些小檔案合并成一個有序的檔案卻并非易事。
3.全排序
如何使用Hadoop産生一個全局排序的檔案?最簡單的方法是使用一個分區(a single partition)。但該方法在處理大型檔案時效率極低,因為一台機器必須處理所有輸出檔案,進而完全喪失了MapReduce所提供的并行架構的優勢。
事實上仍有替代方案:首先,建立一系列排好序的檔案;其次,串聯這些檔案;最後,生成一個全局排序的檔案。主要的思路是使用一個partitioner來描述輸出的全局排序。
示例:以氣溫排序為例
給定一個partitioner,四個分區,第一個分區的溫度範圍在0-10,第二個在11-20,第三個在21-30,第四個在31-40.
這樣可以保證第i個分區的鍵小于第i+1個分區的鍵,保證了完全有序,但是會出現資料分布不均的情況。
獲得氣溫分布資訊意味着可以建立一系列分布非常均勻的分區。但由于該操作需要周遊整個資料集,是以并不實用。通過對鍵空間進行采樣,就可較為均勻地劃分資料集。采樣的核心思想是隻檢視一小部分鍵,獲得鍵的近似分布,并由此建構分區。Hadoop已經内置了若幹采樣器。
InputSampler類實作了Sampler接口,該接口的唯一成員方法(getSampler)有兩個輸入參數(一個InputFormat對象和一個Job對象),傳回一系列樣本鍵。
代碼如下
package com.zhen.mapreduce.sort.totalPartitioner;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.mapreduce.lib.partition.InputSampler;
import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**
* @author FengZhen
* @date 2018年9月9日
* 根據分區全排序
*/
public class SortByTemperatureUsingTotalOrderPartitioner extends Configured implements Tool{
public int run(String[] args) throws Exception {
Job job = Job.getInstance(getConf());
job.setJobName("SortByTemperatureUsingTotalOrderPartitioner");
job.setJarByClass(SortByTemperatureUsingTotalOrderPartitioner.class);
job.setInputFormatClass(SequenceFileInputFormat.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
SequenceFileInputFormat.setInputPaths(job, new Path(args[0]));
SequenceFileOutputFormat.setOutputPath(job, new Path(args[1]));
//是否被壓縮都會被輸出
SequenceFileOutputFormat.setCompressOutput(job, true);
SequenceFileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
SequenceFileOutputFormat.setOutputCompressionType(job, CompressionType.BLOCK);
job.setPartitionerClass(TotalOrderPartitioner.class);
/**
* 采樣率設為 0.1
* 最大樣本數 10000
* 最大分區數 10
* 這也是InputSampler作為應用程式運作時的預設設定
* 隻要任意一個限制條件滿足,即停止采樣。
*/
InputSampler.Sampler<IntWritable, Text> sampler = new InputSampler.RandomSampler(0.1, 10000, 10);
InputSampler.writePartitionFile(job, sampler);
//為了和叢集上運作的其他任務共享分區檔案,InputSampler需要将其所寫的分區檔案加到分布式緩存中。
Configuration conf = job.getConfiguration();
String partitionFile = TotalOrderPartitioner.getPartitionFile(conf);
URI partitionUri = new URI(partitionFile + "#" + TotalOrderPartitioner.DEFAULT_PATH);
job.addCacheFile(partitionUri);
job.createSymlink();
return job.waitForCompletion(true) ? 0 : 1;
}
public static void main(String[] args) throws Exception {
String[] params = new String[]{"hdfs://fz/user/hdfs/MapReduce/data/sort/preprocessor/output","hdfs://fz/user/hdfs/MapReduce/data/sort/SortByTemperatureUsingTotalOrderPartitioner/output"};
int exitCode = ToolRunner.run(new SortByTemperatureUsingTotalOrderPartitioner(), params);
System.exit(exitCode);
}
}
4.輔助排序
MapReduce架構在記錄到達reducer之前按鍵對記錄排序,但鍵所對應的值并沒有被排序。
示例:鍵升序,鍵相同的值升序
代碼如下
package com.zhen.mapreduce.sort.secondarySort;
import java.io.IOException;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**
* @author FengZhen
* @date 2018年9月9日
* 對鍵排序後的值排序
*/
public class MaxTemperatureUsingSecondarySort extends Configured implements Tool{
static class MaxTemperatureMapper extends Mapper<LongWritable, Text, IntPair, NullWritable>{
private RecordParser recordParser = new RecordParser();
@Override
protected void map(LongWritable key, Text value,
Mapper<LongWritable, Text, IntPair, NullWritable>.Context context)
throws IOException, InterruptedException {
recordParser.parse(value.toString());
if (recordParser.isValidTemperature()) {
context.write(new IntPair(recordParser.getYear(), recordParser.getTemperature()), NullWritable.get());
}
}
}
static class MaxTemperatureReducer extends Reducer<IntPair, NullWritable, IntPair, NullWritable>{
@Override
protected void reduce(IntPair key, Iterable<NullWritable> values,
Reducer<IntPair, NullWritable, IntPair, NullWritable>.Context context)
throws IOException, InterruptedException {
context.write(key, NullWritable.get());
}
}
/**
* 建立一個自定義的partitioner以按照組合鍵的守字段(年份)進行分區
* @author FengZhen
*
*/
public static class FirstPartitioner extends Partitioner<IntWritable, IntWritable>{
@Override
public int getPartition(IntWritable key, IntWritable value, int numPartitions) {
return Math.abs(key.get() * 127) % numPartitions;
}
}
/**
* 按照年份(升序)和氣溫(降序)排列鍵
* @author FengZhen
*
*/
public static class KeyComparator extends WritableComparator{
public KeyComparator() {
super(IntPair.class, true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
IntPair ip1 = (IntPair) a;
IntPair ip2 = (IntPair) b;
int cmp = IntPair.compare(ip1.getFirstKey(), ip2.getFirstKey());
if (cmp != 0) {
return cmp;
}
return -IntPair.compare(ip1.getSecondKey(), ip2.getSecondKey());
}
}
/**
* 按年份對鍵進行分組
* @author FengZhen
*
*/
public static class GroupComparator extends WritableComparator {
protected GroupComparator() {
super(IntPair.class, true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
IntPair ip1 = (IntPair) a;
IntPair ip2 = (IntPair) b;
return IntPair.compare(ip1.getFirstKey(), ip2.getFirstKey());
}
}
public int run(String[] args) throws Exception {
Job job = Job.getInstance(getConf());
job.setJobName("MaxTemperatureUsingSecondarySort");
job.setJarByClass(MaxTemperatureUsingSecondarySort.class);
job.setMapperClass(MaxTemperatureMapper.class);
job.setReducerClass(MaxTemperatureReducer.class);
job.setPartitionerClass(FirstPartitioner.class);
job.setSortComparatorClass(KeyComparator.class);
job.setGroupingComparatorClass(GroupComparator.class);
job.setOutputKeyClass(IntPair.class);
job.setOutputValueClass(NullWritable.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
return job.waitForCompletion(true) ? 0 : 1;
}
public static void main(String[] args) throws Exception {
String[] params = new String[] {"hdfs://fz/user/hdfs/MapReduce/data/sort/MaxTemperatureUsingSecondarySort/input", "hdfs://fz/user/hdfs/MapReduce/data/sort/MaxTemperatureUsingSecondarySort/output"};
int exitCode = ToolRunner.run(new MaxTemperatureUsingSecondarySort(), params);
System.exit(exitCode);
}
}
IntPair:自定義組合鍵
package com.zhen.mapreduce.sort.secondarySort;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;
/**
* 自定義組合鍵
* map的鍵排序
* */
public class IntPair implements WritableComparable{
//使用java基本資料類型
private int firstKey;
private int secondKey;
public IntPair() {
}
public IntPair(int firstKey, int secondKey) {
this.firstKey = firstKey;
this.secondKey = secondKey;
}
//必須有預設的構造函數
public int getFirstKey() {
return firstKey;
}
public void setFirstKey(int firstKey) {
this.firstKey = firstKey;
}
public int getSecondKey() {
return secondKey;
}
public void setSecondKey(int secondKey) {
this.secondKey = secondKey;
}
public void readFields(DataInput in) throws IOException {
firstKey = in.readInt();
secondKey = in.readInt();
}
public void write(DataOutput out) throws IOException {
out.writeInt(firstKey);
out.writeInt(secondKey);
}
/**
* map的鍵的比較就是根據這個方法來進行
* */
public int compareTo(Object o) {
IntPair tInt = (IntPair)o;
//利用這個來控制升序或降序
//this在前為升序
//this在後為降序
return this.getFirstKey() >= (tInt.getFirstKey()) ? -1 : 1;
}
/**
* 比較兩個int值大小
* 降序
* @param a
* @param b
* @return
*/
public static int compare(int a, int b) {
return a >= b ? -1 : 1;
}
@Override
public String toString() {
return "IntPair [firstKey=" + firstKey + ", secondKey=" + secondKey + "]";
}
}
RecordParser:解析每條記錄
package com.zhen.mapreduce.sort.secondarySort;
import java.io.Serializable;
/**
* @author FengZhen
* @date 2018年9月9日
* 解析MapReduce中map的資料
*/
public class RecordParser implements Serializable{
private static final long serialVersionUID = 1L;
/**
* 年份
*/
private Integer year;
/**
* 氣溫
*/
private Integer temperature;
/**
* 解析
* @param value
*/
public void parse(String value) {
String[] values = value.split(",");
if (values.length >= 2) {
year = Integer.valueOf(values[0]);
temperature = Integer.valueOf(values[1]);
}
}
/**
* 校驗是否合格
* @return
*/
public boolean isValidTemperature() {
return null != temperature;
}
public Integer getYear() {
return year;
}
public void setYear(Integer year) {
this.year = year;
}
public int getTemperature() {
return temperature;
}
public void setTemperature(Integer temperature) {
this.temperature = temperature;
}
}
原始資料如下
1990,14
1980,12
1990,19
1960,11
1960,18
1980,17
1970,24
1970,23
1940,22
1940,35
1930,44
1920,43
IntPair [firstKey=1990, secondKey=19]
IntPair [firstKey=1990, secondKey=14]
IntPair [firstKey=1980, secondKey=17]
IntPair [firstKey=1980, secondKey=12]
IntPair [firstKey=1970, secondKey=23]
IntPair [firstKey=1970, secondKey=24]
IntPair [firstKey=1960, secondKey=18]
IntPair [firstKey=1960, secondKey=11]
IntPair [firstKey=1940, secondKey=35]
IntPair [firstKey=1940, secondKey=22]
IntPair [firstKey=1930, secondKey=44]
IntPair [firstKey=1920, secondKey=43]