天天看点

hadoop 之 PathFilter -- 输入文件过滤器

1.指定多个输入

在单个操作中处理一批文件,这是很常见的需求。比如说处理日志的MapReduce作业可能需要分析一个月内包含在大量目录中的日志文件。在一个表达式中使用通配符在匹配多个文件时比较方便的,无需列举每个文件和目录来指定输入。hadoop为执行通配提供了两个FileSystem方法:

public FileStatus[] globStatus(Path pathPattern) throw IOException
public FileStatus[] globStatus(Path pathPattern, PathFilter filter) throw IOException
           

PS:

  • globStatus()方法返回与路径想匹配的所有文件的FileStatus对象数组,并按路径排序。hadoop所支持的通配符与Unix bash相同。
  • 第二个方法传了一个PathFilter对象作为参数,PathFilter可以进一步对匹配进行限制。PathFilter是一个接口,里面只有一个方法accept(Path path)。

PathFilter实例

RegexExcludePathFilter.java

class RegexExcludePathFilter implements PathFilter{
    private final String regex;
    public RegexExcludePathFilter(String regex) {
        this.regex = regex;
    }
    @Override
    public boolean accept(Path path) {
        return !path.toString().matches(regex);
    }
}
           

PS:该类实现了PathFilter接口,重写了accept方法

使用这个过滤器:

//通配符的使用
public static void list() throws IOException{
        Configuration conf = new Configuration();
        FileSystem fs = FileSystem.get(conf);
        //PathFilter是过滤布符合置顶表达式的路径,下列就是把以txt结尾的过滤掉
        FileStatus[] status = fs.globStatus(new Path("hdfs://master:9000/user/hadoop/test/*"),new RegexExcludePathFilter(".*txt"));
        //FileStatus[] status = fs.globStatus(new Path("hdfs://master:9000/user/hadoop/test/*"));
        Path[] listedPaths = FileUtil.stat2Paths(status);
        for (Path p : listedPaths) {
            System.out.println(p);
        }
    }
           

如果没有过滤器,

FileStatus[] status = fs.globStatus(new Path("hdfs://master:9000/user/hadoop/test/*"));
           

则输出结果如下:

hdfs://master:/user/hadoop/test/a.txt
hdfs://master:/user/hadoop/test/b.txt
hdfs://master:/user/hadoop/test/c.aaa
hdfs://master:/user/hadoop/test/c.txt
hdfs://master:/user/hadoop/test/cc.aaa
           

如果使用了过滤器

FileStatus[] status = fs.globStatus(new Path("hdfs://master:9000/user/hadoop/test/*"),new RegexExcludePathFilter(".*txt"));
           

则输出结果如下:

hdfs://master:/user/hadoop/test/c.aaa
hdfs://master:/user/hadoop/test/cc.aaa
           

由此可见,PathFilter就是在匹配前面条件之后再加以限制,将匹配PathFilter的路径去除掉。

其实由accept方法里面的

return !path.toString().matches(regex);

可以看出来,就是将匹配的全部去除掉,如果改为

return path.toString().matches(regex);

就是将匹配regex的Path输出,将不匹配的去除。

PathFilter实例2

public int run(String[] args) throws Exception {
        Configuration conf = getConf();
        FileSystem fs = FileSystem.get(conf);
        Job job = Job.getInstance(conf);

        //通过过滤器过滤掉不要的文件
        FileStatus[] status = fs.globStatus(new Path(args[]),new RegexExcludePathFilter(".*txt"));
        Path[] listedPaths = FileUtil.stat2Paths(status);

        job.setJarByClass(this.getClass());
        job.setJobName("SumStepByTool");
        job.setInputFormatClass(TextInputFormat.class); //这个是默认的输入格式

        job.setMapperClass(SumStepByToolMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(InfoBeanMy.class);

        job.setReducerClass(SumStepByToolReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(InfoBeanMy.class);
        //job.setNumReduceTasks();

        //对不同的输入文件使用不同的Mapper进行处理
//      MultipleInputs.addInputPath(job, new Path(args[]), TextInputFormat.class, SumStepByToolMapper.class);
//      MultipleInputs.addInputPath(job, new Path(args[]), TextInputFormat.class, SumStepByToolWithCommaMapper.class);
        FileInputFormat.setInputPaths(job, listedPaths);
        FileOutputFormat.setOutputPath(job, new Path(args[]));


        return job.waitForCompletion(true) ? :-;
    }
           

继续阅读