天天看點

基于hadoop的推薦算法-mahout版

基于hadoop的推薦算法,講其中mahout實作的基于項目的推薦算法

分為4步:

1.獲得人-物 使用者矩陣

    輸入為所有人對物品的評價或關聯

    map端輸出key為人,value為物品+傾好度

    reeduce端輸出key為人,vallue為多個物品+傾好度

2.獲得物-物 項目矩陣

   輸入為“使用者矩陣”,講每一行人-物資料中的物品做笛卡爾積,生産成物-物的關聯

   map端輸出為key為物,value為關聯度

   reduce端輸出key為物,value為多個物的關聯度

(可以根據各種規則生成項目相似度矩陣表,此處算法帶過)

修改:

求項目相似矩陣是基于項目的協同過濾算法的核心

公式有很多種,核心是物品i和物品j相關使用者的交集與并集的商

mahout使用的公式是1.dot(i,j) = sum(Pi(u)*Pi(u))

                  2.norms(i) = sum(Pi(u)^2)

                  3.simi(i,j) = 1/(1+(norms(i)-2*dot(i,j)+noorm(i))^1/2)

mahout的實作方法是

第一個job,用物品-人的矩陣,求得norms,即物品的使用者平方和,輸出是物-norms

第二個job,Map:用人-物的矩陣,求Pi(u)*Pi(u),即相同使用者的物品的評價的乘機,輸出物-多個對端物品的Pi(u)*Pi(u)

           Reduce:用物-多個對端物品的Pi(u)*Pi(u)和物-norms,求得物品的相似矩陣(因為這個時候可以彙總所有和這個物品相關的物品的dot)

第三個job,補全物品的相似矩陣

3.獲得使用者-項目相似矩陣

輸入為人-物 使用者矩陣 和 物-物 項目矩陣

Map端輸出key為物,value為類VectorOrPrefWritable,是包含物與人的傾好度,或是物與物的相似度

reduce端輸出key為物,value為類VectorAndPrefWritable,是彙總當個物品到所有人的傾好度和到所有物品的相似度

4.獲得使用者推薦矩陣

輸入為VectorAndPrefWritable

Map端輸出為key:人,value:物+系數(map端根據單個物品貢獻的系數生成推薦系數,也就是人到物品A的傾好度*物品A到其他物品的相似度)

reduce端輸出為key:人,,value:推薦項目+系數(reduce端使用自定公式,彙總所有單物品貢獻的四叔,求人到其他項目的傾好度,取topn作為目前使用者的推薦項目)

再在這裡貼幾個mahout推薦算法分析的文章:

http://eric-gcm.iteye.com/blog/1817822

http://eric-gcm.iteye.com/blog/1818033

http://eric-gcm.iteye.com/blog/1820060

以下是mahout代碼:

 ItemSimilarityJob類是mahout使用hadoop做推薦引擎的主要實作類,下面開始分析。

run()函數是啟動函數:

Java代碼  

