天天看點

Spark MLlib 1.6 -- 分類和回歸篇

·  Linear models

· classification (SVMs, logistic regression)

· linear regression (least squares, Lasso, ridge)

·  Decision trees

·  Ensembles of decision trees

· random forests

· gradient-boosted trees

·  Naive Bayes

·  Isotonic regression

Spark.mllib 實作以下ML問題: 兩個标簽類的分類, 多個标簽類的分類,和回歸分析。

下表列出每類問題的支援算法:

Problem Type Supported Methods
Binary Classification

linear SVMs, logistic regression, decision trees, random forests,

 gradient-boosted trees, naive Bayes

線性支援向量機,邏輯回歸,決策樹,随機森林,梯度提升決策樹,

樸素貝葉斯決策

Multiclass Classification

logistic regression, decision trees, random forests, naive Bayes

邏輯回歸,決策樹,随機森林,樸素貝葉斯決策

Regression

linear least squares, Lasso, ridge regression, decision trees, 

random forests, gradient-boosted trees, isotonic regression

線性最小二乘法,最小化的絕對收縮和選擇算子,嶺回歸,

決策樹,随機森林,梯度提升決策樹,保序回歸

3.1 線性模型 –spark.mllib

· Mathematical formulation

o Loss functions

o Regularizers

o Optimization

· Classification

o Linear Support Vector Machines (SVMs)

o Logistic regression

· Regression

o Linear least squares, Lasso, and ridge regression

o Streaming linear regression

· Implementation (developer)

3.1.1 數學公式

 許多标準機器學習問題可以轉化為凸優化問題,如, 凸函數f 的最小值是依賴于d維向量w(稱為權重向量),可以把問題轉化為:

求    \min_{w \in R^d }{ f(x) } 問題。此處f 函數形如:

