本文主要通過輸入檔案中的child字段和parent字段進行單表關聯查詢,推導出哪些使用者具有child與grandparent關系。
1、 輸入資料分析
輸入檔案資料示例:
child parent
Tom Jack
Jack Alice
Jack Jesse
第1清單示child,第2清單示parent,我們需要根據child和parent的關系得出child和grantparent的關系。比如說Tom的parent是Jack,Jack的parent是Alice和Jesse,由此我們可以得出Tom的grantparent是{Alice,Jesse}。
2、 Map過程
首先使用預設的TextInputFormat類對輸入檔案進行處理,得到文本中每行的偏移量及其内容。Map過程首先将輸入分割成child和parent,然後正序輸出一次作為右表,反序輸出一次作為左表,需要注意的是在輸出的value中必須加上左右表差別标志,其中左表辨別符為1,右表辨別符為2,如圖所示。
3、 Reduce過程
Reduce過程首先對輸入< key,values >即<”Lucy”,[“1 Tom”,”2 Mary”,”2 Ben”]>的values值進行周遊擷取到單元資訊(例如”1 Tom”),然後将單元資訊中的使用者ID(例如Tom)按照左表、右表辨別符分别存入到grandChild集合和grandParent集合,最後對grandChild集合和grandParent集合進行笛卡爾積運算得到child與grandParent的關系,并進行輸出,如圖所示。
<pre name="code" class="java">package com.mr5;
import java.io.File;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashSet;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.Reducer;
import com.mr1.EJob;
import com.mr3.CombinationKey;
import com.mr3.DefinedComparator;
import com.mr3.DefinedGroupSort;
import com.mr3.DefinedPartition;
import com.mr3.SecondSortMR;
import com.mr3.SecondSortMR.SortMapper;
import com.mr3.SecondSortMR.SortReducer;
/*
* 輸入檔案資料示例:
child parent
Tom Jack
Jack Alice
Jack Jesse
*
* 輸出結果:
* Tom Alice
* Tom Jesse
*
*
* */
public class SingletonTableJoin {
public static class JoinMapper extends Mapper<LongWritable, Text, Text, Text>{
Text child=new Text();
Text parent=new Text();
String line="";
@Override
protected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {
if (value!=null) {
line=value.toString();
String[] values=line.split(" ");
if (values.length>=2) {
child.set(values[0]);
parent.set("parent "+values[1]);
context.write(child, parent);
parent.set(values[1]);
child.set("child "+values[0]);
context.write(parent, child);
}
}
}
}
public static class JoinReducer extends Reducer<Text,Text,Text,Text>{
String value="";
HashSet<String> childs=new HashSet<>();
HashSet<String> parents=new HashSet<>();
Text child=new Text();
Text parent=new Text();
@Override
protected void reduce(Text key, Iterable<Text> values,Context context)throws IOException, InterruptedException {
for (Text text : values) {
value=text.toString();
if (value.contains("child")) {
childs.add(value.split(" ")[1]);
}
if (value.contains("parent")) {
parents.add(value.split(" ")[1]);
}
}
if (childs.size()!=0&&parents.size()!=0) {
for (String str1 : childs) {
child.set(str1);
for (String str2 : parents) {
parent.set(" "+str2);
context.write(child, parent);
}
}
}
childs.clear();
parents.clear();
}
}
public static void main(String[] args) throws Exception {
File jarFile = EJob.createTempJar("bin");
ClassLoader classLoader = EJob.getClassLoader();
Thread.currentThread().setContextClassLoader(classLoader);
Configuration conf = new Configuration(true);
conf.set("fs.default.name", "hdfs://192.168.56.111:9000");
conf.set("hadoop.job.user", "root");
conf.set("mapreduce.framework.name", "yarn");
conf.set("mapreduce.jobtracker.address", "192.168.56.111:9001");
conf.set("yarn.resourcemanager.hostname", "192.168.56.111");
conf.set("yarn.resourcemanager.admin.address", "192.168.56.111:8033");
conf.set("yarn.resourcemanager.address", "192.168.56.111:8032");
conf.set("yarn.resourcemanager.resource-tracker.address","192.168.56.111:8031");
conf.set("yarn.resourcemanager.scheduler.address","192.168.56.111:8030");
conf.setBoolean("mapreduce.app-submission.cross-platform", true);
String[] otherArgs = new String[2];
otherArgs[0] = "hdfs://192.168.56.111:9000/testdata/SingletonTableJoin.txt";// 計算原檔案目錄,需提前在裡面存入檔案
String time = new SimpleDateFormat("yyyyMMddHHmmss").format(new Date());
otherArgs[1] = "hdfs://192.168.56.111:9000/output/output" + time;// 計算後的計算結果存儲目錄,每次程式執行的結果目錄不能相同,是以添加時間标簽
Job job = new Job(conf, "SingletonTableJoin job");
job.setJarByClass(SingletonTableJoin.class);
((JobConf) job.getConfiguration()).setJar(jarFile.toString());// 環境變量調用,添加此句則可在eclipse中直接送出mapreduce任務,如果将該java檔案打成jar包,需要将該句注釋掉,否則在執行時反而找不到環境變量
System.out.println("Job start!");
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
job.setMapperClass(JoinMapper.class);
job.setReducerClass(JoinReducer.class);
//設定reduce的輸出key和value類型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
if (job.waitForCompletion(true)) {
System.out.println("ok!");
} else {
System.out.println("error!");
System.exit(0);
}
}
}