天天看點

【推薦系統算法實戰】 ALS 矩陣分解算法

##一、算法描述

ALS ( Alternating Least Squares ) ,交叉最小二乘法。

###1.原理

####問題描述

ALS的矩陣分解算法常應用于推薦系統中,将使用者(user)對商品(item)的評分矩陣,分解為使用者對商品隐含特征的偏好矩陣,和商品在隐含特征上的映射矩陣。

與傳統的矩陣分解SVD方法來分解矩陣 R() 不同的是,ALS

( Alternating Least Squares ) 希望找到兩個低維矩陣,以

來逼近矩陣R,

其中 ,,,d 表示降維後的次元,一般 d<<r,r表示矩陣 R 的秩,。

####目标函數

為了找到低維矩陣X,Y最大程度地逼近矩分矩陣R,最小化下面的平方誤差損失函數。

為防止過拟合,給公式 (1) 加上​

​正則項​

​​,公式改下為:

其中,,,是正則項的系數。

####模型求解

  • 1.固定Y,對 求導 ,得到求解的公式
  • 2.同理固定X,可得到求解的公式

    其中,,,I表示一個d * d的機關矩陣。

  • 基于公式(3)、(4),首先随機初始化矩陣X,然後利用公式(3)更新Y,接着用公式(4)更新X,直到計算出的RMSE(均方根誤差)值收斂或疊代次數足夠多而結束疊代為止。

    其中,,

####ALS-WR模型

以上模型适用于使用者對商品的有明确的評分矩陣的場景,然而很多情況下使用者沒有明确的回報對商品的偏好,而是通過一些行為隐式的回報。比如對商品的購買次數、對電視節目收看的次數或者時長,這時我們可以推測次數越多,看得時間越長,使用者的偏好程度越高,但是對于沒有購買或者收看的節目,可能是由于使用者不知道有該商品,或者沒有途徑擷取該商品,我們不能确定的推測使用者不喜歡該商品。ALS-WR通過置信度的權重來解決此問題,對于我們更确信使用者偏好的項賦予較大的權重,對于沒有回報的項,賦予較小的權重。模型如下

  • ALS-WR目标函數

    其中,

,是置信度系數

  • 通過最小二乘法求解

    其中是一維的個對角矩陣,; 其中是一維的個對角矩陣,

####與其他矩陣分解算法的比較

  • 在實際應用中,由于待分解的矩陣常常是非常稀疏的,與SVD相比,ALS能有效的解決過拟合問題。
  • 基于ALS的矩陣分解的協同過濾算法的可擴充性也優于SVD。
  • 與随機梯度下降的求解方式相比,一般情況下随機梯度下降比ALS速度快;但有兩種情況ALS更優于随機梯度下降:1)當系統能夠并行化時,ALS的擴充性優于随機梯度下降法。2)ALS-WR能夠有效的處理使用者對商品的隐式回報的資料。

###2.僞代碼:

import numpy

def mf_als(R, P, Q, K, steps=5000, alpha=0.0002, beta=0.02):
    Q = Q.T
    for step in xrange(steps): 
        for i in xrange(len(P)):                              
            e = 0
        for i in xrange(len(R)):
            for j in xrange(len(R[i])):
                if R[i][j] > 0:
                    e = e + pow(R[i][j] - numpy.dot(P[i,:],Q[:,j]), 2)
                    for k in xrange(K):
                        e = e + (beta/2) * (pow(P[i][k],2) + pow(Q[k][j],2))
        if e < 0.001:
        break
    return P, Q.T

if __name__ == "__main__":
    R = [
     [5,3,0,1],
     [4,0,0,1],
     [1,1,0,5],
     [1,0,0,4],
     [0,1,5,4],
    ]
    R = numpy.array(R)
    N, M, K = len(R), len(R[0]), 2
    P = numpy.random.rand(N,K)
    Q = numpy.random.rand(M,K)

    nP, nQ = mf_als(R, P, Q, K)
    print numpy.dot(nP, nQ.T)
      

