天天看點

Hadoop MapReduce新舊API差別

新增的Java MapReduce API

Hadoop的版本0.20.0包含有一個新的 Java MapReduce API,有時也稱為"上下文對象"(context object),旨在使API在今後更容易擴充。新的API 在類型上不相容先前的API,是以,需要重寫以前的應用程式才能使新的API發揮作用。

新增的API 和舊的API 之間,有下面幾個明顯的差別。

新的API 傾向于使用虛類,而不是接口,因為這更容易擴充。例如,可以無需修改類的實作而在虛類中添加一個方法(即用預設的實作)。在新的API 中, mapper 和reducer現在都是虛類。

新的API 放在org.apache.hadoop.mapreduce 包(和子包)中。之前版本的API 依舊放在org.apache.hadoop.mapred中。

新的API充分使用上下文對象,使使用者代碼能與MapReduce系統通信。例如,MapContext 基本具備了JobConf、OutputCollector和Reporter的功能。

新的API 同時支援"推"(push)和"拉"(pull)式的疊代。這兩類API,均可以将鍵/值對記錄推給mapper,但除此之外,新的API 也允許把記錄從map()方法中拉出。對reducer來說是一樣的。"拉"式處理資料的好處是可以實作資料的批量處理,而非逐條記錄地處理。

新增的API實作了配置的統一。舊API 通過一個特殊的JobConf 對象配置作業,該對象是Hadoop配置對象的一個擴充 (用于配置守護程序,詳情請參見第130頁的"API配置"小節)。在新的API 中,我們丢棄這種區分,所有作業的配置均通過Configuration 來完成。

新API中作業控制由Job類實作,而非JobClient類,新API中删除了JobClient類。

輸出檔案的命名方式稍有不同。map的輸出檔案名為part-m-nnnnn,而reduce的輸出為part-r-nnnnn(其中nnnnn表示分塊序号,為整數,且從0開始算)。

例2-6 顯示了使用新API 重寫的MaxTemperature應用。不同之處已加粗顯示。

将舊API寫的Mapper和Reducer類轉換為新API時,記住将map()和reduce()的簽名轉換為新形式。如果隻是将類的繼承修改為對新的Mapper和Reducer類的繼承,編譯的時候也不會報錯或顯示警告資訊,因為新的Mapper和Reducer類同樣也提供了等價的map()和reduce()函數。但是,自己寫的mapper或reducer代碼是不會被調用的,這會導緻難以診斷的錯誤。

例2-6. 用新上下文對象MapReduce API重寫的MaxTemperature應用

public class NewMaxTemperature {  
 
	static class NewMaxTemperatureMapper  
		extends Mapper<LongWritable, Text, Text, IntWritable> {  
 
	    	private static final int MISSING = 9999;  
	    	public void map(LongWritable key, Text value,Context context)  
			throws IOException, InterruptedException {               
	 
	    		String line = value.toString();         
	    		String year = line.substring(15, 19);         
	    		int airTemperature;         
	    		
			if (line.charAt(87) == '+') { // parseInt doesn't like leading plus signs          
	      			airTemperature = Integer.parseInt(line.substring(88, 92));         
	    		} else {           
	      			airTemperature = Integer.parseInt(line.substring(87, 92));         
		   	}         
		    	
			String quality = line.substring(92, 93);  
		    	if (airTemperature != MISSING && quality.matches("[01459]")) {  
		      		context.write(new Text(year), new IntWritable(airTemperature));         
		    	}       
	  	}     
	}     
   
  	static class NewMaxTemperatureReducer  
    		extends Reducer<Text, IntWritable, Text, IntWritable> {         
    	
		public void reduce(Text key, Iterable<IntWritable> values, Context context)           
        		throws IOException, InterruptedException {               
 
		      	int maxValue = Integer.MIN_VALUE;         
		      	for (IntWritable value : values) {           
				maxValue = Math.max(maxValue, value.get());         
		      	}         
		      	context.write(key, new IntWritable(maxValue));       
    		}     
  	}  
 
	public static void main(String[] args) throws Exception {       
		if (args.length != 2) {         
			System.err.println("Usage: NewMaxTemperature
			<input path> <output path>");     
			System.exit(-1);  
		}           

		Job job = new Job();       
		job.setJarByClass(NewMaxTemperature.class);  

		FileInputFormat.addInputPath(job, new Path(args[0]));  
		FileOutputFormat.setOutputPath(job, new Path(args[1]));           

		job.setMapperClass(NewMaxTemperatureMapper.class);       
		job.setReducerClass(NewMaxTemperatureReducer.class);  
		job.setOutputKeyClass(Text.class);  
		job.setOutputValueClass(IntWritable.class);           

		System.exit(job.waitForCompletion(true) ? 0 : 1);     
	}  
    }  
           

   原來MapReduce代碼可在《Hadoop權威指南》内找到,大家可進行對比。

  又一例子:Hadoop in Action中第四章:

package com;

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;                 
import org.apache.hadoop.util.ToolRunner;


public class tt extends Configured implements Tool {
	
	public static class MapClass extends Mapper<LongWritable, Text, Text, Text> {
		public void map(LongWritable key, Text value, Context context)
			throws IOException, InterruptedException {
			//split的作用是将該字元串裡面的變量指派給citation這個字元串數組當中。
			String[] citation = value.toString().split(",");
			//使用新的API取代了collect相關的API,将map中的key和value進行了互換。
			context.write(new Text(citation[1]), new Text(citation[0]));  
		}
	}
	
	public static class Reduce extends Reducer<Text, Text, Text, Text> {  //前兩個參數設定是輸入參數,後兩個參數是輸出參數。
		
		public void reduce(Text key, Iterable<Text> values, Context context)
			throws IOException, InterruptedException {
			String csv ="";
			
			//Text類型是類似于String類型的文本格式,但是在處理編碼上還是和String有差别,與記憶體序列化有關,是hadoop經過封裝之後的新類。
			for (Text val:values) {
				if (csv.length() > 0) csv += ",";
				csv += val.toString();
			}
		
			context.write(key, new Text(csv));
		}
	}
	
	public int run(String[] args) throws Exception {  //由hadoop本身調用該程式
		Configuration conf = getConf();
		Job job = new Job(conf, "tt"); //利用job取代了jobclient
		job.setJarByClass(tt.class);
		
		Path in = new Path(args[0]);
		Path out = new Path(args[1]);
		FileInputFormat.setInputPaths(job, in);
		FileOutputFormat.setOutputPath(job, out);
		
		job.setMapperClass(MapClass.class);
		job.setReducerClass(Reduce.class);
		job.setInputFormatClass(TextInputFormat.class);
		job.setOutputFormatClass(TextOutputFormat.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);  //此處如果不進行設定,系統會抛出異常,還要記住新舊API不能混用
		
		System.exit(job.waitForCompletion(true)?0:1);
		return 0;
	}
	
	public static void main(String[] args) throws Exception {
		int res = ToolRunner.run(new Configuration(), new tt(), args);    //調用新的類的方法免除配置的相關瑣碎的細節
		System.exit(res);
	}
}
           

繼續閱讀