天天看點

Hadoop-MapReduce-OutputFormat資料輸出

OutPutFormat資料輸出

實作接口如下:

Hadoop-MapReduce-OutputFormat資料輸出

自定義OutputFormat使用場景以及步驟

Hadoop-MapReduce-OutputFormat資料輸出

實操:

目的:過濾檔案,将包含caocao的行輸入到caocao.txt中,其他的資料輸出到other.txt中

輸入資料:

caocao shi wei wu di
liubei shi shu zhao lie di
liubei shi shu guo de
caocao shi wei guo de 
caozhi shi caocao de er zi
           

Mapper如下

public class DefinedOutputFormatMapper extends Mapper<LongWritable, Text, Text, NullWritable> {

    Text k = new Text();
    String line = "\r\n";

    @Override
    protected void map (LongWritable key, Text value, Context context) throws IOException, InterruptedException {

        String keyStr = value.toString();
		// 這裡不加\r\n的話,不會自動換行
        k.set(keyStr + line);

        context.write(k, NullWritable.get());
    }
}
           

Reducer如下:

public class DefinedOutputFormatReducer extends Reducer<Text, NullWritable, Text, NullWritable> {

    @Override
    protected void reduce (Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {

        // 防止有重複的資料
        for (NullWritable value : values) {
            context.write(key, NullWritable.get());
        }
    }
}
           

自定義OutputFormat實作類如下

public class DefinedOutputFormat extends FileOutputFormat<Text, NullWritable> {

    @Override
    public RecordWriter<Text, NullWritable> getRecordWriter (TaskAttemptContext context) throws IOException, InterruptedException {

        return new DefinedOutputRecordWriter(context);
    }
}
           

自定義RecordWriter如下

public class DefinedOutputRecordWriter extends RecordWriter<Text, NullWritable> {

    FSDataOutputStream fosCaocao;
    FSDataOutputStream fosOther;

    public DefinedOutputRecordWriter (TaskAttemptContext context) {

        try {
            // 1、擷取檔案系統
            FileSystem fileSystem = FileSystem.get(context.getConfiguration());

            // 2、建立輸出到caocao.txt的輸出流
            fosCaocao = fileSystem.create(new Path("/home/lxj/hadoop-data/output/definedOutput/caocao.txt"));

            // 3、建立輸出到other.txt的輸出流
            fosOther = fileSystem.create(new Path("/home/lxj/hadoop-data/output/definedOutput/other.txt"));

        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void write (Text text, NullWritable nullWritable) throws IOException, InterruptedException {

        // 判斷key中是否有caocao,有則寫入caocao.txt,否則寫入other.txt
        if (text.toString().contains("caocao")) {
            fosCaocao.write(text.toString().getBytes());
        } else {
            fosOther.write(text.toString().getBytes());
        }
    }

    @Override
    public void close (TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {

        IOUtils.closeStream(fosCaocao);
        IOUtils.closeStream(fosOther);
    }

           

Driver加入

// 設定OutputFormat
job.setOutputFormatClass(DefinedOutputFormat.class);
           

運作結果如下:

Hadoop-MapReduce-OutputFormat資料輸出

繼續閱讀