問題需求:下面是使用者的好友關系清單,每一行代表一個使用者和他的好友清單。求出任意兩個人之間的共同好友都有誰(好友關系是單向的,也就是說1的好友裡面有2,但是2的好友裡面不一定有1)。
1 2,3,4,5,6
3 1,5,6,7,9
2 3,5,7,9,11,12
思路:
1.首先把使用者作為value,他好友清單拆分後作為key,類似倒排操作,如下:
2 1
3 1
4 1
5 1
6 1
1 3
5 3
6 3
7 3
9 3
3 2
5 2
7 2
9 2
11 2
12 2
2.進行reduceByKey操作,value需要進行排序,這樣value的任意兩個人都有共同好友為key的那個人:
11 2
1 3
7 2,3
9 2,3
4 1
6 1,3
3 1,2
5 1,2,3
2 1
12 2
3.如果求兩兩好友就雙重for循環value作為下次的key,三個的共同好友原理一樣,三層for循環就行:
1_2 3
1_2 5
2_3 9
2_3 7
1_3 6
1_3 5
2_3 5
4.再次進行reduceByKey操作就可以得到共同好友了。
1_3 5,6
2_3 7,5,9
1_2 3,5
Mapreduce實作代碼:

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class FindFrients1 {
public static class FindFrients1Mapper extends Mapper<LongWritable, Text, Text, Text> {
/**
* map輸出的key:Text guid
* map輸出的value:Text 0_1 和 1_1 是否點選_曝光
*/
Text keyOut = new Text();
Text valueOut = new Text();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String uid = line.split("\t")[0];
String[] friends = line.split("\t")[1].split(",");
for (String fre: friends) {
context.write(new Text(fre),new Text(uid));
}
}
}
public static class FindFrients1Reducer extends Reducer<Text, Text, Text, Text> {
/**
* reduce輸出的key: Text
* reduce輸出的value: NullWritable
*/
Text valueOut = new Text();
@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
// guid [0_1,0_1,1_1] ---> guid 1 3
StringBuilder sb = new StringBuilder();
for (Text fre: values) {
sb.append(fre.toString()).append(",");
}
String re=sb.substring(0,sb.length()-1);
context.write(key,new Text(re));
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "FindFrients1");
job.setMapperClass(FindFrients1Mapper.class);
job.setReducerClass(FindFrients1Reducer.class);
// //設定combiner運作的reducer類【重要】
// job.setCombinerClass(AvgrageScoreCombiner.AvgScoreCombiner.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputValueClass(Text.class);
job.setNumReduceTasks(1);
FileInputFormat.addInputPath(job, new Path("E:\\tmp\\badou\\data\\hadoop_test\\findFriend\\test_friend.txt"));
Path outputPath = new Path("E:\\tmp\\badou\\data\\hadoop_test\\findFriend\\output1");
FileSystem fs = FileSystem.get(conf);
if (fs.exists(outputPath)) {
fs.delete(outputPath, true);//遞歸删除
System.out.println("outputpath:【" + outputPath.toString() + "】 delete success!");
}
FileOutputFormat.setOutputPath(job, outputPath);
boolean isSuccess = job.waitForCompletion(true);
int status = isSuccess ? 0 : 1;
System.exit(status);
}
}
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
import java.util.Arrays;
public class FindFrients2 {
public static class FindFrients2Mapper extends Mapper<LongWritable, Text, Text, Text> {
/**
* map輸出的key:Text guid
* map輸出的value:Text 0_1 和 1_1 是否點選_曝光
*/
Text keyOut = new Text();
Text valueOut = new Text();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
/// 孫初丹 劉領會_李四_王五
String line = value.toString();
String[] lines= line.split("\t");
//拿到朋友
String uid = lines[0];
//拿到朋友
String[] fri = lines[1].split(",");
//針對朋友一個排序
// [劉領會,李四,王五]
// 劉靈會,李四 孫初丹
Arrays.sort(fri);
for (int i = 0; i <fri.length-1 ; i++) {
for (int j = i+1; j <fri.length ; j++) {
context.write(new Text(fri[i]+"_"+fri[j]),new Text(uid));
}
}
}
}
public static class FindFrients2Reducer extends Reducer<Text, Text, Text, Text> {
/**
* reduce輸出的key: Text
* reduce輸出的value: NullWritable
*/
Text valueOut = new Text();
@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
// guid [0_1,0_1,1_1] ---> guid 1 3
StringBuilder sb = new StringBuilder();
for (Text fre:values) {
sb.append(fre+",");
}
String re = sb.substring(0,sb.length()-1);
context.write(new Text(key),new Text(re));
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "FindFrients2");
job.setMapperClass(FindFrients2Mapper.class);
job.setReducerClass(FindFrients2Reducer.class);
// //設定combiner運作的reducer類【重要】
// job.setCombinerClass(AvgrageScoreCombiner.AvgScoreCombiner.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputValueClass(Text.class);
job.setNumReduceTasks(1);
FileInputFormat.addInputPath(job, new Path("E:\\tmp\\badou\\data\\hadoop_test\\findFriend\\output1\\"));
Path outputPath = new Path("E:\\tmp\\badou\\data\\hadoop_test\\findFriend\\output2");
FileSystem fs = FileSystem.get(conf);
if (fs.exists(outputPath)) {
fs.delete(outputPath, true);//遞歸删除
System.out.println("outputpath:【" + outputPath.toString() + "】 delete success!");
}
FileOutputFormat.setOutputPath(job, outputPath);
boolean isSuccess = job.waitForCompletion(true);
int status = isSuccess ? 0 : 1;
System.exit(status);
}
}
View Code
Spark實作代碼:

package com.badou.function
import org.apache.spark.sql.{DataFrame, SparkSession}
import scala.collection.mutable.ListBuffer
object Friends {
def main(args: Array[String]): Unit = {
val sparkSession = SparkSession
.builder()
.master("local[*]")
.appName("feat_eg")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
// .config("hive.metastore.uris",
// "thrift://"+"192.168.10.42"+":9083")
// .config("spark.storage.memoryFraction",0.6)
.enableHiveSupport()
.getOrCreate()
val orcPath = "E:\\tmp\\badou\\data\\hadoop_test\\findFriend\\test_friend.txt"
var userLogDF: DataFrame = sparkSession.read.format("csv").option("sep", "\t").load(orcPath)
userLogDF.createOrReplaceTempView("user_friend")
userLogDF = sparkSession.sql("SELECT explode(split(_c1,',')) AS friend,_c0 AS uid FROM user_friend")
userLogDF.createOrReplaceTempView("user_friends")
userLogDF = sparkSession.sql("SELECT friend,concat_ws(',',collect_list(uid)) AS uid FROM user_friends GROUP BY friend")
// userLogDF.show()
val rdd = userLogDF.rdd
val rdd2 = rdd.map(f => {
val str = f.getString(0)
val uidSort = f.getString(1).split(",").sortWith(_ < _)
var sb1 =new StringBuffer()
for(uid <- uidSort){
sb1.append(uid+",")
}
val uStr = sb1.substring(0,sb1.length()-1)
// println("map:"+str+" uid:"+sb1)
(str,uStr)
})
// rdd2.foreach(f=>println(f._1+" "+f._2))
val value = rdd2.mapPartitions(f => {
val list = f.toList
var sb2 = new ListBuffer[(String,String)]
for (one <- list) {
val value1 = one._1
val list1 = one._2.split(",").toList
if(list1.length>1){
for (i <- 0 to list1.length - 2) {
for (j <- 1 to list1.length-1) {
if(i!=j){
sb2.append((list1(i) + "_" + list1(j) , value1))
}
}
}
}
}
sb2.toIterator
})
value.foreach(f=>println(f._1+" "+f._2))
import sparkSession.implicits._
var friendDF = value.toDF("friends", "same_friend")
friendDF.createOrReplaceTempView("user_friends")
friendDF = sparkSession.sql("SELECT friends,concat_ws(',',collect_list(same_friend)) AS same_friend FROM user_friends GROUP BY friends")
friendDF.show(false)
}
}
5、二度好友:雙方有一個以上共同的好友,這時朋友網可以計算出你們有幾個共同的好友并且呈現數字給你(好友的好友,但并不是直接好友)。你們的關系是: 你->朋友->陌生人
目前使用者的好友排除掉兩兩共同的好友就是二度好友。
原始好友處理成如下:
1_3 2,3,4,5,6
1_2 2,3,4,5,6
3_1 1,5,6,7,9
3_2 1,5,6,7,9
2_1 3,5,7,9,11,12
2_3 3,5,7,9,11,12
共同好友第4步的結果處理成:
1_3 5,6
2_3 7,5,9
1_2 3,5
2_1 3,5
3_2 7,5,9
3_1 5,6
這樣根據相同的key,排除掉共同好友就是二度好友
1_3 2,3,4
1_2 2,4,6
3_1 1,7,9
3_2 1,6
2_1 7,9,11,12
2_3 3,11,12
Spark實作共同好友和二度好友的全部代碼:

