天天看點

Spark協同過濾推薦

項目最近需要給使用者推薦潛在的店鋪,當時也在考慮是用協同過濾推薦還是用ALS訓練模型,但是考慮到資料量是以一年為周期每天更新跑的,模型就算訓練出來也沒多大用處。耗時,調參,沒有必要。是以還是決定使用協同過濾推薦。而我采用的是同現相似度矩陣來計算的。

相關的原理介紹我這裡就不再重複了,大家可以搜搜,有很多源碼,隻是對于矩陣不熟悉的人想告訴大家每個步驟計算得到的是什麼内容,看着rdd一步一步往下走但是不知所雲就不好了。而且如果業務有新的需求,需要更改源碼的計算邏輯,你都不知道在哪一步修改。最後再說說我的經驗之談。

首先貼下最基礎的源碼,然後我再說明每個rdd到底是什麼。歐氏距離相似度、餘弦相似度的代碼我就不貼了。

package recommend

import scala.math._
import org.apache.spark.rdd.RDD
import org.apache.spark.SparkContext._

/**
 * 使用者評分.
 * @param userid 使用者
 * @param itemid 評分物品
 * @param pref 評分
 */
case class ItemPref(
  val userid: String,
  val itemid: String,
  val pref: Double) extends Serializable
/**
 * 使用者推薦.
 * @param userid 使用者
 * @param itemid 推薦物品
 * @param pref 評分
 */
case class UserRecomm(
  val userid: String,
  val itemid: String,
  val pref: Double) extends Serializable
/**
 * 相似度.
 * @param itemid1 物品
 * @param itemid2 物品
 * @param similar 相似度
 */
case class ItemSimi(
  val itemid1: String,
  val itemid2: String,
  val similar: Double) extends Serializable

/**
 * 相似度計算.
 * 支援:同現相似度、歐氏距離相似度、餘弦相似度
 *
 */
class ItemSimilarity extends Serializable {

  /**
   * 相似度計算.
   * @param user_rdd 使用者評分
   * @param stype 計算相似度公式
   * @param RDD[ItemSimi] 傳回物品相似度
   *
   */
  def Similarity(user_rdd: RDD[ItemPref], stype: String): (RDD[ItemSimi]) = {
    val simil_rdd = stype match {
      case "cooccurrence" =>
        ItemSimilarity.CooccurrenceSimilarity(user_rdd)
      case _ =>
        ItemSimilarity.CooccurrenceSimilarity(user_rdd)
    }
    simil_rdd
  }

}

object ItemSimilarity {

  /**
   * 同現相似度矩陣計算.
   * w(i,j) = N(i)∩N(j)/sqrt(N(i)*N(j))
   * @param user_rdd 使用者評分
   * @param RDD[ItemSimi] 傳回物品相似度
   *
   */
  def CooccurrenceSimilarity(user_rdd: RDD[ItemPref]): (RDD[ItemSimi]) = {
    // 0 資料做準備
    val user_rdd1 = user_rdd.map(f => (f.userid, f.itemid, f.pref))
    val user_rdd2 = user_rdd1.map(f => (f._1, f._2))
    // 1 (使用者:物品) 笛卡爾積 (使用者:物品) => 物品:物品組合     
    val user_rdd3 = user_rdd2.join(user_rdd2)
    val user_rdd4 = user_rdd3.map(f => (f._2, 1))
    // 2 物品:物品:頻次 
    val user_rdd5 = user_rdd4.reduceByKey((x, y) => x + y)
    // 3 對角矩陣 
    val user_rdd6 = user_rdd5.filter(f => f._1._1 == f._1._2)
    // 4 非對角矩陣 
    val user_rdd7 = user_rdd5.filter(f => f._1._1 != f._1._2)
    // 5 計算同現相似度(物品1,物品2,同現頻次)
    val user_rdd8 = user_rdd7.map(f => (f._1._1, (f._1._1, f._1._2, f._2))).
      join(user_rdd6.map(f => (f._1._1, f._2)))
    val user_rdd9 = user_rdd8.map(f => (f._2._1._2, (f._2._1._1,
      f._2._1._2, f._2._1._3, f._2._2)))
    val user_rdd10 = user_rdd9.join(user_rdd6.map(f => (f._1._1, f._2)))
    val user_rdd11 = user_rdd10.map(f => (f._2._1._1, f._2._1._2, f._2._1._3, f._2._1._4, f._2._2))
    val user_rdd12 = user_rdd11.map(f => (f._1, f._2, (f._3 / sqrt(f._4 * f._5))))
    // 6 結果傳回
    user_rdd12.map(f => ItemSimi(f._1, f._2, f._3))
  }

}


           
package recommend

