目錄
1. 介紹
2. 開始幹貨
a. 準備資料
b. 資料标準化
c. 模型訓練
d. 模型評估
e. 推薦輸出
3. 拓展
1. 介紹
官方文檔說明:http://spark.apache.org/docs/latest/ml-collaborative-filtering.html
本文章源代碼工程:https://github.com/johncai0/ALSRecommender
基礎需要了解線性代數中矩陣的乘法。
交替最小二乘法使用者拟合或者梯度下降,其原理是固定一個值求另一個值,再固定求出來的值算另一個值,如此交替拟合,故名交替最小二乘法。
所有user所對應的所有item的集和用矩陣表示為(示例)
item1 | item2 | item3 | |
user1 | 1 | 2 | |
user2 | 3 | 2 | |
user3 | 3 | 1 | 3 |
如果将這個矩陣表示為A,那麼A可以拆分成X(userCF)和Y(itemCF)的乘積,且X和Y非常稠密(此處被稱為 矩陣分解,此處需要梯度下降,用的算法是交替最小二乘法)
由上圖可見,矩陣A非常稀疏,而X和Y是稠密的,如果再将X,Y兩個稠密矩陣求積回得到一個近似A的稠密矩陣(此處稱之為矩陣補全)
例如:
上圖的兩個稠密矩陣的乘機得到下邊的補全矩陣(的到了積分)
選擇分數最高音樂的推薦給使用者即可(紅色的格子)
如果有描述不正确請留言,希望這篇文字能幫到其他人。
2. 開始幹貨
a. 準備資料
https://github.com/johncai0/ALSRecommender/tree/master/data
将檔案下載下傳後,存儲到hdfs://john:9000/user/ALSRecommender_TrainingData/目錄下或者其他目錄(如果用本文中的路徑,接下來的代碼複制即可)
spark 讀取資料
val ss=SparkSession.builder().getOrCreate()
val base = "hdfs://john:9000/user/ALSRecommender_TrainingData/*"
val fieldSchema = StructType(Array(
StructField("user", StringType, true),
StructField("itemrank", IntegerType, true),
StructField("item", StringType, true)
))
val df=ss.read.schema(fieldSchema).csv(base)
b. 資料标準化
由于讀入的資料user列和item列是字元串類型,是以需要重新定義id成整形,詳細定義方式參考:
https://blog.csdn.net/cwg_1992/article/details/95813455
1. 為使用者生成int類型的id
2. 為item生成int類型的id
3. 分别将userid和itemid列Join到df上,生成alldata的dataFarem,就可以直接給ASL模型用了。
4. 切分訓練資料集和驗證資料集,分别是90%和10%
val userandid=df.select("user").rdd.map(_.getString(0)).distinct().zipWithIndex().map(l=>(l._1,l._2.toInt)).toDF("user","userid")
val itemandid=df.select("item").rdd.map(_.getString(0)).distinct().zipWithIndex().map(l=>(l._1,l._2.toInt)).toDF("item","itemid")
val alldata=df.join(userandid,"user").join(itemandid,"item")
//分割訓練資料集和測試資料集,并緩存到記憶體
val Array(trainData, cvData) = alldata.randomSplit(Array(0.9, 0.1))
trainData.cache()
cvData.cache()
c. 模型訓練
此處需要設定輸入的訓練資料的使用者列setUserCol為userid
item列為itemid列
評分列為itemrank列
需要一個輸出的預測列prediction列
其他幾個超參數解釋如下:
numBlocks是使用者和項目将被分區為多個塊的數量,以便并行化計算(預設為10)。
rank是模型中潛在因子的數量(預設為10)。
maxIter是要運作的最大疊代次數(預設為10)。
regParam指定ALS中的正則化參數(預設為1.0)。
implicitPrefs指定是使用顯式回報 ALS變體還是使用适用于隐式回報資料的變體 (預設為false使用顯式回報)。
alpha是适用于ALS的隐式回報變量的參數,其控制偏好觀察中的 基線置信度(預設為1.0)。
nonnegative指定是否對最小二乘使用非負限制(預設為false)。
超參數詳細見http://spark.apache.org/docs/latest/ml-collaborative-filtering.html
val model = new ALS().
setSeed(Random.nextLong()).
setImplicitPrefs(true).
setRank(10).setRegParam(0.01).
setAlpha(1.0).setMaxIter(20).
setUserCol("userid").setItemCol("itemid").
setRatingCol("itemrank").setPredictionCol("prediction").
fit(trainData)
d. 模型評估
由于本資料集中除了使用者對應的item及對應的評分,沒有别的資訊給于參考推薦模型的好于壞。此外,由于千人千面問題,或許隻有本人才能知道這個推薦模型的準确性。
但是在大量的資料集下,我們可以認為所有item中,青睐使用者更多的item認為是較好的(值得推薦的),那麼我們可以為每個使用者生成ROC曲線來衡量這個推薦模型好壞,曲線的下面積為這個使用者的AUC(Area Under the Curve),如果要評估一個模型的好壞,必須計算所有使用者的平均AUC。
以下是計算平均AUC的方法
def areaUnderCurve(positiveData: DataFrame, bAllItemIDs: Broadcast[Array[Int]], predictFunction: (DataFrame => DataFrame)): Double = {
val positivePredictions = predictFunction(positiveData.select("userid", "itemid")).
withColumnRenamed("prediction", "positivePrediction")
val negativeData = positiveData.select("userid", "itemid").as[(Int,Int)].
groupByKey { case (user, _) => user }.
flatMapGroups { case (userID, userIDAndPosArtistIDs) =>
val random = new Random()
val posItemIDSet = userIDAndPosArtistIDs.map { case (_, item) => item }.toSet
val negative = new ArrayBuffer[Int]()
val allItemIDs = bAllItemIDs.value
var i = 0
while (i < allItemIDs.length && negative.size < posItemIDSet.size) {
val itemID = allItemIDs(random.nextInt(allItemIDs.length))
// Only add new distinct IDs
if (!posItemIDSet.contains(itemID)) {
negative += itemID
}
i += 1
}
// Return the set with user ID added back
negative.map(itemID => (userID, itemID))
}.toDF("userid", "itemid")
// Make predictions on the rest:
val negativePredictions = predictFunction(negativeData).
withColumnRenamed("prediction", "negativePrediction")
val joinedPredictions = positivePredictions.join(negativePredictions, "userid").
select("userid", "positivePrediction", "negativePrediction").cache()
// Count the number of pairs per user
val allCounts = joinedPredictions.
groupBy("userid").agg(count(lit("1")).as("total")).
select("userid", "total")
// Count the number of correctly ordered pairs per user
val correctCounts = joinedPredictions.
filter($"positivePrediction" > $"negativePrediction").
groupBy("userid").agg(count("userid").as("correct")).
select("userid", "correct")
// Combine these, compute their ratio, and average over all users
val meanAUC = allCounts.join(correctCounts, Seq("userid"), "left_outer").
select($"userid", (coalesce($"correct", lit(0)) / $"total").as("auc")).
agg(mean("auc")).
as[Double].first()
joinedPredictions.unpersist()
meanAUC
}
def predictMostListened(train: DataFrame)(allData: DataFrame): DataFrame = {
val listenCounts = train.groupBy("itemid").
agg(sum("itemrank").as("prediction")).
select("itemid", "prediction")
allData.
join(listenCounts, Seq("itemid"), "left_outer").
select("userid", "itemid", "prediction")
}
如果我們能夠檢測模型的好壞,結合多組超參數是否可以動态選擇超參數而得到更好的模型?答案是肯定的。
var gauc:Double=0.00 //存放最優的AUC值
var rank:Int=0 //存放最優AUC值對應的rank
var regParam:Double=0.00 //同上
var alpha:Double=0.00 //同上
var goodModel:ALSModel=null //存放最優的ALS模型
val evaluations =
for (rank <- Seq(10, 11);
regParam <- Seq(3.0, 2.0);
alpha <- Seq(8.0, 1.0))
yield {
val model = new ALS().
setSeed(Random.nextLong()).
setImplicitPrefs(true).
setRank(rank).setRegParam(regParam).
setAlpha(alpha).setMaxIter(20).
setUserCol("userid").setItemCol("itemid").
setRatingCol("itemrank").setPredictionCol("prediction").
fit(trainData)
val auc = areaUnderCurve(cvData, bAllItemIDs, model.transform) //每次訓練完模型算出這個模型的AUC
if (auc > gauc) { //如果本次的auc大于上次的auc,則将本次的模型儲存下來
goodModel=model
} else {
model.userFactors.unpersist()
model.itemFactors.unpersist()
}
(auc, (rank, regParam, alpha))
}
e. 推薦輸出
def makeRecommendations(user: String, howMany: Int): DataFrame = {
val userID = this.userandid.where(s"user = '" + user + "'").select("userid").rdd.collect()(0).getInt(0)
val toRecommend = goodModel.itemFactors.
select($"id".as("itemid")).
withColumn("userid", lit(userID))
if (goodModel == null && goodModel ==None) println("goodModel is Null.........")
val reced=goodModel.transform(toRecommend).
select("itemid", "prediction").
orderBy($"prediction".desc).
limit(howMany)
reced.join(itemandid,"itemid").drop("itemid").sort("prediction")
}
官方文檔說明:http://spark.apache.org/docs/latest/ml-collaborative-filtering.html
本文章源代碼工程:https://github.com/johncai0/ALSRecommender
3. 拓展
推薦和回歸其實是一回事,都是在根據已發生的因子來推測接下來的動作或者結果。
PS: 如果有誤可以留言溝通,最後在自我積累的同時希望能幫到大家。