mapreduce自定義輸入格式
概念:
- 當普通的輸入格不能滿足客戶的要求的時候。因為普通的輸入格式是将檔案的每一行輸入的資料作為一個value值然後進行map端的操作。現在有的需求是将資料庫中的資料作為一個輸入的格式,或者是将一個檔案的整體作為一個輸入格式等。
舉例:
- 現在有一個需求是将一個目錄下的所有小檔案讀取進來,将檔案的整個内容都作為一個value值進行輸入。出來map端的值是檔案名稱作為key值,整個檔案内容作為value值。
源碼解析:
- 源碼
public abstract class InputFormat<K, V> {
/**
* Logically split the set of input files for the job.
*
* <p>Each {@link InputSplit} is then assigned to an individual {@link Mapper}
* for processing.</p>
*
* <p><i>Note</i>: The split is a <i>logical</i> split of the inputs and the
* input files are not physically split into chunks. For e.g. a split could
* be <i><input-file-path, start, offset></i> tuple. The InputFormat
* also creates the {@link RecordReader} to read the {@link InputSplit}.
*
* @param context job configuration.
* @return an array of {@link InputSplit}s for the job.
*/
public abstract
List<InputSplit> getSplits(JobContext context
) throws IOException, InterruptedException;
/**
* Create a record reader for a given split. The framework will call
* {@link RecordReader#initialize(InputSplit, TaskAttemptContext)} before
* the split is used.
* @param split the split to be read
* @param context the information about the task
* @return a new record reader
* @throws IOException
* @throws InterruptedException
*/
public abstract
RecordReader<K,V> createRecordReader(InputSplit split,
TaskAttemptContext context
) throws IOException,
InterruptedException;
}
- 解說:
- 這是最基礎的InputFormat類,其中包含了兩個方法,第一個方法就是getSplit()方法,和getRecordReader()方法。
- getSplit():這個方法是擷取這個檔案的分片資訊,必須要實作。
- getRecordReader():這個方法是具體操作讀取檔案的方式,也必須得實作。
具體操作
創造一個檔案:
...........
1.txt
one
...........
2.txt
tow
...........
3.txt
three
...........
書寫自定一的檔案輸入類型
- 建立新的格式CusFileInputFormat類
/**
* @description
* @author: LuoDeSong [email protected]
* @create: 2019-06-19 11:06:09
**/
public class CusFileInputFormat extends FileInputFormat<NullWritable, BytesWritable> {
//讀取的檔案是否可被切分,這裡的檔案小于了128M,是以不需要進行切分,是以設定的值是false,如果需要切分的時候就改成true
@Override
protected boolean isSplitable(JobContext context, Path filename) {
return false;
}
@Override
public RecordReader createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
CusRecordReader cusRecordReader = new CusRecordReader();
cusRecordReader.initialize(split,context);
return cusRecordReader;
}
}
* 說明:因為沒有必要更改别人的檔案分片的規則,是以我們可以直接繼承InputFormat類的實作類FileInputFormat,他其中已經實作了檔案的分片規則。我們需要做的就是重新定義我們讀取檔案的個是就行也就是重新寫一個RecordReader類。為getRecordReader()做準備。
* 參數說明:因為輸入的key是整個檔案是以就不需偏移量了,是以為NullWritable;因為讀取的是整個檔案,并且是按照位元組的方式來讀取的,是以為BytesWritable。
- 建立新的RecordReader類CusRecordReader:
/**
* @description
* @author: LuoDeSong [email protected]
* @create: 2019-06-19 11:10:19
**/
public class CusRecordReader extends RecordReader<NullWritable, BytesWritable> {
//定義配置類
private Configuration conf;
//檔案的切片類
private FileSplit split;
//是否繼續讀取檔案
private boolean propress;
//輸出資料的格式什麼樣的
private BytesWritable bytesWritable = new BytesWritable();
@Override
public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
this.split = (FileSplit) split;//向下轉型
this.conf = context.getConfiguration();//擷取配置檔案資訊
}
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
if (!propress) {
//1定義緩沖區
byte[] data = new byte[(int) split.getLength()];
//要讀取的檔案就是一個切片
FileSystem fs = null;
FSDataInputStream input = null;
try {
//擷取檔案系統的執行個體
Path path = split.getPath();
fs = path.getFileSystem(conf);
//讀取資料
input = fs.open(path);
IOUtils.readFully(input, data, 0, data.length);
bytesWritable.set(data, 0, data.length);
} catch (Exception e) {
e.printStackTrace();
} finally {
IOUtils.closeStream(input);
}
propress = true;
return propress;
}
return false;
}
@Override
public NullWritable getCurrentKey() throws IOException, InterruptedException {
return NullWritable.get();
}
@Override
public BytesWritable getCurrentValue() throws IOException, InterruptedException {
return bytesWritable;
}
//讀取的進度
@Override
public float getProgress() throws IOException, InterruptedException {
return this.propress ? 1 : -1;
}
@Override
public void close() throws IOException {
}
}
* 說明:需要繼承最根本的檔案讀取的類RecordReader,然後重新按照自己的方式來重新其中必須的方法,書寫的過程和方式已經在代碼中做好了注釋。
* 參數說明:和CusFileInputFormat類中的說明是一樣的。
/**
* @description
* @author: LuoDeSong [email protected]
* @create: 2019-06-19 11:36:06
**/
public class FileMapper extends Mapper<NullWritable, ByteWritable, Text, ByteWritable> {
private Text newKey = new Text();
@Override
protected void setup(Context context) throws IOException, InterruptedException {
FileSplit inputSplit = (FileSplit) context.getInputSplit();
String name = inputSplit.getPath().getName();
newKey.set(name);
}
@Override
protected void map(NullWritable key, ByteWritable value, Context context) throws IOException, InterruptedException {
context.write(newKey, value);
}
}
/**
* @description
* @author: LuoDeSong [email protected]
* @create: 2019-06-19 11:42:22
**/
public class FileReducer extends Reducer<Text, ByteWritable, Text, ByteWritable> {
@Override
protected void reduce(Text key, Iterable<ByteWritable> values, Context context) throws IOException, InterruptedException {
context.write(key, values.iterator().next());
}
}
public class Driver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf=new Configuration();
Job job=Job.getInstance(conf);
job.setJarByClass(Driver.class);
job.setMapperClass(FileMapper.class);
job.setReducerClass(FileReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(BytesWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(BytesWritable.class);
job.setInputFormatClass(CusFileInputFormat.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
boolean result=job.waitForCompletion(true);
System.exit(result?0:1);
}
}
總結:
- 更改輸入格式是一個必需的點,我們整個過程實際上就是追溯源碼,仿照源碼得來的,希望你在大資料的路上越走越好。