import org.apache.spark.rdd.RDD
import org.apache.spark.SparkContext._

  /**
   * 使用者推薦計算.
   * 根據物品相似度、使用者評分、指定最大推薦數量進行使用者推薦
   */

class RecommendedItem {
  /**
   * 使用者推薦計算.
   * @param items_similar 物品相似度
   * @param user_prefer 使用者評分
   * @param r_number 推薦數量
   * @param RDD[UserRecomm] 傳回使用者推薦物品
   *
   */
  def Recommend(items_similar: RDD[ItemSimi],
    user_prefer: RDD[ItemPref],
    r_number: Int): (RDD[UserRecomm]) = {
    //   0 資料準備  
    val rdd_app1_R1 = items_similar.map(f => (f.itemid1, f.itemid2, f.similar))
    val user_prefer1 = user_prefer.map(f => (f.userid, f.itemid, f.pref))
    //   1 矩陣計算——i行與j列join
    val rdd_app1_R2 = rdd_app1_R1.map(f => (f._1, (f._2, f._3))).
      join(user_prefer1.map(f => (f._2, (f._1, f._3))))
    //   2 矩陣計算——i行與j列元素相乘
    val rdd_app1_R3 = rdd_app1_R2.map(f => ((f._2._2._1, f._2._1._1), f._2._2._2 * f._2._1._2))
    //   3 矩陣計算——使用者:元素累加求和
    val rdd_app1_R4 = rdd_app1_R3.reduceByKey((x, y) => x + y)
    //   4 矩陣計算——使用者:對結果過濾已有I2
    val rdd_app1_R5 = rdd_app1_R4.leftOuterJoin(user_prefer1.map(f => ((f._1, f._2), 1))).
      filter(f => f._2._2.isEmpty).map(f => (f._1._1, (f._1._2, f._2._1)))
    //   5 矩陣計算——使用者:使用者對結果排序,過濾
    val rdd_app1_R6 = rdd_app1_R5.groupByKey()
    val rdd_app1_R7 = rdd_app1_R6.map(f => {
      val i2 = f._2.toBuffer
      val i2_2 = i2.sortBy(_._2)
      if (i2_2.length > r_number) i2_2.remove(0, (i2_2.length - r_number))
      (f._1, i2_2.toIterable)
    })
    val rdd_app1_R8 = rdd_app1_R7.flatMap(f => {
      val id2 = f._2
      for (w <- id2) yield (f._1, w._1, w._2)
    })
    rdd_app1_R8.map(f => UserRecomm(f._1, f._2, f._3))
  }

  /**
   * 使用者推薦計算.
   * @param items_similar 物品相似度
   * @param user_prefer 使用者評分
   * @param RDD[UserRecomm] 傳回使用者推薦物品
   *
   */
  def Recommend(items_similar: RDD[ItemSimi],
    user_prefer: RDD[ItemPref]): (RDD[UserRecomm]) = {
    //   0 資料準備  
    val rdd_app1_R1 = items_similar.map(f => (f.itemid1, f.itemid2, f.similar))
    val user_prefer1 = user_prefer.map(f => (f.userid, f.itemid, f.pref))
    //   1 矩陣計算——i行與j列join
    val rdd_app1_R2 = rdd_app1_R1.map(f => (f._1, (f._2, f._3))).
      join(user_prefer1.map(f => (f._2, (f._1, f._3))))
    //   2 矩陣計算——i行與j列元素相乘
    val rdd_app1_R3 = rdd_app1_R2.map(f => ((f._2._2._1, f._2._1._1), f._2._2._2 * f._2._1._2))
    //   3 矩陣計算——使用者:元素累加求和
    val rdd_app1_R4 = rdd_app1_R3.reduceByKey((x, y) => x + y)
    //   4 矩陣計算——使用者:對結果過濾已有I2
    val rdd_app1_R5 = rdd_app1_R4.leftOuterJoin(user_prefer1.map(f => ((f._1, f._2), 1))).
      filter(f => f._2._2.isEmpty).map(f => (f._1._1, (f._1._2, f._2._1)))
    //   5 矩陣計算——使用者:使用者對結果排序,過濾
    val rdd_app1_R6 = rdd_app1_R5.map(f => (f._1, f._2._1, f._2._2)).
      sortBy(f => (f._1, f._3))
    rdd_app1_R6.map(f => UserRecomm(f._1, f._2, f._3))
  }

}
           
