天天看點

mapreduce操作單表關聯資料

mapreduuce操作單表關聯資料,資料結構如下

                                                             child                                    parent

                                                             Tom                                     Lucy

                                                             Tom                                     Jack

                                                             Jone                                    Lucy

                                                             Jone                                   Jack

                                                             Lucy                                      Mary

                                                             Lucy                                       Ben

                                                             Jack                                      Alice

                                                             Jack                                    Jesse

                                                             Terry                                    Alice

                                                             Terry                                     Jesse

                                                             Philip                                    Terry

                                                             Philip                                   Alma

                                                             Mark                                     Terry

                                                             Mark                                       Alma

要求:

          要求從 給出的資料中 尋找所 關心的資料,它是對 原始資料所包含資訊的 挖掘。下面進入這個執行個體。

          執行個體中給出 child-parent(孩子——父母)表,要求輸出 grandchild-grandparent(孫子——爺奶)表

圖解:

mapreduce操作單表關聯資料

設計思路:

          分析這個執行個體,顯然需要進行單表連接配接,連接配接的是 左表的 parent 列和 右表的 child 列,且 左表和 右表是

         同一個表。連接配接結果中 除去連接配接的兩列就是所需要的結果 “grandchild--grandparent”表。MapReduce 解

         決這個執行個體, 首先應該考慮如何實作 表的 自連接配接; 其次就是 連接配接列的 設定;最後是 結果的 整理。考慮

         到 MapReduce 的 shuffle 過程會将相同的 key 會連接配接在一起,是以可以将 map 結果的 key 設定成 待連接配接

        的 列,然後列中相同的值就自然會連接配接在一起了。再與最開始的分析聯系起來:要連接配接的是左表的 parent

        列和右表的 child 列,且左表和右表是同一個表,是以在 map階段将 讀入資料 分割成 child 和 parent 之後,

        會将 parent 設定成 key,child 設定成 value進行輸出,并作為 左表;再将對 同一對 child 和 parent 中的 child

       設定成 key,parent 設定成value 進行輸出,作為 右表。為了 區分輸出中的 左右表,需要在輸出的 value 中再

       再加上左右表左右表的 資訊,比如在 value 的 String 最開始處加上符 字元 1 表示 左表,加上符 字元 2 表示 右

      表。這樣在 map 的結果中就形成了左表和右表,然後在 shuffle 過程中完成連接配接。reduce 接收到連接配接的結果,

      其中每個 key 的 value-list 就包含了“grandchild--grandparent”關系。取出每個key 的 value-list 進行解析,将 左表

      中的 child 放入一個 數組, 右表中的 parent 放入一個 數組,然後對 兩個數組求笛卡爾積就是最後的結果了。

代碼:

package com.hebut.mr;

import java.io.IOException;
import java.util.*;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class STjoin {

	public static class STjoinMap extends Mapper<Object, Text, Text, Text> {

		@Override
		protected void map(Object key, Text value,
				Mapper<Object, Text, Text, Text>.Context context)
				throws IOException, InterruptedException {

			String childname = new String();
			String parentname = new String();
			String relationtype = new String();

			//  輸入的一行預處理文本
			StringTokenizer itr = new StringTokenizer(value.toString());
			String[] values = new String[2];
			int i = 0;
			while (itr.hasMoreTokens()) {
				values[i] = itr.nextToken();
				i++;
			}

			if (values[0].compareTo("child") != 0) {
				childname = values[0];
				parentname = values[1];
				//  輸出左表
				relationtype = "1";
				context.write(new Text(values[1]), new Text(relationtype + "+"
						+ childname + "+" + parentname));
				//  輸出右表
				relationtype = "2";
				context.write(new Text(values[0]), new Text(relationtype + "+"
						+ childname + "+" + parentname));
			}

		}
	}

	public static int time = 0;

	public static class STjoinReduce extends Reducer<Text, Text, Text, Text> {

		@Override
		protected void reduce(Text key, Iterable<Text> values,
				Reducer<Text, Text, Text, Text>.Context context)
				throws IOException, InterruptedException {
			// TODO Auto-generated method stub
			//  輸出表頭
			if(0==time){
				context.write(new Text("grandchild"), new Text("grandparent"));
				time++;
				
			}
			int grandchildnum = 0;
			String[] grandchild = new String[10];
			int grandparentnum = 0;
			String[] grandparent = new String[10];
			Iterator<Text> ite = values.iterator();
			while (ite.hasNext()) {
				String record = ite.next().toString();
				int len = record.length();
				int i = 2;
				if (0 == len) {
					continue;
				}
				//  取得左右表辨別
				char relationtype = record.charAt(0);
				//  定義孩子和父母變量
				String childname = new String();
				String parentname = new String();
				//  擷取 value‐list 中 value 的 child
				while (record.charAt(i) != '+') {
					childname += record.charAt(i);
					i++;
				}
				i = i + 1;
				//  擷取 value‐list 中 value 的 parent
				while (i < len) {
					parentname += record.charAt(i);
					i++;
				}
				//  左表,取出child放入grandchildren
				if ('1' == relationtype) {
					grandchild[grandchildnum] = childname;
					grandchildnum++;
				}
				//  右表,取出 parent 放入 grandparent
				if ('2' == relationtype) {
					grandparent[grandparentnum] = parentname;
					grandparentnum++;
				}
			
			}
			 if (0 != grandchildnum && 0 != grandparentnum) {
				 for (int m = 0; m < grandchildnum;m++) {
					 for(int n=0;n<grandparentnum;n++){

						 context.write(new Text(grandchild[m]), new Text(grandparent[n]));
						 
					 }
				 }
			 }		 
					 
			 } 
				 
			 }
		        
	public static void main(String[] args) throws IOException,
			ClassNotFoundException, InterruptedException {

		Job job = new Job();
		job.setJarByClass(STjoin.class);
		job.setJobName("STjoin");
		FileInputFormat.addInputPath(job, new Path(
				"hdfs://hadoop:9000/user/hadoop/STjoin_in/"));
		FileOutputFormat.setOutputPath(job, new Path(
				"hdfs://hadoop:9000/user/hadoop/STjoin_out/"));
		//  設定 Map 和 Reduce 處理類
		job.setMapperClass(STjoin.STjoinMap.class);
		job.setReducerClass(STjoin.STjoinReduce.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);
		System.exit(job.waitForCompletion(true) ? 0 : 1);

	}
}
           

繼續閱讀