F(w) = \Lamda * R(w) + frac{1,n} * \Sum|_{i=1}|^{n} {L(w;x_i,y_i)|

此處向量   x_i \in R^d是訓練測試資料,  1 <= I <= n , y_i \in R是相應的類标簽,類标簽在分類問題是需要預測的。如果方法是線性的,如果

L(w;x,y) 可以表示成w^{T} x 和 y 的函數, 下面會講解不是凸優化問題的情況。

目标函數f 有兩個點:正規化決定模型的複雜程度,損失決定模型的誤差,損失函數L(w; . , . ) 是w 的凸函數,正規化參數 \Lamda >= 0 (名為regParam ) 來權衡兩個目标:錯誤最小 和模型複雜度最低 (為了防止過拟合)。

3.1.1.1 損失函數 

下表總結損失函數,集損失函數的梯度函數

Spark MLlib 1.6 -- 分類和回歸篇

3.1.1.2 正規化

正則化可以使模型處理相對簡單,并且可以避免模型過拟合。支援以下正則化 spark.mllib

Spark MLlib 1.6 -- 分類和回歸篇

此處 sign(w) 是符号向量,每個元素是w 向量相應位置的符号函數 sign(x_i)

L2-正規化相對L1-正規化處理簡單,是因為L2的正規函數是連續光滑函數,而L1的正規函數則不是。L1正規化可以使權向量中稀少的值變得不那麼重要,使模型在特征選擇上處理更容易了解。彈性網絡(elastic net)是L1和L2正規化的組合。不建議訓練模型時不适用正則化,特别是訓練向本數很少時。

3.1.1.3 最優化

線性方法使用凸最優化方法優化目标函數。Spark.mllib使用兩種方法SGD 和L-BFGS(見最優化章節)。目前,大多數算法APIs 支援随機梯度下降(SGD)和大部分支援L-BFGS。 

3.1.2 分類

分類算法的目标是把資料分門别類。最簡單分類是兩分類問題,即分成兩類(正類和負類)。如果多餘兩類,一般稱為多類别分類問題。Spark.mllib 支援兩種線性分類:線性支援向量機(SVM)和邏輯回歸。 線性SVN 隻支援兩分類,而邏輯回歸支援兩分類和多分類。這兩種算法都支援L1和L2正規化。訓練集為RDD[LabeledPoint] 在MLlib , 而類标簽為 0, 1,2,… 。 注意,數學公式中,兩分類的類标簽表示為: +1 (正類) 和 -1 (負類)。

3.1.2.1 線性支援向量機(SVM)

線性SVN是處理大多數分類問題的首選,線性方法描述見上面表達式(1),其中損失函數形如:

   L(w;x,y) :=  max{ 0,  1 – y w^t x }

預設,線性SVN訓練集需要使用L2正規化。同時支援L1正規化,在此情況下, 變成線性算法。

線性SVN算法輸出SVN模型。給定新資料點,表示為x , 模型基于w^T x 的 預測。 預設, 如果 w^T x >= 0 , 則歸為正類,否則歸為負類。

例子:

下例中展示如何加載測試資料,執行算法訓練,并預測結果與訓練集的錯誤。

Scala SVMWithSGD API : https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.mllib.classification.SVMWithSGD

Scala SVMModel API : 

https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.mllib.classification.SVMModel

import org.apache.spark.mllib.classification.{SVMModel, SVMWithSGD}

import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics

import org.apache.spark.mllib.util.MLUtils

// Load training data in LIBSVM format.

val data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")

// Split data into training (60%) and test (40%).

val splits = data.randomSplit(Array(0.6, 0.4), seed = 11L)

val training = splits(0).cache()

val test = splits(1)

// Run training algorithm to build the model

val numIterations = 100

val model = SVMWithSGD.train(training, numIterations)

// Clear the default threshold.

model.clearThreshold()

// Compute raw scores on the test set.

val scoreAndLabels = test.map { point =>

  val score = model.predict(point.features)

  (score, point.label)

}

// Get evaluation metrics.

val metrics = new BinaryClassificationMetrics(scoreAndLabels)

val auROC = metrics.areaUnderROC()

println("Area under ROC = " + auROC)

// Save and load model

model.save(sc, "myModelPath")

val sameModel = SVMModel.load(sc, "myModelPath")

SVMWitSGD.train() 方法預設使用L2正規化,且使用正規參數1.0 。如果想修改此預設,需要建立新的SVMWithSGD 執行個體,并用setter方法重新配置。其他spark.mllib算法也支援setter方法重新配置,例如,下例給出L1正規化 且SVM正規化參數位0.1 ,訓練樣本疊代200次。

import org.apache.spark.mllib.optimization.L1Updater

val svmAlg = new SVMWithSGD()

svmAlg.optimizer.

  setNumIterations(200).

  setRegParam(0.1).

  setUpdater(new L1Updater)

val modelL1 = svmAlg.run(training)

3.1.2.2 邏輯回歸

邏輯回歸廣泛用于預測兩類别分類問題。它也符合等式(1) , 并且損失函數形如:

L(w;x,y)  := log( 1 + exp(-y w^T x) )

對于兩類别分類問題,算法輸出兩類别邏輯回歸模型,給定新的測試點,記為x , 模型通過邏輯函數

F(z) = 1 / { 1 + e^(-z)} 

 此處 z = w^T x , 如果 f(w^T x) > 0.5 , 認為是正類, 否則認為是負類, 可以看到此分類方法分類和SVN不太一樣,多了一個随機函數f( ) 

兩類别分類邏輯回歸可以推廣到多類别邏輯回歸,用來處理多類别分類問題。例如,假設有K可能的輸出結果,選取其中一個作為對比值,剩下K-1個輸出值分别去和對比值做兩類别回歸。在spark.mllib , 這個對比值就是類别0 ,詳見 統計學習基礎:http://statweb.stanford.edu/~tibs/ElemStatLearn/

對于多類别分類問題,算法會輸出K-1個邏輯回歸模型,給定一個新測試點,帶入K-1個模型算出最大機率值得類别,記為預測結果。

我們實作兩個算法解決邏輯回歸:小批梯隊下降法(mini-batch gradient descent)和L-BFGS , 我們建議優先選L-BFGS,它的收斂性更快一些。

例子:

下面例子将如何加載多類别資料集,将資料集分為訓練和測試,使用LogisticRegressionWithLBFGS 做邏輯回歸。模型再用測試資料集去評估優劣。

Scala LogisticRegressionWithLBFGS API : https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.mllib.classification.LogisticRegressionWithLBFGS

Scala LogisticRegressionModel API : 

https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.mllib.classification.LogisticRegressionModel

import org.apache.spark.SparkContext

import org.apache.spark.mllib.classification.{LogisticRegressionWithLBFGS, LogisticRegressionModel}

import org.apache.spark.mllib.evaluation.MulticlassMetrics

import org.apache.spark.mllib.regression.LabeledPoint

import org.apache.spark.mllib.linalg.Vectors

import org.apache.spark.mllib.util.MLUtils

// Load training data in LIBSVM format.

val data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")

// Split data into training (60%) and test (40%).

val splits = data.randomSplit(Array(0.6, 0.4), seed = 11L)

val training = splits(0).cache()

val test = splits(1)

// Run training algorithm to build the model

val model = new LogisticRegressionWithLBFGS()

  .setNumClasses(10)

  .run(training)

// Compute raw scores on the test set.

val predictionAndLabels = test.map { case LabeledPoint(label, features) =>

  val prediction = model.predict(features)

  (prediction, label)

}

// Get evaluation metrics.

val metrics = new MulticlassMetrics(predictionAndLabels)

val precision = metrics.precision

println("Precision = " + precision)

// Save and load model

model.save(sc, "myModelPath")

val sameModel = LogisticRegressionModel.load(sc, "myModelPath")

3.1.3 回歸

3.1.3.1 線性最小二乘,Lasso , 嶺回歸

最小二乘在回歸問題中經常使用。同樣是線性算法符合公式(1) , 損失函數形為:

Spark MLlib 1.6 -- 分類和回歸篇

使用不同的正規化方法得到不同最小二乘法: 正交最小二乘法或線性最小二乘法(不适用正規化);嶺回歸使用L2正規化;Lasso使用L1正規化。對所有這些模型,平均損失(訓練集錯誤率) 1/n  \SUM|_(i=1) ^n| ( w^T x_i – y_i )^2 , 稱為均方誤差。

例子:

下例展示如何加載訓練資料,轉化成标簽點的RDD,例子使用LinearRegressionWithSGD 建構線性模型來預測類标簽。最後計算均方差錯誤來評估拟合優度。

Scala LinearRegressionWithSGD API : https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.mllib.regression.LinearRegressionWithSGD

Scala LinearRegressionModel API : https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.mllib.regression.LinearRegressionModel

import org.apache.spark.mllib.regression.LabeledPoint

import org.apache.spark.mllib.regression.LinearRegressionModel

import org.apache.spark.mllib.regression.LinearRegressionWithSGD

import org.apache.spark.mllib.linalg.Vectors

// Load and parse the data

val data = sc.textFile("data/mllib/ridge-data/lpsa.data")

val parsedData = data.map { line =>

  val parts = line.split(',')

  LabeledPoint(parts(0).toDouble, Vectors.dense(parts(1).split(' ').map(_.toDouble)))

}.cache()

// Building the model

val numIterations = 100

val model = LinearRegressionWithSGD.train(parsedData, numIterations)

// Evaluate model on training examples and compute training error

val valuesAndPreds = parsedData.map { point =>

  val prediction = model.predict(point.features)

  (point.label, prediction)

}

val MSE = valuesAndPreds.map{case(v, p) => math.pow((v - p), 2)}.mean()

println("training Mean Squared Error = " + MSE)

// Save and load model

model.save(sc, "myModelPath")

val sameModel = LinearRegressionModel.load(sc, "myModelPath")

RidgeRegressionWithSGD 和 LassoWithSGD 使用同LinearRegressionWithSGD 

為了運作上例代碼,需要檢視spark 快速指南中 Self-Contained Applications 章節(https://spark.apache.org/docs/latest/quick-start.html#self-contained-applications)

3.1.3.2 流線性回歸

當資料是以流的形式進入模型,最好選取線上回歸模型,更新資料每批生成的周期。Spark.mllib  流線性回歸暫支援正交最小二乘。除了拟合度是計算每批次資料的到,拟合度計算方法和離線是一樣。

例子

下例展示如何從檔案生成訓練資料流和測試資料流。把流資料解釋為标簽點,拟合線上線性回歸模型,預測下一個流的類标簽。

首先,引入必須的輸入資料和模型類

import org.apache.spark.mllib.linalg.Vectors

import org.apache.spark.mllib.regression.LabeledPoint

import org.apache.spark.mllib.regression.StreamingLinearRegressionWithSGD

然後生成訓練資料流和測試資料流。假設StreamingContext ssc 已經生成,詳見Spark Streaming Programming Guide (https://spark.apache.org/docs/latest/streaming-programming-guide.html#initializing)

下例中我們使用标簽點來代表訓練和測試資料,實際中建議測試資料使用無标簽向量。

val trainingData = ssc.textFileStream("/training/data/dir").map(LabeledPoint.parse).cache()

val testData = ssc.textFileStream("/testing/data/dir").map(LabeledPoint.parse)

初始化模型權重為0

val numFeatures = 3

val model = new StreamingLinearRegressionWithSGD()

    .setInitialWeights(Vectors.zeros(numFeatures))

注冊訓練資料流和測試資料流,将預測結果列印出來。

model.trainOn(trainingData)

model.predictOnValues(testData.map(lp => (lp.label, lp.features))).print()

ssc.start()

ssc.awaitTermination()

現在可以把訓練和測試資料流儲存在不同的檔案夾下。每行記錄的資料點格式( y , [ x1,x2,x3]) ,此處y 是類标簽, x1,x2,x3是特征向量。訓練資料檔案隻要儲存在/training/data/dir, 模型就會随時更新,測試資料檔案儲存在/testing/data/dir 下就會計算類标簽預測。注意,訓練資料越多,預測結果越好。

3.1.4 實作(開發者)

Spark.mllib實作了簡單分布式版本的SGD(stochastic gradient descent),這個SGD是基于(underlying) 梯度下降法。所有提供的算法接受正規化參數作為輸入(regParam) , 同時還有其他SGD的各種參數(stepSize , numIterations , miniBatchFraction ) 。 罪域每個參數,我們提供三種可能的正規化(none , L1 , L2 )

邏輯回歸,L-BFGS版本的實作基于LogisticRegressionWithLBFGS類,這個實作支援兩類别邏輯回歸和多類别邏輯回歸,而SGD隻支援兩類别邏輯回歸。盡管,L-BFGS不支援L1正規化, 單SGD隻支援L1正規化。當L1正規化不是必選是, 強烈推薦L-BFGS算法, 因為它收斂更快,比SGD算法更精确的逼近逆 Hessian 矩陣,這個Hessian 矩陣通過拟牛頓法(quasi-Newton methond)

算法的Scala 實作:

· SVMWithSGD

· LogisticRegressionWithLBFGS

· LogisticRegressionWithSGD

· LinearRegressionWithSGD

· RidgeRegressionWithSGD

· LassoWithSGD

Python 調用scala 實作: PythonMLLibAPI.

3.2 樸素貝葉斯

假設特征向量的兩兩次元獨立,則使用樸素貝葉斯可以計算多分類,并且它的訓練效率很高。計算一遍所有訓練樣本,可以算每個類的類條件機率分布函數,然後給定一個測試樣本,可以分别計算給定測試樣本(觀測值)的條件下,給定類的條件機率,将測試樣本劃分到類條件機率做的類别。

Spark.mllib 支援多類别樸素貝葉斯和伯努利樸素貝葉斯(Bernoulli naive Bayes)。這些算法多用于文檔分類。在文檔上下文中,觀測值是每個文檔特定單詞的出現頻率(多類别樸素貝葉斯),或者在伯努利樸素貝葉斯中,觀測值是每個文檔特定單詞的0(文檔中未出現此單詞)和1(文檔中出現此單詞)值,這樣才能確定每個特征值非負值。模型算法可以使用“multinomial” or “bernoulli” , 預設是“multinomial”。額外平滑參數

λ(default to 1.0)。文檔分類中,輸入特征向量一般是稀松的,稀松向量可以節省記憶體和網絡IO,且訓練樣本隻用計算一次,是以可以不用緩存。

3.2.1 例子

NaiveBayes 實作了多分類樸素貝葉斯,輸入訓練資料RDD[LabeledPoint]和lambda 平滑因子。配置模型可選參數,計算出NaiveBayesModel 的執行個體可以用來預測和分類。

Scala NaiveBayes API : https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.mllib.classification.NaiveBayes

Scala NaiveBayesModel API : 

https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.mllib.classification.NaiveBayesModel

import org.apache.spark.mllib.classification.{NaiveBayes, NaiveBayesModel}

import org.apache.spark.mllib.linalg.Vectors

import org.apache.spark.mllib.regression.LabeledPoint

val data = sc.textFile("data/mllib/sample_naive_bayes_data.txt")

val parsedData = data.map { line =>

  val parts = line.split(',')

  LabeledPoint(parts(0).toDouble, Vectors.dense(parts(1).split(' ').map(_.toDouble)))

}

// Split data into training (60%) and test (40%).

val splits = parsedData.randomSplit(Array(0.6, 0.4), seed = 11L)

val training = splits(0)

val test = splits(1)

val model = NaiveBayes.train(training, lambda = 1.0, modelType = "multinomial")

val predictionAndLabel = test.map(p => (model.predict(p.features), p.label))

val accuracy = 1.0 * predictionAndLabel.filter(x => x._1 == x._2).count() / test.count()

// Save and load model

model.save(sc, "target/tmp/myNaiveBayesModel")

val sameModel = NaiveBayesModel.load(sc, "target/tmp/myNaiveBayesModel")

3.3 決策樹

·  Basic algorithm

· Node impurity and information gain

· Split candidates

· Stopping rule

·  Usage tips

· Problem specification parameters

· Stopping criteria

· Tunable parameters

· Caching and checkpointing

·  Scaling

·  Examples

· Classification

· Regression

決策樹算法常用于機器學習中分類和回歸問題,由于以下優點,決策樹得到廣泛使用:

1 處理特征分類時,對分類結果容易直覺解釋

2 容易擴充到多類情況

3 不去要對特征向量進行規整()

4 可以處理非線性問題

5 可以直覺觀察特征的比對互動過程

決策樹算法族,諸如随機森林和随機深林的提升算法在處理分類和回歸問題是效率最高。

Spark.mllib的決策樹使用連續特征和歸類特征,應用于兩類别分類和多類别分類,以及回歸問題。決策樹實作按行分片處理,最多允許分布式訓練百萬行資料。

随機森林和梯度提升樹詳見 Ensembles guide (https://spark.apache.org/docs/latest/mllib-ensembles.html)

3.3.1 基本算法

決策樹是貪婪算法,它會按二分去周遊整個特征向量空間。算法預測相同标簽類的葉節點集。每一種分片的結果都是在決策節點上,所有可能的劃分方法中選取最優的方法,選取最優的依據是資訊增益(information gain) 最大化。 換句話說,在每個決策節點,選取使 argmax(s)  IG(D,s) , 資訊增益最大化的參數s  ,此處 , IG(D,s) 是在資料集D 上應用劃分方法s 所得到的資訊增益。

3.3.1.1 節點混雜度和資訊增益

節點混雜度是測量節點上标簽集均一性。目前實作兩種分類混雜度(Gini 混雜度和熵), 一種回歸混雜度。

Spark MLlib 1.6 -- 分類和回歸篇

資訊增益不同于父節點的混雜度,以及子節點的混雜度帶權重之和。假設劃分s 将資料集D 劃分為D1  和D2 ,其中D1有N1個元素,D2有N2個元素。

資訊增益

IG(D,s) = Impurity(D) – N1/N Impurity(D1) – N2/N Impurity(D2)

3.3.1.2 拆分可選集

3.3.1.2.1 連續特征

在單機上小資料集上,給定特征向量,不管怎麼劃分它的特征向量值是唯一的。有些實作先對特征向量進行排序,然後使用排序後的特征向量快速計算。

但對大量特征向量排序是不可取的,有些實作相對樣本集進行抽樣,對抽樣樣本集計算分位數(如四分位數),然後在對全部特征向量按分為點近似劃分備選集。這種方式把特征向量空間劃分為幾個區域,max’Bins 參數可以設定最多允許多少個這樣的區域。

需要注意次數區域數不能大于樣本的分類樹N(預設maxBins 為32,雖然大多數情況這個數不适用) 。 當區域數大于類别數時,決策樹算法會自動将這個區域數調低。

3.3.1.2.2 特征歸類

假設可以講特征向量歸為M個類,共有2^(M-1) – 1種可能的分法,對于兩類别分類和回歸問題,通過按分位點将類别标簽從新排序,我們可以把這個可能的分法降為M-1, 如,兩類别分類問題有一個類别特征,三個歸類A,B和C,其中類别标簽1占比分别是0.2 , 0.6 和0.4 , 則從新排序後是A,C,B . 有兩種劃分法A| C,B  或 A,C | B 。

在多類别分類中,共有 2^(M-1) – 1 中可能劃分。但是當 2^(M-1) – 1 比maxBins  參數大時,使用類似兩類别分類和回歸相似的啟發式方法。M 個類特征按混雜度排序,共有M – 1 中分類劃分。

3.3.1.3 停止規則

決策樹遞歸結束條件:當一下任一個滿足後結束:

1 樹的深度等于maxDepth 訓練參數

2 沒有一種劃分可以使得資訊增益大于minInfoGain

3 沒有一種劃分可以生成新的子節點,這個子節點要滿足minInstancesPerNode 訓練參數。

3.3.2 使用提示

在本節中會讨論決策樹的各種參數,下面分别說明。 對于初學者需要掌握”Problem specification parameters” 的參數和maxDepth 參數。

3.3.2.1 問題關鍵參數

下面參數是關鍵參數,不需要優化。

1) algo(算法) : Classification or Regression 

2) numClasses: 類别數(隻在分類中有用)

3) categoricalFeaturesInfo: 确定哪些特征值需要分類,以及這些特征值可以劃分為幾類。這個是用map 表示, 每個元素的key 是特征值索引(從0 開始) , 每個元素的value 是特征值可以劃分的類别數(标簽值從0開始)。所有不在此map中的特征就認為是連續特征(即不需要劃分類别的特征)

i>  例如: Map( 0 ->2 , 4 -> 10 ) , 特征值索引為0的可以劃分為兩類别(列别标簽0,1) , 特征值縮影為4的可以劃分為10個類别(0,1,…9) . 需要注意特征值索引和特征值對應類别标簽都是從0 開始。

ii> 需要注意,即使不配置categoricalFeaturesInfo這個參數,算法任然可以運作,但處于對性能的考慮,還是建議仔細配置這個參數。

3.3.2.2 停止原則

此參數決定何時算法停止。配置此參數是需要仔細斟酌,防止決策樹過分拟合。

maxDepth: 決策樹最大深度,深度越大,訓練所要花費的時間越多,更容易過分拟合。

minInstancesPerNode: 為了保證每個節點能進一步劃分,每個子節點至少要包含的訓練樣本數。這個在随機森林裡用的多,因為它的深度比一棵樹要多。

minInfoGain: 每個節點進一步劃分是,必須保證這個劃分至少可以獲得的資訊增益數。

3.3.2.3 可優化參數

一下參數可以用于優化。

maxBins : 當離散連續特征是,允許的最大bin數

i> 增大maxBins允許算法更多地分類劃分,可能會獲得更好的決策規則,但是卻增加計算的複雜度。

ii> 這個值至少要大于任一個特征劃分的最大類别數。

maxMemoryInMB: 用于計算足夠多統計量所需要的最大記憶體。

i>預設記憶體使用256M ,基本能保證決策樹在大多數場景工作。

通過減少訓練過程中資料的傳遞,是以提高這個值可以加速訓練的過程。然而,因為每次疊代是和資料互動的次數正比于maxMemoryInMB , 是以提升這個值可以降低收益(資訊增益)。

實作細節:決策樹算法計算待劃分節點集的統計資訊。每個歸類的節點的總數是有記憶體決定的。maxMemoryInMB 确定每個worker 可以使用統計量所占用的MB。

subsamplingRate: 訓練集資料用于決策樹訓練的采樣比例。這個參數決定訓練樣本的采樣比例,是以直接影響訓練樹算法體系(使用随機森林和梯度提升樹)。對于單個決策樹沒有太大作用,主要是訓練樣本數目并不是主要限制決策樹算法準确性。

Impurity: 選擇特征劃分所需滿足的混雜度,這個參數和algo參數要對應。

3.3.2.4 緩存和檢查點

MLlib1.2中添加特性處理“放大”的深度樹和決策樹系統。當maxDepth 很大時,算法最好開啟樹節點(Node ID) 的緩存和定期檢查點。當numTrees設定很大時,這些參數同樣可用于随機樹(RandomForest)。

useNodeIdCache: 當設定為true時,算法在每次疊代時不會把模型(決策樹或決策森林)傳遞給executors.

對于深度很大的樹(或樹系統),設定這個參數可以加速在workers上的計算,對于随機森林算法,可以降低每次疊代時模型和executor的互動。

實作細節:預設,算法疊代期間模型和executor互動,保證在每個樹節點可以比對相應的訓練資料,當把此參數設定為開啟,算法會在樹節點上緩存訓練資料,這樣可以減少模型和executor的互動。

樹節點(Node ID) 每次緩存生成一個RDDs。這樣在反複疊代時,這樣很冗長的線性依賴會降低系統運算性能,另一方面,定期檢查點可以緩解RDD 線性依賴,它會把RDD線性依賴之前的RDD緩存到檔案系統,需要事先設定useNodeIdCache 為true

checkpointDir : 樹節點(Node ID)把RDD儲存的HDFS檔案路徑

checkpointInterval: 樹節點(Node ID) 緩存RDD 的周期。 把此參數設定過短會導緻頻繁寫HDFS, 設定過長,一旦executor 失敗時如果沒有把RDD線性依賴都儲存在檔案系統上,則需要全部從新計算。

3.3.3 放大

計算的時間消耗基本線性正比于訓練樣本數目,特征數,和maxBins 參數。但互動時間消耗近似線性正比于特征數和maxBins.

算法可以讀取稀松向量資料和緊緻向量資料。然而, 對于稀松向量資料并沒有做任何優化。

3.3.4 例子

3.3.4.1 分類

下例展現如何加載LIBSVM 資料檔案, 将資料解析成RDD[LabeledPoint] , 然後運用決策樹算法進行分類,算法使用Gini混雜度和樹深度最大5 。 測試資料的錯誤率用于評估算法準确性。

Scala DecisionTree API : https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.mllib.tree.DecisionTree

Scala DecisionTreeModel API : https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.mllib.tree.model.DecisionTreeModel

import org.apache.spark.mllib.tree.DecisionTree

import org.apache.spark.mllib.tree.model.DecisionTreeModel

import org.apache.spark.mllib.util.MLUtils

// Load and parse the data file.

val data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")

// Split the data into training and test sets (30% held out for testing)

val splits = data.randomSplit(Array(0.7, 0.3))

val (trainingData, testData) = (splits(0), splits(1))

// Train a DecisionTree model.

//  Empty categoricalFeaturesInfo indicates all features are continuous.

val numClasses = 2

val categoricalFeaturesInfo = Map[Int, Int]()

val impurity = "gini"

val maxDepth = 5

val maxBins = 32

val model = DecisionTree.trainClassifier(trainingData, numClasses, categoricalFeaturesInfo,

  impurity, maxDepth, maxBins)

// Evaluate model on test instances and compute test error

val labelAndPreds = testData.map { point =>

  val prediction = model.predict(point.features)

  (point.label, prediction)

}

val testErr = labelAndPreds.filter(r => r._1 != r._2).count().toDouble / testData.count()

println("Test Error = " + testErr)

println("Learned classification tree model:\n" + model.toDebugString)

// Save and load model

model.save(sc, "target/tmp/myDecisionTreeClassificationModel")

val sameModel = DecisionTreeModel.load(sc, "target/tmp/myDecisionTreeClassificationModel")

完整例子見"examples/src/main/scala/org/apache/spark/examples/mllib/DecisionTreeClassificationExample.scala"

3.3.4.2 回歸

下例展現如何加載LIBSVM 資料檔案, 将資料解析成RDD[LabeledPoint] , 然後運用決策樹算法進行回歸,算法使用Gini混雜度和樹深度最大5 。均方差錯誤(MSE)用于計算算法的拟合程度。

Scala DecisionTree API : 

https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.mllib.tree.DecisionTree

Scala DecisionTreeModel API : 

https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.mllib.tree.model.DecisionTreeModel

import org.apache.spark.mllib.tree.DecisionTree

import org.apache.spark.mllib.tree.model.DecisionTreeModel

import org.apache.spark.mllib.util.MLUtils

// Load and parse the data file.

val data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")

// Split the data into training and test sets (30% held out for testing)

val splits = data.randomSplit(Array(0.7, 0.3))

val (trainingData, testData) = (splits(0), splits(1))

// Train a DecisionTree model.

//  Empty categoricalFeaturesInfo indicates all features are continuous.

val categoricalFeaturesInfo = Map[Int, Int]()

val impurity = "variance"

val maxDepth = 5

val maxBins = 32

val model = DecisionTree.trainRegressor(trainingData, categoricalFeaturesInfo, impurity,

  maxDepth, maxBins)

// Evaluate model on test instances and compute test error

val labelsAndPredictions = testData.map { point =>

  val prediction = model.predict(point.features)

  (point.label, prediction)

}

val testMSE = labelsAndPredictions.map{ case (v, p) => math.pow(v - p, 2) }.mean()

println("Test Mean Squared Error = " + testMSE)

println("Learned regression tree model:\n" + model.toDebugString)

// Save and load model

model.save(sc, "target/tmp/myDecisionTreeRegressionModel")

val sameModel = DecisionTreeModel.load(sc, "target/tmp/myDecisionTreeRegressionModel")

完整例子:"examples/src/main/scala/org/apache/spark/examples/mllib/DecisionTreeRegressionExample.scala"

3.4 內建學習

·  Gradient-Boosted Trees vs. Random Forests

·  Random Forests

· Basic algorithm

o Training

o Prediction

· Usage tips

· Examples

o Classification

o Regression

·  Gradient-Boosted Trees (GBTs)

· Basic algorithm

o Losses

· Usage tips

o Validation while training

· Examples

o Classification

o Regression

內建學習就是把多種模型整合在一起的學習算法,spark.mllib支援兩種主要的內建學習:梯度提升樹(GradientBoostedTrees )和随機森林。

3.4.1 梯度提升樹 vs 随機森林

梯度提升樹和随機森林都屬于內建樹算法,但訓練的過程截然不同:

1 GBT每次訓練一顆樹模型,而随機森林可以同時訓練多個樹模型,是以,訓練多個樹模型時,GBT明顯比随機森林耗費更多的時間。

雖然GBT每次訓練樹模型時更耗時,但是可以使用選擇每次訓練一顆簡單樹,這樣可能反而會比訓練随機森林更省時。

2 随機森林更不益于過度拟合,同時訓練多個樹模型的随機森林可以降低拟合過程中模型間的相似性,但使用GBT訓練多個樹模型很容易得出過度拟合的結果(統計上看來,随機樹訓練時減少多個樹模型的變量數,但GBT訓練多個樹模型,為了降低無偏性會增加變量樹)

簡言之,兩種算法都是高效的,差別隻在于針于特定問題時需要取舍。

3.4.2 随機森林

随機森林是決策算法中內建樹算法。并且随機森林是最成功的機器學習算法之一,适用于分類和回歸問題。随機森林綜合多個決策樹來降低訓練過度的機率。和決策樹類似,随機森林可以用于多類别的特征歸類,它在優點就是可以不用對特征進行縮放,可以擴充到非線性特征以及特征互動(feature interaction).

Spark.mllib 支援兩類别和多類别随機森林,以及連續特征和特征歸類的回歸。随機森林是通過決策樹的實作,是以建議詳細閱讀決策樹章節。

3.4.2.1 算法基礎

随機森林算法同時訓練多顆決策樹,是以算法可并行執行。算法在每顆樹的訓練過程會引入随機特性,以降低多個決策樹訓練結果的相關性。将多顆決策樹聯合在一起用于測試集預測可以減少預測的可變性,同時還可以提高測試集預測的效率。

3.4.2.1.1 訓練 

訓練過程中引入的随機特性包括:

1) 每次疊代訓練過程對原始資料進行子采集,以獲得不同的訓練集(a.k.a bootstrapping) 

