基于hadoop的推薦算法,講其中mahout實作的基于項目的推薦算法
分為4步:
1.獲得人-物 使用者矩陣
輸入為所有人對物品的評價或關聯
map端輸出key為人,value為物品+傾好度
reeduce端輸出key為人,vallue為多個物品+傾好度
2.獲得物-物 項目矩陣
輸入為“使用者矩陣”,講每一行人-物資料中的物品做笛卡爾積,生産成物-物的關聯
map端輸出為key為物,value為關聯度
reduce端輸出key為物,value為多個物的關聯度
(可以根據各種規則生成項目相似度矩陣表,此處算法帶過)
修改:
求項目相似矩陣是基于項目的協同過濾算法的核心
公式有很多種,核心是物品i和物品j相關使用者的交集與并集的商
mahout使用的公式是1.dot(i,j) = sum(Pi(u)*Pi(u))
2.norms(i) = sum(Pi(u)^2)
3.simi(i,j) = 1/(1+(norms(i)-2*dot(i,j)+noorm(i))^1/2)
mahout的實作方法是
第一個job,用物品-人的矩陣,求得norms,即物品的使用者平方和,輸出是物-norms
第二個job,Map:用人-物的矩陣,求Pi(u)*Pi(u),即相同使用者的物品的評價的乘機,輸出物-多個對端物品的Pi(u)*Pi(u)
Reduce:用物-多個對端物品的Pi(u)*Pi(u)和物-norms,求得物品的相似矩陣(因為這個時候可以彙總所有和這個物品相關的物品的dot)
第三個job,補全物品的相似矩陣
3.獲得使用者-項目相似矩陣
輸入為人-物 使用者矩陣 和 物-物 項目矩陣
Map端輸出key為物,value為類VectorOrPrefWritable,是包含物與人的傾好度,或是物與物的相似度
reduce端輸出key為物,value為類VectorAndPrefWritable,是彙總當個物品到所有人的傾好度和到所有物品的相似度
4.獲得使用者推薦矩陣
輸入為VectorAndPrefWritable
Map端輸出為key:人,value:物+系數(map端根據單個物品貢獻的系數生成推薦系數,也就是人到物品A的傾好度*物品A到其他物品的相似度)
reduce端輸出為key:人,,value:推薦項目+系數(reduce端使用自定公式,彙總所有單物品貢獻的四叔,求人到其他項目的傾好度,取topn作為目前使用者的推薦項目)
再在這裡貼幾個mahout推薦算法分析的文章:
http://eric-gcm.iteye.com/blog/1817822
http://eric-gcm.iteye.com/blog/1818033
http://eric-gcm.iteye.com/blog/1820060
以下是mahout代碼:
ItemSimilarityJob類是mahout使用hadoop做推薦引擎的主要實作類,下面開始分析。
run()函數是啟動函數:
Java代碼
- public final class RecommenderJob extends AbstractJob {
- public static final String BOOLEAN_DATA = "booleanData";
- private static final int DEFAULT_MAX_SIMILARITIES_PER_ITEM = 100;
- private static final int DEFAULT_MAX_PREFS_PER_USER = 1000;
- private static final int DEFAULT_MIN_PREFS_PER_USER = 1;
- @Override
- public int run(String[] args) throws Exception {
- //這裡原來有大一堆代碼,都是用來載入配置項,不用管它
- //第一步:準備矩陣,将原始資料轉換為一個矩陣,在PreparePreferenceMatrixJob這個類中完成
- if (shouldRunNextPhase(parsedArgs, currentPhase)) {
- ToolRunner.run(getConf(), new PreparePreferenceMatrixJob(), new String[]{
- "--input", getInputPath().toString(),
- "--output", prepPath.toString(),
- "--maxPrefsPerUser", String.valueOf(maxPrefsPerUserInItemSimilarity),
- "--minPrefsPerUser", String.valueOf(minPrefsPerUser),
- "--booleanData", String.valueOf(booleanData),
- "--tempDir", getTempPath().toString()});
- numberOfUsers = HadoopUtil.readInt(new Path(prepPath, PreparePreferenceMatrixJob.NUM_USERS), getConf());
- }
- //第二步:計算協同矩陣
- if (shouldRunNextPhase(parsedArgs, currentPhase)) {
- if (numberOfUsers == -1) {
- numberOfUsers = (int) HadoopUtil.countRecords(new Path(prepPath, PreparePreferenceMatrixJob.USER_VECTORS),
- PathType.LIST, null, getConf());
- }
- //calculate the co-occurrence matrix
- ToolRunner.run(getConf(), new RowSimilarityJob(), new String[]{
- "--input", new Path(prepPath, PreparePreferenceMatrixJob.RATING_MATRIX).toString(),
- "--output", similarityMatrixPath.toString(),
- "--numberOfColumns", String.valueOf(numberOfUsers),
- "--similarityClassname", similarityClassname,
- "--maxSimilaritiesPerRow", String.valueOf(maxSimilaritiesPerItem),
- "--excludeSelfSimilarity", String.valueOf(Boolean.TRUE),
- "--threshold", String.valueOf(threshold),
- "--tempDir", getTempPath().toString()});
- }
- //start the multiplication of the co-occurrence matrix by the user vectors
- if (shouldRunNextPhase(parsedArgs, currentPhase)) {
- Job prePartialMultiply1 = prepareJob(
- similarityMatrixPath, prePartialMultiplyPath1, SequenceFileInputFormat.class,
- SimilarityMatrixRowWrapperMapper.class, VarIntWritable.class, VectorOrPrefWritable.class,
- Reducer.class, VarIntWritable.class, VectorOrPrefWritable.class,
- SequenceFileOutputFormat.class);
- boolean succeeded = prePartialMultiply1.waitForCompletion(true);
- if (!succeeded)
- return -1;
- //continue the multiplication
- Job prePartialMultiply2 = prepareJob(new Path(prepPath, PreparePreferenceMatrixJob.USER_VECTORS),
- prePartialMultiplyPath2, SequenceFileInputFormat.class, UserVectorSplitterMapper.class, VarIntWritable.class,
- VectorOrPrefWritable.class, Reducer.class, VarIntWritable.class, VectorOrPrefWritable.class,
- SequenceFileOutputFormat.class);
- if (usersFile != null) {
- prePartialMultiply2.getConfiguration().set(UserVectorSplitterMapper.USERS_FILE, usersFile);
- }
- prePartialMultiply2.getConfiguration().setInt(UserVectorSplitterMapper.MAX_PREFS_PER_USER_CONSIDERED,
- maxPrefsPerUser);
- succeeded = prePartialMultiply2.waitForCompletion(true);
- if (!succeeded)
- return -1;
- //finish the job
- Job partialMultiply = prepareJob(
- new Path(prePartialMultiplyPath1 + "," + prePartialMultiplyPath2), partialMultiplyPath,
- SequenceFileInputFormat.class, Mapper.class, VarIntWritable.class, VectorOrPrefWritable.class,
- ToVectorAndPrefReducer.class, VarIntWritable.class, VectorAndPrefsWritable.class,
- SequenceFileOutputFormat.class);
- setS3SafeCombinedInputPath(partialMultiply, getTempPath(), prePartialMultiplyPath1, prePartialMultiplyPath2);
- succeeded = partialMultiply.waitForCompletion(true);
- if (!succeeded)
- return -1;
- }
- if (shouldRunNextPhase(parsedArgs, currentPhase)) {
- //filter out any users we don't care about
- if (filterFile != null) {
- Job itemFiltering = prepareJob(new Path(filterFile), explicitFilterPath, TextInputFormat.class,
- ItemFilterMapper.class, VarLongWritable.class, VarLongWritable.class,
- ItemFilterAsVectorAndPrefsReducer.class, VarIntWritable.class, VectorAndPrefsWritable.class,
- SequenceFileOutputFormat.class);
- boolean succeeded = itemFiltering.waitForCompletion(true);
- if (!succeeded)
- return -1;
- }
- String aggregateAndRecommendInput = partialMultiplyPath.toString();
- if (filterFile != null) {
- aggregateAndRecommendInput += "," + explicitFilterPath;
- }
- //extract out the recommendations
- Job aggregateAndRecommend = prepareJob(
- new Path(aggregateAndRecommendInput), outputPath, SequenceFileInputFormat.class,
- PartialMultiplyMapper.class, VarLongWritable.class, PrefAndSimilarityColumnWritable.class,
- AggregateAndRecommendReducer.class, VarLongWritable.class, RecommendedItemsWritable.class,
- TextOutputFormat.class);
- Configuration aggregateAndRecommendConf = aggregateAndRecommend.getConfiguration();
- if (itemsFile != null) {
- aggregateAndRecommendConf.set(AggregateAndRecommendReducer.ITEMS_FILE, itemsFile);
- }
- if (filterFile != null) {
- setS3SafeCombinedInputPath(aggregateAndRecommend, getTempPath(), partialMultiplyPath, explicitFilterPath);
- }
- setIOSort(aggregateAndRecommend);
- aggregateAndRecommendConf.set(AggregateAndRecommendReducer.ITEMID_INDEX_PATH,
- new Path(prepPath, PreparePreferenceMatrixJob.ITEMID_INDEX).toString());
- aggregateAndRecommendConf.setInt(AggregateAndRecommendReducer.NUM_RECOMMENDATIONS, numRecommendations);
- aggregateAndRecommendConf.setBoolean(BOOLEAN_DATA, booleanData);
- boolean succeeded = aggregateAndRecommend.waitForCompletion(true);
- if (!succeeded)
- return -1;
- }
- return 0;
- }
第二步,計算協同矩陣,主要在RowSimilarityJob 這個類中完成
Java代碼
- ToolRunner.run(getConf(), new RowSimilarityJob(), new String[]{
- "--input", new Path(prepPath, PreparePreferenceMatrixJob.RATING_MATRIX).toString(),
- "--output", similarityMatrixPath.toString(),
- "--numberOfColumns", String.valueOf(numberOfUsers),
- "--similarityClassname", similarityClassname,
- "--maxSimilaritiesPerRow", String.valueOf(maxSimilaritiesPerItem),
- "--excludeSelfSimilarity", String.valueOf(Boolean.TRUE),
- "--threshold", String.valueOf(threshold),
- "--tempDir", getTempPath().toString()});
- }
可以看到這個job的輸入路徑就是上一篇中,PreparePreferenceMatrixJob中最後一個reducer的輸出路徑。
下邊詳細分析RowSimilarityJob類的實作:
Java代碼
- public class RowSimilarityJob extends AbstractJob {
- @Override
- public int run(String[] args) throws Exception {
- //一大堆載入參數的代碼,忽略
- //第一個MapReduce
- if (shouldRunNextPhase(parsedArgs, currentPhase)) {
- Job normsAndTranspose = prepareJob(getInputPath(), weightsPath, VectorNormMapper.class, IntWritable.class,
- VectorWritable.class, MergeVectorsReducer.class, IntWritable.class, VectorWritable.class);
- normsAndTranspose.setCombinerClass(MergeVectorsCombiner.class);
- Configuration normsAndTransposeConf = normsAndTranspose.getConfiguration();
- normsAndTransposeConf.set(THRESHOLD, String.valueOf(threshold));
- normsAndTransposeConf.set(NORMS_PATH, normsPath.toString());
- normsAndTransposeConf.set(NUM_NON_ZERO_ENTRIES_PATH, numNonZeroEntriesPath.toString());
- normsAndTransposeConf.set(MAXVALUES_PATH, maxValuesPath.toString());
- normsAndTransposeConf.set(SIMILARITY_CLASSNAME, similarityClassname);
- boolean succeeded = normsAndTranspose.waitForCompletion(true);
- if (!succeeded) {
- return -1;
- }
- }
- //第二個MapReduce
- if (shouldRunNextPhase(parsedArgs, currentPhase)) {
- Job pairwiseSimilarity = prepareJob(weightsPath, pairwiseSimilarityPath, CooccurrencesMapper.class,
- IntWritable.class, VectorWritable.class, SimilarityReducer.class, IntWritable.class, VectorWritable.class);
- pairwiseSimilarity.setCombinerClass(VectorSumReducer.class);
- Configuration pairwiseConf = pairwiseSimilarity.getConfiguration();
- pairwiseConf.set(THRESHOLD, String.valueOf(threshold));
- pairwiseConf.set(NORMS_PATH, normsPath.toString());
- pairwiseConf.set(NUM_NON_ZERO_ENTRIES_PATH, numNonZeroEntriesPath.toString());
- pairwiseConf.set(MAXVALUES_PATH, maxValuesPath.toString());
- pairwiseConf.set(SIMILARITY_CLASSNAME, similarityClassname);
- pairwiseConf.setInt(NUMBER_OF_COLUMNS, numberOfColumns);
- pairwiseConf.setBoolean(EXCLUDE_SELF_SIMILARITY, excludeSelfSimilarity);
- boolean succeeded = pairwiseSimilarity.waitForCompletion(true);
- if (!succeeded) {
- return -1;
- }
- }
- //第三個MapReduce
- if (shouldRunNextPhase(parsedArgs, currentPhase)) {
- Job asMatrix = prepareJob(pairwiseSimilarityPath, getOutputPath(), UnsymmetrifyMapper.class,
- IntWritable.class, VectorWritable.class, MergeToTopKSimilaritiesReducer.class, IntWritable.class,
- VectorWritable.class);
- asMatrix.setCombinerClass(MergeToTopKSimilaritiesReducer.class);
- asMatrix.getConfiguration().setInt(MAX_SIMILARITIES_PER_ROW, maxSimilaritiesPerRow);
- boolean succeeded = asMatrix.waitForCompletion(true);
- if (!succeeded) {
- return -1;
- }
- }
- return 0;
- }
可以看到RowSimilityJob也是分成三個MapReduce過程:
1、Mapper :VectorNormMapper類,輸出 ( userid_index, <itemid_index, pref> )類型
Java代碼
- public static class VectorNormMapper extends Mapper<IntWritable,VectorWritable,IntWritable,VectorWritable> {
- @Override
- protected void map(IntWritable row, VectorWritable vectorWritable, Context ctx)
- throws IOException, InterruptedException {
- Vector rowVector = similarity.normalize(vectorWritable.get());
- int numNonZeroEntries = 0;
- double maxValue = Double.MIN_VALUE;
- Iterator<Vector.Element> nonZeroElements = rowVector.iterateNonZero();
- while (nonZeroElements.hasNext()) {
- Vector.Element element = nonZeroElements.next();
- RandomAccessSparseVector partialColumnVector = new RandomAccessSparseVector(Integer.MAX_VALUE);
- partialColumnVector.setQuick(row.get(), element.get());
- //輸出 ( userid_index, <itemid_index, pref> )類型
- ctx.write(new IntWritable(element.index()), new VectorWritable(partialColumnVector));
- numNonZeroEntries++;
- if (maxValue < element.get()) {
- maxValue = element.get();
- }
- }
- if (threshold != NO_THRESHOLD) {
- nonZeroEntries.setQuick(row.get(), numNonZeroEntries);
- maxValues.setQuick(row.get(), maxValue);
- }
- norms.setQuick(row.get(), similarity.norm(rowVector));
- //計算item的總數
- ctx.getCounter(Counters.ROWS).increment(1);
- }
- }
Reduer : MergeVectorsReducer類,輸入的是(userid_index, <itemid_index, pref>),同一個userid_index在此進行合并,輸出( userid_index, vector<itemid_index, pref> )
Java代碼
- public static class MergeVectorsReducer extends Reducer<IntWritable,VectorWritable,IntWritable,VectorWritable> {
- @Override
- protected void reduce(IntWritable row, Iterable<VectorWritable> partialVectors, Context ctx)
- throws IOException, InterruptedException {
- Vector partialVector = Vectors.merge(partialVectors);
- if (row.get() == NORM_VECTOR_MARKER) {
- Vectors.write(partialVector, normsPath, ctx.getConfiguration());
- } else if (row.get() == MAXVALUE_VECTOR_MARKER) {
- Vectors.write(partialVector, maxValuesPath, ctx.getConfiguration());
- } else if (row.get() == NUM_NON_ZERO_ENTRIES_VECTOR_MARKER) {
- Vectors.write(partialVector, numNonZeroEntriesPath, ctx.getConfiguration(), true);
- } else {
- ctx.write(row, new VectorWritable(partialVector));
- }
- }
- }
- }
2、Mapper:CooccurrencesMapper類,對同一個userid_index下的vector<itemid_index ,pref>進行處理,
收集<item1, item2>對, 輸出為( itemid_index, vector<itemid_index, value> )
Java代碼
- public static class CooccurrencesMapper extends Mapper<IntWritable,VectorWritable,IntWritable,VectorWritable> {
- @Override
- protected void map(IntWritable column, VectorWritable occurrenceVector, Context ctx)
- throws IOException, InterruptedException {
- Vector.Element[] occurrences = Vectors.toArray(occurrenceVector);
- Arrays.sort(occurrences, BY_INDEX);
- int cooccurrences = 0;
- int prunedCooccurrences = 0;
- for (int n = 0; n < occurrences.length; n++) {
- Vector.Element occurrenceA = occurrences[n];
- Vector dots = new RandomAccessSparseVector(Integer.MAX_VALUE);
- for (int m = n; m < occurrences.length; m++) {
- Vector.Element occurrenceB = occurrences[m];
- if (threshold == NO_THRESHOLD || consider(occurrenceA, occurrenceB)) {
- dots.setQuick(occurrenceB.index(), similarity.aggregate(occurrenceA.get(), occurrenceB.get()));
- cooccurrences++;
- } else {
- prunedCooccurrences++;
- }
- }
- ctx.write(new IntWritable(occurrenceA.index()), new VectorWritable(dots));
- }
- ctx.getCounter(Counters.COOCCURRENCES).increment(cooccurrences);
- ctx.getCounter(Counters.PRUNED_COOCCURRENCES).increment(prunedCooccurrences);
- }
- }
Reducer :SimilarityReducer類,生成協同矩陣
Java代碼
- public static class SimilarityReducer
- extends Reducer<IntWritable,VectorWritable,IntWritable,VectorWritable> {
- @Override
- protected void reduce(IntWritable row, Iterable<VectorWritable> partialDots, Context ctx)
- throws IOException, InterruptedException {
- Iterator<VectorWritable> partialDotsIterator = partialDots.iterator();
- //取一個vecotr作為該item的行向量
- Vector dots = partialDotsIterator.next().get();
- while (partialDotsIterator.hasNext()) {
- Vector toAdd = partialDotsIterator.next().get();
- Iterator<Vector.Element> nonZeroElements = toAdd.iterateNonZero();
- while (nonZeroElements.hasNext()) {
- Vector.Element nonZeroElement = nonZeroElements.next();
- //nonZeroElement.index()為itemid,将另一個vecotr中itemid的value加進去
- dots.setQuick(nonZeroElement.index(), dots.getQuick(nonZeroElement.index()) + nonZeroElement.get());
- }
- }
- //最後得到的dots是協同矩陣中行号為row的一行,行中元素是item對其他的item的相似度
- Vector similarities = dots.like();
- double normA = norms.getQuick(row.get());
- Iterator<Vector.Element> dotsWith = dots.iterateNonZero();
- while (dotsWith.hasNext()) {
- Vector.Element b = dotsWith.next();
- double similarityValue = similarity.similarity(b.get(), normA, norms.getQuick(b.index()), numberOfColumns);
- if (similarityValue >= treshold) {
- similarities.set(b.index(), similarityValue);
- }
- }
- if (excludeSelfSimilarity) {
- similarities.setQuick(row.get(), 0);
- }
- ctx.write(row, new VectorWritable(similarities));
- }
- }
3、Mapper:UnsymmetrifyMapper類,用來生成對稱矩陣的。上一步得到的是非對稱矩陣,首先将矩陣偏轉,得到偏轉矩陣,用原矩陣加上偏轉矩陣,可以得到對稱矩陣
Java代碼
- public static class UnsymmetrifyMapper extends Mapper<IntWritable,VectorWritable,IntWritable,VectorWritable> {
- private int maxSimilaritiesPerRow;
- @Override
- protected void setup(Mapper.Context ctx) throws IOException, InterruptedException {
- maxSimilaritiesPerRow = ctx.getConfiguration().getInt(MAX_SIMILARITIES_PER_ROW, 0);
- Preconditions.checkArgument(maxSimilaritiesPerRow > 0, "Incorrect maximum number of similarities per row!");
- }
- @Override
- protected void map(IntWritable row, VectorWritable similaritiesWritable, Context ctx)
- throws IOException, InterruptedException {
- Vector similarities = similaritiesWritable.get();
- // For performance reasons moved transposedPartial creation out of the while loop and reusing the same vector
- Vector transposedPartial = similarities.like();
- TopK<Vector.Element> topKQueue = new TopK<Vector.Element>(maxSimilaritiesPerRow, Vectors.BY_VALUE);
- Iterator<Vector.Element> nonZeroElements = similarities.iterateNonZero();
- //這個地方用來生成偏轉矩陣的,非對稱矩陣,用原矩陣加上偏轉矩陣,可以得到對稱矩陣
- while (nonZeroElements.hasNext()) {
- Vector.Element nonZeroElement = nonZeroElements.next();
- topKQueue.offer(new Vectors.TemporaryElement(nonZeroElement));
- transposedPartial.setQuick(row.get(), nonZeroElement.get());
- //偏轉矩陣中的每一個元素
- ctx.write(new IntWritable(nonZeroElement.index()), new VectorWritable(transposedPartial));
- transposedPartial.setQuick(row.get(), 0.0);
- }
- Vector topKSimilarities = similarities.like();
- for (Vector.Element topKSimilarity : topKQueue.retrieve()) {
- topKSimilarities.setQuick(topKSimilarity.index(), topKSimilarity.get());
- }
- //這裡隻收集前maxSimilaritiesPerRow個得分最高的item,是以咱們最後的對稱矩陣,實際上每行隻有
- //maxSimilaritiesPerRow個是對稱的,其他的位置也不管了
- ctx.write(row, new VectorWritable(topKSimilarities));
- }
- }
Reducer:MergeToTopKSimilaritiesReducer類,就是将上面Map偏轉的元素都收集起來,也就是完成了偏轉矩陣和(截取了得分前maxSimilaritiesPerRow個)的原矩陣相加的過程,得到了對稱矩陣
Java代碼
- public static class MergeToTopKSimilaritiesReducer
- extends Reducer<IntWritable,VectorWritable,IntWritable,VectorWritable> {
- private int maxSimilaritiesPerRow;
- @Override
- protected void setup(Context ctx) throws IOException, InterruptedException {
- maxSimilaritiesPerRow = ctx.getConfiguration().getInt(MAX_SIMILARITIES_PER_ROW, 0);
- Preconditions.checkArgument(maxSimilaritiesPerRow > 0, "Incorrect maximum number of similarities per row!");
- }
- @Override
- protected void reduce(IntWritable row, Iterable<VectorWritable> partials, Context ctx)
- throws IOException, InterruptedException {
- Vector allSimilarities = Vectors.merge(partials);
- Vector topKSimilarities = Vectors.topKElements(maxSimilaritiesPerRow, allSimilarities);
- ctx.write(row, new VectorWritable(topKSimilarities));
- }
- }
至此,RowSimilarityJob類的全部工作就完成,最終生成的是一個對稱矩陣,也就是協同矩陣
Java代碼
- //協同矩陣與使用者向量相乘
- //start the multiplication of the co-occurrence matrix by the user vectors
- if (shouldRunNextPhase(parsedArgs, currentPhase)) {
- //第一個MapReducer
- Job prePartialMultiply1 = prepareJob(
- similarityMatrixPath, prePartialMultiplyPath1, SequenceFileInputFormat.class,
- SimilarityMatrixRowWrapperMapper.class, VarIntWritable.class, VectorOrPrefWritable.class,
- Reducer.class, VarIntWritable.class, VectorOrPrefWritable.class,
- SequenceFileOutputFormat.class);
- boolean succeeded = prePartialMultiply1.waitForCompletion(true);
- if (!succeeded)
- return -1;
- //第二個MapReduce
- //continue the multiplication
- Job prePartialMultiply2 = prepareJob(new Path(prepPath, PreparePreferenceMatrixJob.USER_VECTORS),
- prePartialMultiplyPath2, SequenceFileInputFormat.class, UserVectorSplitterMapper.class, VarIntWritable.class,
- VectorOrPrefWritable.class, Reducer.class, VarIntWritable.class, VectorOrPrefWritable.class,
- SequenceFileOutputFormat.class);
- if (usersFile != null) {
- prePartialMultiply2.getConfiguration().set(UserVectorSplitterMapper.USERS_FILE, usersFile);
- }
- prePartialMultiply2.getConfiguration().setInt(UserVectorSplitterMapper.MAX_PREFS_PER_USER_CONSIDERED,
- maxPrefsPerUser);
- succeeded = prePartialMultiply2.waitForCompletion(true);
- if (!succeeded)
- return -1;
- //finish the job
- //第三個MapReduce
- Job partialMultiply = prepareJob(
- new Path(prePartialMultiplyPath1 + "," + prePartialMultiplyPath2), partialMultiplyPath,
- SequenceFileInputFormat.class, Mapper.class, VarIntWritable.class, VectorOrPrefWritable.class,
- ToVectorAndPrefReducer.class, VarIntWritable.class, VectorAndPrefsWritable.class,
- SequenceFileOutputFormat.class);
- setS3SafeCombinedInputPath(partialMultiply, getTempPath(), prePartialMultiplyPath1, prePartialMultiplyPath2);
- succeeded = partialMultiply.waitForCompletion(true);
- if (!succeeded)
- return -1;
- }
下邊也是同樣分析一下這個三個MapReduce的細節:
1、Mapper: SimilarityMatrixRowWrapperMapper 類,将協同矩陣的一行拿出來,通過包裝,封裝成VectorOrPrefWritable類,與那邊的UserVectorSplitterMapper 的輸出類型一緻
Java代碼
- public final class SimilarityMatrixRowWrapperMapper extends
- Mapper<IntWritable,VectorWritable,VarIntWritable,VectorOrPrefWritable> {
- //将協同矩陣的一行拿出來,通過包裝,封裝成VectorOrPrefWritable類,與那邊的UserVectorSplitterMapper
- //的輸出類型一緻
- @Override
- protected void map(IntWritable key,
- VectorWritable value,
- Context context) throws IOException, InterruptedException {
- Vector similarityMatrixRow = value.get();
- similarityMatrixRow.set(key.get(), Double.NaN);
- context.write(new VarIntWritable(key.get()), new VectorOrPrefWritable(similarityMatrixRow));
- }
- }
2、Mapper:UserVectorSplitterMapper類
Java代碼
- //輸入格式: theUserID:<itemid_index1,pref1>,<itemid_index2,pref2>........<itemid_indexN,prefN>
- //輸出格式: itemid1:<theUserID,pref1>
- // itemid2:<theUserID,pref2>
- // itemid3:<theUserID,pref3>
- // ......
- // itemidN:<theUserID,prefN>
Java代碼
- public final class UserVectorSplitterMapper extends
- Mapper<VarLongWritable,VectorWritable, VarIntWritable,VectorOrPrefWritable> {
- @Override
- protected void map(VarLongWritable key,
- VectorWritable value,
- Context context) throws IOException, InterruptedException {
- long userID = key.get();
- if (usersToRecommendFor != null && !usersToRecommendFor.contains(userID)) {
- return;
- }
- Vector userVector = maybePruneUserVector(value.get());
- Iterator<Vector.Element> it = userVector.iterateNonZero();
- VarIntWritable itemIndexWritable = new VarIntWritable();
- VectorOrPrefWritable vectorOrPref = new VectorOrPrefWritable();
- while (it.hasNext()) {
- Vector.Element e = it.next();
- itemIndexWritable.set(e.index());
- vectorOrPref.set(userID, (float) e.get());
- context.write(itemIndexWritable, vectorOrPref);
- }
- }
3、Reduce:ToVectorAndPrefReducer類,收集協同矩陣為itemid的一行,并且收集評價過該item的使用者和評分,最後的輸出是 itemid_index,VectorAndPrefsWritable(vector,List<userid>,List<pref>)
Java代碼
- public final class ToVectorAndPrefReducer extends
- Reducer<VarIntWritable,VectorOrPrefWritable,VarIntWritable,VectorAndPrefsWritable> {
- //收集所有key為itemid的
- @Override
- protected void reduce(VarIntWritable key,
- Iterable<VectorOrPrefWritable> values,
- Context context) throws IOException, InterruptedException {
- List<Long> userIDs = Lists.newArrayList();
- List<Float> prefValues = Lists.newArrayList();
- Vector similarityMatrixColumn = null;
- for (VectorOrPrefWritable value : values) {
- if (value.getVector() == null) {
- // Then this is a user-pref value
- userIDs.add(value.getUserID());
- prefValues.add(value.getValue());
- } else {
- // Then this is the column vector
- //協同矩陣的一個行(行号為itemid的一行)
- if (similarityMatrixColumn != null) {
- throw new IllegalStateException("Found two similarity-matrix columns for item index " + key.get());
- }
- similarityMatrixColumn = value.getVector();
- }
- }
- if (similarityMatrixColumn == null) {
- return;
- }
- //收集協同矩陣為itemid的一行,并且手機評價過該item的使用者和評分
- VectorAndPrefsWritable vectorAndPrefs = new VectorAndPrefsWritable(similarityMatrixColumn, userIDs, prefValues);
- context.write(key, vectorAndPrefs);
- }
- }
第四步,協同矩陣和使用者向量相乘,得到推薦結果
Java代碼
- //extract out the recommendations
- Job aggregateAndRecommend = prepareJob(
- new Path(aggregateAndRecommendInput), outputPath, SequenceFileInputFormat.class,
- PartialMultiplyMapper.class, VarLongWritable.class, PrefAndSimilarityColumnWritable.class,
- AggregateAndRecommendReducer.class, VarLongWritable.class, RecommendedItemsWritable.class,
- TextOutputFormat.class);
- Configuration aggregateAndRecommendConf = aggregateAndRecommend.getConfiguration();
Mapper:PartialMultiplyMapper類
Java代碼
- //輸入類型:( itemid_index, <userid的數組,pref的數組,協同矩陣行号為itemid_index的行> )
- //輸出類型: userid,<該使用者對itemid_index1的評分,協同矩陣行号為itemid_index1的行> )
- // userid,<該使用者對itemid_index2的評分,協同矩陣行号為itemid_index2的行> )
- // .....
- // .....
- // userid,<該使用者對itemid_indexN的評分,協同矩陣行号為itemid_indexN的行> )
Java代碼
- public final class PartialMultiplyMapper extends
- Mapper<VarIntWritable,VectorAndPrefsWritable,VarLongWritable,PrefAndSimilarityColumnWritable> {
- @Override
- protected void map(VarIntWritable key,
- VectorAndPrefsWritable vectorAndPrefsWritable,
- Context context) throws IOException, InterruptedException {
- Vector similarityMatrixColumn = vectorAndPrefsWritable.getVector();
- List<Long> userIDs = vectorAndPrefsWritable.getUserIDs();
- List<Float> prefValues = vectorAndPrefsWritable.getValues();
- VarLongWritable userIDWritable = new VarLongWritable();
- PrefAndSimilarityColumnWritable prefAndSimilarityColumn = new PrefAndSimilarityColumnWritable();
- for (int i = 0; i < userIDs.size(); i++) {
- long userID = userIDs.get(i);
- float prefValue = prefValues.get(i);
- if (!Float.isNaN(prefValue)) {
- prefAndSimilarityColumn.set(prefValue, similarityMatrixColumn);
- userIDWritable.set(userID);
- context.write(userIDWritable, prefAndSimilarityColumn);
- }
- }
- }
- }
Reducer:AggregateAndRecommendReducer類,Reducer中進行PartialMultiply,按乘積得到的推薦度的大小取出最大的幾個item。對于非booleanData,是用pref和相似度矩陣的PartialMultiply得到推薦度的值來進行排序。
而booleanData的pref值都是1.0f,是以去計算矩陣相乘的過程沒有意義,直接累加相似度的值即可。
用這個資料排序就可得到推薦結果
Java代碼
- public final class AggregateAndRecommendReducer extends
- Reducer<VarLongWritable,PrefAndSimilarityColumnWritable,VarLongWritable,RecommendedItemsWritable> {
- @Override
- protected void reduce(VarLongWritable userID,
- Iterable<PrefAndSimilarityColumnWritable> values,
- Context context) throws IOException, InterruptedException {
- if (booleanData) {
- reduceBooleanData(userID, values, context);
- } else {
- reduceNonBooleanData(userID, values, context);
- }
- }
- private void reduceBooleanData(VarLongWritable userID,
- Iterable<PrefAndSimilarityColumnWritable> values,
- Context context) throws IOException, InterruptedException {
- Vector predictionVector = null;
- for (PrefAndSimilarityColumnWritable prefAndSimilarityColumn : values) {
- predictionVector = predictionVector == null
- ? prefAndSimilarityColumn.getSimilarityColumn()
- : predictionVector.plus(prefAndSimilarityColumn.getSimilarityColumn());
- }
- writeRecommendedItems(userID, predictionVector, context);
- }
- private void reduceNonBooleanData(VarLongWritable userID,
- Iterable<PrefAndSimilarityColumnWritable> values,
- Context context) throws IOException, InterruptedException {
- Vector numerators = null;
- Vector denominators = null;
- Vector numberOfSimilarItemsUsed = new RandomAccessSparseVector(Integer.MAX_VALUE, 100);
- for (PrefAndSimilarityColumnWritable prefAndSimilarityColumn : values) {
- Vector simColumn = prefAndSimilarityColumn.getSimilarityColumn();
- float prefValue = prefAndSimilarityColumn.getPrefValue();
- Iterator<Vector.Element> usedItemsIterator = simColumn.iterateNonZero();
- while (usedItemsIterator.hasNext()) {
- int itemIDIndex = usedItemsIterator.next().index();
- numberOfSimilarItemsUsed.setQuick(itemIDIndex, numberOfSimilarItemsUsed.getQuick(itemIDIndex) + 1);
- }
- //vector.times(float) 是向量乘于一個數,也就是向量的每一個值都乘以這個數
- //vector.plus(vector) 是兩個向量相加,每一個位置上的值相加
- //numerators是一個vecotr,每一個元素是這樣的
- // 注:其中simility(item1, item2)代表物品item1和物品item2的相似度 ,pref(userid, item)代表用于userid對item打分分值
- numerators = numerators == null
- ? prefValue == BOOLEAN_PREF_VALUE ? simColumn.clone() : simColumn.times(prefValue)
- : numerators.plus(prefValue == BOOLEAN_PREF_VALUE ? simColumn : simColumn.times(prefValue));
- simColumn.assign(ABSOLUTE_VALUES);
- //denominators是一個vecotr,每一個元素是這樣的
- // 注:其中simility(item1, item2)代表物品item1和物品item2的相似度
- denominators = denominators == null ? simColumn : denominators.plus(simColumn);
- }
- if (numerators == null) {
- return;
- }
- Vector recommendationVector = new RandomAccessSparseVector(Integer.MAX_VALUE, 100);
- Iterator<Vector.Element> iterator = numerators.iterateNonZero();
- while (iterator.hasNext()) {
- Vector.Element element = iterator.next();
- int itemIDIndex = element.index();
- if (numberOfSimilarItemsUsed.getQuick(itemIDIndex) > 1) {
- //計算歸一化預測值
- double prediction = element.get() / denominators.getQuick(itemIDIndex);
- recommendationVector.setQuick(itemIDIndex, prediction);
- }
- }
- writeRecommendedItems(userID, recommendationVector, context);
- }
- }