基于hadoop的推薦算法-mahout版
  1. public final class RecommenderJob extends AbstractJob {  
  2.   public static final String BOOLEAN_DATA = "booleanData";  
  3.   private static final int DEFAULT_MAX_SIMILARITIES_PER_ITEM = 100;  
  4.   private static final int DEFAULT_MAX_PREFS_PER_USER = 1000;  
  5.   private static final int DEFAULT_MIN_PREFS_PER_USER = 1;  
  6.   @Override  
  7.   public int run(String[] args) throws Exception {  
  8.     //這裡原來有大一堆代碼,都是用來載入配置項,不用管它  
  9.     //第一步:準備矩陣,将原始資料轉換為一個矩陣,在PreparePreferenceMatrixJob這個類中完成  
  10.     if (shouldRunNextPhase(parsedArgs, currentPhase)) {  
  11.       ToolRunner.run(getConf(), new PreparePreferenceMatrixJob(), new String[]{  
  12.               "--input", getInputPath().toString(),  
  13.               "--output", prepPath.toString(),  
  14.               "--maxPrefsPerUser", String.valueOf(maxPrefsPerUserInItemSimilarity),  
  15.               "--minPrefsPerUser", String.valueOf(minPrefsPerUser),  
  16.               "--booleanData", String.valueOf(booleanData),  
  17.               "--tempDir", getTempPath().toString()});  
  18.       numberOfUsers = HadoopUtil.readInt(new Path(prepPath, PreparePreferenceMatrixJob.NUM_USERS), getConf());  
  19.     }  
  20.     //第二步:計算協同矩陣  
  21.     if (shouldRunNextPhase(parsedArgs, currentPhase)) {  
  22.       if (numberOfUsers == -1) {  
  23.         numberOfUsers = (int) HadoopUtil.countRecords(new Path(prepPath, PreparePreferenceMatrixJob.USER_VECTORS),  
  24.                 PathType.LIST, null, getConf());  
  25.       }  
  26.       //calculate the co-occurrence matrix  
  27.       ToolRunner.run(getConf(), new RowSimilarityJob(), new String[]{  
  28.               "--input", new Path(prepPath, PreparePreferenceMatrixJob.RATING_MATRIX).toString(),  
  29.               "--output", similarityMatrixPath.toString(),  
  30.               "--numberOfColumns", String.valueOf(numberOfUsers),  
  31.               "--similarityClassname", similarityClassname,  
  32.               "--maxSimilaritiesPerRow", String.valueOf(maxSimilaritiesPerItem),  
  33.               "--excludeSelfSimilarity", String.valueOf(Boolean.TRUE),  
  34.               "--threshold", String.valueOf(threshold),  
  35.               "--tempDir", getTempPath().toString()});  
  36.     }  
  37.     //start the multiplication of the co-occurrence matrix by the user vectors  
  38.     if (shouldRunNextPhase(parsedArgs, currentPhase)) {  
  39.       Job prePartialMultiply1 = prepareJob(  
  40.               similarityMatrixPath, prePartialMultiplyPath1, SequenceFileInputFormat.class,  
  41.               SimilarityMatrixRowWrapperMapper.class, VarIntWritable.class, VectorOrPrefWritable.class,  
  42.               Reducer.class, VarIntWritable.class, VectorOrPrefWritable.class,  
  43.               SequenceFileOutputFormat.class);  
  44.       boolean succeeded = prePartialMultiply1.waitForCompletion(true);  
  45.       if (!succeeded)   
  46.         return -1;  
  47.       //continue the multiplication  
  48.       Job prePartialMultiply2 = prepareJob(new Path(prepPath, PreparePreferenceMatrixJob.USER_VECTORS),  
  49.               prePartialMultiplyPath2, SequenceFileInputFormat.class, UserVectorSplitterMapper.class, VarIntWritable.class,  
  50.               VectorOrPrefWritable.class, Reducer.class, VarIntWritable.class, VectorOrPrefWritable.class,  
  51.               SequenceFileOutputFormat.class);  
  52.       if (usersFile != null) {  
  53.         prePartialMultiply2.getConfiguration().set(UserVectorSplitterMapper.USERS_FILE, usersFile);  
  54.       }  
  55.       prePartialMultiply2.getConfiguration().setInt(UserVectorSplitterMapper.MAX_PREFS_PER_USER_CONSIDERED,  
  56.               maxPrefsPerUser);  
  57.       succeeded = prePartialMultiply2.waitForCompletion(true);  
  58.       if (!succeeded)   
  59.         return -1;  
  60.       //finish the job  
  61.       Job partialMultiply = prepareJob(  
  62.               new Path(prePartialMultiplyPath1 + "," + prePartialMultiplyPath2), partialMultiplyPath,  
  63.               SequenceFileInputFormat.class, Mapper.class, VarIntWritable.class, VectorOrPrefWritable.class,  
  64.               ToVectorAndPrefReducer.class, VarIntWritable.class, VectorAndPrefsWritable.class,  
  65.               SequenceFileOutputFormat.class);  
  66.       setS3SafeCombinedInputPath(partialMultiply, getTempPath(), prePartialMultiplyPath1, prePartialMultiplyPath2);  
  67.       succeeded = partialMultiply.waitForCompletion(true);  
  68.       if (!succeeded)   
  69.         return -1;  
  70.     }  
  71.     if (shouldRunNextPhase(parsedArgs, currentPhase)) {  
  72.       //filter out any users we don't care about  
  73.       if (filterFile != null) {  
  74.         Job itemFiltering = prepareJob(new Path(filterFile), explicitFilterPath, TextInputFormat.class,  
  75.                 ItemFilterMapper.class, VarLongWritable.class, VarLongWritable.class,  
  76.                 ItemFilterAsVectorAndPrefsReducer.class, VarIntWritable.class, VectorAndPrefsWritable.class,  
  77.                 SequenceFileOutputFormat.class);  
  78.         boolean succeeded = itemFiltering.waitForCompletion(true);  
  79.         if (!succeeded)   
  80.           return -1;  
  81.       }  
  82.       String aggregateAndRecommendInput = partialMultiplyPath.toString();  
  83.       if (filterFile != null) {  
  84.         aggregateAndRecommendInput += "," + explicitFilterPath;  
  85.       }  
  86.       //extract out the recommendations  
  87.       Job aggregateAndRecommend = prepareJob(  
  88.               new Path(aggregateAndRecommendInput), outputPath, SequenceFileInputFormat.class,  
  89.               PartialMultiplyMapper.class, VarLongWritable.class, PrefAndSimilarityColumnWritable.class,  
  90.               AggregateAndRecommendReducer.class, VarLongWritable.class, RecommendedItemsWritable.class,  
  91.               TextOutputFormat.class);  
  92.       Configuration aggregateAndRecommendConf = aggregateAndRecommend.getConfiguration();  
  93.       if (itemsFile != null) {  
  94.         aggregateAndRecommendConf.set(AggregateAndRecommendReducer.ITEMS_FILE, itemsFile);  
  95.       }  
  96.       if (filterFile != null) {  
  97.         setS3SafeCombinedInputPath(aggregateAndRecommend, getTempPath(), partialMultiplyPath, explicitFilterPath);  
  98.       }  
  99.       setIOSort(aggregateAndRecommend);  
  100.       aggregateAndRecommendConf.set(AggregateAndRecommendReducer.ITEMID_INDEX_PATH,  
  101.               new Path(prepPath, PreparePreferenceMatrixJob.ITEMID_INDEX).toString());  
  102.       aggregateAndRecommendConf.setInt(AggregateAndRecommendReducer.NUM_RECOMMENDATIONS, numRecommendations);  
  103.       aggregateAndRecommendConf.setBoolean(BOOLEAN_DATA, booleanData);  
  104.       boolean succeeded = aggregateAndRecommend.waitForCompletion(true);  
  105.       if (!succeeded)   
  106.         return -1;  
  107.     }  
  108.     return 0;  
  109.   }  

       第二步,計算協同矩陣,主要在RowSimilarityJob 這個類中完成

Java代碼  

基于hadoop的推薦算法-mahout版
  1. ToolRunner.run(getConf(), new RowSimilarityJob(), new String[]{  
  2.               "--input", new Path(prepPath, PreparePreferenceMatrixJob.RATING_MATRIX).toString(),  
  3.               "--output", similarityMatrixPath.toString(),  
  4.               "--numberOfColumns", String.valueOf(numberOfUsers),  
  5.               "--similarityClassname", similarityClassname,  
  6.               "--maxSimilaritiesPerRow", String.valueOf(maxSimilaritiesPerItem),  
  7.               "--excludeSelfSimilarity", String.valueOf(Boolean.TRUE),  
  8.               "--threshold", String.valueOf(threshold),  
  9.               "--tempDir", getTempPath().toString()});  
  10.     }  

   可以看到這個job的輸入路徑就是上一篇中,PreparePreferenceMatrixJob中最後一個reducer的輸出路徑。

