天天看點

mapreduce 多種輸入

1.多路徑輸入

1)FileInputFormat.addInputPath 多次調用加載不同路徑

FileInputFormat.addInputPath(job, new Path("hdfs://RS5-112:9000/cs/path1"));

FileInputFormat.addInputPath(job, new Path("hdfs://RS5-112:9000/cs/path2"));

2)FileInputFormat.addInputPaths一次調用加載 多路徑字元串用逗号隔開

FileInputFormat.addInputPaths(job, "hdfs://RS5-112:9000/cs/path1,hdfs://RS5-112:9000/cs/path2");

2.多種輸入

MultipleInputs可以加載不同路徑的輸入檔案,并且每個路徑可用不同的maper

MultipleInputs.addInputPath(job, new Path("hdfs://RS5-112:9000/cs/path1"), TextInputFormat.class,MultiTypeFileInput1Mapper.class);

MultipleInputs.addInputPath(job, new Path("hdfs://RS5-112:9000/cs/path3"), TextInputFormat.class,MultiTypeFileInput3Mapper.class);

例子:

package example;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;

import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**

 * 多類型檔案輸入

 * @author lijl

 *

 */

public class MultiTypeFileInputMR {

static class MultiTypeFileInput1Mapper extends Mapper<LongWritable, Text, Text, Text>{

public void map(LongWritable key,Text value,Context context){

try {

String[] str = value.toString().split("\\|");

context.write(new Text(str[0]), new Text(str[1]));

} catch (IOException e) {

e.printStackTrace();

} catch (InterruptedException e) {

}

static class MultiTypeFileInput3Mapper extends Mapper<LongWritable, Text, Text, Text>{

String[] str = value.toString().split("");

static class MultiTypeFileInputReducer extends Reducer<Text, Text, Text, Text>{

public void reduce(Text key,Iterable<Text> values,Context context){

for(Text value:values){

context.write(key,value);

public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {

Configuration conf = new Configuration();

conf.set("mapred.textoutputformat.separator", ",");

Job job = new Job(conf,"MultiPathFileInput");

job.setJarByClass(MultiTypeFileInputMR.class);

FileOutputFormat.setOutputPath(job, new Path("hdfs://RS5-112:9000/cs/path6"));

job.setMapOutputKeyClass(Text.class);

job.setMapOutputValueClass(Text.class);

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(Text.class);

job.setReducerClass(MultiTypeFileInputReducer.class);

job.setNumReduceTasks(1);

System.exit(job.waitForCompletion(true)?0:1);

繼續閱讀