天天看點

mllib調參 spark_Spark MLlib協同過濾算法

算法說明

協同過濾(Collaborative Filtering,簡稱CF,WIKI上的定義是:簡單來說是利用某個興趣相投、擁有共同經驗之群體的喜好來推薦感興趣的資訊給使用者,個人透過合作的機制給予資訊相當程度的回應(如評分)并記錄下來以達到過濾的目的,進而幫助别人篩選資訊,回應不一定局限于特别感興趣的,特别不感興趣資訊的紀錄也相當重要。

協同過濾常被應用于推薦系統。這些技術旨在補充使用者—商品關聯矩陣中所缺失的部分。

MLlib 目前支援基于模型的協同過濾,其中使用者和商品通過一小組隐性因子進行表達,并且這些因子也用于預測缺失的元素。MLLib 使用交替最小二乘法(ALS) 來學習這些隐性因子。

使用者對物品或者資訊的偏好,根據應用本身的不同,可能包括使用者對物品的評分、使用者檢視物品的記錄、使用者的購買記錄等。其實這些使用者的偏好資訊可以分為兩類:

顯式的使用者回報:這類是使用者在網站上自然浏覽或者使用網站以外,顯式地提供回報資訊,例如使用者對物品的評分或者對物品的評論。

隐式的使用者回報:這類是使用者在使用網站是産生的資料,隐式地反映了使用者對物品的喜好,例如使用者購買了某物品,使用者檢視了某物品的資訊,等等。

顯式的使用者回報能準确地反映使用者對物品的真實喜好,但需要使用者付出額外的代價;而隐式的使用者行為,通過一些分析和處理,也能反映使用者的喜好,隻是資料不是很精确,有些行為的分析存在較大的噪音。但隻要選擇正确的行為特征,隐式的使用者回報也能得到很好的效果,隻是行為特征的選擇可能在不同的應用中有很大的不同,例如在電子商務的網站上,購買行為其實就是一個能很好表現使用者喜好的隐式回報。

推薦引擎根據不同的推薦機制可能用到資料源中的一部分,然後根據這些資料,分析出一定的規則或者直接對使用者對其他物品的喜好進行預測計算。這樣推薦引擎可以在使用者進入時給他推薦他可能感興趣的物品。

MLlib目前支援基于協同過濾的模型,在這個模型裡,使用者和産品被一組可以用來預測缺失項目的潛在因子來描述。特别是我們實作交替最小二乘(ALS)算法來學習這些潛在的因子,在 MLlib 中的實作有如下參數:

numBlocks是用于并行化計算的分塊個數(設定為-1時 為自動配置);

rank是模型中隐性因子的個數;

iterations是疊代的次數;

lambda是ALS 的正則化參數;

implicitPrefs決定了是用顯性回報ALS 的版本還是用隐性回報資料集的版本;

alpha是一個針對于隐性回報 ALS 版本的參數,這個參數決定了偏好行為強度的基準。

mllib調參 spark_Spark MLlib協同過濾算法

執行個體介紹

在本執行個體中将使用協同過濾算法對GroupLens Research(http://grouplens.org/datasets/movielens/)提供的資料進行分析,該資料為一組從20世紀90年末到21世紀初由MovieLens使用者提供的電影評分資料,這些資料中包括電影評分、電影中繼資料(風格類型和年代)以及關于使用者的人口統計學資料(年齡、郵編、性别和職業等)。根據不同需求該組織提供了不同大小的樣本資料,不同樣本資訊中包含三種資料:評分、使用者資訊和電影資訊。

對這些資料分析進行如下步驟:

1. 裝載如下兩種資料:

a)裝載樣本評分資料,其中最後一列時間戳除10的餘數作為key,Rating為值;

b)裝載電影目錄對照表(電影ID->電影标題)

2.将樣本評分表以key值切分成3個部分,分别用于訓練 (60%,并加入使用者評分), 校驗 (20%), and 測試 (20%)

3.訓練不同參數下的模型,并再校驗集中驗證,擷取最佳參數下的模型

