天天看點

hadoop_MapReduce 自定義outputFormat

自定義outputFormat

        • 适用場景
        • 分析
        • 實作思路
        • 代碼實作:
          • 自定義一個outputformat
          • 定義RecordWriter類
          • 定義Map
          • 定義Driver主類

适用場景

将最終的資料分開到不同的檔案夾下面去

分析

程式的關鍵點是要在一個mapreduce程式中根據資料的不同,輸出兩類結果到不同目錄,這類靈活的輸出需求可以通過自定義outputformat來實作

實作思路

實作要點:

  • 1、在mapreduce中通路外部資源
  • 2、自定義outputformat,改寫其中的recordwriter,改寫具體輸出資料的方法write()

代碼實作:

自定義一個outputformat
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class DiyOutputFormat extends FileOutputFormat<Text, NullWritable> {
    @Override
    public RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
        FileSystem fileSystem = FileSystem.get(taskAttemptContext.getConfiguration());
        FSDataOutputStream goodReview = fileSystem.create(new Path("輸出資料路徑"));
        FSDataOutputStream bedReview = fileSystem.create(new Path("輸出資料路徑"));
        DiyRecordWriter diyRecordWriter = new DiyRecordWriter(goodReview,bedReview);
        return diyRecordWriter;
    }
}
           
定義RecordWriter類
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;

import java.io.IOException;

public class DiyRecordWriter extends RecordWriter<Text, NullWritable> {
    private FSDataOutputStream goodReview;
    private FSDataOutputStream bedReview;

    public DiyRecordWriter(FSDataOutputStream goodReview, FSDataOutputStream bedReview) {
        this.goodReview = goodReview;
        this.bedReview = bedReview;

    }

	//根據項目需求編寫邏輯
    @Override
    public void write(Text text, NullWritable nullWritable) throws IOException, InterruptedException {
        String[] split = text.toString().trim().split("\\t");
        if (split[9].equals("0")) {
            goodReview.write(text.toString().getBytes());
            goodReview.write("\r\n".getBytes());
        } else {
            bedReview.write(text.toString().getBytes());
            bedReview.write("\r\n".getBytes());
        }

    }

    @Override
    public void close(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
        if (goodReview != null) {
            goodReview.close();
        }
        if (bedReview != null) {
            bedReview.close();
        }
    }
}

           
定義Map
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class DiyMap extends Mapper<LongWritable, Text, Text, NullWritable> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        context.write(value, NullWritable.get());

    }
}
           
定義Driver主類
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;

public class DiyDriver {
    public static void main(String[] args) throws Exception {

        Job job = Job.getInstance(new Configuration(), "");

        job.setJarByClass(DiyDriver.class);
        job.setMapperClass(DiyMap.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(NullWritable.class);

        job.setInputFormatClass(TextInputFormat.class);
        TextInputFormat.addInputPath(job, new Path("讀取資料路徑"));

        job.setOutputFormatClass(DiyOutputFormat.class);
        DiyOutputFormat.setOutputPath(job, new Path("輸出的success資料路徑"));

        boolean b = job.waitForCompletion(true);
        System.exit(b ? 0 : 1);
    }
}
           
看了不點贊,坤坤咬你蛋!!!
           

繼續閱讀