天天看點

hadoop實戰多表關聯

多表關聯處理擷取結果,大緻意思把資料切割成左右表

package org.apache.hadoop.examples;

import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class STjoin {
	public static int time = 0;

	public static class Map extends Mapper<Object, Text, Text, Text> {

		public void map(Object key, Text value, Context context)
				throws IOException, InterruptedException {
			String childname = new String();
			String parentname = new String();
			String type = new String();
			String line = value.toString();
			int i = 0;
			while (line.charAt(i) != ' ') {
				i++;
			}

			String[] values = { line.substring(0, i), line.substring(i + 1) };
			if (values[0].compareTo("child") != 0) {
				childname = values[0];
				parentname = values[1];
				type = "1";
				context.write(new Text(values[1]), new Text(type + "+"
						+ childname + "+" + parentname));

				type = "2";
				context.write(new Text(values[0]), new Text(type + "+"
						+ childname + "+" + parentname));

			}
		}
	}

	public static class IntSumReducer extends Reducer<Text, Text, Text, Text> {

		public void reduce(Text key, Iterable<IntWritable> values,
				Context context) throws IOException, InterruptedException {
			if (time == 0) {
				context.write(new Text("grandchild"), new Text("grandparent"));
				time++;
			}
			
			int grandchildnum = 0;
			String grandchild[] = new String[10];
			int grandparentnum = 0;
			String grandparent[] = new String[10];
			Iterator ite = values.iterator();
			while (ite.hasNext()) {
				String record = ite.next().toString();
				int len = record.length();
				int i = 2;
				if (len == 0)
					continue;
				char type = record.charAt(0);
				String childname = new String();
				String parentname = new String();
				System.out.println("------------------" + record);
				while (record.charAt(i) != '+') {
					childname = childname + record.charAt(i);
					i++;
					// System.out.println("childname" + childname);
				}
				i = i + 1;

				while (i < len) {
					parentname = parentname + record.charAt(i);
					i++;
					// System.out.println("parentname" + parentname);
				}
				if (type == '1') {
					grandchild[grandchildnum] = childname;
					grandchildnum++;
				} else {
					grandparent[grandparentnum] = parentname;
					grandparentnum++;
				}
			}
			if (grandparentnum != 0 && grandchildnum != 0) {
				for (int i = 0; i < grandchildnum; i++) {
					for (int j = 0; j < grandparentnum; j++) {
						context.write(new Text(grandchild[i]), new Text(
								grandparent[j]));
					}
				}
			}
		}
	}

	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		String[] otherArgs = new GenericOptionsParser(conf, args)
				.getRemainingArgs();
		if (otherArgs.length != 2) {
			System.err.println("Usage: wordcount <in> <out>");
			System.exit(2);
		}
		Job job = new Job(conf, "stsort");
		job.setJarByClass(STjoin.class);
		job.setMapperClass(Map.class);
		job.setReducerClass(IntSumReducer.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);
		FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
		FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
		System.exit(job.waitForCompletion(true) ? 0 : 1);
	}
}