4.用最佳模型預測測試集的評分,計算和實際評分之間的均方根誤差

5.根據使用者評分的資料,推薦前十部最感興趣的電影(注意要剔除使用者已經評分的電影)

測試資料說明

在MovieLens提供的電影評分資料分為三個表:評分、使用者資訊和電影資訊,在該系列提供的附屬資料提供大概6000位讀者和100萬個評分資料,具體位置為/data/class8/movielens/data目錄下,對三個表資料說明可以參考該目錄下README文檔。

1.評分資料說明(ratings.data)

該評分資料總共四個字段,格式為UserID::MovieID::Rating::Timestamp,分為為使用者編号::電影編号::評分::評分時間戳,其中各個字段說明如下:

使用者編号範圍1~6040

電影編号1~3952

電影評分為五星評分,範圍0~5

評分時間戳機關秒

每個使用者至少有20個電影評分

使用的ratings.dat的資料樣本如下所示:

1::1193::5::978300760

1::661::3::978302109

1::914::3::978301968

1::3408::4::978300275

1::2355::5::978824291

1::1197::3::978302268

1::1287::5::978302039

1::2804::5::978300719

2.使用者資訊(users.dat)

使用者資訊五個字段,格式為UserID::Gender::Age::Occupation::Zip-code,分為為使用者編号::性别::年齡::職業::郵編,其中各個字段說明如下:

使用者編号範圍1~6040

性别,其中M為男性,F為女性

不同的數字代表不同的年齡範圍,如:25代表25~34歲範圍

職業資訊,在測試資料中提供了21中職業分類

地區郵編

使用的users.dat的資料樣本如下所示:

1::F::1::10::48067

2::M::56::16::70072

3::M::25::15::55117

4::M::45::7::02460

5::M::25::20::55455

6::F::50::9::55117

7::M::35::1::06810

8::M::25::12::11413

3.電影資訊(movies.dat)

電影資料分為三個字段,格式為MovieID::Title::Genres,分為為電影編号::電影名::電影類别,其中各個字段說明如下:

電影編号1~3952

由IMDB提供電影名稱,其中包括電影上映年份

電影分類,這裡使用實際分類名非編号,如:Action、Crime等

使用的movies.dat的資料樣本如下所示:

1::Toy Story (1995)::Animation|Children's|Comedy

2::Jumanji (1995)::Adventure|Children's|Fantasy

3::Grumpier Old Men (1995)::Comedy|Romance4::Waiting to Exhale (1995)::Comedy|Drama5::Father of the Bride Part II (1995)::Comedy6::Heat (1995)::Action|Crime|Thriller7::Sabrina (1995)::Comedy|Romance8::Tom and Huck (1995)::Adventure|Children's

程式代碼

import java.io.File

import scala.io.Source

import org.apache.log4j.{Level, Logger}

import org.apache.spark.SparkConf

import org.apache.spark.SparkContext

import org.apache.spark.SparkContext._

import org.apache.spark.rdd._

import org.apache.spark.mllib.recommendation.{ALS, Rating, MatrixFactorizationModel}objectMovieLensALS {

def main(args: Array[String]) {//屏蔽不必要的日志顯示在終端上Logger.getLogger("org.apache.spark").setLevel(Level.WARN)

Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)if (args.length != 2) {

println("Usage: /path/to/spark/bin/spark-submit --driver-memory 2g --class week7.MovieLensALS" +

"week7.jar movieLensHomeDir personalRatingsFile")

sys.exit(1)

}//設定運作環境val conf= new SparkConf().setAppName("MovieLensALS").setMaster("local[4]")

val sc= newSparkContext(conf)//裝載使用者評分,該評分由評分器生成val myRatings= loadRatings(args(1))

val myRatingsRDD= sc.parallelize(myRatings, 1)//樣本資料目錄val movieLensHomeDir= args(0)//裝載樣本評分資料,其中最後一列Timestamp取除10的餘數作為key,Rating為值,即(Int,Rating)val ratings= sc.textFile(new File(movieLensHomeDir, "ratings.dat").toString).map { line =>val fields= line.split("::")

(fields(3).toLong % 10, Rating(fields(0).toInt, fields(1).toInt, fields(2).toDouble))

}//裝載電影目錄對照表(電影ID->電影标題)val movies= sc.textFile(new File(movieLensHomeDir, "movies.dat").toString).map { line =>val fields= line.split("::")

