天天看點

hadoop 之 PathFilter -- 輸入檔案過濾器

1.指定多個輸入

在單個操作中處理一批檔案,這是很常見的需求。比如說處理日志的MapReduce作業可能需要分析一個月内包含在大量目錄中的日志檔案。在一個表達式中使用通配符在比對多個檔案時比較友善的,無需列舉每個檔案和目錄來指定輸入。hadoop為執行通配提供了兩個FileSystem方法:

public FileStatus[] globStatus(Path pathPattern) throw IOException
public FileStatus[] globStatus(Path pathPattern, PathFilter filter) throw IOException
           

PS:

  • globStatus()方法傳回與路徑想比對的所有檔案的FileStatus對象數組,并按路徑排序。hadoop所支援的通配符與Unix bash相同。
  • 第二個方法傳了一個PathFilter對象作為參數,PathFilter可以進一步對比對進行限制。PathFilter是一個接口,裡面隻有一個方法accept(Path path)。

PathFilter執行個體

RegexExcludePathFilter.java

class RegexExcludePathFilter implements PathFilter{
    private final String regex;
    public RegexExcludePathFilter(String regex) {
        this.regex = regex;
    }
    @Override
    public boolean accept(Path path) {
        return !path.toString().matches(regex);
    }
}
           

PS:該類實作了PathFilter接口,重寫了accept方法

使用這個過濾器:

//通配符的使用
public static void list() throws IOException{
        Configuration conf = new Configuration();
        FileSystem fs = FileSystem.get(conf);
        //PathFilter是過濾布符合置頂表達式的路徑,下列就是把以txt結尾的過濾掉
        FileStatus[] status = fs.globStatus(new Path("hdfs://master:9000/user/hadoop/test/*"),new RegexExcludePathFilter(".*txt"));
        //FileStatus[] status = fs.globStatus(new Path("hdfs://master:9000/user/hadoop/test/*"));
        Path[] listedPaths = FileUtil.stat2Paths(status);
        for (Path p : listedPaths) {
            System.out.println(p);
        }
    }
           

如果沒有過濾器,

FileStatus[] status = fs.globStatus(new Path("hdfs://master:9000/user/hadoop/test/*"));
           

則輸出結果如下:

hdfs://master:/user/hadoop/test/a.txt
hdfs://master:/user/hadoop/test/b.txt
hdfs://master:/user/hadoop/test/c.aaa
hdfs://master:/user/hadoop/test/c.txt
hdfs://master:/user/hadoop/test/cc.aaa
           

如果使用了過濾器

FileStatus[] status = fs.globStatus(new Path("hdfs://master:9000/user/hadoop/test/*"),new RegexExcludePathFilter(".*txt"));
           

則輸出結果如下:

hdfs://master:/user/hadoop/test/c.aaa
hdfs://master:/user/hadoop/test/cc.aaa
           

由此可見,PathFilter就是在比對前面條件之後再加以限制,将比對PathFilter的路徑去除掉。

其實由accept方法裡面的

return !path.toString().matches(regex);

可以看出來,就是将比對的全部去除掉,如果改為

return path.toString().matches(regex);

就是将比對regex的Path輸出,将不比對的去除。

PathFilter執行個體2

public int run(String[] args) throws Exception {
        Configuration conf = getConf();
        FileSystem fs = FileSystem.get(conf);
        Job job = Job.getInstance(conf);

        //通過過濾器過濾掉不要的檔案
        FileStatus[] status = fs.globStatus(new Path(args[]),new RegexExcludePathFilter(".*txt"));
        Path[] listedPaths = FileUtil.stat2Paths(status);

        job.setJarByClass(this.getClass());
        job.setJobName("SumStepByTool");
        job.setInputFormatClass(TextInputFormat.class); //這個是預設的輸入格式

        job.setMapperClass(SumStepByToolMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(InfoBeanMy.class);

        job.setReducerClass(SumStepByToolReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(InfoBeanMy.class);
        //job.setNumReduceTasks();

        //對不同的輸入檔案使用不同的Mapper進行處理
//      MultipleInputs.addInputPath(job, new Path(args[]), TextInputFormat.class, SumStepByToolMapper.class);
//      MultipleInputs.addInputPath(job, new Path(args[]), TextInputFormat.class, SumStepByToolWithCommaMapper.class);
        FileInputFormat.setInputPaths(job, listedPaths);
        FileOutputFormat.setOutputPath(job, new Path(args[]));


        return job.waitForCompletion(true) ? :-;
    }
           

繼續閱讀