下邊詳細分析RowSimilarityJob類的實作:

Java代碼  

基于hadoop的推薦算法-mahout版
  1. public class RowSimilarityJob extends AbstractJob {  
  2.   @Override  
  3.   public int run(String[] args) throws Exception {  
  4.     //一大堆載入參數的代碼,忽略  
  5.     //第一個MapReduce  
  6.     if (shouldRunNextPhase(parsedArgs, currentPhase)) {  
  7.       Job normsAndTranspose = prepareJob(getInputPath(), weightsPath, VectorNormMapper.class, IntWritable.class,  
  8.           VectorWritable.class, MergeVectorsReducer.class, IntWritable.class, VectorWritable.class);  
  9.       normsAndTranspose.setCombinerClass(MergeVectorsCombiner.class);  
  10.       Configuration normsAndTransposeConf = normsAndTranspose.getConfiguration();  
  11.       normsAndTransposeConf.set(THRESHOLD, String.valueOf(threshold));  
  12.       normsAndTransposeConf.set(NORMS_PATH, normsPath.toString());  
  13.       normsAndTransposeConf.set(NUM_NON_ZERO_ENTRIES_PATH, numNonZeroEntriesPath.toString());  
  14.       normsAndTransposeConf.set(MAXVALUES_PATH, maxValuesPath.toString());  
  15.       normsAndTransposeConf.set(SIMILARITY_CLASSNAME, similarityClassname);  
  16.       boolean succeeded = normsAndTranspose.waitForCompletion(true);  
  17.       if (!succeeded) {  
  18.         return -1;  
  19.       }  
  20.     }  
  21.     //第二個MapReduce  
  22.     if (shouldRunNextPhase(parsedArgs, currentPhase)) {  
  23.       Job pairwiseSimilarity = prepareJob(weightsPath, pairwiseSimilarityPath, CooccurrencesMapper.class,  
  24.           IntWritable.class, VectorWritable.class, SimilarityReducer.class, IntWritable.class, VectorWritable.class);  
  25.       pairwiseSimilarity.setCombinerClass(VectorSumReducer.class);  
  26.       Configuration pairwiseConf = pairwiseSimilarity.getConfiguration();  
  27.       pairwiseConf.set(THRESHOLD, String.valueOf(threshold));  
  28.       pairwiseConf.set(NORMS_PATH, normsPath.toString());  
  29.       pairwiseConf.set(NUM_NON_ZERO_ENTRIES_PATH, numNonZeroEntriesPath.toString());  
  30.       pairwiseConf.set(MAXVALUES_PATH, maxValuesPath.toString());  
  31.       pairwiseConf.set(SIMILARITY_CLASSNAME, similarityClassname);  
  32.       pairwiseConf.setInt(NUMBER_OF_COLUMNS, numberOfColumns);  
  33.       pairwiseConf.setBoolean(EXCLUDE_SELF_SIMILARITY, excludeSelfSimilarity);  
  34.       boolean succeeded = pairwiseSimilarity.waitForCompletion(true);  
  35.       if (!succeeded) {  
  36.         return -1;  
  37.       }  
  38.     }  
  39.     //第三個MapReduce  
  40.     if (shouldRunNextPhase(parsedArgs, currentPhase)) {  
  41.       Job asMatrix = prepareJob(pairwiseSimilarityPath, getOutputPath(), UnsymmetrifyMapper.class,  
  42.           IntWritable.class, VectorWritable.class, MergeToTopKSimilaritiesReducer.class, IntWritable.class,  
  43.           VectorWritable.class);  
  44.       asMatrix.setCombinerClass(MergeToTopKSimilaritiesReducer.class);  
  45.       asMatrix.getConfiguration().setInt(MAX_SIMILARITIES_PER_ROW, maxSimilaritiesPerRow);  
  46.       boolean succeeded = asMatrix.waitForCompletion(true);  
  47.       if (!succeeded) {  
  48.         return -1;  
  49.       }  
  50.     }  
  51.     return 0;  
  52.   }  

 可以看到RowSimilityJob也是分成三個MapReduce過程:

1、Mapper :VectorNormMapper類,輸出 ( userid_index, <itemid_index, pref> )類型

Java代碼  

基于hadoop的推薦算法-mahout版
  1. public static class VectorNormMapper extends Mapper<IntWritable,VectorWritable,IntWritable,VectorWritable> {  
  2.     @Override  
  3.     protected void map(IntWritable row, VectorWritable vectorWritable, Context ctx)  
  4.         throws IOException, InterruptedException {  
  5.       Vector rowVector = similarity.normalize(vectorWritable.get());  
  6.       int numNonZeroEntries = 0;  
  7.       double maxValue = Double.MIN_VALUE;  
  8.       Iterator<Vector.Element> nonZeroElements = rowVector.iterateNonZero();  
  9.       while (nonZeroElements.hasNext()) {  
  10.         Vector.Element element = nonZeroElements.next();  
  11.         RandomAccessSparseVector partialColumnVector = new RandomAccessSparseVector(Integer.MAX_VALUE);  
  12.         partialColumnVector.setQuick(row.get(), element.get());  
  13.         //輸出 ( userid_index, <itemid_index, pref> )類型  
  14.         ctx.write(new IntWritable(element.index()), new VectorWritable(partialColumnVector));  
  15.         numNonZeroEntries++;  
  16.         if (maxValue < element.get()) {  
  17.           maxValue = element.get();  
  18.         }  
  19.       }  
  20.       if (threshold != NO_THRESHOLD) {  
  21.         nonZeroEntries.setQuick(row.get(), numNonZeroEntries);  
  22.         maxValues.setQuick(row.get(), maxValue);  
  23.       }  
  24.       norms.setQuick(row.get(), similarity.norm(rowVector));  
  25.       //計算item的總數  
  26.       ctx.getCounter(Counters.ROWS).increment(1);  
  27.     }  
  28. }  

Reduer : MergeVectorsReducer類,輸入的是(userid_index, <itemid_index, pref>),同一個userid_index在此進行合并,輸出( userid_index, vector<itemid_index, pref> )

Java代碼  

基于hadoop的推薦算法-mahout版
  1.   public static class MergeVectorsReducer extends Reducer<IntWritable,VectorWritable,IntWritable,VectorWritable> {  
  2.     @Override  
  3.     protected void reduce(IntWritable row, Iterable<VectorWritable> partialVectors, Context ctx)  
  4.         throws IOException, InterruptedException {  
  5.       Vector partialVector = Vectors.merge(partialVectors);  
  6.       if (row.get() == NORM_VECTOR_MARKER) {  
  7.         Vectors.write(partialVector, normsPath, ctx.getConfiguration());  
  8.       } else if (row.get() == MAXVALUE_VECTOR_MARKER) {  
  9.         Vectors.write(partialVector, maxValuesPath, ctx.getConfiguration());  
  10.       } else if (row.get() == NUM_NON_ZERO_ENTRIES_VECTOR_MARKER) {  
  11.         Vectors.write(partialVector, numNonZeroEntriesPath, ctx.getConfiguration(), true);  
  12.       } else {  
  13.         ctx.write(row, new VectorWritable(partialVector));  
  14.       }  
  15.     }  
  16.   }  
  17. }  

 2、Mapper:CooccurrencesMapper類,對同一個userid_index下的vector<itemid_index ,pref>進行處理,