2) 在每個樹子節點上随機地使用特征子集劃分方法

除了以上随機特性,随機森林的決策樹訓練算法和單個決策樹的訓練相同。

3.4.2.1.2 預測

為了使随機森林能更好的預測測試集,需要考慮随機森林預測結果是一個集合。針對分類和回歸兩個不同的問題,需要使用不同的政策把預測結果集合轉譯成最終的結果。

分類問題:少數服從多數原則,随機森林中每顆決策樹會輸出一個類标簽,将測試樣本歸到所有這些類标簽中出現次數的類别上。

回歸問題:平均值原則,随機森林中每顆決策樹會輸出一個實數,最終的結果是這些預測實數的均值。

3.4.2.2 使用提示

下面給出随機森林算法中各種參數的配置說明,以下省略決策樹章節中出現的參數說明。

1>  numTrees : 随機森林中決策樹顆數

i) 提高此參數可以降低預測中不确定性,提高測試集的準确性

ii) 訓練時間線性正比于決策樹顆數

2>  maxDepth 随機森林中決策樹的最大深度

i) 增加決策樹的深度可以提升随機森林的預測能力,但會耗費更多的訓練時間且很容易訓練過度。

ii) 一般,随機森林算法的樹深度可以大于單顆決策樹的深度,因為單顆決策樹訓練很容易出現訓練過度。

