目录
1. 介绍
2. 开始干货
a. 准备数据
b. 数据标准化
c. 模型训练
d. 模型评估
e. 推荐输出
3. 拓展
1. 介绍
官方文档说明:http://spark.apache.org/docs/latest/ml-collaborative-filtering.html
本文章源代码工程:https://github.com/johncai0/ALSRecommender
基础需要了解线性代数中矩阵的乘法。
交替最小二乘法用户拟合或者梯度下降,其原理是固定一个值求另一个值,再固定求出来的值算另一个值,如此交替拟合,故名交替最小二乘法。
所有user所对应的所有item的集和用矩阵表示为(示例)
item1 | item2 | item3 | |
user1 | 1 | 2 | |
user2 | 3 | 2 | |
user3 | 3 | 1 | 3 |
如果将这个矩阵表示为A,那么A可以拆分成X(userCF)和Y(itemCF)的乘积,且X和Y非常稠密(此处被称为 矩阵分解,此处需要梯度下降,用的算法是交替最小二乘法)
由上图可见,矩阵A非常稀疏,而X和Y是稠密的,如果再将X,Y两个稠密矩阵求积回得到一个近似A的稠密矩阵(此处称之为矩阵补全)
例如:
上图的两个稠密矩阵的乘机得到下边的补全矩阵(的到了积分)
选择分数最高音乐的推荐给用户即可(红色的格子)
如果有描述不正确请留言,希望这篇文字能帮到其他人。
2. 开始干货
a. 准备数据
https://github.com/johncai0/ALSRecommender/tree/master/data
将文件下载后,存储到hdfs://john:9000/user/ALSRecommender_TrainingData/目录下或者其他目录(如果用本文中的路径,接下来的代码复制即可)
spark 读取数据
val ss=SparkSession.builder().getOrCreate()
val base = "hdfs://john:9000/user/ALSRecommender_TrainingData/*"
val fieldSchema = StructType(Array(
StructField("user", StringType, true),
StructField("itemrank", IntegerType, true),
StructField("item", StringType, true)
))
val df=ss.read.schema(fieldSchema).csv(base)
b. 数据标准化
由于读入的数据user列和item列是字符串类型,因此需要重新定义id成整形,详细定义方式参考:
https://blog.csdn.net/cwg_1992/article/details/95813455
1. 为用户生成int类型的id
2. 为item生成int类型的id
3. 分别将userid和itemid列Join到df上,生成alldata的dataFarem,就可以直接给ASL模型用了。
4. 切分训练数据集和验证数据集,分别是90%和10%
val userandid=df.select("user").rdd.map(_.getString(0)).distinct().zipWithIndex().map(l=>(l._1,l._2.toInt)).toDF("user","userid")
val itemandid=df.select("item").rdd.map(_.getString(0)).distinct().zipWithIndex().map(l=>(l._1,l._2.toInt)).toDF("item","itemid")
val alldata=df.join(userandid,"user").join(itemandid,"item")
//分割训练数据集和测试数据集,并缓存到内存
val Array(trainData, cvData) = alldata.randomSplit(Array(0.9, 0.1))
trainData.cache()
cvData.cache()
c. 模型训练
此处需要设置输入的训练数据的用户列setUserCol为userid
item列为itemid列
评分列为itemrank列
需要一个输出的预测列prediction列
其他几个超参数解释如下:
numBlocks是用户和项目将被分区为多个块的数量,以便并行化计算(默认为10)。
rank是模型中潜在因子的数量(默认为10)。
maxIter是要运行的最大迭代次数(默认为10)。
regParam指定ALS中的正则化参数(默认为1.0)。
implicitPrefs指定是使用显式反馈 ALS变体还是使用适用于隐式反馈数据的变体 (默认为false使用显式反馈)。
alpha是适用于ALS的隐式反馈变量的参数,其控制偏好观察中的 基线置信度(默认为1.0)。
nonnegative指定是否对最小二乘使用非负约束(默认为false)。
超参数详细见http://spark.apache.org/docs/latest/ml-collaborative-filtering.html
val model = new ALS().
setSeed(Random.nextLong()).
setImplicitPrefs(true).
setRank(10).setRegParam(0.01).
setAlpha(1.0).setMaxIter(20).
setUserCol("userid").setItemCol("itemid").
setRatingCol("itemrank").setPredictionCol("prediction").
fit(trainData)
d. 模型评估
由于本数据集中除了用户对应的item及对应的评分,没有别的信息给于参考推荐模型的好于坏。此外,由于千人千面问题,或许只有本人才能知道这个推荐模型的准确性。
但是在大量的数据集下,我们可以认为所有item中,青睐用户更多的item认为是较好的(值得推荐的),那么我们可以为每个用户生成ROC曲线来衡量这个推荐模型好坏,曲线的下面积为这个用户的AUC(Area Under the Curve),如果要评估一个模型的好坏,必须计算所有用户的平均AUC。
以下是计算平均AUC的方法
def areaUnderCurve(positiveData: DataFrame, bAllItemIDs: Broadcast[Array[Int]], predictFunction: (DataFrame => DataFrame)): Double = {
val positivePredictions = predictFunction(positiveData.select("userid", "itemid")).
withColumnRenamed("prediction", "positivePrediction")
val negativeData = positiveData.select("userid", "itemid").as[(Int,Int)].
groupByKey { case (user, _) => user }.
flatMapGroups { case (userID, userIDAndPosArtistIDs) =>
val random = new Random()
val posItemIDSet = userIDAndPosArtistIDs.map { case (_, item) => item }.toSet
val negative = new ArrayBuffer[Int]()
val allItemIDs = bAllItemIDs.value
var i = 0
while (i < allItemIDs.length && negative.size < posItemIDSet.size) {
val itemID = allItemIDs(random.nextInt(allItemIDs.length))
// Only add new distinct IDs
if (!posItemIDSet.contains(itemID)) {
negative += itemID
}
i += 1
}
// Return the set with user ID added back
negative.map(itemID => (userID, itemID))
}.toDF("userid", "itemid")
// Make predictions on the rest:
val negativePredictions = predictFunction(negativeData).
withColumnRenamed("prediction", "negativePrediction")
val joinedPredictions = positivePredictions.join(negativePredictions, "userid").
select("userid", "positivePrediction", "negativePrediction").cache()
// Count the number of pairs per user
val allCounts = joinedPredictions.
groupBy("userid").agg(count(lit("1")).as("total")).
select("userid", "total")
// Count the number of correctly ordered pairs per user
val correctCounts = joinedPredictions.
filter($"positivePrediction" > $"negativePrediction").
groupBy("userid").agg(count("userid").as("correct")).
select("userid", "correct")
// Combine these, compute their ratio, and average over all users
val meanAUC = allCounts.join(correctCounts, Seq("userid"), "left_outer").
select($"userid", (coalesce($"correct", lit(0)) / $"total").as("auc")).
agg(mean("auc")).
as[Double].first()
joinedPredictions.unpersist()
meanAUC
}
def predictMostListened(train: DataFrame)(allData: DataFrame): DataFrame = {
val listenCounts = train.groupBy("itemid").
agg(sum("itemrank").as("prediction")).
select("itemid", "prediction")
allData.
join(listenCounts, Seq("itemid"), "left_outer").
select("userid", "itemid", "prediction")
}
如果我们能够检测模型的好坏,结合多组超参数是否可以动态选择超参数而得到更好的模型?答案是肯定的。
var gauc:Double=0.00 //存放最优的AUC值
var rank:Int=0 //存放最优AUC值对应的rank
var regParam:Double=0.00 //同上
var alpha:Double=0.00 //同上
var goodModel:ALSModel=null //存放最优的ALS模型
val evaluations =
for (rank <- Seq(10, 11);
regParam <- Seq(3.0, 2.0);
alpha <- Seq(8.0, 1.0))
yield {
val model = new ALS().
setSeed(Random.nextLong()).
setImplicitPrefs(true).
setRank(rank).setRegParam(regParam).
setAlpha(alpha).setMaxIter(20).
setUserCol("userid").setItemCol("itemid").
setRatingCol("itemrank").setPredictionCol("prediction").
fit(trainData)
val auc = areaUnderCurve(cvData, bAllItemIDs, model.transform) //每次训练完模型算出这个模型的AUC
if (auc > gauc) { //如果本次的auc大于上次的auc,则将本次的模型保存下来
goodModel=model
} else {
model.userFactors.unpersist()
model.itemFactors.unpersist()
}
(auc, (rank, regParam, alpha))
}
e. 推荐输出
def makeRecommendations(user: String, howMany: Int): DataFrame = {
val userID = this.userandid.where(s"user = '" + user + "'").select("userid").rdd.collect()(0).getInt(0)
val toRecommend = goodModel.itemFactors.
select($"id".as("itemid")).
withColumn("userid", lit(userID))
if (goodModel == null && goodModel ==None) println("goodModel is Null.........")
val reced=goodModel.transform(toRecommend).
select("itemid", "prediction").
orderBy($"prediction".desc).
limit(howMany)
reced.join(itemandid,"itemid").drop("itemid").sort("prediction")
}
官方文档说明:http://spark.apache.org/docs/latest/ml-collaborative-filtering.html
本文章源代码工程:https://github.com/johncai0/ALSRecommender
3. 拓展
推荐和回归其实是一回事,都是在根据已发生的因子来推测接下来的动作或者结果。
PS: 如果有误可以留言沟通,最后在自我积累的同时希望能帮到大家。