收集<item1, item2>對, 輸出為( itemid_index, vector<itemid_index, value> )

Java代碼  

基于hadoop的推薦算法-mahout版
  1. public static class CooccurrencesMapper extends Mapper<IntWritable,VectorWritable,IntWritable,VectorWritable> {  
  2.     @Override  
  3.     protected void map(IntWritable column, VectorWritable occurrenceVector, Context ctx)  
  4.         throws IOException, InterruptedException {  
  5.       Vector.Element[] occurrences = Vectors.toArray(occurrenceVector);  
  6.       Arrays.sort(occurrences, BY_INDEX);  
  7.       int cooccurrences = 0;  
  8.       int prunedCooccurrences = 0;  
  9.       for (int n = 0; n < occurrences.length; n++) {  
  10.         Vector.Element occurrenceA = occurrences[n];  
  11.         Vector dots = new RandomAccessSparseVector(Integer.MAX_VALUE);  
  12.         for (int m = n; m < occurrences.length; m++) {  
  13.           Vector.Element occurrenceB = occurrences[m];  
  14.           if (threshold == NO_THRESHOLD || consider(occurrenceA, occurrenceB)) {  
  15.             dots.setQuick(occurrenceB.index(), similarity.aggregate(occurrenceA.get(), occurrenceB.get()));  
  16.             cooccurrences++;  
  17.           } else {  
  18.             prunedCooccurrences++;  
  19.           }  
  20.         }  
  21.         ctx.write(new IntWritable(occurrenceA.index()), new VectorWritable(dots));  
  22.       }  
  23.       ctx.getCounter(Counters.COOCCURRENCES).increment(cooccurrences);  
  24.       ctx.getCounter(Counters.PRUNED_COOCCURRENCES).increment(prunedCooccurrences);  
  25.     }  
  26.   }  

Reducer :SimilarityReducer類,生成協同矩陣

Java代碼  

