天天看点

Hadoop-MapReduce-OutputFormat数据输出

OutPutFormat数据输出

实现接口如下:

Hadoop-MapReduce-OutputFormat数据输出

自定义OutputFormat使用场景以及步骤

Hadoop-MapReduce-OutputFormat数据输出

实操:

目的:过滤文件,将包含caocao的行输入到caocao.txt中,其他的数据输出到other.txt中

输入数据:

caocao shi wei wu di
liubei shi shu zhao lie di
liubei shi shu guo de
caocao shi wei guo de 
caozhi shi caocao de er zi
           

Mapper如下

public class DefinedOutputFormatMapper extends Mapper<LongWritable, Text, Text, NullWritable> {

    Text k = new Text();
    String line = "\r\n";

    @Override
    protected void map (LongWritable key, Text value, Context context) throws IOException, InterruptedException {

        String keyStr = value.toString();
		// 这里不加\r\n的话,不会自动换行
        k.set(keyStr + line);

        context.write(k, NullWritable.get());
    }
}
           

Reducer如下:

public class DefinedOutputFormatReducer extends Reducer<Text, NullWritable, Text, NullWritable> {

    @Override
    protected void reduce (Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {

        // 防止有重复的数据
        for (NullWritable value : values) {
            context.write(key, NullWritable.get());
        }
    }
}
           

自定义OutputFormat实现类如下

public class DefinedOutputFormat extends FileOutputFormat<Text, NullWritable> {

    @Override
    public RecordWriter<Text, NullWritable> getRecordWriter (TaskAttemptContext context) throws IOException, InterruptedException {

        return new DefinedOutputRecordWriter(context);
    }
}
           

自定义RecordWriter如下

public class DefinedOutputRecordWriter extends RecordWriter<Text, NullWritable> {

    FSDataOutputStream fosCaocao;
    FSDataOutputStream fosOther;

    public DefinedOutputRecordWriter (TaskAttemptContext context) {

        try {
            // 1、获取文件系统
            FileSystem fileSystem = FileSystem.get(context.getConfiguration());

            // 2、创建输出到caocao.txt的输出流
            fosCaocao = fileSystem.create(new Path("/home/lxj/hadoop-data/output/definedOutput/caocao.txt"));

            // 3、创建输出到other.txt的输出流
            fosOther = fileSystem.create(new Path("/home/lxj/hadoop-data/output/definedOutput/other.txt"));

        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void write (Text text, NullWritable nullWritable) throws IOException, InterruptedException {

        // 判断key中是否有caocao,有则写入caocao.txt,否则写入other.txt
        if (text.toString().contains("caocao")) {
            fosCaocao.write(text.toString().getBytes());
        } else {
            fosOther.write(text.toString().getBytes());
        }
    }

    @Override
    public void close (TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {

        IOUtils.closeStream(fosCaocao);
        IOUtils.closeStream(fosOther);
    }

           

Driver加入

// 设置OutputFormat
job.setOutputFormatClass(DefinedOutputFormat.class);
           

运行结果如下:

Hadoop-MapReduce-OutputFormat数据输出

继续阅读