(fields(0).toInt, fields(1))

}.collect().toMap

val numRatings=ratings.count()

val numUsers=ratings.map(_._2.user).distinct().count()

val numMovies=ratings.map(_._2.product).distinct().count()

println("Got" + numRatings + "ratings from" + numUsers + "users on" + numMovies + "movies.")//将樣本評分表以key值切分成3個部分,分别用于訓練 (60%,并加入使用者評分), 校驗 (20%), and 測試 (20%)//該資料在計算過程中要多次應用到,是以cache到記憶體val numPartitions= 4val training= ratings.filter(x => x._1 < 6)

.values

.union(myRatingsRDD)//注意ratings是(Int,Rating),取value即可.repartition(numPartitions)

.cache()

val validation= ratings.filter(x => x._1 >= 6 && x._1 < 8)

.values

.repartition(numPartitions)

.cache()

val test= ratings.filter(x => x._1 >= 8).values.cache()

val numTraining=training.count()

val numValidation=validation.count()

val numTest=test.count()

println("Training:" + numTraining + ", validation:" + numValidation + ", test:" +numTest)//訓練不同參數下的模型,并在校驗集中驗證,擷取最佳參數下的模型val ranks= List(8, 12)

val lambdas= List(0.1, 10.0)

val numIters= List(10, 20)var bestModel: Option[MatrixFactorizationModel] =Nonevar bestValidationRmse =Double.MaxValuevar bestRank = 0

var bestLambda = -1.0

var bestNumIter = -1

for (rank

val model=ALS.train(training, rank, numIter, lambda)

val validationRmse=computeRmse(model, validation, numValidation)

println("RMSE (validation) =" + validationRmse + "for the model trained with rank ="

+ rank + ", lambda =" + lambda + ", and numIter =" + numIter + ".")if (validationRmse

bestModel=Some(model)

bestValidationRmse=validationRmse

bestRank=rank

bestLambda=lambda

bestNumIter=numIter

}

}//用最佳模型預測測試集的評分,并計算和實際評分之間的均方根誤差val testRmse= computeRmse(bestModel.get, test, numTest)

println("The best model was trained with rank =" + bestRank + "and lambda =" + bestLambda + ", and numIter =" + bestNumIter + ", and its RMSE on the test set is" + testRmse + ".")//create a naive baseline and compare it with the best modelval meanRating=training.union(validation).map(_.rating).mean

val baselineRmse=math.sqrt(test.map(x=> (meanRating - x.rating) * (meanRating -x.rating)).mean)

val improvement= (baselineRmse - testRmse) / baselineRmse * 100println("The best model improves the baseline by" + "%1.2f".format(improvement) + "%.")//推薦前十部最感興趣的電影,注意要剔除使用者已經評分的電影val myRatedMovieIds=myRatings.map(_.product).toSet

val candidates= sc.parallelize(movies.keys.filter(!myRatedMovieIds.contains(_)).toSeq)

val recommendations= bestModel.get.predict(candidates.map((0, _)))

.collect()

.sortBy(-_.rating)

.take(10)var i = 1println("Movies recommended for you:")

recommendations.foreach { r =>println("%2d".format(i) + ":" +movies(r.product))

i+= 1}

