天天看點

sparkMlib實作協同過濾算法

協同過濾常被應用于推薦系統。這些技術旨在補充使用者-商品關聯矩陣中所缺失的部分。MLlib目前支援基于模型的協同過濾,其中使用者和商品通過一小組隐語義因子進行表達,并且這些因子也用于預測缺失的元素。為此,我們實作了交替最小二乘法(ALS) 來學習這些隐性語義因子。在 MLlib 中的實作有如下的參數:

numBlocks 是用于并行化計算的分塊個數 (設定為-1為自動配置)。

rank 是模型中隐語義因子的個數。

iterations 是疊代的次數。

lambda 是ALS的正則化參數。

implicitPrefs 決定了是用顯性回報ALS的版本還是用适用隐性回報資料集的版本。

alpha 是一個針對于隐性回報 ALS 版本的參數,這個參數決定了偏好行為強度的基準;

Mlib 中的explicit 和implicit

在實際的應用場景中,我們一般能擷取的資料很少是客戶顯性偏好(客戶對商品的評分),在通常的場景中我們可能可以擷取的資料用隐形客戶偏好(比如點選,浏覽,購買數,分享);實際上推薦通常使用的就是這些代表使用者傾向的隐性特征;關于此知識點可以參考http://blog.csdn.net/lingerlanlan/article/details/46917601

為了更好的了解協同過濾系統中使用的算法原理,我們先了解一下ASL;

sparkMlib實作協同過濾算法

ALS是alternating least squares的縮寫 , 意為交替最小二乘法;而ALS-WR是alternating-least-squares with weighted-λ -regularization的縮寫,意為權重正則化交替最小二乘法。該方法常用于基于矩陣分解的推薦系統中;比如在使用者對商品的評分矩陣,可以分解為一個使用者對隐含特征偏好的矩陣,一個是商品所包含的隐含特征的矩陣;對于R(m×n)的矩陣,ALS旨在找到兩個低維矩陣X(m×k)和矩陣Y(n×k),來近似逼近R(m×n),在這過程中把使用者評分缺失項填上,并根據這個分數給使用者推薦;即公式如下

sparkMlib實作協同過濾算法

把一個高維矩陣寫成兩個低位矩陣的相識乘積,比如上圖使用者對商品的打分矩陣,矩陣Y可以了解為把電影映射到低維的特征上,比如科幻、愛情、武俠、恐怖;

為了盡可以找到逼近的X,Y矩陣,下面就是優化平方誤差公式:

sparkMlib實作協同過濾算法

其中rui表示使用者u對商品i的評分,xu(1×k)表示示使用者u的偏好的隐含特征向量,yi(1×k)表示商品i包含的隐含特征向量, 向量x和yi的内積xuTyi是使用者u對商品i評分的近似。

損失函數一般需要加入正則化項來避免過拟合等問題,我們使用L2正則化,是以上面的公式改造為:

sparkMlib實作協同過濾算法

這樣,協同過濾就轉化為優化問題了,上面式子因為X和Y耦合在一起很難解。這就要引入交替二乘法,其主旨就是先固定X,求Y,疊代至收斂;然後固定Y求X;

sparkMlib實作協同過濾算法

ALS-WR

上文提到的模型适用于解決有明确評分矩陣的應用場景,然而很多情況下,使用者沒有明确回報對商品的偏好,也就是沒有直接打分,我們隻能通過使用者的某些行為來推斷他對商品的偏好。比如,在電視節目推薦的問題中,對電視節目收看的次數或者時長,這時我們可以推測次數越多,看得時間越長,使用者的偏好程度越高,但是對于沒有收看的節目,可能是由于使用者不知道有該節目,或者沒有途徑擷取該節目,我們不能确定的推測使用者不喜歡該節目。ALS-WR通過置信度權重來解決這些問題:對于更确信使用者偏好的項賦以較大的權重,對于沒有回報的項,賦以較小的權重。ALS-WR模型的形式化說明如下:

  • ALS-WR的目标函數:
sparkMlib實作協同過濾算法

其中α是置信度系數。

  • 求解方式還是最小二乘法:
sparkMlib實作協同過濾算法

其中Cu是n×n的對角矩陣,Ci是m×m的對角矩陣;Cuii  = cui,  Ciii  = cii。

