天天看点

mapreduce(四):单表关联查询

本文主要通过输入文件中的child字段和parent字段进行单表关联查询,推导出哪些用户具有child与grandparent关系。

1、 输入数据分析

输入文件数据示例:

child parent
Tom Jack 
Jack Alice
Jack Jesse
           

第1列表示child,第2列表示parent,我们需要根据child和parent的关系得出child和grantparent的关系。比如说Tom的parent是Jack,Jack的parent是Alice和Jesse,由此我们可以得出Tom的grantparent是{Alice,Jesse}。

2、 Map过程

首先使用默认的TextInputFormat类对输入文件进行处理,得到文本中每行的偏移量及其内容。Map过程首先将输入分割成child和parent,然后正序输出一次作为右表,反序输出一次作为左表,需要注意的是在输出的value中必须加上左右表区别标志,其中左表标识符为1,右表标识符为2,如图所示。

mapreduce(四):单表关联查询

3、 Reduce过程

Reduce过程首先对输入< key,values >即<”Lucy”,[“1 Tom”,”2 Mary”,”2 Ben”]>的values值进行遍历获取到单元信息(例如”1 Tom”),然后将单元信息中的用户ID(例如Tom)按照左表、右表标识符分别存入到grandChild集合和grandParent集合,最后对grandChild集合和grandParent集合进行笛卡尔积运算得到child与grandParent的关系,并进行输出,如图所示。

mapreduce(四):单表关联查询
<pre name="code" class="java">package com.mr5;

import java.io.File;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashSet;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.Reducer;

import com.mr1.EJob;
import com.mr3.CombinationKey;
import com.mr3.DefinedComparator;
import com.mr3.DefinedGroupSort;
import com.mr3.DefinedPartition;
import com.mr3.SecondSortMR;
import com.mr3.SecondSortMR.SortMapper;
import com.mr3.SecondSortMR.SortReducer;


/*
 * 输入文件数据示例:
child parent
Tom Jack 
Jack Alice
Jack Jesse
 * 
 * 输出结果:
 * Tom	   Alice
 * Tom	   Jesse
 * 
 * 
 * */


public class SingletonTableJoin {
	
	public static class JoinMapper extends Mapper<LongWritable, Text, Text, Text>{
		Text child=new Text();
		Text parent=new Text();
		String line="";
		@Override
		protected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {
			if (value!=null) {
				line=value.toString();
				String[] values=line.split(" ");
				if (values.length>=2) {
					child.set(values[0]);
					parent.set("parent "+values[1]);
					context.write(child, parent);
					parent.set(values[1]);
					child.set("child "+values[0]);
					context.write(parent, child);
				}
			}
		}	
	}
	
	public static class JoinReducer extends Reducer<Text,Text,Text,Text>{
		
		String value="";
		HashSet<String> childs=new HashSet<>();
		HashSet<String> parents=new HashSet<>();
		Text child=new Text();
		Text parent=new Text();
		@Override
		protected void reduce(Text key, Iterable<Text> values,Context context)throws IOException, InterruptedException {
			for (Text text : values) {
				value=text.toString();
				if (value.contains("child")) {
					childs.add(value.split(" ")[1]);
				}
				if (value.contains("parent")) {
					parents.add(value.split(" ")[1]);
				}
			}
			
			if (childs.size()!=0&&parents.size()!=0) {
				for (String str1 : childs) {
					child.set(str1);
					for (String str2 : parents) {
						parent.set("   "+str2);
						context.write(child, parent);
					}
				}
			}
			childs.clear();
			parents.clear();
		}
		
	}
		public static void main(String[] args) throws Exception {
			File jarFile = EJob.createTempJar("bin");
			ClassLoader classLoader = EJob.getClassLoader();
			Thread.currentThread().setContextClassLoader(classLoader);
			
			Configuration conf = new Configuration(true);
			conf.set("fs.default.name", "hdfs://192.168.56.111:9000");
			conf.set("hadoop.job.user", "root");
			conf.set("mapreduce.framework.name", "yarn");
			conf.set("mapreduce.jobtracker.address", "192.168.56.111:9001");
			conf.set("yarn.resourcemanager.hostname", "192.168.56.111");
			conf.set("yarn.resourcemanager.admin.address", "192.168.56.111:8033");
			conf.set("yarn.resourcemanager.address", "192.168.56.111:8032");
			conf.set("yarn.resourcemanager.resource-tracker.address","192.168.56.111:8031");
			conf.set("yarn.resourcemanager.scheduler.address","192.168.56.111:8030");
			conf.setBoolean("mapreduce.app-submission.cross-platform", true);
			String[] otherArgs = new String[2];
			otherArgs[0] = "hdfs://192.168.56.111:9000/testdata/SingletonTableJoin.txt";// 计算原文件目录,需提前在里面存入文件
			String time = new SimpleDateFormat("yyyyMMddHHmmss").format(new Date());
			otherArgs[1] = "hdfs://192.168.56.111:9000/output/output" + time;// 计算后的计算结果存储目录,每次程序执行的结果目录不能相同,所以添加时间标签
			
			Job job = new Job(conf, "SingletonTableJoin job");
			job.setJarByClass(SingletonTableJoin.class);
			((JobConf) job.getConfiguration()).setJar(jarFile.toString());// 环境变量调用,添加此句则可在eclipse中直接提交mapreduce任务,如果将该java文件打成jar包,需要将该句注释掉,否则在执行时反而找不到环境变量
			System.out.println("Job start!");
			FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
			FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
			
			job.setMapperClass(JoinMapper.class);
			job.setReducerClass(JoinReducer.class);
			
			 //设置reduce的输出key和value类型 
			job.setOutputKeyClass(Text.class);
			job.setOutputValueClass(Text.class);
			
			
			if (job.waitForCompletion(true)) {
				System.out.println("ok!");
			} else {
				System.out.println("error!");
				System.exit(0);
			}
		}
		
	}
           

继续阅读