基于hadoop的推薦算法-mahout版
  1. public static class SimilarityReducer  
  2.       extends Reducer<IntWritable,VectorWritable,IntWritable,VectorWritable> {  
  3.     @Override  
  4.     protected void reduce(IntWritable row, Iterable<VectorWritable> partialDots, Context ctx)  
  5.         throws IOException, InterruptedException {  
  6.       Iterator<VectorWritable> partialDotsIterator = partialDots.iterator();  
  7.       //取一個vecotr作為該item的行向量  
  8.       Vector dots = partialDotsIterator.next().get();  
  9.       while (partialDotsIterator.hasNext()) {  
  10.         Vector toAdd = partialDotsIterator.next().get();  
  11.         Iterator<Vector.Element> nonZeroElements = toAdd.iterateNonZero();  
  12.         while (nonZeroElements.hasNext()) {  
  13.           Vector.Element nonZeroElement = nonZeroElements.next();  
  14.           //nonZeroElement.index()為itemid,将另一個vecotr中itemid的value加進去  
  15.           dots.setQuick(nonZeroElement.index(), dots.getQuick(nonZeroElement.index()) + nonZeroElement.get());  
  16.         }  
  17.       }  
  18.       //最後得到的dots是協同矩陣中行号為row的一行,行中元素是item對其他的item的相似度  
  19.       Vector similarities = dots.like();  
  20.       double normA = norms.getQuick(row.get());  
  21.       Iterator<Vector.Element> dotsWith = dots.iterateNonZero();  
  22.       while (dotsWith.hasNext()) {  
  23.         Vector.Element b = dotsWith.next();  
  24.         double similarityValue = similarity.similarity(b.get(), normA, norms.getQuick(b.index()), numberOfColumns);  
  25.         if (similarityValue >= treshold) {  
  26.           similarities.set(b.index(), similarityValue);  
  27.         }  
  28.       }  
  29.       if (excludeSelfSimilarity) {  
  30.         similarities.setQuick(row.get(), 0);  
  31.       }  
  32.       ctx.write(row, new VectorWritable(similarities));  
  33.     }  
  34.   }  

3、Mapper:UnsymmetrifyMapper類,用來生成對稱矩陣的。上一步得到的是非對稱矩陣,首先将矩陣偏轉,得到偏轉矩陣,用原矩陣加上偏轉矩陣,可以得到對稱矩陣

Java代碼  

基于hadoop的推薦算法-mahout版
  1. public static class UnsymmetrifyMapper extends Mapper<IntWritable,VectorWritable,IntWritable,VectorWritable>  {  
  2.     private int maxSimilaritiesPerRow;  
  3.     @Override  
  4.     protected void setup(Mapper.Context ctx) throws IOException, InterruptedException {  
  5.       maxSimilaritiesPerRow = ctx.getConfiguration().getInt(MAX_SIMILARITIES_PER_ROW, 0);  
  6.       Preconditions.checkArgument(maxSimilaritiesPerRow > 0, "Incorrect maximum number of similarities per row!");  
  7.     }  
  8.     @Override  
  9.     protected void map(IntWritable row, VectorWritable similaritiesWritable, Context ctx)  
  10.         throws IOException, InterruptedException {  
  11.       Vector similarities = similaritiesWritable.get();  
  12.       // For performance reasons moved transposedPartial creation out of the while loop and reusing the same vector  
  13.       Vector transposedPartial = similarities.like();  
  14.       TopK<Vector.Element> topKQueue = new TopK<Vector.Element>(maxSimilaritiesPerRow, Vectors.BY_VALUE);  
  15.       Iterator<Vector.Element> nonZeroElements = similarities.iterateNonZero();  
  16.       //這個地方用來生成偏轉矩陣的,非對稱矩陣,用原矩陣加上偏轉矩陣,可以得到對稱矩陣  
  17.       while (nonZeroElements.hasNext()) {  
  18.         Vector.Element nonZeroElement = nonZeroElements.next();  
  19.         topKQueue.offer(new Vectors.TemporaryElement(nonZeroElement));  
  20.         transposedPartial.setQuick(row.get(), nonZeroElement.get());  
  21.         //偏轉矩陣中的每一個元素  
  22.         ctx.write(new IntWritable(nonZeroElement.index()), new VectorWritable(transposedPartial));  
  23.         transposedPartial.setQuick(row.get(), 0.0);  
  24.       }  
  25.       Vector topKSimilarities = similarities.like();  
  26.       for (Vector.Element topKSimilarity : topKQueue.retrieve()) {  
  27.         topKSimilarities.setQuick(topKSimilarity.index(), topKSimilarity.get());  
  28.       }  
  29.       //這裡隻收集前maxSimilaritiesPerRow個得分最高的item,是以咱們最後的對稱矩陣,實際上每行隻有  
  30.       //maxSimilaritiesPerRow個是對稱的,其他的位置也不管了  
  31.       ctx.write(row, new VectorWritable(topKSimilarities));  
  32.     }  
  33.   }  

 Reducer:MergeToTopKSimilaritiesReducer類,就是将上面Map偏轉的元素都收集起來,也就是完成了偏轉矩陣和(截取了得分前maxSimilaritiesPerRow個)的原矩陣相加的過程,得到了對稱矩陣

Java代碼  

