天天看點

HADOOP小練習——多表關聯

輸入檔案

address.txt factory.txt:
1 Beijing Beijing Red Star 1
2 Guangzhou Shenzhen Thunder 3
3 Shenzhen Guangzhou Honda 2
4 Xian Beijing Rising 1
Guangzhou Development Bank 2
Tencent 3
Back of Beijing 1

輸出檔案

factory city
Beijing Red Star Beijing
Shenzhen Thunder Shenzhen
Guangzhou Honda Guangzhou
Beijing Rising Beijing
Guangzhou DevelopmentBank Guangzhou
Tencent Shenzhen
Back of Beijing Beijing

設計思路

1:建立兩個list,其中一個list存放factory,另一個存放city。

2:利用雙重for循環,以編号相等的條件下,逐一篩選。

3:打出表頭,輸出。

代碼

mapper

package FindConnection;

import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class Find2mapper extends Mapper<LongWritable, Text, Text, Text> {

	@Override
	protected void map(LongWritable key, Text value,Context context)
			throws IOException, InterruptedException {
			 String line = value.toString();
			 String word[] = line.split("\n");
			 char at = word[0].charAt(0);  
			 if(at>'0'&&at<'9')  
			 {
				 context.write(new Text(word[0]), new Text("1"));
			 }
			 else{
				 context.write(new Text(word[0]), new Text("2"));
			 }
			 
			 


	}

}


           

reducer

package FindConnection;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class Find2Reducer extends Reducer<Text, Text, Text, Text> {
			int x = 0;
			int i = 0;
			int y = 0;
			int time = 0;
			List<String> list1 = new ArrayList<String>();
			List<String> list2 = new ArrayList<String>();
	@Override
	protected void reduce(Text arg0, Iterable<Text> arg1,Context arg2) throws IOException,
			InterruptedException {
		if(time==0)
		{
			arg2.write(new Text("city---"), new Text("---factory"));  //輸出表頭
			time++;
		}
		       int flag = 0;
			for(Text flg:arg1)
			{
				String flgs = flg.toString();
				 flag = Integer.parseInt(flgs);
			}
			x++;
			String word = arg0.toString();
			if(flag==1)
			{
				list1.add(word);
				i++;
			}
			else{
				list2.add(word);
				y++;
			}			
			if(x==11)  //當x=11的時候,是全部存入兩個list表的時候。下面開始逐一篩選
			{
				for(int p=0;p<i;p++)
				{
					for(int m=0;m<y;m++)
					{
						String word1 = list1.get(p);
						String word2 = list2.get(m);
						String word11[] = word1.split(" ");
						String word22[] = word2.split(" ");
						int n = word22.length;
						if(word11[0].equals(word22[n-1]))
						{
							String newword1 = new String();
							for(int u=1;u<word11.length;u++)
							{
								newword1 = newword1 +" "+word11[u];
							}
							String newword2 = new String();
							for(int o=0;o<word22.length-1;o++)
							{
								newword2 = newword2 +" "+word22[o];
							}
							arg2.write(new Text(newword1+"---"), new Text("---"+newword2));
						}
					}
				}
			}
	}

}

           

main

package FindConnection;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;



public class Find2Main {

	public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
		Configuration conf = new Configuration();
		Job job = Job.getInstance(conf);
		
		job.setJarByClass(Find2Main.class);
	
		job.setMapperClass(Find2mapper.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(Text.class);
		
		job.setReducerClass(Find2Reducer.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);
		
		FileInputFormat.addInputPath(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));
		
		boolean xx = job.waitForCompletion(true);

	}

}


           

繼續閱讀