天天看點

基于Spark MlLib的協同過濾推薦模型1. 介紹2. 開始幹貨3. 拓展

目錄

1. 介紹

2. 開始幹貨

a. 準備資料

b. 資料标準化

c. 模型訓練

d. 模型評估

e. 推薦輸出

3. 拓展

1. 介紹

官方文檔說明:http://spark.apache.org/docs/latest/ml-collaborative-filtering.html

本文章源代碼工程:https://github.com/johncai0/ALSRecommender

基礎需要了解線性代數中矩陣的乘法。

交替最小二乘法使用者拟合或者梯度下降,其原理是固定一個值求另一個值,再固定求出來的值算另一個值,如此交替拟合,故名交替最小二乘法。

所有user所對應的所有item的集和用矩陣表示為(示例)

item1 item2 item3
user1 1 2
user2 3 2
user3 3 1 3

如果将這個矩陣表示為A,那麼A可以拆分成X(userCF)和Y(itemCF)的乘積,且X和Y非常稠密(此處被稱為 矩陣分解,此處需要梯度下降,用的算法是交替最小二乘法)

基于Spark MlLib的協同過濾推薦模型1. 介紹2. 開始幹貨3. 拓展

由上圖可見,矩陣A非常稀疏,而X和Y是稠密的,如果再将X,Y兩個稠密矩陣求積回得到一個近似A的稠密矩陣(此處稱之為矩陣補全)

例如:

基于Spark MlLib的協同過濾推薦模型1. 介紹2. 開始幹貨3. 拓展

上圖的兩個稠密矩陣的乘機得到下邊的補全矩陣(的到了積分)

基于Spark MlLib的協同過濾推薦模型1. 介紹2. 開始幹貨3. 拓展

選擇分數最高音樂的推薦給使用者即可(紅色的格子)

如果有描述不正确請留言,希望這篇文字能幫到其他人。

2. 開始幹貨

a. 準備資料

https://github.com/johncai0/ALSRecommender/tree/master/data

将檔案下載下傳後,存儲到hdfs://john:9000/user/ALSRecommender_TrainingData/目錄下或者其他目錄(如果用本文中的路徑,接下來的代碼複制即可)

spark 讀取資料

val ss=SparkSession.builder().getOrCreate()
    val base = "hdfs://john:9000/user/ALSRecommender_TrainingData/*"

    val fieldSchema = StructType(Array(
      StructField("user", StringType, true),
      StructField("itemrank", IntegerType, true),
      StructField("item", StringType, true)
    ))
    val df=ss.read.schema(fieldSchema).csv(base)
           

b. 資料标準化

由于讀入的資料user列和item列是字元串類型,是以需要重新定義id成整形,詳細定義方式參考:

https://blog.csdn.net/cwg_1992/article/details/95813455

1. 為使用者生成int類型的id

2. 為item生成int類型的id

3. 分别将userid和itemid列Join到df上,生成alldata的dataFarem,就可以直接給ASL模型用了。 

4. 切分訓練資料集和驗證資料集,分别是90%和10%

val userandid=df.select("user").rdd.map(_.getString(0)).distinct().zipWithIndex().map(l=>(l._1,l._2.toInt)).toDF("user","userid")
  val itemandid=df.select("item").rdd.map(_.getString(0)).distinct().zipWithIndex().map(l=>(l._1,l._2.toInt)).toDF("item","itemid")
  val alldata=df.join(userandid,"user").join(itemandid,"item")
//分割訓練資料集和測試資料集,并緩存到記憶體
val Array(trainData, cvData) = alldata.randomSplit(Array(0.9, 0.1))
    trainData.cache()
    cvData.cache()
           

c. 模型訓練

此處需要設定輸入的訓練資料的使用者列setUserCol為userid

item列為itemid列

評分列為itemrank列

需要一個輸出的預測列prediction列

其他幾個超參數解釋如下:

numBlocks是使用者和項目将被分區為多個塊的數量,以便并行化計算(預設為10)。
rank是模型中潛在因子的數量(預設為10)。
maxIter是要運作的最大疊代次數(預設為10)。
regParam指定ALS中的正則化參數(預設為1.0)。
implicitPrefs指定是使用顯式回報 ALS變體還是使用适用于隐式回報資料的變體 (預設為false使用顯式回報)。
alpha是适用于ALS的隐式回報變量的參數,其控制偏好觀察中的 基線置信度(預設為1.0)。
nonnegative指定是否對最小二乘使用非負限制(預設為false)。
           

 超參數詳細見http://spark.apache.org/docs/latest/ml-collaborative-filtering.html

val model = new ALS().
            setSeed(Random.nextLong()).
            setImplicitPrefs(true).
            setRank(10).setRegParam(0.01).
            setAlpha(1.0).setMaxIter(20).
            setUserCol("userid").setItemCol("itemid").
            setRatingCol("itemrank").setPredictionCol("prediction").
            fit(trainData)
           

d. 模型評估

由于本資料集中除了使用者對應的item及對應的評分,沒有别的資訊給于參考推薦模型的好于壞。此外,由于千人千面問題,或許隻有本人才能知道這個推薦模型的準确性。

