天天看點

基于Spark MLlib平台的協同過濾算法---電影推薦系統

又好一陣子沒有寫文章了,阿彌陀佛...最近項目中要做理财推薦,是以,回過頭來回顧一下協同過濾算法在推薦系統中的應用。

    說到推薦系統,大家可能立馬會想到協同過濾算法。本文基于Spark MLlib平台實作一個向使用者推薦電影的簡單應用。其中,主要包括三部分内容:

  • 協同過濾算法概述
  • 基于模型的協同過濾應用---電影推薦
  • 實時推薦架構分析

    一、協同過濾算法概述

        本人對算法的研究,目前還不是很深入,這裡簡單的介紹下其工作原理。

        通常,協同過濾算法按照資料使用,可以分為:

        1)基于使用者(UserCF)

       2)基于商品(ItemCF)

       3)基于模型(ModelCF)

        按照模型,可以分為:

        1)最近鄰模型:基于距離的協同過濾算法

       2)Latent Factor Mode(SVD):基于矩陣分解的模型

       3)Graph:圖模型,社會網絡圖模型

        文中,使用的協同過濾算法是基于矩陣分解的模型。

      1、基于使用者(UserCF)---基于使用者相似性

        基于使用者的協同過濾,通過不同使用者對物品的評分來評測使用者之間的相似性,基于使用者之間的相似性做出推薦。簡單來講,就是給使用者推薦和他興趣相似的其他使用者喜歡的物品。

        舉個例子:

基于Spark MLlib平台的協同過濾算法---電影推薦系統

        如圖,有三個使用者A、B、C,四個物品A、B、C、D,需要向使用者A推薦物品。這裡,由于使用者A和使用者C都買過物品A和物品C,是以,我們認為使用者A和使用者C非常相似,同時,使用者C又買過物品D,那麼就需要給A使用者推薦物品D。

        基于UserCF的基本思想相當簡單,基于使用者對物品的偏好,找到相鄰鄰居使用者,然後将鄰居使用者喜歡的商品推薦給目前使用者。

        計算上,将一個使用者對所有物品的偏好作為一個向量來計算使用者之間的相似度,找到K鄰居後,根據鄰居的相似度權重以及他們對物品的偏好,預測目前使用者沒有偏好的未涉及物品,計算得到一個排序的物品清單作為推薦。

        2、基于商品(ItemCF)---基于商品相似性

      基于商品的協同過濾,通過使用者對不同item的評分來評測item之間的相似性,基于item之間的相似性做出推薦。簡單來将,就是給使用者推薦和他之前喜歡的物品相似的物品。

       例如:

基于Spark MLlib平台的協同過濾算法---電影推薦系統

       如圖,有三個使用者A、B、C和三件物品A、B、C,需要向使用者C推薦物品。這裡,由于使用者A買過物品A和C,使用者B買過物品A、B、C,使用者C買過物品A,從使用者A和B可以看出,這兩個使用者都買過物品A和C,說明物品A和C非常相似,同時,使用者C又買過物品A,是以,将物品C推薦給使用者C。

       基于ItemCF的原理和基于UserCF類似,隻是在計算鄰居時采用物品本身,而不是從使用者的角度,即基于使用者對物品的偏好找到相似的物品,然後根據使用者的曆史偏好,推薦相似的物品給他。

       從計算角度,即将所有使用者對某個物品的偏好作為一個向量來計算物品之間的相似度,得到物品的相似物品後,根據使用者曆史的偏好預測目前使用者還沒有表示偏好的物品,計算得到一個排序的物品清單作為推薦。

        3、基于模型(ModelCF)

        基于模型的協同過濾推薦就是基于樣本的使用者喜好資訊,訓練一個推薦模型,然後根據實時的使用者喜好的資訊進行預測,計算推薦。

                本文使用的基于矩陣分解的模型,算法如圖:

基于Spark MLlib平台的協同過濾算法---電影推薦系統

         Spark MLlib目前支援基于模型的協同過濾,其中使用者和商品通過一小組隐性因子進行表達,并且這些因子也用于預測缺失的元素。MLlib使用交替最小二乘法(ALS)來學習這些隐性因子。

         如果有興趣,可以閱讀Spark的這部分源代碼:

基于Spark MLlib平台的協同過濾算法---電影推薦系統

    二、基于模型的協同過濾應用---電影推薦

         本文實作對使用者推薦電影的簡單應用。

        1、測試資料描述

           本次測試資料主要包括四個資料檔案:(詳細的資料描述參見README檔案)

           1)使用者資料檔案

              使用者ID::性别::年齡::職業編号::郵編

