天天看點

Mapreduce之購物籃分析

Mapreduce之購物籃分析

購物籃分析是一個流行的資料挖掘技術,在市場營銷中這個技術可以揭示不同商品或商品組之間的相似度

MBA原理

通過MapReduce架構,設計相關的解決方案生成交易的關聯規則,進而查找最常見的商品對

應用領域

  • 信用卡交易分析
  • 電話呼叫模式分析
  • 欺詐識别
  • 電信服務交易分析
  • 大型線上零售商的每日/每周交易分析

樣例輸入

crackers,bread,banana
crackers,coke,butter,coffee
crackers,bread
crackers,bread
crackers,bread
crackers,bread,coffee
butter,coke
butter,coke,bread,crackers
           

樣例輸出

Mapreduce之購物籃分析

mapper階段任務

maper階段的map()函數根據購物籃子中的商品生成如下鍵-值對

[<crackers,icecream>,1]

[<crackers,coke.,1]

但是在程式自動分類過程中會和出現如下現象

購物籃T1:crackers,icecream,coke

購物籃T2:icecream,coke,crackers

根據關聯規則,對于T1會生成如下規則

[(crackers,icecream),1]

[(crackers,coke),1]

[(icecream,coke),1]

對于T2則會生成如下規則

[(icecream,coke),1]

[(icecream,crackers),1]

[(coke,crackers),1]

從中我們可以看到,有六對規則,但是我們發現(crackers,icecream)和(icecream,crackers)是一樣的,在這裡會被分成不同的規則,是以在生成規則之前需要對商品按照字母順序進行排序,就可以避免這個問題

mapper階段編碼

在這個過程中使用快速排序算法對商品進行排序

package com.deng.MBA;
import java.io.IOException;
import java.util.List;
import java.util.ArrayList;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class MBAMapper extends Mapper<LongWritable,Text,Text,IntWritable>{
	int numberOfPairs;
    public static final int DEFAULT_NUMBER_OF_PAIRS=2;
    protected void setup(Context context) throws IOException,InterruptedException{
        this.numberOfPairs=context.getConfiguration().getInt("number.of.pairs",DEFAULT_NUMBER_OF_PAIRS);
    }
    public void map(LongWritable key,Text value,Context context) throws IOException,InterruptedException{
        String line=value.toString();
        List<String> items=convertItems(line);
        if((items==null)||items.isEmpty()){
            return ;
        }
        generateMapperOutput(numberOfPairs,items,context);
    }

    private static List<String> convertItems(String line){
        if((line==null)||line.length()==0){
            return null;
        }
        String[] tokens=line.split(",");
        if((tokens==null)||(tokens.length==0)){
            return null;
        }
        List<String> items=new ArrayList<String>();
        for(String token:tokens){
            if(token!=null){
                items.add(token.trim());
            }
        }
        return items;
    }
    private void generateMapperOutput(int numberOfPairs, List<String> items, Context context)
            throws IOException, InterruptedException{
        List<List<String>> sortedCombinations =Combination.findSortedCombinations(items,numberOfPairs);
        for(List<String> itemsList: sortedCombinations){
            context.write(new Text(itemsList.toString()),new IntWritable(1));
        }
    }
}

           

其中combinations是一個簡單的工具,使用Combinations.generateCombinations(s1,s2,…,sn)方法可以生成給定的集合,如下所示

假設購物籃為{a,b,c}

假設生成具有兩個商品的規則,則分類結果如下所示

[a,b],[a,c],[b,c]

Combination編碼

package com.deng.MBA;

import java.util.List;
import java.util.Arrays;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;

public class Combination {
   public static <T extends Comparable<? super T>> List<List<T>>
    findSortedCombinations(Collection<T> elements,int n){
       List<List<T>> result =new ArrayList<List<T>>();
       if(n==0){
           result.add(new ArrayList<T>());
           return result;
       }
       List<List<T>> combinations=findSortedCombinations(elements,n-1);
       for(List<T> combination:combinations){
           for(T element:elements){
               if(combination.contains(element)){
                   continue;
               }
               List<T> list=new ArrayList<T>();
               list.addAll(combination);

               if(list.contains(element)){
                   continue;
               }

               list.add(element);
               Collections.sort(list);
               if(result.contains(list)){
                   continue;
               }
               result.add(list);
           }
       }
       return result;
   }
    public static void main(String[] args) {
        List<String> elements = Arrays.asList("a", "b", "c", "d", "e");
        List<List<String>> combinations = findSortedCombinations(elements, 2);
        System.out.println(combinations);
    }
}

           

