天天看點

hadoop之MapReduce---OutputFormat資料輸出OutputFormat接口實作類自定義OutputFormat使用場景及步驟

OutputFormat接口實作類

OutputFormat是MapReduce輸出的基類,所有實作MapReduce輸出都實作了 OutputFormat接口。下面我們介紹幾種常見的OutputFormat實作類。

  1. 文本輸出TextOutputFormat

    預設的輸出格式是TextOutputFormat,它把每條記錄寫為文本行。它的鍵和值可以是任意類型,因為TextOutputFormat調用toString()方法把它們轉換為字元串

  2. SequenceFileOutputFormat

    将SequenceFileOutputFormat輸出作為後續 MapReduce任務的輸入,這便是一種好的輸出格式,因為它的格式緊湊,很容易被壓縮

  3. 自定義OutputFormat

    根據使用者需求,自定義實作輸出

自定義OutputFormat使用場景及步驟

  1. 使用場景

    為了實作控制最終檔案的輸出路徑和輸出格式,可以自定義OutputFormat

    例如:要在一個MapReduce程式中根據資料的不同輸出兩類結果到不同目錄,這類靈活的輸出需求可以通過自定義OutputFormat來實作。

  2. 自定義OutputFormat步驟

    1)自定義一個類繼承FileOutputFormat

    2)改寫RecordWriter,具體改寫輸出資料的方法write()

    #自定義OutputFormat案例實操

    過濾輸入的log日志,包含liujh的網站輸出到e:/liujh.log,不包含liujh的網站輸出到e:/other.log。

    輸入資料

http://www.baidu.com
http://www.google.com
http://cn.bing.com
http://www.liujh.com
http://www.sohu.com
http://www.sina.com
http://www.sin2a.com
http://www.sin2desa.com
http://www.sindsafa.com
           

案例實操

1)編寫FilterMapper類

import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class FilterMapper extends Mapper<LongWritable, Text, Text, NullWritable>{
	@Override
	protected void map(LongWritable key, Text value, Context context)	throws IOException, InterruptedException {
		// 寫出
		context.write(value, NullWritable.get());
	}
}
           

2)編寫FilterReducer類

import java.io.IOException;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class FilterReducer extends Reducer<Text, NullWritable, Text, NullWritable> {
Text k = new Text();
	@Override
	protected void reduce(Text key, Iterable<NullWritable> values, Context context)		throws IOException, InterruptedException {
       // 1 擷取一行
		String line = key.toString();
       // 2 拼接
		line = line + "\r\n";
       // 3 設定key
       k.set(line);
       // 4 輸出
		context.write(k, NullWritable.get());
	}
}
           

3)自定義一個OutputFormat類

import java.io.IOException;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class FilterOutputFormat extends FileOutputFormat<Text, NullWritable>{
	@Override
	public RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext job)			throws IOException, InterruptedException {
		// 建立一個RecordWriter
		return new FilterRecordWriter(job);
	}
}
           

4)編寫RecordWriter類

import java.io.IOException;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
public class FilterRecordWriter extends RecordWriter<Text, NullWritable> {
	FSDataOutputStream liujhOut = null;
	FSDataOutputStream otherOut = null;
	public FilterRecordWriter(TaskAttemptContext job) {
		// 1 擷取檔案系統
		FileSystem fs;
		try {
			fs = FileSystem.get(job.getConfiguration());

			// 2 建立輸出檔案路徑
			Path liujhPath = new Path("e:/liujh.log");
			Path otherPath = new Path("e:/other.log");

			// 3 建立輸出流
			liujhOut = fs.create(liujhPath);
			otherOut = fs.create(otherPath);
		} catch (IOException e) {
			e.printStackTrace();
		}
	}

	@Override
	public void write(Text key, NullWritable value) throws IOException, InterruptedException {
		// 判斷是否包含“liujh”輸出到不同檔案
		if (key.toString().contains("liujh")) {
			liujhOut.write(key.toString().getBytes());
		} else {
			otherOut.write(key.toString().getBytes());
		}
	}

	@Override
	public void close(TaskAttemptContext context) throws IOException, InterruptedException {
		// 關閉資源
IOUtils.closeStream(liujhOut);
		IOUtils.closeStream(otherOut);	}
}
           

5)編寫FilterDriver類

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class FilterDriver {
	public static void main(String[] args) throws Exception {
// 輸入輸出路徑需要根據自己電腦上實際的輸入輸出路徑設定
args = new String[] { "e:/input/inputoutputformat", "e:/output2" };
		Configuration conf = new Configuration();
		Job job = Job.getInstance(conf);

		job.setJarByClass(FilterDriver.class);
		job.setMapperClass(FilterMapper.class);
		job.setReducerClass(FilterReducer.class);

		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(NullWritable.class);
		
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(NullWritable.class);

		// 要将自定義的輸出格式元件設定到job中
		job.setOutputFormatClass(FilterOutputFormat.class);

		FileInputFormat.setInputPaths(job, new Path(args[0]));

		// 雖然我們自定義了outputformat,但是因為我們的outputformat繼承自fileoutputformat
		// 而fileoutputformat要輸出一個_SUCCESS檔案,是以,在這還得指定一個輸出目錄
		FileOutputFormat.setOutputPath(job, new Path(args[1]));

		boolean result = job.waitForCompletion(true);
		System.exit(result ? 0 : 1);
	}
}
           
hadoop之MapReduce---OutputFormat資料輸出OutputFormat接口實作類自定義OutputFormat使用場景及步驟

簡書:https://www.jianshu.com/u/0278602aea1d

CSDN:https://blog.csdn.net/u012387141