下面兩個參數可以加速算法訓練過程,一般不建議優化

3> subsamplingRate : 此參數設定算法疊代中,使用的訓練資料在原始資料集中占比,建議使用預設1.0,但使用更少的資料訓練可以極大的提升訓練速度。

4> featureSubsetStrategy: 決策樹每個子節點的特征集用于進一步劃分備選特征數。這個參數是一個分數或總特征數的一個函數。降低此參數可以加速訓練,但如果太低會嚴重影響算法的準确性。

3.4.2.3 例子

3.4.2.3.1 分類

下例給出加載LIBSVM資料檔案,轉化為LabeledPoint 的RDD,并使用随機森林進行分類,使用測試資料集的誤差來評測算法的準确性。

RandomForest Scala Docs API : http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.mllib.tree.RandomForest

RandomForestModel Scala Docs API:

http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.mllib.tree.model.RandomForestModel

import org.apache.spark.mllib.tree.RandomForest

import org.apache.spark.mllib.tree.model.RandomForestModel

import org.apache.spark.mllib.util.MLUtils

// Load and parse the data file.

val data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")

// Split the data into training and test sets (30% held out for testing)

val splits = data.randomSplit(Array(0.7, 0.3))

val (trainingData, testData) = (splits(0), splits(1))

// Train a RandomForest model.