package recommend

import org.apache.log4j.{ Level, Logger }
import org.apache.spark.{ SparkConf, SparkContext }
import org.apache.spark.rdd.RDD

object ItemCF {
  def main(args: Array[String]) {

    //0 建構Spark對象
    val conf = new SparkConf().setAppName("ItemCF")
    val sc = new SparkContext(conf)
    Logger.getRootLogger.setLevel(Level.WARN)

    //1 讀取樣本資料
    val data_path = "data.txt"
    val data = sc.textFile(data_path)
    val userdata = data.map(_.split(",")).map(f => (ItemPref(f(0), f(1), f(2).toDouble))).cache()

    //2 建立模型
    val mysimil = new ItemSimilarity()
    val simil_rdd1 = mysimil.Similarity(userdata, "cooccurrence")
    val recommd = new RecommendedItem
    val recommd_rdd1 = recommd.Recommend(simil_rdd1, userdata, 30)

    //3 列印結果
    println(s"物品相似度矩陣: ${simil_rdd1.count()}")
    simil_rdd1.collect().foreach { ItemSimi =>
      println(ItemSimi.itemid1 + ", " + ItemSimi.itemid2 + ", " + ItemSimi.similar)
    }
    println(s"用戶推薦清單: ${recommd_rdd1.count()}")
    recommd_rdd1.collect().foreach { UserRecomm =>
      println(UserRecomm.userid + ", " + UserRecomm.itemid + ", " + UserRecomm.pref)
    }    

  }
}

           

相似度計算的邏輯大緻是先計算出物品與物品之間關聯的矩陣,然後用物品關聯的矩陣X使用者購買物品矩陣,得出使用者可能購買的矩陣。

原始資料說明data.txt

第一列是使用者,第二列是商品,第三列可以了解為評分,比如我購買了一瓶啤酒,覺得味道不錯,評價0.8分(滿分1分),但是使用者的評價資料一般是很難收集到的,如果收集到肯定要放入矩陣計算。但是大部分情況下收集不到,是以我們直接用1和0來代替,1是購買,0是未購買。未購買的直接提前就過濾掉,不放入模型計算。

1,1,1

1,2,1

2,1,1

2,3,1

3,3,1

3,4,1

4,2,1

4,4,1

5,1,1

5,2,1

5,3,1

6,4,1

現在就開始說明每個rdd得到的到底是什麼?

類ItemSimilarity中

原始資料得到user_rdd1:

使用者,物品,評分

1,1,1

1,2,1

2,1,1

2,3,1

3,3,1

3,4,1

4,2,1

4,4,1

5,1,1

5,2,1

5,3,1

6,4,1

user_rdd2:

使用者,物品

(1,1)

(1,2)

(2,1)

(2,3)

(3,3)

(3,4)

(4,2)

(4,4)

(5,1)

(5,2)

(5,3)

(6,4)

user_rdd3,笛卡爾join操作

使用者,物品    使用者,物品

1,1            1,1    

1,2            1,2

2,1            2,1        

2,3            2,3

3,3            3,3

3,4            3,4

4,2            4,2

4,4            4,4

5,1            5,1

5,2            5,2

5,3            5,3

6,4            6,4

user_rdd3:

使用者,(物品,物品)

(4,(2,2))

(4,(2,4))

(4,(4,2))

(4,(4,4))

(5,(1,1))

(5,(1,2))

(5,(1,3))

