新增的Java MapReduce API
Hadoop的版本0.20.0包含有一個新的 Java MapReduce API,有時也稱為"上下文對象"(context object),旨在使API在今後更容易擴充。新的API 在類型上不相容先前的API,是以,需要重寫以前的應用程式才能使新的API發揮作用。
新增的API 和舊的API 之間,有下面幾個明顯的差別。
新的API 傾向于使用虛類,而不是接口,因為這更容易擴充。例如,可以無需修改類的實作而在虛類中添加一個方法(即用預設的實作)。在新的API 中, mapper 和reducer現在都是虛類。
新的API 放在org.apache.hadoop.mapreduce 包(和子包)中。之前版本的API 依舊放在org.apache.hadoop.mapred中。
新的API充分使用上下文對象,使使用者代碼能與MapReduce系統通信。例如,MapContext 基本具備了JobConf、OutputCollector和Reporter的功能。
新的API 同時支援"推"(push)和"拉"(pull)式的疊代。這兩類API,均可以将鍵/值對記錄推給mapper,但除此之外,新的API 也允許把記錄從map()方法中拉出。對reducer來說是一樣的。"拉"式處理資料的好處是可以實作資料的批量處理,而非逐條記錄地處理。
新增的API實作了配置的統一。舊API 通過一個特殊的JobConf 對象配置作業,該對象是Hadoop配置對象的一個擴充 (用于配置守護程序,詳情請參見第130頁的"API配置"小節)。在新的API 中,我們丢棄這種區分,所有作業的配置均通過Configuration 來完成。
新API中作業控制由Job類實作,而非JobClient類,新API中删除了JobClient類。
輸出檔案的命名方式稍有不同。map的輸出檔案名為part-m-nnnnn,而reduce的輸出為part-r-nnnnn(其中nnnnn表示分塊序号,為整數,且從0開始算)。
例2-6 顯示了使用新API 重寫的MaxTemperature應用。不同之處已加粗顯示。
将舊API寫的Mapper和Reducer類轉換為新API時,記住将map()和reduce()的簽名轉換為新形式。如果隻是将類的繼承修改為對新的Mapper和Reducer類的繼承,編譯的時候也不會報錯或顯示警告資訊,因為新的Mapper和Reducer類同樣也提供了等價的map()和reduce()函數。但是,自己寫的mapper或reducer代碼是不會被調用的,這會導緻難以診斷的錯誤。
例2-6. 用新上下文對象MapReduce API重寫的MaxTemperature應用
public class NewMaxTemperature {
static class NewMaxTemperatureMapper
extends Mapper<LongWritable, Text, Text, IntWritable> {
private static final int MISSING = 9999;
public void map(LongWritable key, Text value,Context context)
throws IOException, InterruptedException {
String line = value.toString();
String year = line.substring(15, 19);
int airTemperature;
if (line.charAt(87) == '+') { // parseInt doesn't like leading plus signs
airTemperature = Integer.parseInt(line.substring(88, 92));
} else {
airTemperature = Integer.parseInt(line.substring(87, 92));
}
String quality = line.substring(92, 93);
if (airTemperature != MISSING && quality.matches("[01459]")) {
context.write(new Text(year), new IntWritable(airTemperature));
}
}
}
static class NewMaxTemperatureReducer
extends Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int maxValue = Integer.MIN_VALUE;
for (IntWritable value : values) {
maxValue = Math.max(maxValue, value.get());
}
context.write(key, new IntWritable(maxValue));
}
}
public static void main(String[] args) throws Exception {
if (args.length != 2) {
System.err.println("Usage: NewMaxTemperature
<input path> <output path>");
System.exit(-1);
}
Job job = new Job();
job.setJarByClass(NewMaxTemperature.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setMapperClass(NewMaxTemperatureMapper.class);
job.setReducerClass(NewMaxTemperatureReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
原來MapReduce代碼可在《Hadoop權威指南》内找到,大家可進行對比。
又一例子:Hadoop in Action中第四章:
package com;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class tt extends Configured implements Tool {
public static class MapClass extends Mapper<LongWritable, Text, Text, Text> {
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
//split的作用是将該字元串裡面的變量指派給citation這個字元串數組當中。
String[] citation = value.toString().split(",");
//使用新的API取代了collect相關的API,将map中的key和value進行了互換。
context.write(new Text(citation[1]), new Text(citation[0]));
}
}
public static class Reduce extends Reducer<Text, Text, Text, Text> { //前兩個參數設定是輸入參數,後兩個參數是輸出參數。
public void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
String csv ="";
//Text類型是類似于String類型的文本格式,但是在處理編碼上還是和String有差别,與記憶體序列化有關,是hadoop經過封裝之後的新類。
for (Text val:values) {
if (csv.length() > 0) csv += ",";
csv += val.toString();
}
context.write(key, new Text(csv));
}
}
public int run(String[] args) throws Exception { //由hadoop本身調用該程式
Configuration conf = getConf();
Job job = new Job(conf, "tt"); //利用job取代了jobclient
job.setJarByClass(tt.class);
Path in = new Path(args[0]);
Path out = new Path(args[1]);
FileInputFormat.setInputPaths(job, in);
FileOutputFormat.setOutputPath(job, out);
job.setMapperClass(MapClass.class);
job.setReducerClass(Reduce.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class); //此處如果不進行設定,系統會抛出異常,還要記住新舊API不能混用
System.exit(job.waitForCompletion(true)?0:1);
return 0;
}
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new tt(), args); //調用新的類的方法免除配置的相關瑣碎的細節
System.exit(res);
}
}