reduce階段任務

這個階段就是對規則的支援度進行統計

reduce階段編碼

package com.deng.MBA;
import java.io.IOException;
import java.io.WriteAbortedException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Reducer;

public class MBAReduce extends Reducer<Text,IntWritable,Text,IntWritable> {
    public void reduce(Text key,Iterable<IntWritable> values,Context context)
        throws IOException,InterruptedException{
        int sum=0;
        for(IntWritable value:values){
            sum+=value.get();
        }
        context.write(key,new IntWritable(sum));
    }
}

           

完整代碼

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;
import java.util.*;

public class MBADriver  {
    public static class Combination {
        public static <T extends Comparable<? super T>> List<List<T>>
        findSortedCombinations(Collection<T> elements, int n){
            List<List<T>> result =new ArrayList<List<T>>();
            if(n==0){
                result.add(new ArrayList<T>());
                return result;
            }
            List<List<T>> combinations=findSortedCombinations(elements,n-1);
            for(List<T> combination:combinations){
                for(T element:elements){
                    if(combination.contains(element)){
                        continue;
                    }
                    List<T> list=new ArrayList<T>();
                    list.addAll(combination);

                    if(list.contains(element)){
                        continue;
                    }

                    list.add(element);
                    Collections.sort(list);
                    if(result.contains(list)){
                        continue;
                    }
                    result.add(list);
                }
            }
            return result;
        }
    }

    public static class MBAMapper extends Mapper<LongWritable,Text,Text,IntWritable> {
        int numberOfPairs;
    	public static final int DEFAULT_NUMBER_OF_PAIRS=2;
    	protected void setup(Context context) throws IOException,InterruptedException{
        	this.numberOfPairs=context.getConfiguration().getInt("number.of.pairs",DEFAULT_NUMBER_OF_PAIRS);
    	}
        public void map(LongWritable key,Text value,Context context) throws IOException,InterruptedException{
            String line=value.toString();
            List<String> items=convertItems(line);
            if((items==null)||items.isEmpty()){
                return ;
            }
            generateMapperOutput(numberOfPairs,items,context);
        }

        private static List<String> convertItems(String line){
            if((line==null)||line.length()==0){
                return null;
            }
            String[] tokens=line.split(",");
            if((tokens==null)||(tokens.length==0)){
                return null;
            }
            List<String> items=new ArrayList<String>();
            for(String token:tokens){
                if(token!=null){
                    items.add(token.trim());
                }
            }
            return items;
        }
        private void generateMapperOutput(int numberOfPairs, List<String> items, Context context)
                throws IOException, InterruptedException{
            List<List<String>> sortedCombinations = com.deng.MBA.Combination.findSortedCombinations(items,numberOfPairs);
            for(List<String> itemsList: sortedCombinations){
                context.write(new Text(itemsList.toString()),new IntWritable(1));
            }
        }
    }

    public static class MBAReduce extends Reducer<Text,IntWritable,Text,IntWritable> {
        public void reduce(Text key,Iterable<IntWritable> values,Context context)
                throws IOException,InterruptedException{
            int sum=0;
            for(IntWritable value:values){
                sum+=value.get();
            }
            context.write(key,new IntWritable(sum));
        }
    }
   public static void main(String[] args) throws Exception{
        FileUtil.deleteDirs("output");
        Configuration conf=new Configuration();
        String[] otherArgs=new String[]{"input/MBA","output","3"};
        if(otherArgs.length!=3){
            System.out.println("參數錯誤");
        }
        Job job=new Job(conf,"MBA");
        job.getConfiguration().setInt("number.of.pairs",Integer.parseInt(otherArgs[2]));
        FileInputFormat.addInputPath(job,new Path(otherArgs[0]));
        FileOutputFormat.setOutputPath(job,new Path(otherArgs[1]));
        job.setJarByClass(MBADriver.class);
        job.setMapperClass(MBAMapper.class);
        job.setReducerClass(MBAReduce.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        job.setOutputValueClass(Text.class);
        job.setOutputKeyClass(IntWritable.class);
        job.setCombinerClass(MBAReduce.class);
        System.exit(job.waitForCompletion(true)?0:1);
    }
}
           

寫在最後

代碼是個很神奇的東西,多看看别人優秀的代碼才能提升自己的代碼風格

繼續閱讀