(5,(2,1))

(5,(2,2))

(5,(2,3))

(5,(3,1))

(5,(3,2))

(5,(3,3))

(6,(4,4))

(2,(1,1))

(2,(1,3))

(2,(3,1))

(2,(3,3))

(3,(3,3))

(3,(3,4))

(3,(4,3))

(3,(4,4))

(1,(1,1))

(1,(1,2))

(1,(2,1))

(1,(2,2))

user_rdd4:

(物品,物品),1

((2,2),1)

((2,4),1)

((4,2),1)

((4,4),1)

((1,1),1)

((1,2),1)

((1,3),1)

((2,1),1)

((2,2),1)

((2,3),1)

((3,1),1)

((3,2),1)

((3,3),1)

((4,4),1)

((1,1),1)

((1,3),1)

((3,1),1)

((3,3),1)

((3,3),1)

((3,4),1)

((4,3),1)

((4,4),1)

((1,1),1)

((1,2),1)

((2,1),1)

((2,2),1)

user_rdd5:

(物品,物品),頻次

((1,1),3)

((2,1),2)

((1,3),2)

((4,3),1)

((4,2),1)

((3,4),1)

((3,3),3)

((2,2),3)

((4,4),3)

((2,4),1)

((1,2),2)

((2,3),1)

((3,1),2)

((3,2),1)

user_rdd6:

物品==物品(對角矩陣)

((1,1),3)

((3,3),3)

((2,2),3)

((4,4),3)

user_rdd7:

物品!=物品(非對角矩陣)

((2,1),2)

((1,3),2)

((4,3),1)

((4,2),1)

((3,4),1)

((2,4),1)

((1,2),2)

((2,3),1)

((3,1),2)

((3,2),1)

user_rdd8:

物品1,((物品1,物品2,同現頻次),物品1購買次數)

(4,((4,3,1),3))

(4,((4,2,1),3))

(2,((2,1,2),3))

(2,((2,4,1),3))

(2,((2,3,1),3))

(3,((3,4,1),3))

(3,((3,1,2),3))

(3,((3,2,1),3))

(1,((1,3,2),3))

(1,((1,2,2),3))

user_rdd9:

物品2,(物品1,物品2,同現頻次, 物品1購買次數)

(3,(4,3,1,3))

(2,(4,2,1,3))

(1,(2,1,2,3))

(4,(2,4,1,3))

(3,(2,3,1,3))

(4,(3,4,1,3))

(1,(3,1,2,3))

(2,(3,2,1,3))

(3,(1,3,2,3))

(2,(1,2,2,3))

user_rdd10:

物品2,((物品1,物品2,同現頻次, 物品1購買次數), 物品2購買次數)

(4,((2,4,1,3),3))

(4,((3,4,1,3),3))

(2,((4,2,1,3),3))

(2,((3,2,1,3),3))

(2,((1,2,2,3),3))

(3,((4,3,1,3),3))

(3,((2,3,1,3),3))

(3,((1,3,2,3),3))

(1,((2,1,2,3),3))

(1,((3,1,2,3),3))

user_rdd11:

物品1,物品2,同現頻次, 物品1購買次數,物品2購買次數

(2,4,1,3,3)

(3,4,1,3,3)

(4,2,1,3,3)

(3,2,1,3,3)

(1,2,2,3,3)

(4,3,1,3,3)

(2,3,1,3,3)

(1,3,2,3,3)

(2,1,2,3,3)

(3,1,2,3,3)

user_rdd12:

相似度定義:同時喜歡物品1又喜歡物品2的個數/sqrt(喜歡物品1的個數*喜歡物品2的個數),這裡是同現相似度最核心的代碼

物品1,物品2,相似度

(2,4,0.3333333333333333)

(3,4,0.3333333333333333)

(4,2,0.3333333333333333)

(3,2,0.3333333333333333)

(1,2,0.6666666666666666)

(4,3,0.3333333333333333)

(2,3,0.3333333333333333)

(1,3,0.6666666666666666)

(2,1,0.6666666666666666)

(3,1,0.6666666666666666)

使用者推薦,類RecommendedItem:

rdd_app1_R1:

物品1,物品2,相似度