基于hadoop的推薦算法-mahout版
  1. public static class MergeToTopKSimilaritiesReducer  
  2.     extends Reducer<IntWritable,VectorWritable,IntWritable,VectorWritable> {  
  3.   private int maxSimilaritiesPerRow;  
  4.   @Override  
  5.   protected void setup(Context ctx) throws IOException, InterruptedException {  
  6.     maxSimilaritiesPerRow = ctx.getConfiguration().getInt(MAX_SIMILARITIES_PER_ROW, 0);  
  7.     Preconditions.checkArgument(maxSimilaritiesPerRow > 0, "Incorrect maximum number of similarities per row!");  
  8.   }  
  9.   @Override  
  10.   protected void reduce(IntWritable row, Iterable<VectorWritable> partials, Context ctx)  
  11.       throws IOException, InterruptedException {  
  12.     Vector allSimilarities = Vectors.merge(partials);  
  13.     Vector topKSimilarities = Vectors.topKElements(maxSimilaritiesPerRow, allSimilarities);  
  14.     ctx.write(row, new VectorWritable(topKSimilarities));  
  15.   }  
  16. }  

至此,RowSimilarityJob類的全部工作就完成,最終生成的是一個對稱矩陣,也就是協同矩陣

Java代碼  

基于hadoop的推薦算法-mahout版
  1. //協同矩陣與使用者向量相乘  
  2.     //start the multiplication of the co-occurrence matrix by the user vectors  
  3.     if (shouldRunNextPhase(parsedArgs, currentPhase)) {  
  4.       //第一個MapReducer  
  5.       Job prePartialMultiply1 = prepareJob(  
  6.               similarityMatrixPath, prePartialMultiplyPath1, SequenceFileInputFormat.class,  
  7.               SimilarityMatrixRowWrapperMapper.class, VarIntWritable.class, VectorOrPrefWritable.class,  
  8.               Reducer.class, VarIntWritable.class, VectorOrPrefWritable.class,  
  9.               SequenceFileOutputFormat.class);  
  10.       boolean succeeded = prePartialMultiply1.waitForCompletion(true);  
  11.       if (!succeeded)   
  12.         return -1;  
  13.       //第二個MapReduce  
  14.       //continue the multiplication  
  15.       Job prePartialMultiply2 = prepareJob(new Path(prepPath, PreparePreferenceMatrixJob.USER_VECTORS),  
  16.               prePartialMultiplyPath2, SequenceFileInputFormat.class, UserVectorSplitterMapper.class, VarIntWritable.class,  
  17.               VectorOrPrefWritable.class, Reducer.class, VarIntWritable.class, VectorOrPrefWritable.class,  
  18.               SequenceFileOutputFormat.class);  
  19.       if (usersFile != null) {  
  20.         prePartialMultiply2.getConfiguration().set(UserVectorSplitterMapper.USERS_FILE, usersFile);  
  21.       }  
  22.       prePartialMultiply2.getConfiguration().setInt(UserVectorSplitterMapper.MAX_PREFS_PER_USER_CONSIDERED,  
  23.               maxPrefsPerUser);  
  24.       succeeded = prePartialMultiply2.waitForCompletion(true);  
  25.       if (!succeeded)   
  26.         return -1;  
  27.       //finish the job  
  28.       //第三個MapReduce  
  29.       Job partialMultiply = prepareJob(  
  30.               new Path(prePartialMultiplyPath1 + "," + prePartialMultiplyPath2), partialMultiplyPath,  
  31.               SequenceFileInputFormat.class, Mapper.class, VarIntWritable.class, VectorOrPrefWritable.class,  
  32.               ToVectorAndPrefReducer.class, VarIntWritable.class, VectorAndPrefsWritable.class,  
  33.               SequenceFileOutputFormat.class);  
  34.       setS3SafeCombinedInputPath(partialMultiply, getTempPath(), prePartialMultiplyPath1, prePartialMultiplyPath2);  
  35.       succeeded = partialMultiply.waitForCompletion(true);  
  36.       if (!succeeded)   
  37.         return -1;  
  38.     }  

 下邊也是同樣分析一下這個三個MapReduce的細節:

1、Mapper: SimilarityMatrixRowWrapperMapper 類,将協同矩陣的一行拿出來,通過包裝,封裝成VectorOrPrefWritable類,與那邊的UserVectorSplitterMapper 的輸出類型一緻

Java代碼  

基于hadoop的推薦算法-mahout版
  1. public final class SimilarityMatrixRowWrapperMapper extends  
  2.     Mapper<IntWritable,VectorWritable,VarIntWritable,VectorOrPrefWritable> {  
  3.   //将協同矩陣的一行拿出來,通過包裝,封裝成VectorOrPrefWritable類,與那邊的UserVectorSplitterMapper  
  4.   //的輸出類型一緻  
  5.   @Override  
  6.   protected void map(IntWritable key,  
  7.                      VectorWritable value,  
  8.                      Context context) throws IOException, InterruptedException {  
  9.     Vector similarityMatrixRow = value.get();  
  10.     similarityMatrixRow.set(key.get(), Double.NaN);  
  11.     context.write(new VarIntWritable(key.get()), new VectorOrPrefWritable(similarityMatrixRow));  
  12.   }  
  13. }  

2、Mapper:UserVectorSplitterMapper類

Java代碼  

基于hadoop的推薦算法-mahout版
  1. //輸入格式: theUserID:<itemid_index1,pref1>,<itemid_index2,pref2>........<itemid_indexN,prefN>  
  2.   //輸出格式:  itemid1:<theUserID,pref1>  
  3.   //          itemid2:<theUserID,pref2>  
  4.   //          itemid3:<theUserID,pref3>  
  5.   //          ......  
  6.   //          itemidN:<theUserID,prefN>  

Java代碼  