import java.util
import org.apache.spark.sql.{DataFrame, SparkSession}
import scala.collection.mutable.ListBuffer
object Friends {
def main(args: Array[String]): Unit = {
val sparkSession = SparkSession
.builder()
.master("local[*]")
.appName("feat_eg")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
// .config("hive.metastore.uris",
// "thrift://"+"192.168.10.42"+":9083")
// .config("spark.storage.memoryFraction",0.6)
.enableHiveSupport()
.getOrCreate()
val orcPath = "E:\\tmp\\badou\\data\\hadoop_test\\findFriend\\test_friend.txt"
var userLogDF: DataFrame = sparkSession.read.format("csv").option("sep", "\t").load(orcPath)
userLogDF.createOrReplaceTempView("user_friend")
var friendODF: DataFrame = sparkSession.sql("SELECT _c0 AS uid,_c1 AS friend FROM user_friend")
friendODF.createOrReplaceTempView("user_friend")
userLogDF = sparkSession.sql("SELECT explode(split(friend,',')) AS friend,uid AS uid FROM user_friend")
userLogDF.createOrReplaceTempView("user_friends")
userLogDF = sparkSession.sql("SELECT friend,concat_ws(',',collect_list(uid)) AS uid FROM user_friends GROUP BY friend")
// userLogDF.show()
val rdd = userLogDF.rdd
val rdd2 = rdd.map(f => {
val str = f.getString(0)
val uidSort = f.getString(1).split(",").sortWith(_ < _)
var sb1 =new StringBuffer()
for(uid <- uidSort){
sb1.append(uid+",")
}
val uStr = sb1.substring(0,sb1.length()-1)
// println("map:"+str+" uid:"+sb1)
(str,uStr)
})
// rdd2.foreach(f=>println(f._1+" "+f._2))
val value = rdd2.mapPartitions(f => {
val list = f.toList
var sb2 = new ListBuffer[(String,String)]
for (one <- list) {
val value1 = one._1
val list1 = one._2.split(",").toList
if(list1.length>1){
for (i <- 0 to list1.length - 2) {
for (j <- 1 to list1.length-1) {
if(i!=j){
sb2.append((list1(i) + "_" + list1(j) , value1))
sb2.append((list1(j) + "_" + list1(i) , value1))
}
}
}
}
}
sb2.toIterator
})
value.foreach(f=>println(f._1+" "+f._2))
import sparkSession.implicits._
var commonFriendDF = value.toDF("friends", "same_friend")
commonFriendDF.createOrReplaceTempView("user_friends")
commonFriendDF = sparkSession.sql("SELECT friends,concat_ws(',',collect_list(same_friend)) AS same_friend FROM user_friends GROUP BY friends")
commonFriendDF.show(false)
//下面是求二度好友
friendODF.show(false)
friendODF.createOrReplaceTempView("user_friend")
var allUidDF =sparkSession.sql("SELECT uid,friend,'1' AS cout FROM user_friend ")
allUidDF.createOrReplaceTempView("user_friend")
allUidDF = sparkSession.sql("SELECT concat_ws(',',collect_list(uid)) AS all_uid FROM user_friend GROUP BY cout")
friendODF.createOrReplaceTempView("user_friend")
allUidDF.createOrReplaceTempView("all_uid")
allUidDF = sparkSession.sql("SELECT t1.*,t2.* FROM user_friend t1 CROSS JOIN all_uid t2 ")
allUidDF.show(false)
val allUidRdd = allUidDF.rdd
var value1 = allUidRdd.mapPartitions(f => {
val list = f.toList
var sb2 = new ListBuffer[(String, String)]
for (one <- list) {
val uid = one.getString(0)
val friend = one.getString(1)
val all_uid = one.getString(2).split(",")
for(oneid <- all_uid){
if(!uid.equals(oneid)){
sb2.append((uid + "_" + oneid , friend))
}
}
}
sb2.toIterator
})
value1.foreach(f=>println(f._1+"allUidRdd"+f._2))
// import sparkSession.implicits._
// var friendsDF = value1.toDF("uids", "friends")
// friendsDF.createOrReplaceTempView("all_uid")
val commonFriendBro = commonFriendDF.collectAsList()
val commonFriend = sparkSession.sparkContext.broadcast(commonFriendBro)
val value2 = value1.map(f => {
var key =""
var values =""
var sb1 =new StringBuffer()
var commonFriendValue = commonFriend.value
import scala.collection.JavaConversions._ //用java集合的隐式轉換
for (oneSet <- commonFriendValue) {
val str0 = oneSet.getString(0)
val commonFriends = oneSet.getString(1)
if (f._1.equals(str0)) {
val stringArr = f._2.split(",")
var list1 = new util.ArrayList[String]()
for (str <- stringArr) {
list1.add(str)
}
val commonFriArr = commonFriends.split(",")
var listCom = new util.ArrayList[String]()
for (arr <- commonFriArr) {
listCom.add(arr)
}
list1.removeAll(listCom);
for (li <- list1) {
sb1.append(li+",")
}
key=str0
values = sb1.substring(0,sb1.length()-1)
}
}
(key,values)
})
value2.foreach(f=>println(f._1+" "+f._2))
}
}
參考:1、MapReduce簡單實踐:兩步實作查找共同好友
2、兩個MapReduce實作計算出使用者間的共同好友
3、Spark 計算人員二度關系