1.指定多個輸入
在單個操作中處理一批檔案,這是很常見的需求。比如說處理日志的MapReduce作業可能需要分析一個月内包含在大量目錄中的日志檔案。在一個表達式中使用通配符在比對多個檔案時比較友善的,無需列舉每個檔案和目錄來指定輸入。hadoop為執行通配提供了兩個FileSystem方法:
public FileStatus[] globStatus(Path pathPattern) throw IOException
public FileStatus[] globStatus(Path pathPattern, PathFilter filter) throw IOException
PS:
- globStatus()方法傳回與路徑想比對的所有檔案的FileStatus對象數組,并按路徑排序。hadoop所支援的通配符與Unix bash相同。
- 第二個方法傳了一個PathFilter對象作為參數,PathFilter可以進一步對比對進行限制。PathFilter是一個接口,裡面隻有一個方法accept(Path path)。
PathFilter執行個體
RegexExcludePathFilter.java
class RegexExcludePathFilter implements PathFilter{
private final String regex;
public RegexExcludePathFilter(String regex) {
this.regex = regex;
}
@Override
public boolean accept(Path path) {
return !path.toString().matches(regex);
}
}
PS:該類實作了PathFilter接口,重寫了accept方法
使用這個過濾器:
//通配符的使用
public static void list() throws IOException{
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
//PathFilter是過濾布符合置頂表達式的路徑,下列就是把以txt結尾的過濾掉
FileStatus[] status = fs.globStatus(new Path("hdfs://master:9000/user/hadoop/test/*"),new RegexExcludePathFilter(".*txt"));
//FileStatus[] status = fs.globStatus(new Path("hdfs://master:9000/user/hadoop/test/*"));
Path[] listedPaths = FileUtil.stat2Paths(status);
for (Path p : listedPaths) {
System.out.println(p);
}
}
如果沒有過濾器,
FileStatus[] status = fs.globStatus(new Path("hdfs://master:9000/user/hadoop/test/*"));
則輸出結果如下:
hdfs://master:/user/hadoop/test/a.txt
hdfs://master:/user/hadoop/test/b.txt
hdfs://master:/user/hadoop/test/c.aaa
hdfs://master:/user/hadoop/test/c.txt
hdfs://master:/user/hadoop/test/cc.aaa
如果使用了過濾器
FileStatus[] status = fs.globStatus(new Path("hdfs://master:9000/user/hadoop/test/*"),new RegexExcludePathFilter(".*txt"));
則輸出結果如下:
hdfs://master:/user/hadoop/test/c.aaa
hdfs://master:/user/hadoop/test/cc.aaa
由此可見,PathFilter就是在比對前面條件之後再加以限制,将比對PathFilter的路徑去除掉。
其實由accept方法裡面的
return !path.toString().matches(regex);
可以看出來,就是将比對的全部去除掉,如果改為
return path.toString().matches(regex);
就是将比對regex的Path輸出,将不比對的去除。
PathFilter執行個體2
public int run(String[] args) throws Exception {
Configuration conf = getConf();
FileSystem fs = FileSystem.get(conf);
Job job = Job.getInstance(conf);
//通過過濾器過濾掉不要的檔案
FileStatus[] status = fs.globStatus(new Path(args[]),new RegexExcludePathFilter(".*txt"));
Path[] listedPaths = FileUtil.stat2Paths(status);
job.setJarByClass(this.getClass());
job.setJobName("SumStepByTool");
job.setInputFormatClass(TextInputFormat.class); //這個是預設的輸入格式
job.setMapperClass(SumStepByToolMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(InfoBeanMy.class);
job.setReducerClass(SumStepByToolReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(InfoBeanMy.class);
//job.setNumReduceTasks();
//對不同的輸入檔案使用不同的Mapper進行處理
// MultipleInputs.addInputPath(job, new Path(args[]), TextInputFormat.class, SumStepByToolMapper.class);
// MultipleInputs.addInputPath(job, new Path(args[]), TextInputFormat.class, SumStepByToolWithCommaMapper.class);
FileInputFormat.setInputPaths(job, listedPaths);
FileOutputFormat.setOutputPath(job, new Path(args[]));
return job.waitForCompletion(true) ? :-;
}