天天看點

Hadoop MapReduce程式設計 API入門系列之FOF(Fund of Fund)

Hadoop MapReduce程式設計 API入門系列之FOF(Fund of Fund)
Hadoop MapReduce程式設計 API入門系列之FOF(Fund of Fund)
Hadoop MapReduce程式設計 API入門系列之FOF(Fund of Fund)
Hadoop MapReduce程式設計 API入門系列之FOF(Fund of Fund)
Hadoop MapReduce程式設計 API入門系列之FOF(Fund of Fund)
Hadoop MapReduce程式設計 API入門系列之FOF(Fund of Fund)
Hadoop MapReduce程式設計 API入門系列之FOF(Fund of Fund)
Hadoop MapReduce程式設計 API入門系列之FOF(Fund of Fund)
Hadoop MapReduce程式設計 API入門系列之FOF(Fund of Fund)
Hadoop MapReduce程式設計 API入門系列之FOF(Fund of Fund)

代碼

package zhouls.bigdata.myMapReduce.friend;

import org.apache.hadoop.io.Text;

public class Fof extends Text{//自定義Fof,表示f1和f2關系

public Fof(){//無參構造

super();

}

public Fof(String a,String b){//有參構造

super(getFof(a, b));

public static String getFof(String a,String b){

int r =a.compareTo(b);

if(r<0){

return a+"\t"+b;

}else{

return b+"\t"+a;

import java.io.DataInput;

import java.io.DataOutput;

import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;

public class User implements WritableComparable<User>{

//WritableComparable,實作這個方法,要多很多

//readFields是讀入,write是寫出

private String uname;

private int friendsCount;

public String getUname() {

return uname;

public void setUname(String uname) {

this.uname = uname;

public int getFriendsCount() {

return friendsCount;

public void setFriendsCount(int friendsCount) {

this.friendsCount = friendsCount;

}//這一大段的get和set,可以右鍵,source,産生get和set,自動生成。

public User() {//無參構造

public User(String uname,int friendsCount){//有參構造

this.uname=uname;

this.friendsCount=friendsCount;

public void write(DataOutput out) throws IOException { //序列化

out.writeUTF(uname);

out.writeInt(friendsCount);

public void readFields(DataInput in) throws IOException {//反序列化

this.uname=in.readUTF();

this.friendsCount=in.readInt();

public int compareTo(User o) {//核心

int result = this.uname.compareTo(o.getUname());

if(result==0){

return Integer.compare(this.friendsCount, o.getFriendsCount());

return result;

import org.apache.hadoop.io.WritableComparator;

public class FoFSort extends WritableComparator{

public FoFSort() {//把自定義的User,傳進了

super(User.class,true);

public int compare(WritableComparable a, WritableComparable b) {//排序核心

User u1 =(User) a;

User u2=(User) b;

int result =u1.getUname().compareTo(u2.getUname());

return -Integer.compare(u1.getFriendsCount(), u2.getFriendsCount());

public class FoFGroup extends WritableComparator{

public FoFGroup() {//把自定義的User,傳進了

public int compare(WritableComparable a, WritableComparable b) {//分組核心

return u1.getUname().compareTo(u2.getUname());

import java.text.SimpleDateFormat;

import java.util.Calendar;

import java.util.Date;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.FileSystem;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.DoubleWritable;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.util.StringUtils;

public class RunJob {

// 小明 老王 如花 林志玲

// 老王 小明 鳳姐 排序在FoFSort.java

// 如花 小明 李剛 鳳姐

// 林志玲 小明 李剛 鳳姐 郭美美 分組在FoFGroup.java

// 李剛 如花 鳳姐 林志玲

// 郭美美 鳳姐 林志玲

// 鳳姐 如花 老王 林志玲 郭美美

public static void main(String[] args) {

Configuration config =new Configuration();

// config.set("fs.defaultFS", "hdfs://HadoopMaster:9000");

// config.set("yarn.resourcemanager.hostname", "HadoopMaster");

// config.set("mapred.jar", "C:\\Users\\Administrator\\Desktop\\wc.jar");

// config.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator", ",");//預設分隔符是制表符"\t",這裡自定義,如","

if(run1(config)){

run2(config);//設定兩個run,即兩個mr。

public static void run2(Configuration config) {

try {

FileSystem fs =FileSystem.get(config);

Job job =Job.getInstance(config);

job.setJarByClass(RunJob.class);

job.setJobName("fof2");

job.setMapperClass(SortMapper.class);

job.setReducerClass(SortReducer.class);

job.setSortComparatorClass(FoFSort.class);

job.setGroupingComparatorClass(FoFGroup.class);

job.setMapOutputKeyClass(User.class);

job.setMapOutputValueClass(User.class);

job.setInputFormatClass(KeyValueTextInputFormat.class);

// //設定MR執行的輸入檔案

// FileInputFormat.addInputPath(job, new Path("hdfs://HadoopMaster:9000/f1"));

//

// //該目錄表示MR執行之後的結果資料所在目錄,必須不能存在

// Path outputPath=new Path("hdfs://HadoopMaster:9000/out/f2");

//設定MR執行的輸入檔案

FileInputFormat.addInputPath(job, new Path("./out/f1"));

//該目錄表示MR執行之後的結果資料所在目錄,必須不能存在

Path outputPath=new Path("./out/f2");

if(fs.exists(outputPath)){

fs.delete(outputPath, true);

FileOutputFormat.setOutputPath(job, outputPath);

boolean f =job.waitForCompletion(true);

if(f){

System.out.println("job 成功執行");

} catch (Exception e) {

e.printStackTrace();

public static boolean run1(Configuration config) {

job.setJobName("friend");

job.setMapperClass(FofMapper.class);

job.setReducerClass(FofReducer.class);

job.setMapOutputKeyClass(Fof.class);

job.setMapOutputValueClass(IntWritable.class);

// FileInputFormat.addInputPath(job, new Path("hdfs://HadoopMaster:9000/friend/friend.txt"));//下有friend.txt

// Path outpath =new Path("hdfs://HadoopMaster:9000/out/f1");

FileInputFormat.addInputPath(job, new Path("./data/friend/friend.txt"));//下有friend.txt

Path outpath =new Path("./out/f1");

if(fs.exists(outpath)){

fs.delete(outpath, true);

FileOutputFormat.setOutputPath(job, outpath);

boolean f= job.waitForCompletion(true);

return f;

return false;

static class FofMapper extends Mapper<Text, Text, Fof, IntWritable>{

protected void map(Text key, Text value,

Context context)

throws IOException, InterruptedException {

String user =key.toString();

String[] friends =StringUtils.split(value.toString(), '\t');

for (int i = 0; i < friends.length; i++) {

String f1 = friends[i];

Fof ofof =new Fof(user, f1);

context.write(ofof, new IntWritable(0));

for (int j = i+1; j < friends.length; j++) {

String f2 = friends[j];

Fof fof =new Fof(f1, f2);

context.write(fof, new IntWritable(1));

static class FofReducer extends Reducer<Fof, IntWritable, Fof, IntWritable>{

protected void reduce(Fof arg0, Iterable<IntWritable> arg1,

Context arg2)

int sum =0;

boolean f =true;

for(IntWritable i: arg1){

if(i.get()==0){

f=false;

break;

sum=sum+i.get();

arg2.write(arg0, new IntWritable(sum));

static class SortMapper extends Mapper<Text, Text, User, User>{

String[] args=StringUtils.split(value.toString(),'\t');

String other=args[0];

int friendsCount =Integer.parseInt(args[1]);

context.write(new User(key.toString(),friendsCount), new User(other,friendsCount));

context.write(new User(other,friendsCount), new User(key.toString(),friendsCount));

static class SortReducer extends Reducer<User, User, Text, Text>{

protected void reduce(User arg0, Iterable<User> arg1,

String user =arg0.getUname();

StringBuffer sb =new StringBuffer();

for(User u: arg1 ){

sb.append(u.getUname()+":"+u.getFriendsCount());

sb.append(",");

arg2.write(new Text(user), new Text(sb.toString()));

本文轉自大資料躺過的坑部落格園部落格,原文連結:http://www.cnblogs.com/zlslch/p/6166095.html,如需轉載請自行聯系原作者