天天看点

基于Spark MlLib的协同过滤推荐模型1. 介绍2. 开始干货3. 拓展

目录

1. 介绍

2. 开始干货

a. 准备数据

b. 数据标准化

c. 模型训练

d. 模型评估

e. 推荐输出

3. 拓展

1. 介绍

官方文档说明:http://spark.apache.org/docs/latest/ml-collaborative-filtering.html

本文章源代码工程:https://github.com/johncai0/ALSRecommender

基础需要了解线性代数中矩阵的乘法。

交替最小二乘法用户拟合或者梯度下降,其原理是固定一个值求另一个值,再固定求出来的值算另一个值,如此交替拟合,故名交替最小二乘法。

所有user所对应的所有item的集和用矩阵表示为(示例)

item1 item2 item3
user1 1 2
user2 3 2
user3 3 1 3

如果将这个矩阵表示为A,那么A可以拆分成X(userCF)和Y(itemCF)的乘积,且X和Y非常稠密(此处被称为 矩阵分解,此处需要梯度下降,用的算法是交替最小二乘法)

基于Spark MlLib的协同过滤推荐模型1. 介绍2. 开始干货3. 拓展

由上图可见,矩阵A非常稀疏,而X和Y是稠密的,如果再将X,Y两个稠密矩阵求积回得到一个近似A的稠密矩阵(此处称之为矩阵补全)

例如:

基于Spark MlLib的协同过滤推荐模型1. 介绍2. 开始干货3. 拓展

上图的两个稠密矩阵的乘机得到下边的补全矩阵(的到了积分)

基于Spark MlLib的协同过滤推荐模型1. 介绍2. 开始干货3. 拓展

选择分数最高音乐的推荐给用户即可(红色的格子)

如果有描述不正确请留言,希望这篇文字能帮到其他人。

2. 开始干货

a. 准备数据

https://github.com/johncai0/ALSRecommender/tree/master/data

将文件下载后,存储到hdfs://john:9000/user/ALSRecommender_TrainingData/目录下或者其他目录(如果用本文中的路径,接下来的代码复制即可)

spark 读取数据

val ss=SparkSession.builder().getOrCreate()
    val base = "hdfs://john:9000/user/ALSRecommender_TrainingData/*"

    val fieldSchema = StructType(Array(
      StructField("user", StringType, true),
      StructField("itemrank", IntegerType, true),
      StructField("item", StringType, true)
    ))
    val df=ss.read.schema(fieldSchema).csv(base)
           

b. 数据标准化

由于读入的数据user列和item列是字符串类型,因此需要重新定义id成整形,详细定义方式参考:

https://blog.csdn.net/cwg_1992/article/details/95813455

1. 为用户生成int类型的id

2. 为item生成int类型的id

3. 分别将userid和itemid列Join到df上,生成alldata的dataFarem,就可以直接给ASL模型用了。 

4. 切分训练数据集和验证数据集,分别是90%和10%

val userandid=df.select("user").rdd.map(_.getString(0)).distinct().zipWithIndex().map(l=>(l._1,l._2.toInt)).toDF("user","userid")
  val itemandid=df.select("item").rdd.map(_.getString(0)).distinct().zipWithIndex().map(l=>(l._1,l._2.toInt)).toDF("item","itemid")
  val alldata=df.join(userandid,"user").join(itemandid,"item")
//分割训练数据集和测试数据集,并缓存到内存
val Array(trainData, cvData) = alldata.randomSplit(Array(0.9, 0.1))
    trainData.cache()
    cvData.cache()
           

c. 模型训练

此处需要设置输入的训练数据的用户列setUserCol为userid

item列为itemid列

评分列为itemrank列

需要一个输出的预测列prediction列

其他几个超参数解释如下:

numBlocks是用户和项目将被分区为多个块的数量,以便并行化计算(默认为10)。
rank是模型中潜在因子的数量(默认为10)。
maxIter是要运行的最大迭代次数(默认为10)。
regParam指定ALS中的正则化参数(默认为1.0)。
implicitPrefs指定是使用显式反馈 ALS变体还是使用适用于隐式反馈数据的变体 (默认为false使用显式反馈)。
alpha是适用于ALS的隐式反馈变量的参数,其控制偏好观察中的 基线置信度(默认为1.0)。
nonnegative指定是否对最小二乘使用非负约束(默认为false)。
           

 超参数详细见http://spark.apache.org/docs/latest/ml-collaborative-filtering.html

val model = new ALS().
            setSeed(Random.nextLong()).
            setImplicitPrefs(true).
            setRank(10).setRegParam(0.01).
            setAlpha(1.0).setMaxIter(20).
            setUserCol("userid").setItemCol("itemid").
            setRatingCol("itemrank").setPredictionCol("prediction").
            fit(trainData)
           

d. 模型评估

由于本数据集中除了用户对应的item及对应的评分,没有别的信息给于参考推荐模型的好于坏。此外,由于千人千面问题,或许只有本人才能知道这个推荐模型的准确性。