(2,4,0.3333333333333333)

(3,4,0.3333333333333333)

(4,2,0.3333333333333333)

(3,2,0.3333333333333333)

(1,2,0.6666666666666666)

(4,3,0.3333333333333333)

(2,3,0.3333333333333333)

(1,3,0.6666666666666666)

(2,1,0.6666666666666666)

(3,1,0.6666666666666666)

rdd_app1_R2:

物品1, ((物品2, 相似度),(使用者,評分))

(4,((2,0.3333333333333333),(3,1.0)))

(4,((2,0.3333333333333333),(4,1.0)))

(4,((2,0.3333333333333333),(6,1.0)))

(4,((3,0.3333333333333333),(3,1.0)))

(4,((3,0.3333333333333333),(4,1.0)))

(4,((3,0.3333333333333333),(6,1.0)))

(2,((4,0.3333333333333333),(1,1.0)))

(2,((4,0.3333333333333333),(4,1.0)))

(2,((4,0.3333333333333333),(5,1.0)))

(2,((3,0.3333333333333333),(1,1.0)))

(2,((3,0.3333333333333333),(4,1.0)))

(2,((3,0.3333333333333333),(5,1.0)))

(2,((1,0.6666666666666666),(1,1.0)))

(2,((1,0.6666666666666666),(4,1.0)))

(2,((1,0.6666666666666666),(5,1.0)))

(3,((4,0.3333333333333333),(2,1.0)))

(3,((4,0.3333333333333333),(3,1.0)))

(3,((4,0.3333333333333333),(5,1.0)))

(3,((2,0.3333333333333333),(2,1.0)))

(3,((2,0.3333333333333333),(3,1.0)))

(3,((2,0.3333333333333333),(5,1.0)))

(3,((1,0.6666666666666666),(2,1.0)))

(3,((1,0.6666666666666666),(3,1.0)))

(3,((1,0.6666666666666666),(5,1.0)))

(1,((2,0.6666666666666666),(1,1.0)))

(1,((2,0.6666666666666666),(2,1.0)))

(1,((2,0.6666666666666666),(5,1.0)))

(1,((3,0.6666666666666666),(1,1.0)))

(1,((3,0.6666666666666666),(2,1.0)))

(1,((3,0.6666666666666666),(5,1.0)))

rdd_app1_R3:

(使用者,物品2),評分*相似度

((3,2),0.3333333333333333)

((4,2),0.3333333333333333)

((6,2),0.3333333333333333)

((3,3),0.3333333333333333)

((4,3),0.3333333333333333)

((6,3),0.3333333333333333)

((1,4),0.3333333333333333)

((4,4),0.3333333333333333)

((5,4),0.3333333333333333)

((1,3),0.3333333333333333)

((4,3),0.3333333333333333)

((5,3),0.3333333333333333)

((1,1),0.6666666666666666)

((4,1),0.6666666666666666)

((5,1),0.6666666666666666)

((2,4),0.3333333333333333)

((3,4),0.3333333333333333)

((5,4),0.3333333333333333)

((2,2),0.3333333333333333)

((3,2),0.3333333333333333)

((5,2),0.3333333333333333)

((2,1),0.6666666666666666)

((3,1),0.6666666666666666)

((5,1),0.6666666666666666)

((1,2),0.6666666666666666)

((2,2),0.6666666666666666)

((5,2),0.6666666666666666)

((1,3),0.6666666666666666)

((2,3),0.6666666666666666)

((5,3),0.6666666666666666)

rdd_app1_R4:

(使用者,物品2),評分*相似度

((1,1),0.6666666666666666)

((1,4),0.3333333333333333)

((2,1),0.6666666666666666)

((6,2),0.3333333333333333)

((1,3),1.0)

((4,3),0.6666666666666666)

((4,2),0.3333333333333333)

((5,3),1.0)

((3,4),0.3333333333333333)

((3,3),0.3333333333333333)

((2,2),1.0)

((4,4),0.3333333333333333)

((2,4),0.3333333333333333)

((1,2),0.6666666666666666)

((6,3),0.3333333333333333)

((2,3),0.6666666666666666)

((3,1),0.6666666666666666)