###3.并行化方法:

整體思路就是把矩陣拆成行向量,分别來做最小二乘參數估計。

僞代碼中,所有資料都被廣播到了叢集節點。實際代碼中,隻會向各節點分發其運算能用到的部分資料。

# M: item個數, U: user個數, F: 分解矩陣的秩
# 初始化評分矩陣
R = matrix(rand(M, F)) * matrix(rand(U, F).T)
ms = matrix(rand(M ,F))
us = matrix(rand(U, F))

# 将評分矩陣,item矩陣,user矩陣廣播到所有節點
Rb = sc.broadcast(R)
msb = sc.broadcast(ms)
usb = sc.broadcast(us)

# 指定周遊次數ITERATIONS
for i in range(ITERATIONS):
    # 固定user矩陣,分布式求解item矩陣
    # 每個節點計算M/slices個items
    ms = sc.parallelize(range(M), slices) \
           .map(lambda x: update(x, msb.value[x, :], usb.value, Rb.value)) \
           .collect()
    ms = matrix(np.array(ms)[:, :, 0])      # collect() returns a list, so array ends up being
                                            # a 3-d array, we take the first 2 dims for the matrix
    # 廣播更新後的item矩陣
    msb = sc.broadcast(ms)

    # 固定item矩陣,分布式求解user矩陣
    us = sc.parallelize(range(U), slices) \
           .map(lambda x: update(x, usb.value[x, :], msb.value, Rb.value.T)) \
           .collect()
    us = matrix(np.array(us)[:, :, 0])
    usb = sc.broadcast(us)

    # 平方誤差
    error = rmse(R, ms, us)

# 最小二乘更新資料
# 輸入:矩陣行index,要更新的特征向量,固定的特征矩陣,評分矩陣
def update(i, vec, mat, ratings):
    uu = mat.shape[0]
    ff = mat.shape[1]

    # 變成可逆矩陣
    XtX = mat.T * mat
    Xty = mat.T * ratings[i, :].T

    # 正則項
    for j in range(ff):
        XtX[j,j] += LAMBDA * uu

    # XtXZ=XtY,求Z并傳回
    # 傳回類型為二維數組。因為每次update隻計算一個向量,是以實際隻有第一維有值。
    return np.linalg.solve(XtX, Xty)
      

二、具體實作及調用

###1. 模型調用

#####輸入資料結構與說明:

​Rating(UserId:Int, ItemId:Int, Rating:toDouble)​

​ 使用者、商品id必須為整型(字元串需要采用 token_index 預處理),評分為浮點型。

#####模型輸出資料結構及說明:

​RDD[(Id:Int, Array[feature:Double]]]​

​ 可以分别輸出userFeatures和itemFeatures。包含id和隐含特征值。

#####推薦結果輸出資料結構及說明:

​Rating(UserId:Int, ItemId:Int, Rating:toDouble)​

​ 使用者、商品id與預測評分。

#####算法調用語句示例:

import org.apache.spark.mllib.recommendation.ALS
import org.apache.spark.mllib.recommendation.Rating

// Load and parse the data
val data = sc.textFile("mllib/data/als/test.data")
val ratings = data.map(_.split(',') match {
    case Array(user, item, rate) =>  Rating(user.toInt, item.toInt, rate.toDouble)
})

// Build the recommendation model using ALS
val numIterations = 20
val model = ALS.train(ratings, 1, 20, 0.01)

// Evaluate the model on rating data
val usersProducts = ratings.map{ case Rating(user, product, rate)  => (user, product)}
val predictions = model.predict(usersProducts).map{
    case Rating(user, product, rate) => ((user, product), rate)
}
val ratesAndPreds = ratings.map{
    case Rating(user, product, rate) => ((user, product), rate)
}.join(predictions)
val MSE = ratesAndPreds.map{
    case ((user, product), (r1, r2)) =>  math.pow((r1- r2), 2)
}.reduce(_ + _)/ratesAndPreds.count
println("Mean Squared Error = " + MSE)
      