// Empty categoricalFeaturesInfo indicates all features are continuous.

val numClasses = 2

val categoricalFeaturesInfo = Map[Int, Int]()

val numTrees = 3 // Use more in practice.

val featureSubsetStrategy = "auto" // Let the algorithm choose.

val impurity = "gini"

val maxDepth = 4

val maxBins = 32

val model = RandomForest.trainClassifier(trainingData, numClasses, categoricalFeaturesInfo,

  numTrees, featureSubsetStrategy, impurity, maxDepth, maxBins)

// Evaluate model on test instances and compute test error

val labelAndPreds = testData.map { point =>

  val prediction = model.predict(point.features)

  (point.label, prediction)

}

val testErr = labelAndPreds.filter(r => r._1 != r._2).count.toDouble / testData.count()

println("Test Error = " + testErr)

println("Learned classification forest model:\n" + model.toDebugString)

// Save and load model

model.save(sc, "target/tmp/myRandomForestClassificationModel")

val sameModel = RandomForestModel.load(sc, "target/tmp/myRandomForestClassificationModel")

完整的例子見:"examples/src/main/scala/org/apache/spark/examples/mllib/RandomForestClassificationExample.scala"

3.4.2.3.2 回歸

下例給出加載LIBSVM資料檔案,轉化為LabeledPoint 的RDD,并使用随機森林進行回歸,使用測試資料集的MSE(均方差)來評測算法的可用性。