然後按ALS求解步驟求解就可以了;

下面就使用sparkmlib實作一個推薦系統,直接上代碼:

訓練集資料樣本樣式:

第一個字段為使用者ID,第二個字段為電影ID,第三個地段為評分,第四個字段為

0::2::3::1424380312
0::3::1::1424380312
0::5::2::1424380312
0::9::4::1424380312
0::11::1::1424380312
0::12::2::1424380312
0::15::1::1424380312
0::17::1::1424380312
1::2::2::1424380312
1::3::1::1424380312
1::4::2::1424380312
1::6::1::1424380312
1::9::3::1424380312
1::12::1::1424380312
1::13::1::1424380312
           

定義一個電影bean:

public class MovieBean implements Serializable {
	private static final long serialVersionUID = 1L;
	private int userId;
	 private int movieId;
	 private float rating;
	 private long timestamp;
	public int getUserId() {
		return userId;
	}
	public void setUserId(int userId) {
		this.userId = userId;
	}
	public int getMovieId() {
		return movieId;
	}
	public void setMovieId(int movieId) {
		this.movieId = movieId;
	}
	public float getRating() {
		return rating;
	}
	public void setRating(float rating) {
		this.rating = rating;
	}
	public long getTimestamp() {
		return timestamp;
	}
	public void setTimestamp(long timestamp) {
		this.timestamp = timestamp;
	}
	@Override
	public String toString() {
		return "MovieBean [userId=" + userId + ", movieId=" + movieId + ", rating=" + rating + ", timestamp="
				+ timestamp + "]";
	}
	public MovieBean(int userId, int movieId, float rating, long timestamp) {
		super();
		this.userId = userId;
		this.movieId = movieId;
		this.rating = rating;
		this.timestamp = timestamp;
	}
	public MovieBean(int userId, int movieId) {
		super();
		this.userId = userId;
		this.movieId = movieId;
	} 

}
           

模型實作如下:

public static void main(String[] args) {
		SparkSession sparkSession = SparkSession
			      .builder()
			      .appName("als").master("local[1]")
			      .getOrCreate();
		
		JavaRDD<MovieBean> movieData = sparkSession.read()
				.textFile("E:/sparkMlib/sparkMlib/src/mllib/als/sample_movielens_ratings.txt")
				.javaRDD()
				.map(new Function<String,MovieBean>(){
						public MovieBean call(String line) throws Exception {
								String[]fields = line.split("::");
								if(fields.length !=4){
								throw new Exception();
						}
						int userId = Integer.parseInt(fields[0]);
						int movieId = Integer.parseInt(fields[1]);
						float rating = Float.parseFloat(fields[2]);
						long timestamp = Long.parseLong(fields[3]);
						return new MovieBean(userId,movieId,rating,timestamp);
		}});
		Dataset<Row> ratingData = sparkSession.createDataFrame(movieData, MovieBean.class);
		//把資料集話分為訓練集和測試集
		Dataset<Row>[] splits = ratingData.randomSplit(new double[]{0.8, 0.2});
		Dataset<Row> training = splits[0];
		Dataset<Row> test = splits[1];
		// Build the recommendation model using ALS on the training data
		ALS als =new ALS()
				.setMaxIter(2)//設定疊代次數
				.setRank(10)//設定隐形特征個
		        .setUserCol("userId")
		        .setItemCol("movieId")
		        .setRatingCol("rating");
		 ALSModel model = als.fit(training);//訓練模型
		 
		 // Evaluate the model by computing the RMSE on the test data
		 Dataset<Row> predictions = model.transform(test);
		 System.out.println(predictions.schema());
		 for(Row r:predictions.select("userId", "movieId", "rating", "prediction").sort("prediction").collectAsList()){
				System.out.println(r.get(0)+":"+r.get(1)+":"+r.get(2)+":"+r.get(3));
			}
		 
		 RegressionEvaluator evaluator = new RegressionEvaluator()
				  .setMetricName("rmse")
				  .setLabelCol("rating")
				  .setPredictionCol("prediction");
		Double rmse = evaluator.evaluate(predictions);
		System.out.println("Root-mean-square error = " + rmse);
	}	
           
sparkMlib實作協同過濾算法

借鑒 https://github.com/ceys/jdml/wiki/ALS

繼續閱讀