Mapreduce之購物籃分析
購物籃分析是一個流行的資料挖掘技術,在市場營銷中這個技術可以揭示不同商品或商品組之間的相似度
MBA原理
通過MapReduce架構,設計相關的解決方案生成交易的關聯規則,進而查找最常見的商品對
應用領域
- 信用卡交易分析
- 電話呼叫模式分析
- 欺詐識别
- 電信服務交易分析
- 大型線上零售商的每日/每周交易分析
樣例輸入
crackers,bread,banana
crackers,coke,butter,coffee
crackers,bread
crackers,bread
crackers,bread
crackers,bread,coffee
butter,coke
butter,coke,bread,crackers
樣例輸出
mapper階段任務
maper階段的map()函數根據購物籃子中的商品生成如下鍵-值對
[<crackers,icecream>,1]
[<crackers,coke.,1]
但是在程式自動分類過程中會和出現如下現象
購物籃T1:crackers,icecream,coke
購物籃T2:icecream,coke,crackers
根據關聯規則,對于T1會生成如下規則
[(crackers,icecream),1]
[(crackers,coke),1]
[(icecream,coke),1]
對于T2則會生成如下規則
[(icecream,coke),1]
[(icecream,crackers),1]
[(coke,crackers),1]
從中我們可以看到,有六對規則,但是我們發現(crackers,icecream)和(icecream,crackers)是一樣的,在這裡會被分成不同的規則,是以在生成規則之前需要對商品按照字母順序進行排序,就可以避免這個問題
mapper階段編碼
在這個過程中使用快速排序算法對商品進行排序
package com.deng.MBA;
import java.io.IOException;
import java.util.List;
import java.util.ArrayList;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class MBAMapper extends Mapper<LongWritable,Text,Text,IntWritable>{
int numberOfPairs;
public static final int DEFAULT_NUMBER_OF_PAIRS=2;
protected void setup(Context context) throws IOException,InterruptedException{
this.numberOfPairs=context.getConfiguration().getInt("number.of.pairs",DEFAULT_NUMBER_OF_PAIRS);
}
public void map(LongWritable key,Text value,Context context) throws IOException,InterruptedException{
String line=value.toString();
List<String> items=convertItems(line);
if((items==null)||items.isEmpty()){
return ;
}
generateMapperOutput(numberOfPairs,items,context);
}
private static List<String> convertItems(String line){
if((line==null)||line.length()==0){
return null;
}
String[] tokens=line.split(",");
if((tokens==null)||(tokens.length==0)){
return null;
}
List<String> items=new ArrayList<String>();
for(String token:tokens){
if(token!=null){
items.add(token.trim());
}
}
return items;
}
private void generateMapperOutput(int numberOfPairs, List<String> items, Context context)
throws IOException, InterruptedException{
List<List<String>> sortedCombinations =Combination.findSortedCombinations(items,numberOfPairs);
for(List<String> itemsList: sortedCombinations){
context.write(new Text(itemsList.toString()),new IntWritable(1));
}
}
}
其中combinations是一個簡單的工具,使用Combinations.generateCombinations(s1,s2,…,sn)方法可以生成給定的集合,如下所示
假設購物籃為{a,b,c}
假設生成具有兩個商品的規則,則分類結果如下所示
[a,b],[a,c],[b,c]
Combination編碼
package com.deng.MBA;
import java.util.List;
import java.util.Arrays;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
public class Combination {
public static <T extends Comparable<? super T>> List<List<T>>
findSortedCombinations(Collection<T> elements,int n){
List<List<T>> result =new ArrayList<List<T>>();
if(n==0){
result.add(new ArrayList<T>());
return result;
}
List<List<T>> combinations=findSortedCombinations(elements,n-1);
for(List<T> combination:combinations){
for(T element:elements){
if(combination.contains(element)){
continue;
}
List<T> list=new ArrayList<T>();
list.addAll(combination);
if(list.contains(element)){
continue;
}
list.add(element);
Collections.sort(list);
if(result.contains(list)){
continue;
}
result.add(list);
}
}
return result;
}
public static void main(String[] args) {
List<String> elements = Arrays.asList("a", "b", "c", "d", "e");
List<List<String>> combinations = findSortedCombinations(elements, 2);
System.out.println(combinations);
}
}
reduce階段任務
這個階段就是對規則的支援度進行統計
reduce階段編碼
package com.deng.MBA;
import java.io.IOException;
import java.io.WriteAbortedException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Reducer;
public class MBAReduce extends Reducer<Text,IntWritable,Text,IntWritable> {
public void reduce(Text key,Iterable<IntWritable> values,Context context)
throws IOException,InterruptedException{
int sum=0;
for(IntWritable value:values){
sum+=value.get();
}
context.write(key,new IntWritable(sum));
}
}
完整代碼
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
import java.util.*;
public class MBADriver {
public static class Combination {
public static <T extends Comparable<? super T>> List<List<T>>
findSortedCombinations(Collection<T> elements, int n){
List<List<T>> result =new ArrayList<List<T>>();
if(n==0){
result.add(new ArrayList<T>());
return result;
}
List<List<T>> combinations=findSortedCombinations(elements,n-1);
for(List<T> combination:combinations){
for(T element:elements){
if(combination.contains(element)){
continue;
}
List<T> list=new ArrayList<T>();
list.addAll(combination);
if(list.contains(element)){
continue;
}
list.add(element);
Collections.sort(list);
if(result.contains(list)){
continue;
}
result.add(list);
}
}
return result;
}
}
public static class MBAMapper extends Mapper<LongWritable,Text,Text,IntWritable> {
int numberOfPairs;
public static final int DEFAULT_NUMBER_OF_PAIRS=2;
protected void setup(Context context) throws IOException,InterruptedException{
this.numberOfPairs=context.getConfiguration().getInt("number.of.pairs",DEFAULT_NUMBER_OF_PAIRS);
}
public void map(LongWritable key,Text value,Context context) throws IOException,InterruptedException{
String line=value.toString();
List<String> items=convertItems(line);
if((items==null)||items.isEmpty()){
return ;
}
generateMapperOutput(numberOfPairs,items,context);
}
private static List<String> convertItems(String line){
if((line==null)||line.length()==0){
return null;
}
String[] tokens=line.split(",");
if((tokens==null)||(tokens.length==0)){
return null;
}
List<String> items=new ArrayList<String>();
for(String token:tokens){
if(token!=null){
items.add(token.trim());
}
}
return items;
}
private void generateMapperOutput(int numberOfPairs, List<String> items, Context context)
throws IOException, InterruptedException{
List<List<String>> sortedCombinations = com.deng.MBA.Combination.findSortedCombinations(items,numberOfPairs);
for(List<String> itemsList: sortedCombinations){
context.write(new Text(itemsList.toString()),new IntWritable(1));
}
}
}
public static class MBAReduce extends Reducer<Text,IntWritable,Text,IntWritable> {
public void reduce(Text key,Iterable<IntWritable> values,Context context)
throws IOException,InterruptedException{
int sum=0;
for(IntWritable value:values){
sum+=value.get();
}
context.write(key,new IntWritable(sum));
}
}
public static void main(String[] args) throws Exception{
FileUtil.deleteDirs("output");
Configuration conf=new Configuration();
String[] otherArgs=new String[]{"input/MBA","output","3"};
if(otherArgs.length!=3){
System.out.println("參數錯誤");
}
Job job=new Job(conf,"MBA");
job.getConfiguration().setInt("number.of.pairs",Integer.parseInt(otherArgs[2]));
FileInputFormat.addInputPath(job,new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job,new Path(otherArgs[1]));
job.setJarByClass(MBADriver.class);
job.setMapperClass(MBAMapper.class);
job.setReducerClass(MBAReduce.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputValueClass(Text.class);
job.setOutputKeyClass(IntWritable.class);
job.setCombinerClass(MBAReduce.class);
System.exit(job.waitForCompletion(true)?0:1);
}
}
寫在最後
代碼是個很神奇的東西,多看看别人優秀的代碼才能提升自己的代碼風格