基于hadoop的推薦算法-mahout版
  1. public final class UserVectorSplitterMapper extends  
  2.     Mapper<VarLongWritable,VectorWritable, VarIntWritable,VectorOrPrefWritable> {  
  3.   @Override  
  4.   protected void map(VarLongWritable key,  
  5.                      VectorWritable value,  
  6.                      Context context) throws IOException, InterruptedException {  
  7.     long userID = key.get();  
  8.     if (usersToRecommendFor != null && !usersToRecommendFor.contains(userID)) {  
  9.       return;  
  10.     }  
  11.     Vector userVector = maybePruneUserVector(value.get());  
  12.     Iterator<Vector.Element> it = userVector.iterateNonZero();  
  13.     VarIntWritable itemIndexWritable = new VarIntWritable();  
  14.     VectorOrPrefWritable vectorOrPref = new VectorOrPrefWritable();  
  15.     while (it.hasNext()) {  
  16.       Vector.Element e = it.next();  
  17.       itemIndexWritable.set(e.index());  
  18.       vectorOrPref.set(userID, (float) e.get());  
  19.       context.write(itemIndexWritable, vectorOrPref);  
  20.     }  
  21.   }  

3、Reduce:ToVectorAndPrefReducer類,收集協同矩陣為itemid的一行,并且收集評價過該item的使用者和評分,最後的輸出是 itemid_index,VectorAndPrefsWritable(vector,List<userid>,List<pref>)

Java代碼  

基于hadoop的推薦算法-mahout版
  1. public final class ToVectorAndPrefReducer extends  
  2.     Reducer<VarIntWritable,VectorOrPrefWritable,VarIntWritable,VectorAndPrefsWritable> {  
  3.   //收集所有key為itemid的  
  4.   @Override  
  5.   protected void reduce(VarIntWritable key,  
  6.                         Iterable<VectorOrPrefWritable> values,  
  7.                         Context context) throws IOException, InterruptedException {  
  8.     List<Long> userIDs = Lists.newArrayList();  
  9.     List<Float> prefValues = Lists.newArrayList();  
  10.     Vector similarityMatrixColumn = null;  
  11.     for (VectorOrPrefWritable value : values) {  
  12.       if (value.getVector() == null) {  
  13.         // Then this is a user-pref value  
  14.         userIDs.add(value.getUserID());  
  15.         prefValues.add(value.getValue());  
  16.       } else {  
  17.         // Then this is the column vector  
  18.         //協同矩陣的一個行(行号為itemid的一行)  
  19.         if (similarityMatrixColumn != null) {  
  20.           throw new IllegalStateException("Found two similarity-matrix columns for item index " + key.get());  
  21.         }  
  22.         similarityMatrixColumn = value.getVector();  
  23.       }  
  24.     }  
  25.     if (similarityMatrixColumn == null) {  
  26.       return;  
  27.     }  
  28.     //收集協同矩陣為itemid的一行,并且手機評價過該item的使用者和評分  
  29.     VectorAndPrefsWritable vectorAndPrefs = new VectorAndPrefsWritable(similarityMatrixColumn, userIDs, prefValues);  
  30.     context.write(key, vectorAndPrefs);  
  31.   }  
  32. }  

第四步,協同矩陣和使用者向量相乘,得到推薦結果

Java代碼  

基于hadoop的推薦算法-mahout版
  1. //extract out the recommendations  
  2.      Job aggregateAndRecommend = prepareJob(  
  3.              new Path(aggregateAndRecommendInput), outputPath, SequenceFileInputFormat.class,  
  4.              PartialMultiplyMapper.class, VarLongWritable.class, PrefAndSimilarityColumnWritable.class,  
  5.              AggregateAndRecommendReducer.class, VarLongWritable.class, RecommendedItemsWritable.class,  
  6.              TextOutputFormat.class);  
  7.      Configuration aggregateAndRecommendConf = aggregateAndRecommend.getConfiguration();  

Mapper:PartialMultiplyMapper類

Java代碼  

基于hadoop的推薦算法-mahout版
  1. //輸入類型:( itemid_index, <userid的數組,pref的數組,協同矩陣行号為itemid_index的行> )  
  2. //輸出類型: userid,<該使用者對itemid_index1的評分,協同矩陣行号為itemid_index1的行> )  
  3. //        userid,<該使用者對itemid_index2的評分,協同矩陣行号為itemid_index2的行> )  
  4. //                       .....    
  5. //                       .....  
  6. //          userid,<該使用者對itemid_indexN的評分,協同矩陣行号為itemid_indexN的行> )  

Java代碼  

基于hadoop的推薦算法-mahout版
  1. public final class PartialMultiplyMapper extends  
  2.     Mapper<VarIntWritable,VectorAndPrefsWritable,VarLongWritable,PrefAndSimilarityColumnWritable> {  
  3.   @Override  
  4.   protected void map(VarIntWritable key,  
  5.                      VectorAndPrefsWritable vectorAndPrefsWritable,  
  6.                      Context context) throws IOException, InterruptedException {  
  7.     Vector similarityMatrixColumn = vectorAndPrefsWritable.getVector();  
  8.     List<Long> userIDs = vectorAndPrefsWritable.getUserIDs();  
  9.     List<Float> prefValues = vectorAndPrefsWritable.getValues();  
  10.     VarLongWritable userIDWritable = new VarLongWritable();  
  11.     PrefAndSimilarityColumnWritable prefAndSimilarityColumn = new PrefAndSimilarityColumnWritable();  
  12.     for (int i = 0; i < userIDs.size(); i++) {  
  13.       long userID = userIDs.get(i);  
  14.       float prefValue = prefValues.get(i);  
  15.       if (!Float.isNaN(prefValue)) {  
  16.         prefAndSimilarityColumn.set(prefValue, similarityMatrixColumn);  
  17.         userIDWritable.set(userID);  
  18.         context.write(userIDWritable, prefAndSimilarityColumn);  
  19.       }  
  20.     }  
  21.   }  
  22. }  

 Reducer:AggregateAndRecommendReducer類,Reducer中進行PartialMultiply,按乘積得到的推薦度的大小取出最大的幾個item。對于非booleanData,是用pref和相似度矩陣的PartialMultiply得到推薦度的值來進行排序。

而booleanData的pref值都是1.0f,是以去計算矩陣相乘的過程沒有意義,直接累加相似度的值即可。

用這個資料排序就可得到推薦結果

Java代碼  

基于hadoop的推薦算法-mahout版
  1. public final class AggregateAndRecommendReducer extends  
  2.     Reducer<VarLongWritable,PrefAndSimilarityColumnWritable,VarLongWritable,RecommendedItemsWritable> {  
  3.  @Override  
  4.   protected void reduce(VarLongWritable userID,  
  5.                         Iterable<PrefAndSimilarityColumnWritable> values,  
  6.                         Context context) throws IOException, InterruptedException {  
  7.     if (booleanData) {  
  8.       reduceBooleanData(userID, values, context);  
  9.     } else {  
  10.       reduceNonBooleanData(userID, values, context);  
  11.     }  
  12.   }  
  13.   private void reduceBooleanData(VarLongWritable userID,  
  14.                                  Iterable<PrefAndSimilarityColumnWritable> values,  
  15.                                  Context context) throws IOException, InterruptedException {  
  16.     Vector predictionVector = null;  
  17.     for (PrefAndSimilarityColumnWritable prefAndSimilarityColumn : values) {  
  18.       predictionVector = predictionVector == null  
  19.           ? prefAndSimilarityColumn.getSimilarityColumn()  
  20.           : predictionVector.plus(prefAndSimilarityColumn.getSimilarityColumn());  
  21.     }  
  22.     writeRecommendedItems(userID, predictionVector, context);  
  23.   }  
  24.   private void reduceNonBooleanData(VarLongWritable userID,  
  25.                         Iterable<PrefAndSimilarityColumnWritable> values,  
  26.                         Context context) throws IOException, InterruptedException {  
  27.     Vector numerators = null;  
  28.     Vector denominators = null;  
  29.     Vector numberOfSimilarItemsUsed = new RandomAccessSparseVector(Integer.MAX_VALUE, 100);  
  30.     for (PrefAndSimilarityColumnWritable prefAndSimilarityColumn : values) {  
  31.       Vector simColumn = prefAndSimilarityColumn.getSimilarityColumn();  
  32.       float prefValue = prefAndSimilarityColumn.getPrefValue();  
  33.       Iterator<Vector.Element> usedItemsIterator = simColumn.iterateNonZero();  
  34.       while (usedItemsIterator.hasNext()) {  
  35.         int itemIDIndex = usedItemsIterator.next().index();  
  36.         numberOfSimilarItemsUsed.setQuick(itemIDIndex, numberOfSimilarItemsUsed.getQuick(itemIDIndex) + 1);  
  37.       }  
  38.       //vector.times(float) 是向量乘于一個數,也就是向量的每一個值都乘以這個數  
  39.       //vector.plus(vector) 是兩個向量相加,每一個位置上的值相加  
  40.       //numerators是一個vecotr,每一個元素是這樣的  
  41.       // 注:其中simility(item1, item2)代表物品item1和物品item2的相似度 ,pref(userid, item)代表用于userid對item打分分值   
  42.       numerators = numerators == null  
  43.           ? prefValue == BOOLEAN_PREF_VALUE ? simColumn.clone() : simColumn.times(prefValue)  
  44.           : numerators.plus(prefValue == BOOLEAN_PREF_VALUE ? simColumn : simColumn.times(prefValue));  
  45.       simColumn.assign(ABSOLUTE_VALUES);  
  46.       //denominators是一個vecotr,每一個元素是這樣的  
  47.       // 注:其中simility(item1, item2)代表物品item1和物品item2的相似度  
  48.       denominators = denominators == null ? simColumn : denominators.plus(simColumn);  
  49.     }  
  50.     if (numerators == null) {  
  51.       return;  
  52.     }  
  53.     Vector recommendationVector = new RandomAccessSparseVector(Integer.MAX_VALUE, 100);  
  54.     Iterator<Vector.Element> iterator = numerators.iterateNonZero();  
  55.     while (iterator.hasNext()) {  
  56.       Vector.Element element = iterator.next();  
  57.       int itemIDIndex = element.index();  
  58.       if (numberOfSimilarItemsUsed.getQuick(itemIDIndex) > 1) {  
  59.         //計算歸一化預測值  
  60.         double prediction = element.get() / denominators.getQuick(itemIDIndex);  
  61.         recommendationVector.setQuick(itemIDIndex, prediction);  
  62.       }  
  63.     }  
  64.     writeRecommendedItems(userID, recommendationVector, context);  
  65.   }  
  66. }  

繼續閱讀