但是在大量的数据集下,我们可以认为所有item中,青睐用户更多的item认为是较好的(值得推荐的),那么我们可以为每个用户生成ROC曲线来衡量这个推荐模型好坏,曲线的下面积为这个用户的AUC(Area Under the Curve),如果要评估一个模型的好坏,必须计算所有用户的平均AUC。

以下是计算平均AUC的方法

def areaUnderCurve(positiveData: DataFrame, bAllItemIDs: Broadcast[Array[Int]], predictFunction: (DataFrame => DataFrame)): Double = {
    val positivePredictions = predictFunction(positiveData.select("userid", "itemid")).
      withColumnRenamed("prediction", "positivePrediction")

    val negativeData = positiveData.select("userid", "itemid").as[(Int,Int)].
      groupByKey { case (user, _) => user }.
      flatMapGroups { case (userID, userIDAndPosArtistIDs) =>
        val random = new Random()
        val posItemIDSet = userIDAndPosArtistIDs.map { case (_, item) => item }.toSet
        val negative = new ArrayBuffer[Int]()
        val allItemIDs = bAllItemIDs.value
        var i = 0

        while (i < allItemIDs.length && negative.size < posItemIDSet.size) {
          val itemID = allItemIDs(random.nextInt(allItemIDs.length))
          // Only add new distinct IDs
          if (!posItemIDSet.contains(itemID)) {
            negative += itemID
          }
          i += 1
        }
        // Return the set with user ID added back
        negative.map(itemID => (userID, itemID))
      }.toDF("userid", "itemid")

    // Make predictions on the rest:
    val negativePredictions = predictFunction(negativeData).
      withColumnRenamed("prediction", "negativePrediction")

    val joinedPredictions = positivePredictions.join(negativePredictions, "userid").
      select("userid", "positivePrediction", "negativePrediction").cache()

    // Count the number of pairs per user
    val allCounts = joinedPredictions.
      groupBy("userid").agg(count(lit("1")).as("total")).
      select("userid", "total")
    // Count the number of correctly ordered pairs per user
    val correctCounts = joinedPredictions.
      filter($"positivePrediction" > $"negativePrediction").
      groupBy("userid").agg(count("userid").as("correct")).
      select("userid", "correct")

    // Combine these, compute their ratio, and average over all users
    val meanAUC = allCounts.join(correctCounts, Seq("userid"), "left_outer").
      select($"userid", (coalesce($"correct", lit(0)) / $"total").as("auc")).
      agg(mean("auc")).
      as[Double].first()

    joinedPredictions.unpersist()

    meanAUC
  }

  def predictMostListened(train: DataFrame)(allData: DataFrame): DataFrame = {
    val listenCounts = train.groupBy("itemid").
      agg(sum("itemrank").as("prediction")).
      select("itemid", "prediction")
    allData.
      join(listenCounts, Seq("itemid"), "left_outer").
      select("userid", "itemid", "prediction")
  }
           

如果我们能够检测模型的好坏,结合多组超参数是否可以动态选择超参数而得到更好的模型?答案是肯定的。

var gauc:Double=0.00 //存放最优的AUC值
var rank:Int=0    //存放最优AUC值对应的rank
var regParam:Double=0.00 //同上
var alpha:Double=0.00 //同上
var goodModel:ALSModel=null //存放最优的ALS模型
val evaluations =
      for (rank     <- Seq(10,  11);
           regParam <- Seq(3.0, 2.0);
           alpha    <- Seq(8.0, 1.0))
        yield {
          val model = new ALS().
            setSeed(Random.nextLong()).
            setImplicitPrefs(true).
            setRank(rank).setRegParam(regParam).
            setAlpha(alpha).setMaxIter(20).
            setUserCol("userid").setItemCol("itemid").
            setRatingCol("itemrank").setPredictionCol("prediction").
            fit(trainData)

          val auc = areaUnderCurve(cvData, bAllItemIDs, model.transform) //每次训练完模型算出这个模型的AUC

          if (auc > gauc) { //如果本次的auc大于上次的auc,则将本次的模型保存下来
            goodModel=model
          } else {
            model.userFactors.unpersist()
            model.itemFactors.unpersist()
          }
          (auc, (rank, regParam, alpha))
        }
           

e. 推荐输出

def makeRecommendations(user: String, howMany: Int): DataFrame = {
    val userID = this.userandid.where(s"user = '" + user + "'").select("userid").rdd.collect()(0).getInt(0) 
    val toRecommend = goodModel.itemFactors.
      select($"id".as("itemid")).
      withColumn("userid", lit(userID)) 
    if (goodModel == null && goodModel ==None) println("goodModel is Null.........")
    val reced=goodModel.transform(toRecommend).
      select("itemid", "prediction").
      orderBy($"prediction".desc).
      limit(howMany)
    reced.join(itemandid,"itemid").drop("itemid").sort("prediction")
  }
           

官方文档说明:http://spark.apache.org/docs/latest/ml-collaborative-filtering.html

本文章源代码工程:https://github.com/johncai0/ALSRecommender

3. 拓展

推荐和回归其实是一回事,都是在根据已发生的因子来推测接下来的动作或者结果。

PS: 如果有误可以留言沟通,最后在自我积累的同时希望能帮到大家。

继续阅读