sc.stop()

}def computeRmse(model: MatrixFactorizationModel, data: RDD[Rating], n: Long): Double={

val predictions: RDD[Rating]= model.predict(data.map(x =>(x.user, x.product)))

val predictionsAndRatings= predictions.map(x =>((x.user, x.product), x.rating))

.join(data.map(x=>((x.user, x.product), x.rating)))

.values

math.sqrt(predictionsAndRatings.map(x=> (x._1 - x._2) * (x._1 - x._2)).reduce(_ + _) /n)

}def loadRatings(path: String): Seq[Rating]={

val lines=Source.fromFile(path).getLines()

val ratings= lines.map { line =>val fields= line.split("::")

Rating(fields(0).toInt, fields(1).toInt, fields(2).toDouble)

}.filter(_.rating> 0.0)if(ratings.isEmpty) {

sys.error("No ratings provided.")

}else{

ratings.toSeq

}

}

}

mllib調參 spark_Spark MLlib協同過濾算法

IDEA執行情況

第一步   使用如下指令啟動Spark叢集

$cd /app/hadoop/spark-1.1.0$sbin/start-all.sh

第二步   進行使用者評分,生成使用者樣本資料

由于該程式中最終推薦給使用者十部電影,這需要使用者提供對樣本電影資料的評分,然後根據生成的最佳模型擷取目前使用者推薦電影。使用者可以使用/home/hadoop/upload/class8/movielens/bin/rateMovies程式進行評分,最終生成personalRatings.txt檔案:

mllib調參 spark_Spark MLlib協同過濾算法

第三步   在IDEA中設定運作環境

在IDEA運作配置中設定MovieLensALS運作配置,需要設定輸入資料所在檔案夾和使用者的評分檔案路徑:

輸入資料所在目錄:輸入資料檔案目錄,在該目錄中包含了評分資訊、使用者資訊和電影資訊,這裡設定為/home/hadoop/upload/class8/movielens/data/

使用者的評分檔案路徑:前一步驟中使用者對十部電影評分結果檔案路徑,在這裡設定為/home/hadoop/upload/class8/movielens/personalRatings.txt

第四步   執行并觀察輸出

輸出Got 1000209 ratings from 6040 users on 3706 movies,表示本算法中計算資料包括大概100萬評分資料、6000多使用者和3706部電影;

輸出Training: 602252, validation: 198919, test: 199049,表示對評分資料進行拆分為訓練資料、校驗資料和測試資料,大緻占比為6:2:2;

在計算過程中選擇8種不同模型對資料進行訓練,然後從中選擇最佳模型,其中最佳模型比基準模型提供22.30%

RMSE (validation) = 0.8680885498009973 for the model trained with rank = 8, lambda = 0.1, and numIter = 10.

RMSE (validation) = 0.868882967482595 for the model trained with rank = 8, lambda = 0.1, and numIter = 20.

RMSE (validation) = 3.7558695311242833 for the model trained with rank = 8, lambda = 10.0, and numIter = 10.

RMSE (validation) = 3.7558695311242833 for the model trained with rank = 8, lambda = 10.0, and numIter = 20.

RMSE (validation) = 0.8663942501841964 for the model trained with rank = 12, lambda = 0.1, and numIter = 10.

RMSE (validation) = 0.8674684744165418 for the model trained with rank = 12, lambda = 0.1, and numIter = 20.

RMSE (validation) = 3.7558695311242833 for the model trained with rank = 12, lambda = 10.0, and numIter = 10.

RMSE (validation) = 3.7558695311242833 for the model trained with rank = 12, lambda = 10.0, and numIter = 20.

The best model was trained with rank = 12 and lambda = 0.1, and numIter = 10, and its RMSE on the test set is 0.8652326018300565.

The best model improves the baseline by 22.30%.

利用前面擷取的最佳模型,結合使用者提供的樣本資料,最終推薦給使用者如下影片:

Movies recommended for you:

1: Bewegte Mann, Der (1994)

2: Chushingura (1962)

3: Love Serenade (1996)

4: For All Mankind (1989)

5: Vie est belle, La (Life is Rosey) (1987)

6: Bandits (1997)

7: King of Masks, The (Bian Lian) (1996)

8: I'm the One That I Want (2000)

9: Big Trees, The (1952)

10: First Love, Last Rites (1997)

mllib調參 spark_Spark MLlib協同過濾算法