基于Spark MLlib平台的協同過濾算法---電影推薦系統

          2)電影資料檔案

             電影ID::電影名稱::電影種類

基于Spark MLlib平台的協同過濾算法---電影推薦系統

         3)評分資料檔案

            使用者ID::電影ID::評分::時間

基于Spark MLlib平台的協同過濾算法---電影推薦系統

        4)測試資料

           使用者ID::電影ID::評分::時間

基于Spark MLlib平台的協同過濾算法---電影推薦系統

        這裡,前三個資料檔案用于模型訓練,第四個資料檔案用于測試模型。

        2、實作代碼:

import org.apache.log4j.{Level, Logger}

import org.apache.spark.mllib.recommendation.{ALS, MatrixFactorizationModel, Rating}

import org.apache.spark.rdd._

import org.apache.spark.{SparkContext, SparkConf}

import org.apache.spark.SparkContext._

import scala.io.Source

object MovieLensALS {

  def main(args:Array[String]) {

    //屏蔽不必要的日志顯示在終端上

    Logger.getLogger("org.apache.spark").setLevel(Level.WARN)

    Logger.getLogger("org.apache.eclipse.jetty.server").setLevel(Level.OFF)

    //設定運作環境

    val sparkConf = new SparkConf().setAppName("MovieLensALS").setMaster("local[5]")

    val sc = new SparkContext(sparkConf)

    //裝載使用者評分,該評分由評分器生成(即生成檔案personalRatings.txt)

    val myRatings = loadRatings(args(1))

    val myRatingsRDD = sc.parallelize(myRatings, 1)

    //樣本資料目錄

    val movielensHomeDir = args(0)

    //裝載樣本評分資料,其中最後一列Timestamp取除10的餘數作為key,Rating為值,即(Int,Rating)

    val ratings = sc.textFile(movielensHomeDir + "/ratings.dat").map {

      line =>

        val fields = line.split("::")

        // format: (timestamp % 10, Rating(userId, movieId, rating))

        (fields(3).toLong % 10, Rating(fields(0).toInt, fields(1).toInt, fields(2).toDouble))

    }

    //裝載電影目錄對照表(電影ID->電影标題)

    val movies = sc.textFile(movielensHomeDir + "/movies.dat").map {

      line =>

        val fields = line.split("::")

        // format: (movieId, movieName)

        (fields(0).toInt, fields(1))

    }.collect().toMap

    //統計有使用者數量和電影數量以及使用者對電影的評分數目

    val numRatings = ratings.count()

    val numUsers = ratings.map(_._2.user).distinct().count()

    val numMovies = ratings.map(_._2.product).distinct().count()

    println("Got " + numRatings + " ratings from " + numUsers + " users " + numMovies + " movies")

    //将樣本評分表以key值切分成3個部分,分别用于訓練 (60%,并加入使用者評分), 校驗 (20%), and 測試 (20%)

    //該資料在計算過程中要多次應用到,是以cache到記憶體

    val numPartitions = 4

    val training = ratings.filter(x => x._1 < 6).values.union(myRatingsRDD).repartition(numPartitions).persist()

    val validation = ratings.filter(x => x._1 >= 6 && x._1 < 8).values.repartition(numPartitions).persist()

    val test = ratings.filter(x => x._1 >= 8).values.persist()

    val numTraining = training.count()

    val numValidation = validation.count()

    val numTest = test.count()

    println("Training: " + numTraining + " validation: " + numValidation + " test: " + numTest)

    //訓練不同參數下的模型,并在校驗集中驗證,擷取最佳參數下的模型

    val ranks = List(8, 12)

    val lambdas = List(0.1, 10.0)

    val numIters = List(10, 20)

    var bestModel: Option[MatrixFactorizationModel] = None

    var bestValidationRmse = Double.MaxValue

    var bestRank = 0

    var bestLambda = -1.0

    var bestNumIter = -1

    for (rank <- ranks; lambda <- lambdas; numIter <- numIters) {

      val model = ALS.train(training, rank, numIter, lambda)

      val validationRmse = computeRmse(model, validation, numValidation)

      println("RMSE(validation) = " + validationRmse + " for the model trained with rank = "

        + rank + ",lambda = " + lambda + ",and numIter = " + numIter + ".")

      if (validationRmse < bestValidationRmse) {

        bestModel = Some(model)

        bestValidationRmse = validationRmse

        bestRank = rank

        bestLambda = lambda

        bestNumIter = numIter

      }

    }

    //用最佳模型預測測試集的評分,并計算和實際評分之間的均方根誤差(RMSE)

    val testRmse = computeRmse(bestModel.get, test, numTest)

    println("The best model was trained with rank = " + bestRank + " and lambda = " + bestLambda

      + ", and numIter = " + bestNumIter + ", and its RMSE on the test set is " + testRmse + ".")

    //create a naive baseline and compare it with the best model

    val meanRating = training.union(validation).map(_.rating).mean

    val baselineRmse = math.sqrt(test.map(x => (meanRating - x.rating) * (meanRating - x.rating)).reduce(_ + _) / numTest)

    val improvement = (baselineRmse - testRmse) / baselineRmse * 100

    println("The best model improves the baseline by " + "%1.2f".format(improvement) + "%.")

    //推薦前十部最感興趣的電影,注意要剔除使用者已經評分的電影

    val myRatedMovieIds = myRatings.map(_.product).toSet

    val candidates = sc.parallelize(movies.keys.filter(!myRatedMovieIds.contains(_)).toSeq)

    val recommendations = bestModel.get

      .predict(candidates.map((0, _)))

      .collect

      .sortBy(-_.rating)

      .take(10)

    var i = 1

    println("Movies recommended for you:")

    recommendations.foreach { r =>

      println("%2d".format(i) + ": " + movies(r.product))

      i += 1

    }

    sc.stop()

  }

