天天看点

Hadoop_MapReduce 自定义InputFromat合并小文件

自定义InputFromat

        • 适用场景
        • 分析
        • 实现思路
        • 代码实现:
          • 自定义InputFromat
          • 自定义RecordReader
          • 定义map
          • 定义主类

适用场景

  • 无论hdfs还是mapreduce,对于小文件都有损效率,实践中,又难免面临处理大量小文件的场景,此时,就需要有相应解决方案

分析

小文件的优化无非以下几种方式:

  • 1、在数据采集的时候,就将小文件或小批数据合成大文件再上传HDFS
  • 2、在业务处理之前,在HDFS上使用mapreduce程序对小文件进行合并
  • 3、在mapreduce处理时,可采用combineInputFormat提高效率

实现思路

程序的核心机制:

  • 自定义一个InputFormat
  • 重写RecordReader,实现一次读取一个完整文件封装为bytesWritable类型
  • 在输出时使用SequenceFileOutPutFormat输出合并文件

代码实现:

自定义InputFromat
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import java.io.IOException;

public class DiyInputFormat extends FileInputFormat {

    @Override
    public RecordReader createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
        DiyRecordReader diyRecordReader = new DiyRecordReader();
        diyRecordReader.initialize(inputSplit, taskAttemptContext);
        return diyRecordReader;
    }

    @Override
    protected boolean isSplitable(JobContext context, Path filename) {
    	//设置不切分
        return false;
    }
}
           
自定义RecordReader
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

import java.io.IOException;

public class DiyRecordReader extends RecordReader {
    private boolean next = false;
    private FileSplit inputSplit1;
    private Configuration configuration;
    private BytesWritable bytesWritable = new BytesWritable();
    
	//初始化操作
	 @Override
    public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
        inputSplit1 = (FileSplit) inputSplit;
        configuration = taskAttemptContext.getConfiguration();
    }
    
	//获得下一条数据
    //系统默认读取一行数据   根据项目需求一次获取所有数据,类型为:BytesWritable
    @Override
    public boolean nextKeyValue() throws IOException, InterruptedException {
        if (!next) {
        	//实例文件系统
            FileSystem fileSystem = FileSystem.get(configuration);
            FSDataInputStream open = fileSystem.open(inputSplit1.getPath());
            byte[] buf = new byte[(int) inputSplit1.getLength()];
            //读取所有数据
            IOUtils.readFully(open, buf, 0, buf.length);
            //写入到BytesWritable类型的变量中
            bytesWritable.set(buf, 0, buf.length);
            next = true;
            return true;
        }
        return false;
    }

	//获取当前的key
    @Override
    public Object getCurrentKey() throws IOException, InterruptedException {
        return NullWritable.get();
    }
	
	//获取当前的value
    @Override
    public Object getCurrentValue() throws IOException, InterruptedException {
        return bytesWritable;
    }

	//获取任务进度
    @Override
    public float getProgress() throws IOException, InterruptedException {
        return 0;
    }

	//用于关闭系统内的资源
    @Override
    public void close() throws IOException {

    }
}
           
定义map
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

import java.io.IOException;

public class DiyMap extends Mapper<NullWritable, BytesWritable, Text, BytesWritable> {
    @Override
    protected void map(NullWritable key, BytesWritable value, Context context) throws IOException, InterruptedException {
   	 	//获取文件路径并且输出
        FileSplit inputSplit = (FileSplit) context.getInputSplit();
        context.write(new Text(inputSplit.getPath().getName()), value);

    }
}
           
定义主类
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;

public class DiyDriver {
    public static void main(String[] args) throws Exception {

        Job job = Job.getInstance(new Configuration(), "DiyInputFormat_001");
        job.setJarByClass(DiyDriver.class);
        job.setMapperClass(DiyMap.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(BytesWritable.class);
        
		//设置文件读取的Format
        job.setInputFormatClass(DiyInputFormat.class);
        //设置读取数据的路径
        DiyInputFormat.addInputPath(job, new Path("文件读取路径"));
		//设置输出格式
        job.setOutputFormatClass(SequenceFileOutputFormat.class);
        //设置输出的路径
        SequenceFileOutputFormat.setOutputPath(job, new Path("文件输出路径"));

        boolean b = job.waitForCompletion(true);
        System.exit(b ? 0 : 1);
    }
}
           
看了不点赞,坤坤咬你蛋!!!

           

继续阅读