天天看點

MapReduce-Counter使用-快速實作大檔案行數的統計

在普通的java程式中我們可以定義一個全局的靜态變量,然後我們可以在各個類中去使用,實作累加器的功能,然而在mapruduce中怎麼實作這一功能呢,各個map可能運作在不同的JVM中(這裡不考慮JVM重用的情況),然而我們可以借助MapReduce提供的Counter功能來實作這一功能,下面我們通過一個執行個體來說明這一個用法。

實驗要求:快速實作檔案行數,以及其中錯誤記錄的統計

實驗資料:

1

2

error

3

4

5

error

6

7

8

9

10

error

11

12

13

14

error

15

16

17

18

19

解決思路:

定義一個枚舉類型,每次調用map函數時,對值進行判斷,把判斷的結果分别寫入不同的Counter,最後輸出Counter的值

根據以上步驟下面是實作代碼:

map階段:

import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class MyMapper extends Mapper<LongWritable, Text, LongWritable, IntWritable> {
	/**
	 * 定義一個枚舉類型
	 * @date 2016年3月25日 下午3:29:44 
	 * @{tags}
	 */
	public static enum FileRecorder{
		ErrorRecorder,
		TotalRecorder
	}
	@Override
	protected void map(LongWritable key, Text value, Context context)
			throws IOException, InterruptedException {
		if("error".equals(value.toString())){
			/**
			 * 把counter實作累加
			 */
			context.getCounter(FileRecorder.ErrorRecorder).increment(1);
		}
		/**
		 * 把counter實作累加
		 */
		context.getCounter(FileRecorder.TotalRecorder).increment(1);
	}
}
           

啟動函數:

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.NLineInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import com.seven.mapreduce.counter.MyMapper.FileRecorder;
public class JobMain {
	public static void main(String[] args) throws Exception {
		Configuration configuration = new Configuration();
		/**
		 * 使NLineInputFormat來分割一個小檔案,近而模拟分布式大檔案的處理
		 */
		configuration.setInt("mapreduce.input.lineinputformat.linespermap", 5); 
		Job job = new Job(configuration, "counter-job");
		job.setInputFormatClass(NLineInputFormat.class);  
		job.setJarByClass(JobMain.class);
		job.setMapperClass(MyMapper.class);
		job.setMapOutputKeyClass(LongWritable.class);
		job.setMapOutputValueClass(IntWritable.class);
		FileInputFormat.addInputPath(job, new Path(args[0]));
		Path outputDir = new Path(args[1]);
		FileSystem fs = FileSystem.get(configuration);
		if( fs.exists(outputDir)) {
			fs.delete(outputDir ,true);
		}
		FileOutputFormat.setOutputPath(job, outputDir);
		if(job.waitForCompletion(true) ? true: false) {
			System.out.println("Error num:" + job.getCounters().findCounter(FileRecorder.ErrorRecorder).getValue());
			System.out.println("Total num:" + job.getCounters().findCounter(FileRecorder.TotalRecorder).getValue());
		}
	}
}
           

運作結果:

MapReduce-Counter使用-快速實作大檔案行數的統計
MapReduce-Counter使用-快速實作大檔案行數的統計

總結:

由上可以看出總共跑了5個map任務,而且通過Counter實作了不同JVM中的全局累加器的功能。關于除自定義Counter以外的其它Counter的含義可以參考《MapReduce-Counters含義》

繼續閱讀