但是在大量的資料集下,我們可以認為所有item中,青睐使用者更多的item認為是較好的(值得推薦的),那麼我們可以為每個使用者生成ROC曲線來衡量這個推薦模型好壞,曲線的下面積為這個使用者的AUC(Area Under the Curve),如果要評估一個模型的好壞,必須計算所有使用者的平均AUC。

以下是計算平均AUC的方法

def areaUnderCurve(positiveData: DataFrame, bAllItemIDs: Broadcast[Array[Int]], predictFunction: (DataFrame => DataFrame)): Double = {
    val positivePredictions = predictFunction(positiveData.select("userid", "itemid")).
      withColumnRenamed("prediction", "positivePrediction")

    val negativeData = positiveData.select("userid", "itemid").as[(Int,Int)].
      groupByKey { case (user, _) => user }.
      flatMapGroups { case (userID, userIDAndPosArtistIDs) =>
        val random = new Random()
        val posItemIDSet = userIDAndPosArtistIDs.map { case (_, item) => item }.toSet
        val negative = new ArrayBuffer[Int]()
        val allItemIDs = bAllItemIDs.value
        var i = 0

        while (i < allItemIDs.length && negative.size < posItemIDSet.size) {
          val itemID = allItemIDs(random.nextInt(allItemIDs.length))
          // Only add new distinct IDs
          if (!posItemIDSet.contains(itemID)) {
            negative += itemID
          }
          i += 1
        }
        // Return the set with user ID added back
        negative.map(itemID => (userID, itemID))
      }.toDF("userid", "itemid")

    // Make predictions on the rest:
    val negativePredictions = predictFunction(negativeData).
      withColumnRenamed("prediction", "negativePrediction")

    val joinedPredictions = positivePredictions.join(negativePredictions, "userid").
      select("userid", "positivePrediction", "negativePrediction").cache()

    // Count the number of pairs per user
    val allCounts = joinedPredictions.
      groupBy("userid").agg(count(lit("1")).as("total")).
      select("userid", "total")
    // Count the number of correctly ordered pairs per user
    val correctCounts = joinedPredictions.
      filter($"positivePrediction" > $"negativePrediction").
      groupBy("userid").agg(count("userid").as("correct")).
      select("userid", "correct")

    // Combine these, compute their ratio, and average over all users
    val meanAUC = allCounts.join(correctCounts, Seq("userid"), "left_outer").
      select($"userid", (coalesce($"correct", lit(0)) / $"total").as("auc")).
      agg(mean("auc")).
      as[Double].first()

    joinedPredictions.unpersist()

    meanAUC
  }

  def predictMostListened(train: DataFrame)(allData: DataFrame): DataFrame = {
    val listenCounts = train.groupBy("itemid").
      agg(sum("itemrank").as("prediction")).
      select("itemid", "prediction")
    allData.
      join(listenCounts, Seq("itemid"), "left_outer").
      select("userid", "itemid", "prediction")
  }
           

如果我們能夠檢測模型的好壞,結合多組超參數是否可以動态選擇超參數而得到更好的模型?答案是肯定的。

var gauc:Double=0.00 //存放最優的AUC值
var rank:Int=0    //存放最優AUC值對應的rank
var regParam:Double=0.00 //同上
var alpha:Double=0.00 //同上
var goodModel:ALSModel=null //存放最優的ALS模型
val evaluations =
      for (rank     <- Seq(10,  11);
           regParam <- Seq(3.0, 2.0);
           alpha    <- Seq(8.0, 1.0))
        yield {
          val model = new ALS().
            setSeed(Random.nextLong()).
            setImplicitPrefs(true).
            setRank(rank).setRegParam(regParam).
            setAlpha(alpha).setMaxIter(20).
            setUserCol("userid").setItemCol("itemid").
            setRatingCol("itemrank").setPredictionCol("prediction").
            fit(trainData)

          val auc = areaUnderCurve(cvData, bAllItemIDs, model.transform) //每次訓練完模型算出這個模型的AUC

          if (auc > gauc) { //如果本次的auc大于上次的auc,則将本次的模型儲存下來
            goodModel=model
          } else {
            model.userFactors.unpersist()
            model.itemFactors.unpersist()
          }
          (auc, (rank, regParam, alpha))
        }
           

e. 推薦輸出

def makeRecommendations(user: String, howMany: Int): DataFrame = {
    val userID = this.userandid.where(s"user = '" + user + "'").select("userid").rdd.collect()(0).getInt(0) 
    val toRecommend = goodModel.itemFactors.
      select($"id".as("itemid")).
      withColumn("userid", lit(userID)) 
    if (goodModel == null && goodModel ==None) println("goodModel is Null.........")
    val reced=goodModel.transform(toRecommend).
      select("itemid", "prediction").
      orderBy($"prediction".desc).
      limit(howMany)
    reced.join(itemandid,"itemid").drop("itemid").sort("prediction")
  }
           

官方文檔說明:http://spark.apache.org/docs/latest/ml-collaborative-filtering.html

本文章源代碼工程:https://github.com/johncai0/ALSRecommender

3. 拓展

推薦和回歸其實是一回事,都是在根據已發生的因子來推測接下來的動作或者結果。

PS: 如果有誤可以留言溝通,最後在自我積累的同時希望能幫到大家。

繼續閱讀