#####性能參數配置:

val conf = new SparkConf()
  .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
  .set("spark.kryo.registrator",  classOf[ALSRegistrator].getName)
  /*
  Whether to track references to the same object when serializing data with Kryo, 
  which is necessary if your object graphs have loops 
  and useful for efficiency if they contain multiple copies of the same object. 
  Can be disabled to improve performance if you know this is not the case.
  */
  .set("spark.kryo.referenceTracking", "false")
  .set("spark.kryoserializer.buffer.mb", "8")
  /*
  Number of milliseconds to wait to launch a data-local task before giving up and launching it on a less-local node. 
  You should increase this setting if your tasks are long and see poor locality, but the default usually works well.
  */
  .set("spark.locality.wait", "10000")
      

##三、案例描述

###1. 業務問題描述及分析

####問題描述

在電子商務領域中,當使用者面對大量的商品時,往往無法快速找到自己喜歡的商品,或者不是非常明确的知道自己喜歡商品。和搜尋引擎相比的推薦系統通過研究使用者的興趣偏好,進行個性化計算,由系統發現使用者的興趣點,進而引導使用者發現自己的需求。

####簡要分析

矩陣分解是推薦系統中非常重要的一種算法,它通過将使用者對商品的評分矩陣(或者隐含資料),分解為使用者對商品隐含特征的偏好矩陣,和商品在隐含特征上的映射矩陣。如果使用者所偏好特征,在商品上基本都出現,我們可以認為這個商品是使用者喜歡的,進而可以将該商品推薦給使用者。

我們用曆史的訂單資料作為訓練資料,來預測使用者對未購買過的商品的偏好程度,将偏好程度最高topN的商品推薦給使用者。

###2. 資料的準備

圖書品類下,2014年1月到5月的訂單資料,取在14月和45月兩個區間都有圖書購物記錄的使用者。1-4月為訓練資料,4-5月為測試資料。使用者對商品有購買行為,則隐性回報值為1。

###3. 算法的運作及模型生成

####性能:

  1. ​N = User*Item​

    ​ N的最大值(理論估計+實際驗證) 測試了兩組資料集:

第一組:

訓練: pair:6557620 使用者:781030 商品:726490

測試: pair:3250426 使用者:781030 商品:490257

N = 726490*781030 = 567410484700
稀疏度 =pair/N = 0.0000115571      
  1. ​worker-num,worker-mem,blocks,kryo,kryo-reference,locality-wait​

    ​ 等運作參數與資料量對一輪疊代時間的影響。
  2. 運作時rdd的transform和action的運算時間與shuffle大小。
模型:

#####資料性質:

  1. 稀疏性(行為(評分、購買),品類)

#####參數選擇: lambda,alpha,R,iter

###4. 模型的評估

####矩陣分解的評估

  • 原始矩陣為R,預測的為,用RMSE來評估預測的效果。

其中N為中所有求和的項數

####推薦效果的評估

  • 對推薦預測的效果一般用準确率(precision)和召回率(recall)來衡量。R(u)是根據使用者在訓練集上的行為給使用者推薦的清單,T(u)是使用者在測試集上的行為清單。則有

    召回率

    準确率

##四、與mahout的對比

mahout與spark性能對比
  • 資料量 6991409行,134M
  • 叢集環境:mahout與spark安裝在同一叢集環境
  • 影響運作時間的參數:降維後的秩 30,疊代次數 30,mahout與spark設定相同
  • 運作時間:mahout(10個reduce) 運作180 minutes,spark 運作 40 minutes

參考文獻

  • ​​Large-scale Parallel Collaborative Filtering for the Netfli Prize​​
  • ​​Collaborative Filtering for Implicit Feedback Datasets​​
  • ​​MATRIX FACTORIZATION TECHNIQUES FOR RECOMMENDER SYSTEMS​​
  • ​​https://www.ethanrosenthal.com/2016/01/09/explicit-matrix-factorization-sgd-als/​​

Kotlin 開發者社群

繼續閱讀