RandomForest Scala Docs API : http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.mllib.tree.RandomForest

RandomForestModel Scala Docs API : http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.mllib.tree.model.RandomForestModel

import org.apache.spark.mllib.tree.RandomForest

import org.apache.spark.mllib.tree.model.RandomForestModel

import org.apache.spark.mllib.util.MLUtils

// Load and parse the data file.

val data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")

// Split the data into training and test sets (30% held out for testing)

val splits = data.randomSplit(Array(0.7, 0.3))

val (trainingData, testData) = (splits(0), splits(1))

// Train a RandomForest model.

// Empty categoricalFeaturesInfo indicates all features are continuous.

val numClasses = 2

val categoricalFeaturesInfo = Map[Int, Int]()

val numTrees = 3 // Use more in practice.

val featureSubsetStrategy = "auto" // Let the algorithm choose.

val impurity = "variance"

val maxDepth = 4

val maxBins = 32

val model = RandomForest.trainRegressor(trainingData, categoricalFeaturesInfo,

  numTrees, featureSubsetStrategy, impurity, maxDepth, maxBins)

// Evaluate model on test instances and compute test error

val labelsAndPredictions = testData.map { point =>

  val prediction = model.predict(point.features)

  (point.label, prediction)

}

val testMSE = labelsAndPredictions.map{ case(v, p) => math.pow((v - p), 2)}.mean()

