在普通的java程式中我們可以定義一個全局的靜态變量,然後我們可以在各個類中去使用,實作累加器的功能,然而在mapruduce中怎麼實作這一功能呢,各個map可能運作在不同的JVM中(這裡不考慮JVM重用的情況),然而我們可以借助MapReduce提供的Counter功能來實作這一功能,下面我們通過一個執行個體來說明這一個用法。
實驗要求:快速實作檔案行數,以及其中錯誤記錄的統計
實驗資料:
1
2
error
3
4
5
error
6
7
8
9
10
error
11
12
13
14
error
15
16
17
18
19
解決思路:
定義一個枚舉類型,每次調用map函數時,對值進行判斷,把判斷的結果分别寫入不同的Counter,最後輸出Counter的值
根據以上步驟下面是實作代碼:
map階段:
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class MyMapper extends Mapper<LongWritable, Text, LongWritable, IntWritable> {
/**
* 定義一個枚舉類型
* @date 2016年3月25日 下午3:29:44
* @{tags}
*/
public static enum FileRecorder{
ErrorRecorder,
TotalRecorder
}
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
if("error".equals(value.toString())){
/**
* 把counter實作累加
*/
context.getCounter(FileRecorder.ErrorRecorder).increment(1);
}
/**
* 把counter實作累加
*/
context.getCounter(FileRecorder.TotalRecorder).increment(1);
}
}
啟動函數:
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.NLineInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import com.seven.mapreduce.counter.MyMapper.FileRecorder;
public class JobMain {
public static void main(String[] args) throws Exception {
Configuration configuration = new Configuration();
/**
* 使NLineInputFormat來分割一個小檔案,近而模拟分布式大檔案的處理
*/
configuration.setInt("mapreduce.input.lineinputformat.linespermap", 5);
Job job = new Job(configuration, "counter-job");
job.setInputFormatClass(NLineInputFormat.class);
job.setJarByClass(JobMain.class);
job.setMapperClass(MyMapper.class);
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
Path outputDir = new Path(args[1]);
FileSystem fs = FileSystem.get(configuration);
if( fs.exists(outputDir)) {
fs.delete(outputDir ,true);
}
FileOutputFormat.setOutputPath(job, outputDir);
if(job.waitForCompletion(true) ? true: false) {
System.out.println("Error num:" + job.getCounters().findCounter(FileRecorder.ErrorRecorder).getValue());
System.out.println("Total num:" + job.getCounters().findCounter(FileRecorder.TotalRecorder).getValue());
}
}
}
運作結果:
總結:
由上可以看出總共跑了5個map任務,而且通過Counter實作了不同JVM中的全局累加器的功能。關于除自定義Counter以外的其它Counter的含義可以參考《MapReduce-Counters含義》