  def computeRmse(model:MatrixFactorizationModel,data:RDD[Rating],n:Long):Double = {

    val predictions:RDD[Rating] = model.predict((data.map(x => (x.user,x.product))))

    val predictionsAndRatings = predictions.map{ x =>((x.user,x.product),x.rating)}

                          .join(data.map(x => ((x.user,x.product),x.rating))).values

    math.sqrt(predictionsAndRatings.map( x => (x._1 - x._2) * (x._1 - x._2)).reduce(_+_)/n)

  }

  def loadRatings(path:String):Seq[Rating] = {

    val lines = Source.fromFile(path).getLines()

    val ratings = lines.map{

      line =>

        val fields = line.split("::")

        Rating(fields(0).toInt,fields(1).toInt,fields(2).toDouble)

    }.filter(_.rating > 0.0)

    if(ratings.isEmpty){

      sys.error("No ratings provided.")

    }else{

      ratings.toSeq

    }

  }

}

        3、運作程式

        1)設定參數,運作程式(兩個參數:第一個資料檔案目錄,第二個測試資料)

基于Spark MLlib平台的協同過濾算法---電影推薦系統

         2)程式運作效果---模型訓練過程

基于Spark MLlib平台的協同過濾算法---電影推薦系統

        3)程式運作效果---電影推薦結果

基于Spark MLlib平台的協同過濾算法---電影推薦系統

        4、總結

          這樣,一個簡單的基于模型的電影推薦應用就算OK了。

    三、實時推薦架構分析

        上面,實作了簡單的推薦系統應用,但是,僅僅實作使用者的定向推薦,在實際應用中價值不是非常大,如果展現價值,最好能夠實作實時或者準實時推薦。

        下面,簡單介紹下實時推薦的一個架構:

基于Spark MLlib平台的協同過濾算法---電影推薦系統

        該架構圖取自淘寶Spark On Yarn的實時架構,這裡,給出一些個人的觀點:

        架構圖分為三層:離線、近線和線上。

            離線部分:主要實作模型的建立。原始資料通過ETL加工清洗,得到目标資料,目标業務資料結合合适的算法,學習訓練模型,得到最佳的模型。

            近線部分:主要使用HBase存儲使用者行為資訊,模型混合系統綜合顯性回報和隐性回報的模型處理結果,将最終的結果推薦給使用者。

            線上部分:這裡,主要有兩種回報,顯性和隐性,個人了解,顯性回報了解為使用者将商品加入購物車,使用者購買商品這些使用者行為;隐性回報了解為使用者在某個商品上停留的時間,使用者點選哪些商品這些使用者行為。這裡,為了實作實時/準實時操作,使用到了Spark Streaming對資料進行實時處理。(有可能是Flume+Kafka+Spark Streaming架構)

        這裡是個人的一些了解,不足之處,望各位指點。

本文出自 “一步.一步” 部落格,請務必保留此出處http://snglw.blog.51cto.com/5832405/1662153

繼續閱讀