println("Test Mean Squared Error = " + testMSE)

println("Learned regression forest model:\n" + model.toDebugString)

// Save and load model

model.save(sc, "target/tmp/myRandomForestRegressionModel")

val sameModel = RandomForestModel.load(sc, "target/tmp/myRandomForestRegressionModel")

完整例子見:"examples/src/main/scala/org/apache/spark/examples/mllib/RandomForestRegressionExample.scala"

3.4.3 梯度提升樹(GBTs)

梯度提升樹(Gradient-Boosted Tress ,GBTs)是決策樹算法的內建,GBT 疊代訓練決策樹以使損失函數的值最小。和決策樹一樣,GBT可以處理歸類特征,同時可以擴充到多類别分類問題。它在優點就是可以不用對特征進行縮放,可以擴充到非線性特征以及特征互動(feature interaction).

Spark.mllib 支援GBT算法可用于二類别分類和回歸問題,算法不限制是連續特征或歸類特征。Spark.mllib 使用已有決策樹算法實作GBT算法,請詳細檢視決策樹章節的說明。

注意:

GBT暫不支援多類分類問題,如果需要處理多類别分類,請使用決策樹或随機森林。

3.4.3.1 算法基礎

梯度提升算法疊代訓練一序列的決策樹,每次疊代時,算法使用訓練出的多顆決策樹進行預測,對目前訓練集資料分類結果準确性進行評估,将預測分類錯誤的資料集再标簽化(re-labeled)來強調這部分錯誤集。 這樣下次,決策樹就可以逐漸修正之前分類錯誤的資料集。

錯誤資料集的再标簽化實作是通過損失函數(下面讨論)來實作的,每次疊代,GBT 算法會使訓練集上的損失函數值下降。(譯者:為了使這個損失函數下降更快,或者使算法快速收斂,此算法使用梯度下降法)

3.4.3.1.1 損失

下表中列出目前spark.mllib GBTs 算法中支援的幾種損失函數。需要注意,每個損失函數有最佳的應用場景,如分類或回歸,但并不是通過的損失函數(即不可能同時适用于分類和回歸)

說明:

N 是樣本個數   

y_i  是樣本i類标簽

x_i 是樣本i的特征

F(x_i) 是樣本i 的預測類标簽

Spark MLlib 1.6 -- 分類和回歸篇

3.4.3.2 使用提示

下面詳細讨論GBT算法的參數,此處省略決策樹的相應參數,如需了解請檢視決策樹說明章節。

I)loss: 算法中使用的損失函數,針對分類和回歸問題需要選取合适的損失函數。同時,使用不同損失函數得到的模型也是不同的。

II) numIterations : GBT算法中訓練疊代的次數。注意每次疊代生成一顆決策樹,也就是算法要求的決策樹顆數。提高此值會此算法耗費更多的疊代次數,當然疊代出的模型會更準确,同時,測試資料準确率的計算的時間也會變長。

III) learningRate : 建議最好不要優化此參數,如果算法不穩定,可以降低此值來使算法穩定。

IV) algo : 配置分類或回歸問題(classification vs regreesion ) 

3.4.3.2.1 訓練中校驗 

當GBT算法訓練的決策樹過多時,會導緻算法過拟合。為了防止過拟合,需要在訓練過程中進行校驗。方法runWithValidation 用于此種校驗。此方法有兩個參數,參數1是訓練的樣本集(RDD),參數2 是校驗樣本集。

當校驗誤差超出算法可以允許的範圍(BoostingStrategy 的validationTol參數)時,算法會停止訓練。實踐中,校驗誤會降低後再增長,這也就是校驗誤差不是單調的,建議使用者設定盡量大的誤差允許範圍,每次疊代使用evaluateEachIteration(每次疊代的損失)檢查校驗誤差曲線,選擇更優化的疊代次數。 

3.4.3.3 例子

3.4.3.3.1 分類 

下例給出加載LIBSVM資料檔案,轉化為LabeledPoint 的RDD,并使用梯度提升樹進行分類和損失計算,使用測試資料集的錯誤率來評測算法的準确性。

GradientBoostedTrees Scala DOCS API : http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.mllib.tree.GradientBoostedTrees

GradientBoostedTreesModel Scala DOCS API : 

http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.mllib.tree.model.GradientBoostedTreesModel

import org.apache.spark.mllib.tree.GradientBoostedTrees

import org.apache.spark.mllib.tree.configuration.BoostingStrategy

import org.apache.spark.mllib.tree.model.GradientBoostedTreesModel

import org.apache.spark.mllib.util.MLUtils

// Load and parse the data file.

val data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")

// Split the data into training and test sets (30% held out for testing)

val splits = data.randomSplit(Array(0.7, 0.3))

val (trainingData, testData) = (splits(0), splits(1))

// Train a GradientBoostedTrees model.

// The defaultParams for Classification use LogLoss by default.

val boostingStrategy = BoostingStrategy.defaultParams("Classification")

boostingStrategy.numIterations = 3 // Note: Use more iterations in practice.

boostingStrategy.treeStrategy.numClasses = 2

boostingStrategy.treeStrategy.maxDepth = 5

// Empty categoricalFeaturesInfo indicates all features are continuous.

boostingStrategy.treeStrategy.categoricalFeaturesInfo = Map[Int, Int]()

val model = GradientBoostedTrees.train(trainingData, boostingStrategy)

// Evaluate model on test instances and compute test error

val labelAndPreds = testData.map { point =>

  val prediction = model.predict(point.features)

  (point.label, prediction)

}

val testErr = labelAndPreds.filter(r => r._1 != r._2).count.toDouble / testData.count()

println("Test Error = " + testErr)

println("Learned classification GBT model:\n" + model.toDebugString)

