天天看點

mapreduce(四):單表關聯查詢

本文主要通過輸入檔案中的child字段和parent字段進行單表關聯查詢,推導出哪些使用者具有child與grandparent關系。

1、 輸入資料分析

輸入檔案資料示例:

child parent
Tom Jack 
Jack Alice
Jack Jesse
           

第1清單示child,第2清單示parent,我們需要根據child和parent的關系得出child和grantparent的關系。比如說Tom的parent是Jack,Jack的parent是Alice和Jesse,由此我們可以得出Tom的grantparent是{Alice,Jesse}。

2、 Map過程

首先使用預設的TextInputFormat類對輸入檔案進行處理,得到文本中每行的偏移量及其内容。Map過程首先将輸入分割成child和parent,然後正序輸出一次作為右表,反序輸出一次作為左表,需要注意的是在輸出的value中必須加上左右表差別标志,其中左表辨別符為1,右表辨別符為2,如圖所示。

mapreduce(四):單表關聯查詢

3、 Reduce過程

Reduce過程首先對輸入< key,values >即<”Lucy”,[“1 Tom”,”2 Mary”,”2 Ben”]>的values值進行周遊擷取到單元資訊(例如”1 Tom”),然後将單元資訊中的使用者ID(例如Tom)按照左表、右表辨別符分别存入到grandChild集合和grandParent集合,最後對grandChild集合和grandParent集合進行笛卡爾積運算得到child與grandParent的關系,并進行輸出,如圖所示。

mapreduce(四):單表關聯查詢
<pre name="code" class="java">package com.mr5;

import java.io.File;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashSet;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.Reducer;

import com.mr1.EJob;
import com.mr3.CombinationKey;
import com.mr3.DefinedComparator;
import com.mr3.DefinedGroupSort;
import com.mr3.DefinedPartition;
import com.mr3.SecondSortMR;
import com.mr3.SecondSortMR.SortMapper;
import com.mr3.SecondSortMR.SortReducer;


/*
 * 輸入檔案資料示例:
child parent
Tom Jack 
Jack Alice
Jack Jesse
 * 
 * 輸出結果:
 * Tom	   Alice
 * Tom	   Jesse
 * 
 * 
 * */


public class SingletonTableJoin {
	
	public static class JoinMapper extends Mapper<LongWritable, Text, Text, Text>{
		Text child=new Text();
		Text parent=new Text();
		String line="";
		@Override
		protected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {
			if (value!=null) {
				line=value.toString();
				String[] values=line.split(" ");
				if (values.length>=2) {
					child.set(values[0]);
					parent.set("parent "+values[1]);
					context.write(child, parent);
					parent.set(values[1]);
					child.set("child "+values[0]);
					context.write(parent, child);
				}
			}
		}	
	}
	
	public static class JoinReducer extends Reducer<Text,Text,Text,Text>{
		
		String value="";
		HashSet<String> childs=new HashSet<>();
		HashSet<String> parents=new HashSet<>();
		Text child=new Text();
		Text parent=new Text();
		@Override
		protected void reduce(Text key, Iterable<Text> values,Context context)throws IOException, InterruptedException {
			for (Text text : values) {
				value=text.toString();
				if (value.contains("child")) {
					childs.add(value.split(" ")[1]);
				}
				if (value.contains("parent")) {
					parents.add(value.split(" ")[1]);
				}
			}
			
			if (childs.size()!=0&&parents.size()!=0) {
				for (String str1 : childs) {
					child.set(str1);
					for (String str2 : parents) {
						parent.set("   "+str2);
						context.write(child, parent);
					}
				}
			}
			childs.clear();
			parents.clear();
		}
		
	}
		public static void main(String[] args) throws Exception {
			File jarFile = EJob.createTempJar("bin");
			ClassLoader classLoader = EJob.getClassLoader();
			Thread.currentThread().setContextClassLoader(classLoader);
			
			Configuration conf = new Configuration(true);
			conf.set("fs.default.name", "hdfs://192.168.56.111:9000");
			conf.set("hadoop.job.user", "root");
			conf.set("mapreduce.framework.name", "yarn");
			conf.set("mapreduce.jobtracker.address", "192.168.56.111:9001");
			conf.set("yarn.resourcemanager.hostname", "192.168.56.111");
			conf.set("yarn.resourcemanager.admin.address", "192.168.56.111:8033");
			conf.set("yarn.resourcemanager.address", "192.168.56.111:8032");
			conf.set("yarn.resourcemanager.resource-tracker.address","192.168.56.111:8031");
			conf.set("yarn.resourcemanager.scheduler.address","192.168.56.111:8030");
			conf.setBoolean("mapreduce.app-submission.cross-platform", true);
			String[] otherArgs = new String[2];
			otherArgs[0] = "hdfs://192.168.56.111:9000/testdata/SingletonTableJoin.txt";// 計算原檔案目錄,需提前在裡面存入檔案
			String time = new SimpleDateFormat("yyyyMMddHHmmss").format(new Date());
			otherArgs[1] = "hdfs://192.168.56.111:9000/output/output" + time;// 計算後的計算結果存儲目錄,每次程式執行的結果目錄不能相同,是以添加時間标簽
			
			Job job = new Job(conf, "SingletonTableJoin job");
			job.setJarByClass(SingletonTableJoin.class);
			((JobConf) job.getConfiguration()).setJar(jarFile.toString());// 環境變量調用,添加此句則可在eclipse中直接送出mapreduce任務,如果将該java檔案打成jar包,需要将該句注釋掉,否則在執行時反而找不到環境變量
			System.out.println("Job start!");
			FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
			FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
			
			job.setMapperClass(JoinMapper.class);
			job.setReducerClass(JoinReducer.class);
			
			 //設定reduce的輸出key和value類型 
			job.setOutputKeyClass(Text.class);
			job.setOutputValueClass(Text.class);
			
			
			if (job.waitForCompletion(true)) {
				System.out.println("ok!");
			} else {
				System.out.println("error!");
				System.exit(0);
			}
		}
		
	}
           

繼續閱讀