天天看點

尋找共同好友(hadoop解決方案)

測試資料

100,200 300 400 500 600 700 800
200,100 300 400 500 700
300,100 200 400 500 600 700
400,100 200 300 700 800
500,100 200 300 700 800
600,100 300
700,100 200 300 400 500 800
800,100 400 500 700
           

注意:測試資料中,逗号“,”前為一個使用者的辨別,逗号“,”後為該使用者的好友。而且,使用者與好友之間是互為好友的,例如:100的使用者中有好友200,則200的使用者中一定有好友100

分析

要找到共同好友,需要把每兩個使用者的好友都進行比較。map階段就是把每兩個使用者的好友都找出來;reducer階段就是找出每兩個使用者的共同好友。

map階段:把每一行的資料中,使用者和他的一個好友作為key,以他的所有好友作為value,例如使用者100,他的map輸出為:

100,200 200 300 400 500 600 700 800
100,300 200 300 400 500 600 700 800
100,400 200 300 400 500 600 700 800
100,500 200 300 400 500 600 700 800
100,600 200 300 400 500 600 700 800
100,700 200 300 400 500 600 700 800
100,800 200 300 400 500 600 700 800
           

最終,map的輸出為:(key值的兩個使用者辨別按從小到大的順序排列)

,       
,       
,       
,       
,       
,       
,       

,     
,     
,     
,     
,     

,      
,      
,      
,      
,      
,      
.
..
...
           

此時可以看出,存在相同的key值,而對應的value分别是key值中兩個使用者的好友,這樣就建構起兩個使用者的好友在一條資料裡,此時,進行shuffle過程中的分組,合并,不需要我們來寫代碼完成,預設的方式即可。這樣,reduce的最終輸入為:

, (      ),(    )
, (      ),(     )
.
..
...
           

reduce階段:比較每一行資料中的兩組值,找出相同辨別的使用者,即為key值中兩個使用者的共同好友

解決方案

map階段

import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class SameFrMapper extends Mapper<LongWritable, Text, Text, Text>{

    @Override
    protected void map(LongWritable key, Text value,Context context)
        throws IOException, InterruptedException {
        String line = value.toString();
        String[] st = line.split(",");
        String[] friends = st[].split("\t");
        String friendsline = st[].toString();
        for (int i = ; i < friends.length; i++) {
            context.write(bulidSortedKey(st[],friends[i]), new Text(friendsline));
        }
    }
    //将比較的兩個使用者由小到大排序
    protected Text bulidSortedKey(String user1,String user2){
        int u1 = Integer.parseInt(user1);
        int u2 = Integer.parseInt(user2);
        if(u1 < u2){
            return new Text(user1 + "," + user2);
        }else{
            return new Text(user2 + "," + user1);
        }
    }
}
           

reduce階段

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class SameFrReducer extends Reducer<Text, Text, Text, Text>{

    @Override
    protected void reduce(Text key, Iterable<Text> value,Context context) throws IOException,
        InterruptedException {
         List<String> list = new ArrayList<String>();
         Set<String> user1set = new TreeSet<String>();
         Set<String> user2set = new TreeSet<String>();
         for (Text t : value) {
             list.add(t.toString());
        }
        String[] user1list = list.get().split("\t");
        String[] user2list = list.get().split("\t");
        for (int i = ; i < user1list.length; i++) {
            user1set.add(user1list[i]);
        }
        for (int i = ; i < user2list.length; i++) {
            user2set.add(user2list[i]);
        }
        Set<String> result = intersect(user1set, user2set);
        Iterator<String> it = result.iterator();
        StringBuffer sb = new StringBuffer();
        while (it.hasNext()) {
            sb.append(it.next().toString()+"\t");           
        }
        context.write(key,new Text(sb.toString()));
    }

    //疊代比較尋找共同好友
    protected Set<String> intersect(Set<String> smallset,Set<String> largeset){
            Set<String> result = new TreeSet<String>();
            //疊代處理小集合來提高性能
            for (String x : smallset) {
                if(largeset.contains(x)){
                    result.add(x);
                }
            }
            return result;
        }
}
           

eclipse內建hadoop插件測試

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import com.bj.samefriends.SameFrMapper;
import com.bj.samefriends.SameFrReducer;

public class JobsRun {
    public static void main(String[] args) {
        Configuration conf = new Configuration();
        conf.set("fs.default.name", "hdfs://node1:9000");
        conf.set("mapred.job.tracker", "node1:9001");
        conf.set("mapred.jar", "C:\\Users\\Administrator\\Desktop\\sameFriends.jar");
        try {
            Job job = new Job(conf);
            job.setJarByClass(JobsRun.class);
            job.setMapperClass(SameFrMapper.class);
            job.setReducerClass(SameFrReducer.class);
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(Text.class);
            job.setNumReduceTasks();

            FileInputFormat.addInputPath(job, new Path("/user/root/input/"));
            FileOutputFormat.setOutputPath(job, new Path("/user/root/output/"));

            System.exit(job.waitForCompletion(true) ?  : );
        } catch (Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
}
           

輸出結果為:

100,200 300 400 500 700 
100,400 200 300 700 800 
100,600 300 
100,800 400 500 700 
200,300 100 400 500 700 
200,500 100 300 700 
200,700 100 300 400 500 
300,400 100 200 700 
300,600 100 
400,700 100 200 300 800 
500,800 100 700 
700,800 100 400 500 
100,300 200 400 500 600 700 
100,500 200 300 700 800 
100,700 200 300 400 500 800 
200,400 100 300 700 
300,500 100 200 700 
300,700 100 200 400 500 
400,800 100 700 
500,700 100 200 300 800 
           

繼續閱讀