// Save and load model

model.save(sc, "target/tmp/myGradientBoostingClassificationModel")

val sameModel = GradientBoostedTreesModel.load(sc,

  "target/tmp/myGradientBoostingClassificationModel")

完整例子見"examples/src/main/scala/org/apache/spark/examples/mllib/GradientBoostingClassificationExample.scala"

3.4.3.3.2 回歸

下例給出加載LIBSVM資料檔案,轉化為LabeledPoint 的RDD,并使用梯度提升樹進行回歸計算,損失函數選取方差(SE squared error),使用測試資料集的均方差(MSE mean squared error)來評測算法的适用性。

GradientBoostedTrees Scala Docs API : http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.mllib.tree.GradientBoostedTrees

GradientBoostedTreesModel Scala Docs API : 

import org.apache.spark.mllib.tree.GradientBoostedTrees

import org.apache.spark.mllib.tree.configuration.BoostingStrategy

import org.apache.spark.mllib.tree.model.GradientBoostedTreesModel

import org.apache.spark.mllib.util.MLUtils

// Load and parse the data file.

val data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")

// Split the data into training and test sets (30% held out for testing)

val splits = data.randomSplit(Array(0.7, 0.3))

val (trainingData, testData) = (splits(0), splits(1))

// Train a GradientBoostedTrees model.

// The defaultParams for Regression use SquaredError by default.

val boostingStrategy = BoostingStrategy.defaultParams("Regression")

boostingStrategy.numIterations = 3 // Note: Use more iterations in practice.

boostingStrategy.treeStrategy.maxDepth = 5

// Empty categoricalFeaturesInfo indicates all features are continuous.

boostingStrategy.treeStrategy.categoricalFeaturesInfo = Map[Int, Int]()

val model = GradientBoostedTrees.train(trainingData, boostingStrategy)

// Evaluate model on test instances and compute test error

val labelsAndPredictions = testData.map { point =>

  val prediction = model.predict(point.features)

  (point.label, prediction)

}

val testMSE = labelsAndPredictions.map{ case(v, p) => math.pow((v - p), 2)}.mean()

println("Test Mean Squared Error = " + testMSE)

println("Learned regression GBT model:\n" + model.toDebugString)

// Save and load model

model.save(sc, "target/tmp/myGradientBoostingRegressionModel")

val sameModel = GradientBoostedTreesModel.load(sc,

  "target/tmp/myGradientBoostingRegressionModel")

完整見 "examples/src/main/scala/org/apache/spark/examples/mllib/GradientBoostingRegressionExample.scala"

3.5 保序回歸

保序回歸是說:對于給定随機變量Y的有限觀察集,記為Y=y_1,y_2,…,y_n, y_i \In R(實數),及相應自變量X = x_1,x_2,…,x_n , 在保證x_1<= x_2 <= … <= x_n 的前提下,使以下拟合函數取最小值:

   f(x) = \Sum|_{i=1} |^{n} w_i (y_i – x_i)^2

此處, w_i 是正的權重。這個拟合函數稱為保序回歸,并且這個拟合函數是唯一的。以上問題可看成在自變量全序條件下的最小二乘問題,顯然保序函數是一個單調函數。

Spark.mllib 支援保序回歸的PAVA(pool adjacent violators algorithm) 算法,此算法可實作并行保序回歸。輸入訓練集是三元組的RDD,三元組第一個元素是雙精度浮點數代表标簽,第二和第三個元素是特征值和對應的權重。除此而外,IsotonicRegression算法可以設定參數 isotonic 預設是true . 此參數是true表示保序算法要求是單調遞增,false表示保序算法要求是單調遞減。

訓練結果傳回保序回歸模型,可用于預測已知或未知特征的類标簽。保序回歸模型是分段線性函數,預測分類的規則:

1)如果預測的輸入集和訓練特征集比對,則傳回相應的預測結果。為了防止同一個輸入特征傳回多個預測結果(可相同或不同),對同一個特征需要定義哪些傳回無效。

2)如果預測的輸入集比訓練特征集每個都低(或高),則傳回最低(或最高)的特征标簽。取最低(或最高)特征的标簽值可以防止同一個特征輸入傳回多個預測标簽值。

3)如果輸入特征落在兩個訓練特征之間,那麼預測結果看成是分段線性函數,需要對這兩個輸入特征進行插值後再進行預測。為了防止同一個特征傳回多個預測結果可以使用前面幾點中相同的處理方式。

3.5.1 例子

原始檔案每行格式為

标簽,特征如 4710.28,500.00.

從這個原始檔案讀取資料後分成訓練集和測試集。用訓練集訓練模型并計算測試集真實标簽與預測标簽的均方差(MSE)

IsotonicRegressionScala Docs API : http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.mllib.regression.IsotonicRegression

IsotonicRegressionModelScala Docs API : http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.mllib.regression.IsotonicRegressionModel

importorg.apache.spark.mllib.regression.{IsotonicRegression,IsotonicRegressionModel}

val data = sc.textFile("data/mllib/sample_isotonic_regression_data.txt")

// Create label,feature, weight tuples from input data with weight set to default value 1.0.

val parsedData = data.map { line =>

  val parts = line.split(',').map(_.toDouble)

  (parts(0), parts(1),1.0)

}

// Split datainto training (60%) and test (40%) sets.

val splits = parsedData.randomSplit(Array(0.6,0.4), seed =11L)

val training = splits(0)

val test = splits(1)

// Createisotonic regression model from training data.

// Isotonicparameter defaults to true so it is only shown for demonstration

val model =newIsotonicRegression().setIsotonic(true).run(training)

// Create tuplesof predicted and real labels.

valpredictionAndLabel = test.map { point =>

  val predictedLabel = model.predict(point._2)

  (predictedLabel, point._1)

}

// Calculatemean squared error between predicted and real labels.

valmeanSquaredError = predictionAndLabel.map {case(p, l)=> math.pow((p - l),2)}.mean()

println("MeanSquared Error = "+meanSquaredError)

// Save and loadmodel

model.save(sc,"target/tmp/myIsotonicRegressionModel")

val sameModel =IsotonicRegressionModel.load(sc,"target/tmp/myIsotonicRegressionModel")

完整的例子見"examples/src/main/scala/org/apache/spark/examples/mllib/IsotonicRegressionExample.scala"

繼續閱讀