((3,2),0.6666666666666666)

((4,1),0.6666666666666666)

((5,4),0.6666666666666666)

((5,1),1.3333333333333333)

((5,2),1.0)

rdd_app1_R5:過濾出來了使用者未買物品的推薦度

(使用者,(物品2,評分*相似度))

(1,(4,0.3333333333333333))

(6,(2,0.3333333333333333))

(1,(3,1.0))

(4,(3,0.6666666666666666))

(2,(2,1.0))

(2,(4,0.3333333333333333))

(6,(3,0.3333333333333333))

(3,(1,0.6666666666666666))

(3,(2,0.6666666666666666))

(4,(1,0.6666666666666666))

(5,(4,0.6666666666666666))

rdd_app1_R6:

使用者, [(未購買物品1,推薦度),(未購買物品2,推薦度)]

(4,CompactBuffer((3,0.6666666666666666), (1,0.6666666666666666)))

(5,CompactBuffer((4,0.6666666666666666)))

(6,CompactBuffer((2,0.3333333333333333), (3,0.3333333333333333)))

(2,CompactBuffer((2,1.0), (4,0.3333333333333333)))

(3,CompactBuffer((1,0.6666666666666666), (2,0.6666666666666666)))

(1,CompactBuffer((4,0.3333333333333333), (3,1.0)))

rdd_app1_R7:

這一步中會排序和篩選(篩選會改變長度,是以需要toBuffer)

使用者, [(未購買物品1,推薦度),(未購買物品2,推薦度)]

(4,ArrayBuffer((3,0.6666666666666666), (1,0.6666666666666666)))

(5,ArrayBuffer((4,0.6666666666666666)))

(6,ArrayBuffer((2,0.3333333333333333), (3,0.3333333333333333)))

(2,ArrayBuffer((4,0.3333333333333333), (2,1.0)))

(3,ArrayBuffer((1,0.6666666666666666), (2,0.6666666666666666)))

(1,ArrayBuffer((4,0.3333333333333333), (3,1.0)))

rdd_app1_R8:

使用者,未購買物品,推薦度

(4,3,0.6666666666666666)

(4,1,0.6666666666666666)

(5,4,0.6666666666666666)

(6,2,0.3333333333333333)

(6,3,0.3333333333333333)

(2,4,0.3333333333333333)

(2,2,1.0)

(3,1,0.6666666666666666)

(3,2,0.6666666666666666)

(1,4,0.3333333333333333)

(1,3,1.0)

經驗之談:

1.在類ItemSimilarity中有兩個Recommend方法,這裡我們推薦使用第一個Recommend方法,因為給使用者推薦的數量太過龐大了,比如最後給某個人推薦了幾百種東西,你覺得他都有可能會買嗎?大家最關心的是他最有可能購買的是哪些物品,是以給定一個r_number限制一下,比如20.這樣系統隻會儲存每個使用者top20的商品。後面你再取資料排序的時候,資料量就大大縮小了,隻需要排序top20的資料,而不是對每個使用者幾百個資料排序,所有會員排完序,那将是噩夢。

2.在這段代碼最後加上過濾filter(f => f._2._2 > 0)

val rdd_app1_R5 = rdd_app1_R4.leftOuterJoin(user_prefer1.map(f => ((f._1, f._2), 1))). filter(f => f._2._2.isEmpty).map(f => (f._1._1, (f._1._2, f._2._1))).filter(f => f._2._2 > 0)

因為最後計算出來的評分可能出現NaN,也就是為空的情況,會影響排序,提前過濾掉。如果資料量覺得還是太大,可以把0慢慢調大,過濾一些評分非常低的資料。

3.參數傳入隻能傳入兩個參數,使用者和商品,最後得出來的矩陣是使用者,商品,評分。這顯然是不夠的,難道我們還需要去關聯使用者表,商品表來擷取相關資料嗎?這個join也是非常恐怖的,是以提前傳參的時候就把相關資料拼接起來,計算完畢後再拆分。比如輸入資料是“使用者id#使用者姓名#使用者性别#使用者年齡”,“商品id#商品名稱#商品價格#商品類别”。最後的輸出資料再以#拆分。

繼續閱讀