天天看點

spark 1.6 MLlib

譯者續:本文會持續更新。

MLlib 是spark 機器學習的庫,它的目标是使機器學習算法能更容易上手。這個庫包含通用學習算法和工具集,包括:分類,回歸,聚類,協同過濾,降維,以及深層優化政策和上層管道API(pipeline).

分為兩個包:

1 spark.mllib 包含基于RDD的原始API

2 spark.ml 包含上層操作DataFrame 的API, 可以構造機器學習管道,

推薦使用spark.ml 包,因為DataFrame API 在機器學習應用中更通用和靈活。但我們會持續支援spark.mllib 也配合spark.ml的開發。開發者可以送出新算法到spark.ml 包,但使用者可以持續關注spark.mllib和使用spark.mllib中的特性。例如,特征抽取和特征變換。

一下列出機器學習包中主要的功能,并講解細節。

spark.mllib: data types, algorithms, and utilities

·Data types

·Basic statistics

osummary statistics

ocorrelations

ostratified sampling

ohypothesis testing

ostreaming significance testing

orandom data generation

·Classification and regression

olinear models (SVMs, logistic regression, linear regression)

onaive Bayes

odecision trees

oensembles of trees (Random Forests and Gradient-Boosted Trees)

oisotonic regression

·Collaborative filtering

oalternating least squares (ALS)

·Clustering

ok-means

oGaussian mixture

opower iteration clustering (PIC)

olatent Dirichlet allocation (LDA)

obisecting k-means

ostreaming k-means

·Dimensionality reduction

osingular value decomposition (SVD)

oprincipal component analysis (PCA)

·Feature extraction and transformation

·Frequent pattern mining

oFP-growth

oassociation rules

oPrefixSpan

·Evaluation metrics

·PMML model export

·Optimization (developer)

ostochastic gradient descent

olimited-memory BFGS (L-BFGS)

一 資料類型 – MLlib

·Local vector

·Labeled point

·Local matrix

·Distributed matrix

oRowMatrix

oIndexedRowMatrix

oCoordinateMatrix

oBlockMatrix

MLlib支援單個節點的本地向量和本地名額,同時也支援基于RDDs的分布式名額集。本地向量和本地名額可看做資料模型的對外接口,而底層的線性代數操作有Breeze 和 jblas提供。監督學習中的訓練樣本在MLlib中稱為,“标簽點”(本人注解,即有類别資訊的樣本點資料)

1.1本地向量

本地向量有兩個關鍵資料:0開始在索引和雙精度浮點型值。MLlib支援兩類本地向量:緊緻向量和稀松向量。緊緻向量是一個雙精度浮點型向量元素組成的數組,稀松向量是兩個同長度的資料,一個是非0向量名額數組,另一個是非0向量元素數組。如, 向量(1.0,0.0,3.0) 的緊緻向量為[1.0,0.0,3.0] ,而對應的稀松向量為 

(3, [0, 2], [1.0, 3.0])

此處

,3代表向量長度(本人注解:[0,2] 是向量中非0資料的名額集,[1.0,3.0] 是對應非0.0資料的值)

本地向量的基類是

Vector,我們提供兩個實作:DenseVector 和SparseVector ,建議使用者使用Vectors的工廠方法建立本地向量。

Scala Vector API:

http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.mllib.linalg.Vector

Scala Vectors API

http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.mllib.linalg.Vectors

importorg.apache.spark.mllib.linalg.{Vector,Vectors}

// Create a dense vector (1.0, 0.0, 3.0).

val dv:Vector=Vectors.dense(1.0,0.0,3.0)

// Create a sparse vector (1.0, 0.0, 3.0) by specifying its indices and values corresponding to nonzero entries.

val sv1:Vector=Vectors.sparse(3,Array(0,2),Array(1.0,3.0))

// Create a sparse vector (1.0, 0.0, 3.0) by specifying its nonzero entries.

val sv2:Vector=Vectors.sparse(3,Seq((0,1.0),(2,3.0)))

注意:

Scala預設import scala.collection.immutable.Vector , 在運作spark ML時,需要手動引入import

org.apache.spark.mllib.linalg.Vector.

1.2

标簽點

标簽點是本地向量

,可以使緊緻向量,也可以使稀松向量。在ML

lib中

标簽點用于監督學習算法

但是綁定雙精度浮點類别标簽後

也可以應用于回歸和分類算法

在兩類分類中

類别标簽可選

0 或 1 , 對于多分類,類别标簽  從0 到(總類别數-

1

)。

标簽類使用

case classs LabeledPoint .

Scala LabdledPoint

  API

http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.mllib.regression.LabeledPoint

valneg=LabeledPoint(0.0,Vectors.sparse(3,Array(0,2),Array(1.0,3.0)))      

1.2.1

稀松資料

實踐中經常會碰到需要訓練稀松資料集,MLLib支援從LIBSVN格式直接讀取訓練資料,對于LIBSVN和LIBLINEAR的使用者對這種格式并不陌生。這種格式是文本檔案,每行是一個标簽點,這個點辨別一個稀松特征向量。

label index1:value1 index2:value2 ...
           

注意此處檔案中向量的索引是從1開始,加載到spark 後自動轉換為從0 開始。

MLUtils.loadLibSVMFile

 讀取按LIBSVN格式存儲的訓練測試資料

Scala MLUtils API : http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.mllib.util.MLUtils

valexamples:RDD[LabeledPoint]=MLUtils.loadLibSVMFile(sc,"data/mllib/sample_libsvm_data.txt")      

1.3本地矩陣

本地矩陣是單個主機上的矩陣,具有特性:整數的矩陣索引和(雙精度)浮點矩陣元素。MLLib支援緊緻矩陣,矩陣元素按列優先存儲在數組中,稀松矩陣,矩陣非0元素按列優先存儲在CSC格式(Compressed Sparse Column,壓縮稀松列),如下面緊緻矩陣:

spark 1.6 MLlib

(3,2)的矩陣存儲在數組中為: 

[1.0, 3.0, 5.0, 2.0, 4.0, 6.0]

本地矩陣的基類是Matrix , 同時提供兩種本地矩陣實作:DenseMatrix,和SparseMatrix 。 建議使用者使用Matrices 類的工廠方法建立本地矩陣。再次提醒,矩陣是按列優先的數組存儲。

Scala Matrix

http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.mllib.linalg.Matrix

Matrices API :

http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.mllib.linalg.Matrices

valsm:Matrix=Matrices.sparse(3,2,Array(0,1,3),Array(0,2,1),Array(9,6,8))      

1.4 分布式矩陣

分布式矩陣是分布在一個或多個RDDs的矩陣,具有特征:長整型矩陣索引,雙精度浮點矩陣元素。考慮到将分布式矩陣轉換為其他形式需要全局shuffle, 這樣很消耗時間,是以有必要仔細斟酌選擇合适形式來存儲分布式大矩陣。暫時支援三種類型的分布式矩陣。

第一類是RowMatrix .RowMatrix 是面向行存儲的矩陣,是以忽略行索引。例如,特征向量。這種矩陣每一行是一個本地向量(RDD)。假設每行的資料并不多,這樣本地矩陣可以在單節點的driver間自由通信,也可以在單節點上存儲和操作。

第二類是

IndexedRowMatrix

 ,它比RowMatrix多了行索引,這個行索引可以标記行并用于關聯操作。

第三類是

CoordinateMatrix

 ,這種舉證按CCO連結清單(coordinate list, https://en.wikipedia.org/wiki/Sparse_matrix#Coordinate_list_.28COO.29 ) 格式存儲, 連結清單每個元素是一個RDD。

注意:

分布式矩陣的RDD的行和列在cache時必須是确定的,否則會出錯。

1.4.1 RowMatrix

因為每行是一個本地向量,是以矩陣的列數限制在integer的範圍,在實際中不建議太大。

RowMatrix 可以由一個RDD[Vector]執行個體建立,然後可以做列統計和分解。QR分解的形式 A = QR , 此處Q是一個正交矩陣,而R是一個上三角矩陣。了解更多奇異值分解(SVD,https://en.wikipedia.org/wiki/Singular_value_decomposition)和主成分分析(PCA,https://en.wikipedia.org/wiki/Principal_component_analysis) ,請看降維章節, http://spark.apache.org/docs/latest/mllib-dimensionality-reduction.html。

Scala RowMatrix API : http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.mllib.linalg.distributed.RowMatrix

importorg.apache.spark.mllib.linalg.Vector

importorg.apache.spark.mllib.linalg.distributed.RowMatrix

val rows:RDD[Vector]=...// an RDD of local vectors

// Create a RowMatrix from an RDD[Vector].

val mat:RowMatrix=newRowMatrix(rows)

// Get its size.

val m= mat.numRows()

val n= mat.numCols()

// QR decomposition

val qrResult= mat.tallSkinnyQR(true)

1.4.2 IndexedRowMatrix

IndexedRowMatrix 可由RDD[IndexedRow] 執行個體建立,此處IndexedRow 封裝為(Long, Vector) . IndexedRowMatrix 去掉行索引就變成了RowMatrix。

Scala IndexedRowMatrix API : http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.mllib.linalg.distributed.IndexedRowMatrix

importorg.apache.spark.mllib.linalg.distributed.{IndexedRow,IndexedRowMatrix,RowMatrix}

val rows:RDD[IndexedRow]=...// an RDD of indexed rows

// Create an IndexedRowMatrix from an RDD[IndexedRow].

val mat:IndexedRowMatrix=newIndexedRowMatrix(rows)

// Get its size.

val m= mat.numRows()

val n= mat.numCols()

// Drop its row indices.

val rowMat:RowMatrix= mat.toRowMatrix()

1.4.3 CoordinateMatrix ( 調和矩陣)

Coordinatematrix 是分布式矩陣,所有元素做成的RDD對象。其中Tuple3 形如( i : Long , j : Long, value : Double ) ,此處i 是行索引, j 是列索引, value 是元素的值。CoordinateMatrix 隻在當矩陣行和列都很大時,同時矩陣非0 元素很稀松。

CoordinateMatrix 可以從RDD[MatrixEntry]執行個體建立,此處MatrixEntry 封裝為(Long , Long, Double )。 CoordinateMatrix 調用toIndexeedRowMatrix 方法可以将CoordinateMatrix 矩陣轉化為IndexedRowMatrix 矩陣,其他coordinateMatrix 的計算暫時還不支援。

Scala CoordinateMatrix API : http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.mllib.linalg.distributed.CoordinateMatrix

importorg.apache.spark.mllib.linalg.distributed.{CoordinateMatrix,MatrixEntry}

val entries:RDD[MatrixEntry]=...// an RDD of matrix entries

// Create a CoordinateMatrix from an RDD[MatrixEntry].

val mat:CoordinateMatrix=newCoordinateMatrix(entries)

// Get its size.

val m= mat.numRows()

val n= mat.numCols()

// Convert it to an IndexRowMatrix whose rows are sparse vectors.

val indexedRowMatrix= mat.toIndexedRowMatrix()

1.4.4 BlockMatrix (分塊矩陣)

BlockMatrix是分布式矩陣RDD[MarixBlock],此處MatrixBlock是元組((Int, Int) , Matrix ), 其中(Int, Int) 是矩陣塊的索引, Matrix 是給定矩陣塊索引的子矩陣,矩陣次元(是數組的長度)

rowsPerBlock

colsPerBlock

BlockMatrix矩陣支援

add 和 multiply 方法和另一個同次元的BlockMatrix 計算。

H

elper

函數

validate

可以校驗

BlockMatrix 是否設定正确。

BlockMatrix 矩陣可以有IndexedRowMatrix 或 CoordinateMatrix  調用toBlockMatrix 方法得到, toBlockMatrix 方法預設建立 1024 *

1024 的塊矩陣

使用者可以調用接口

toBlockMatrix(rowsPerBlock , colsPerBlock ) 修改矩陣次元。

Scala BlockMatrix API :

http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.mllib.linalg.distributed.BlockMatrix

importorg.apache.spark.mllib.linalg.distributed.{BlockMatrix,CoordinateMatrix,MatrixEntry}

val entries:RDD[MatrixEntry]=...// an RDD of (i, j, v) matrix entries

// Create a CoordinateMatrix from an RDD[MatrixEntry].

val coordMat:CoordinateMatrix=newCoordinateMatrix(entries)

// Transform the CoordinateMatrix to a BlockMatrix

val matA:BlockMatrix= coordMat.toBlockMatrix().cache()

// Validate whether the BlockMatrix is set up properly. Throws an Exception when it is not valid.

// Nothing happens if it is valid.

matA.validate()

// Calculate A^T A.

val ata= matA.transpose.multiply(matA)

2 基本統計 – spark.mllib

2.1 統計概覽

在Statistics類中提供基本列統計RDD[Vector]功能

colStats()傳回MultivariateStatisticalSummary 的執行個體,這個執行個體可以按列計算最大,最小,均值,方差,非0個數統計,列的1範數。

Scala MultivariateStatisticalSummary API : http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.mllib.stat.MultivariateStatisticalSummary

import org.apache.spark.mllib.linalg.Vector

import org.apache.spark.mllib.stat.{MultivariateStatisticalSummary, Statistics}

val observations: RDD[Vector] = ... // an RDD of Vectors

// Compute column summary statistics.

val summary: MultivariateStatisticalSummary = Statistics.colStats(observations)

println(summary.mean) // a dense vector containing the mean value for each column

println(summary.variance) // column-wise variance

println(summary.numNonzeros) // number of nonzeros in each column

2.2 相關統計

計算兩個資料序列(可以使向量或矩陣)的相關系數。在spark.mllib中,我們提供成對計算相關系數,實作了Pearson’s相關和Spearman’s相關。相關統計的結果依賴于計算對象,如果是兩個RDD[Double]的計算,結果是Double類型,如果是兩個RDD[Vector]計算,結果是一個Matrix矩陣。

Scala Statistics API : http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.mllib.stat.Statistics

import org.apache.spark.SparkContext

import org.apache.spark.mllib.linalg._

import org.apache.spark.mllib.stat.Statistics

val sc: SparkContext = ...

val seriesX: RDD[Double] = ... // a series

val seriesY: RDD[Double] = ... // must have the same number of partitions and cardinality as seriesX

// compute the correlation using Pearson's method. Enter "spearman" for Spearman's method. If a 

// method is not specified, Pearson's method will be used by default. 

val correlation: Double = Statistics.corr(seriesX, seriesY, "pearson")

val data: RDD[Vector] = ... // note that each Vector is a row and not a column

// calculate the correlation matrix using Pearson's method. Use "spearman" for Spearman's method.

// If a method is not specified, Pearson's method will be used by default. 

val correlMatrix: Matrix = Statistics.corr(data, "pearson")

2.3 分層采樣(Stratified sampling) 

在spark.mllib中提供計算原始RDD 鍵值對的分層采樣方法:sampleByKey 和 sampleByKeyExact 。在分層采樣中,鍵可以看做标簽類,相應的值可以看做屬性。如,鍵可以使男人或女人,文檔ID,相應的值可以使人的年齡或文檔的單次。 sampleByKey 方法随機采樣一系列觀測值,過程就像逐個周遊所有樣本點,通過抛銀币決定取舍,是以隻需要确定采樣點個數。sampleByKeyExact 比分層随機采樣方法sampleByKey需要更多地樣本,才能保證采樣點個數有99.99%的置信度,sampleByKeyExact暫不支援python.

sampleByKeyExact() 采樣由[ f_k , n_k ] 完全決定, 對任意一個鍵k 屬于 K 鍵集合,f_k是預期鍵對應采樣點值得占比(分數),n_k 是這個鍵k在整個集合中值的個數。無放回采樣(即采樣的資料取走,不會出現重複) 方法需要一個參數(withReplacement預設是false) , 而又放回采樣方法需要兩個參數。

import org.apache.spark.SparkContext

import org.apache.spark.SparkContext._

import org.apache.spark.rdd.PairRDDFunctions

val sc: SparkContext = ...

val data = ... // an RDD[(K, V)] of any key value pairs

val fractions: Map[K, Double] = ... // specify the exact fraction desired from each key

// Get an exact sample from each stratum

val approxSample = data.sampleByKey(withReplacement = false, fractions)

val exactSample = data.sampleByKeyExact(withReplacement = false, fractions)

2.4 假設檢驗

假設檢驗在統計上用于判定統計結果又多大統計意義,及統計結果有多大置信度。Spark.mllib 暫支援Pearson’s chi-squared 檢驗,檢驗結果的适用性和獨立性。輸入資料需要驗證适用性和獨立性。适用性檢驗需要輸入Vector , 獨立性需要資料Matrix 。

Spark.mllib 支援輸入RDD[LabledPoint] ,使用chi-squared獨立性來決定特征的選擇。

Statistics 提供方法運作Pearson’s chi-squared 檢驗,下例用于假設檢驗。

import org.apache.spark.SparkContext

import org.apache.spark.mllib.linalg._

import org.apache.spark.mllib.regression.LabeledPoint

import org.apache.spark.mllib.stat.Statistics._

val sc: SparkContext = ...

val vec: Vector = ... // a vector composed of the frequencies of events

// compute the goodness of fit. If a second vector to test against is not supplied as a parameter, 

// the test runs against a uniform distribution.  

val goodnessOfFitTestResult = Statistics.chiSqTest(vec)

println(goodnessOfFitTestResult) // summary of the test including the p-value, degrees of freedom, 

                                 // test statistic, the method used, and the null hypothesis.

val mat: Matrix = ... // a contingency matrix

// conduct Pearson's independence test on the input contingency matrix

val independenceTestResult = Statistics.chiSqTest(mat) 

println(independenceTestResult) // summary of the test including the p-value, degrees of freedom...

val obs: RDD[LabeledPoint] = ... // (feature, label) pairs.

// The contingency table is constructed from the raw (feature, label) pairs and used to conduct

// the independence test. Returns an array containing the ChiSquaredTestResult for every feature 

// against the label.

val featureTestResults: Array[ChiSqTestResult] = Statistics.chiSqTest(obs)

var i = 1

featureTestResults.foreach { result =>

    println(s"Column $i:\n$result")

    i += 1

} // summary of the test

Statistics 提供1-sample, 2-sided Kolmogorov-Smirnov檢驗機率分布是否相等。提供理論分布名稱和理論分布參數,或者根據已知理論分布計算累計分布函數,使用者可以檢驗樣本點是否出自來驗證機率分布。在特殊例子中,如正态分布,不用沒有提供正态分布參數,則檢驗會使用标準正态分布參數。

Scala Statistics API : http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.mllib.stat.Statistics

import org.apache.spark.mllib.stat.Statistics

val data: RDD[Double] = ... // an RDD of sample data

// run a KS test for the sample versus a standard normal distribution

val testResult = Statistics.kolmogorovSmirnovTest(data, "norm", 0, 1)

println(testResult) // summary of the test including the p-value, test statistic,

                    // and null hypothesis

                    // if our p-value indicates significance, we can reject the null hypothesis

// perform a KS test using a cumulative distribution function of our making

val myCDF: Double => Double = ...

val testResult2 = Statistics.kolmogorovSmirnovTest(data, myCDF)

2.5 流式顯著性測試

Spark.mllib 提供線上測試實作,如A/B線上測試。此測試需要在spark streaming DStream[(Boolean, Double)] 上使用,每個流單元的第一個元素是邏輯真假,假代表對照組(false),而真代表實驗組(true) , 第二個元素是觀測值。

流式顯著性檢驗支援這兩個參數:

1 peacePeriod  (平穩周期), 預設最初啟動後可以忽略的資料組數。

1 windowSize (窗尺寸) , 每次假設檢驗使用的資料批次數,若設為0 , 則累計處理之前所有批次。

StreamingTest 支援流式假設檢驗。

val data = ssc.textFileStream(dataDir).map(line => line.split(",") match {

  case Array(label, value) => BinarySample(label.toBoolean, value.toDouble)

})

val streamingTest = new StreamingTest()

  .setPeacePeriod(0)

  .setWindowSize(0)

  .setTestMethod("welch")

val out = streamingTest.registerStream(data)

out.print()

完整例子代碼見:examples/src/main/scala/org/apache/spark/examples/mllib/StreamingTestExample.scala

2.6 随機數發生器

随機數發生器在随機算法,随機模闆和性能測試中很有用。Spark.mllib 的随機發生器RDD 帶i.i.d. 随機資料來自給定分布:均勻分布, 标準正态, Possion (泊松分布)。

RandomRDDs 提供工廠方法來生成随機雙精度浮點RDD 和 随機向量RDD。下例生辰随機雙精度浮點RDD, 這些随機值來自标準正态分布N(0,1), 做平移和伸縮後映射到N(1,4)。

Scala RandomRDD API : http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.mllib.random.RandomRDDs

import org.apache.spark.SparkContext

import org.apache.spark.mllib.random.RandomRDDs._

val sc: SparkContext = ...

// Generate a random double RDD that contains 1 million i.i.d. values drawn from the

// standard normal distribution `N(0, 1)`, evenly distributed in 10 partitions.

val u = normalRDD(sc, 1000000L, 10)

// Apply a transform to get a random double RDD following `N(1, 4)`.

val v = u.map(x => 1.0 + 2.0 * x)

2.6 核密度估計

核密度估計在經驗機率分布圖中用處很大,這種分布圖不需要假設觀測值來自特定的某個分布。通過給定點集,來計算随機變量的機率密度函數。通過計算經驗分布在特定點的PDF(偏導數),作為标準正态分布在每個采樣點附近的PDF。

KernelDensity 提供方法計算RDD采樣點集的核密度估計,見下例:

Scala KernelDensity API: http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.mllib.stat.KernelDensity

import org.apache.spark.mllib.stat.KernelDensity

import org.apache.spark.rdd.RDD

val data: RDD[Double] = ... // an RDD of sample data

// Construct the density estimator with the sample data and a standard deviation for the Gaussian

// kernels

val kd = new KernelDensity()

  .setSample(data)

  .setBandwidth(3.0)

// Find density estimates for the given values

val densities = kd.estimate(Array(-1.0, 2.0, 5.0))

3 分類和回歸—spark.mllib

Spark.mllib 實作以下ML問題: 兩個标簽類的分類, 多個标簽類的分類,和回歸分析。

下表列出每類問題的支援算法:

Problem Type Supported Methods
Binary Classification

linear SVMs, logistic regression, decision trees, random forests,

 gradient-boosted trees, naive Bayes

線性支援向量機,邏輯回歸,決策樹,随機森林,梯度提升決策樹,

樸素貝葉斯決策

Multiclass Classification

logistic regression, decision trees, random forests, naive Bayes

邏輯回歸,決策樹,随機森林,樸素貝葉斯決策

Regression

linear least squares, Lasso, ridge regression, decision trees, 

random forests, gradient-boosted trees, isotonic regression

線性最小二乘法,最小化的絕對收縮和選擇算子,嶺回歸,

決策樹,随機森林,梯度提升決策樹,保序回歸

3.1 線性模型 –spark.mllib

· Mathematical formulation

o Loss functions

o Regularizers

o Optimization

· Classification

o Linear Support Vector Machines (SVMs)

o Logistic regression

· Regression

o Linear least squares, Lasso, and ridge regression

o Streaming linear regression

· Implementation (developer)

3.1.1 數學公式

 許多标準機器學習問題可以轉化為凸優化問題,如, 凸函數f 的最小值是依賴于d維向量w(稱為權重向量),可以把問題轉化為:

求    \min_{w \in R^d }{ f(x) } 問題。此處f 函數形如:

F(w) = \Lamda * R(w) + frac{1,n} * \Sum|_{i=1}|^{n} {L(w;x_i,y_i)|

此處向量   x_i \in R^d是訓練測試資料,  1 <= I <= n , y_i \in R是相應的類标簽,類标簽在分類問題是需要預測的。如果方法是線性的,如果

L(w;x,y) 可以表示成w^{T} x 和 y 的函數, 下面會講解不是凸優化問題的情況。

目标函數f 有兩個點:正規化決定模型的複雜程度,損失決定模型的誤差,損失函數L(w; . , . ) 是w 的凸函數,正規化參數 \Lamda >= 0 (名為regParam ) 來權衡兩個目标:錯誤最小 和模型複雜度最低 (為了防止過拟合)。

3.1.1.1 損失函數 

下表總結損失函數,集損失函數的梯度函數

3.1.1.2 正規化

正則化可以使模型處理相對簡單,并且可以避免模型過拟合。支援以下正則化 spark.mllib

此處 sign(w) 是符号向量,每個元素是w 向量相應位置的符号函數 sign(x_i)

L2-正規化相對L1-正規化處理簡單,是因為L2的正規函數是連續光滑函數,而L1的正規函數則不是。L1正規化可以使權向量中稀少的值變得不那麼重要,使模型在特征選擇上處理更容易了解。彈性網絡(elastic net)是L1和L2正規化的組合。不建議訓練模型時不适用正則化,特别是訓練向本數很少時。

3.1.1.3 最優化

線性方法使用凸最優化方法優化目标函數。Spark.mllib使用兩種方法SGD 和L-BFGS(見最優化章節)。目前,大多數算法APIs 支援随機梯度下降(SGD)和大部分支援L-BFGS。 

3.1.2分類

分類算法的目标是把資料分門别類。最簡單分類是兩分類問題,即分成兩類(正類和負類)。如果多餘兩類,一般稱為多類别分類問題。Spark.mllib 支援兩種線性分類:線性支援向量機(SVM)和邏輯回歸。 線性SVN 隻支援兩分類,而邏輯回歸支援兩分類和多分類。這兩種算法都支援L1和L2正規化。訓練集為RDD[LabeledPoint] 在MLlib , 而類标簽為 0, 1,2,… 。 注意,數學公式中,兩分類的類标簽表示為: +1 (正類) 和 -1 (負類)。

3.1.2.1 線性支援向量機(SVM)

線性SVN是處理大多數分類問題的首選,線性方法描述見上面表達式(1),其中損失函數形如:

   L(w;x,y) :=  max{ 0,  1 – y w^t x }

預設,線性SVN訓練集需要使用L2正規化。同時支援L1正規化,在此情況下, 變成線性算法。

線性SVN算法輸出SVN模型。給定新資料點,表示為x , 模型基于w^T x 的 預測。 預設, 如果 w^T x >= 0 , 則歸為正類,否則歸為負類。

例子:

下例中展示如何加載測試資料,執行算法訓練,并預測結果與訓練集的錯誤。

Scala SVMWithSGD API : https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.mllib.classification.SVMWithSGD

Scala SVMModel API : 

https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.mllib.classification.SVMModel

import org.apache.spark.mllib.classification.{SVMModel, SVMWithSGD}

import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics

import org.apache.spark.mllib.util.MLUtils

// Load training data in LIBSVM format.

val data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")

// Split data into training (60%) and test (40%).

val splits = data.randomSplit(Array(0.6, 0.4), seed = 11L)

val training = splits(0).cache()

val test = splits(1)

// Run training algorithm to build the model

val numIterations = 100

val model = SVMWithSGD.train(training, numIterations)

// Clear the default threshold.

model.clearThreshold()

// Compute raw scores on the test set.

val scoreAndLabels = test.map { point =>

  val score = model.predict(point.features)

  (score, point.label)

}

// Get evaluation metrics.

val metrics = new BinaryClassificationMetrics(scoreAndLabels)

val auROC = metrics.areaUnderROC()

println("Area under ROC = " + auROC)

// Save and load model

model.save(sc, "myModelPath")

val sameModel = SVMModel.load(sc, "myModelPath")

SVMWitSGD.train() 方法預設使用L2正規化,且使用正規參數1.0 。如果想修改此預設,需要建立新的SVMWithSGD 執行個體,并用setter方法重新配置。其他spark.mllib算法也支援setter方法重新配置,例如,下例給出L1正規化 且SVM正規化參數位0.1 ,訓練樣本疊代200次。

import org.apache.spark.mllib.optimization.L1Updater

val svmAlg = new SVMWithSGD()

svmAlg.optimizer.

  setNumIterations(200).

  setRegParam(0.1).

  setUpdater(new L1Updater)

val modelL1 = svmAlg.run(training)

3.1.2.2 邏輯回歸

邏輯回歸廣泛用于預測兩類别分類問題。它也符合等式(1) , 并且損失函數形如:

L(w;x,y)  := log( 1 + exp(-y w^T x) )

對于兩類别分類問題,算法輸出兩類别邏輯回歸模型,給定新的測試點,記為x , 模型通過邏輯函數

F(z) = 1 / { 1 + e^(-z)} 

 此處 z = w^T x , 如果 f(w^T x) > 0.5 , 認為是正類, 否則認為是負類, 可以看到此分類方法分類和SVN不太一樣,多了一個随機函數f( ) 

兩類别分類邏輯回歸可以推廣到多類别邏輯回歸,用來處理多類别分類問題。例如,假設有K可能的輸出結果,選取其中一個作為對比值,剩下K-1個輸出值分别去和對比值做兩類别回歸。在spark.mllib , 這個對比值就是類别0 ,詳見 統計學習基礎:http://statweb.stanford.edu/~tibs/ElemStatLearn/

對于多類别分類問題,算法會輸出K-1個邏輯回歸模型,給定一個新測試點,帶入K-1個模型算出最大機率值得類别,記為預測結果。

我們實作兩個算法解決邏輯回歸:小批梯隊下降法(mini-batch gradient descent)和L-BFGS , 我們建議優先選L-BFGS,它的收斂性更快一些。

例子:

下面例子将如何加載多類别資料集,将資料集分為訓練和測試,使用LogisticRegressionWithLBFGS 做邏輯回歸。模型再用測試資料集去評估優劣。

Scala LogisticRegressionWithLBFGS API : https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.mllib.classification.LogisticRegressionWithLBFGS

Scala LogisticRegressionModel API : 

https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.mllib.classification.LogisticRegressionModel

import org.apache.spark.SparkContext

import org.apache.spark.mllib.classification.{LogisticRegressionWithLBFGS, LogisticRegressionModel}

import org.apache.spark.mllib.evaluation.MulticlassMetrics

import org.apache.spark.mllib.regression.LabeledPoint

import org.apache.spark.mllib.linalg.Vectors

import org.apache.spark.mllib.util.MLUtils

// Load training data in LIBSVM format.

val data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")

// Split data into training (60%) and test (40%).

val splits = data.randomSplit(Array(0.6, 0.4), seed = 11L)

val training = splits(0).cache()

val test = splits(1)

// Run training algorithm to build the model

val model = new LogisticRegressionWithLBFGS()

  .setNumClasses(10)

  .run(training)

// Compute raw scores on the test set.

val predictionAndLabels = test.map { case LabeledPoint(label, features) =>

  val prediction = model.predict(features)

  (prediction, label)

}

// Get evaluation metrics.

val metrics = new MulticlassMetrics(predictionAndLabels)

val precision = metrics.precision

println("Precision = " + precision)

// Save and load model

model.save(sc, "myModelPath")

val sameModel = LogisticRegressionModel.load(sc, "myModelPath")

3.1.3 回歸

3.1.3.1 線性最小二乘,Lasso , 嶺回歸

最小二乘在回歸問題中經常使用。同樣是線性算法符合公式(1) , 損失函數形為:

使用不同的正規化方法得到不同最小二乘法: 正交最小二乘法或線性最小二乘法(不适用正規化);嶺回歸使用L2正規化;Lasso使用L1正規化。對所有這些模型,平均損失(訓練集錯誤率) 1/n  \SUM|_(i=1) ^n| ( w^T x_i – y_i )^2 , 稱為均方誤差。

例子:

下例展示如何加載訓練資料,轉化成标簽點的RDD,例子使用LinearRegressionWithSGD 建構線性模型來預測類标簽。最後計算均方差錯誤來評估拟合優度。

Scala LinearRegressionWithSGD API : https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.mllib.regression.LinearRegressionWithSGD

Scala LinearRegressionModel API : https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.mllib.regression.LinearRegressionModel

import org.apache.spark.mllib.regression.LabeledPoint

import org.apache.spark.mllib.regression.LinearRegressionModel

import org.apache.spark.mllib.regression.LinearRegressionWithSGD

import org.apache.spark.mllib.linalg.Vectors

// Load and parse the data

val data = sc.textFile("data/mllib/ridge-data/lpsa.data")

val parsedData = data.map { line =>

  val parts = line.split(',')

  LabeledPoint(parts(0).toDouble, Vectors.dense(parts(1).split(' ').map(_.toDouble)))

}.cache()

// Building the model

val numIterations = 100

val model = LinearRegressionWithSGD.train(parsedData, numIterations)

// Evaluate model on training examples and compute training error

val valuesAndPreds = parsedData.map { point =>

  val prediction = model.predict(point.features)

  (point.label, prediction)

}

val MSE = valuesAndPreds.map{case(v, p) => math.pow((v - p), 2)}.mean()

println("training Mean Squared Error = " + MSE)

// Save and load model

model.save(sc, "myModelPath")

val sameModel = LinearRegressionModel.load(sc, "myModelPath")

RidgeRegressionWithSGD 和 LassoWithSGD 使用同LinearRegressionWithSGD 

為了運作上例代碼,需要檢視spark 快速指南中 Self-Contained Applications 章節(https://spark.apache.org/docs/latest/quick-start.html#self-contained-applications)

3.1.3.2 流線性回歸

當資料是以流的形式進入模型,最好選取線上回歸模型,更新資料每批生成的周期。Spark.mllib  流線性回歸暫支援正交最小二乘。除了拟合度是計算每批次資料的到,拟合度計算方法和離線是一樣。

例子

下例展示如何從檔案生成訓練資料流和測試資料流。把流資料解釋為标簽點,拟合線上線性回歸模型,預測下一個流的類标簽。

首先,引入必須的輸入資料和模型類

import org.apache.spark.mllib.linalg.Vectors

import org.apache.spark.mllib.regression.LabeledPoint

import org.apache.spark.mllib.regression.StreamingLinearRegressionWithSGD

然後生成訓練資料流和測試資料流。假設StreamingContext ssc 已經生成,詳見Spark Streaming Programming Guide (https://spark.apache.org/docs/latest/streaming-programming-guide.html#initializing)

下例中我們使用标簽點來代表訓練和測試資料,實際中建議測試資料使用無标簽向量。

val trainingData = ssc.textFileStream("/training/data/dir").map(LabeledPoint.parse).cache()

val testData = ssc.textFileStream("/testing/data/dir").map(LabeledPoint.parse)

初始化模型權重為0

val numFeatures = 3

val model = new StreamingLinearRegressionWithSGD()

    .setInitialWeights(Vectors.zeros(numFeatures))

注冊訓練資料流和測試資料流,将預測結果列印出來。

model.trainOn(trainingData)

model.predictOnValues(testData.map(lp => (lp.label, lp.features))).print()

ssc.start()

ssc.awaitTermination()

現在可以把訓練和測試資料流儲存在不同的檔案夾下。每行記錄的資料點格式( y , [ x1,x2,x3]) ,此處y 是類标簽, x1,x2,x3是特征向量。訓練資料檔案隻要儲存在/training/data/dir, 模型就會随時更新,測試資料檔案儲存在/testing/data/dir 下就會計算類标簽預測。注意,訓練資料越多,預測結果越好。

3.1.4 實作(開發者)

Spark.mllib實作了簡單分布式版本的SGD(stochastic gradient descent),這個SGD是基于(underlying) 梯度下降法。所有提供的算法接受正規化參數作為輸入(regParam) , 同時還有其他SGD的各種參數(stepSize , numIterations , miniBatchFraction ) 。 罪域每個參數,我們提供三種可能的正規化(none , L1 , L2 )

邏輯回歸,L-BFGS版本的實作基于LogisticRegressionWithLBFGS類,這個實作支援兩類别邏輯回歸和多類别邏輯回歸,而SGD隻支援兩類别邏輯回歸。盡管,L-BFGS不支援L1正規化, 單SGD隻支援L1正規化。當L1正規化不是必選是, 強烈推薦L-BFGS算法, 因為它收斂更快,比SGD算法更精确的逼近逆 Hessian 矩陣,這個Hessian 矩陣通過拟牛頓法(quasi-Newton methond)

算法的Scala 實作:

· 

SVMWithSGD

· LogisticRegressionWithLBFGS

· LogisticRegressionWithSGD

· LinearRegressionWithSGD

· RidgeRegressionWithSGD

· LassoWithSGD

Python 調用scala 實作: PythonMLLibAPI.

 3.2 樸素貝葉斯

3.3 決策樹

·  

Basic algorithm

· Node impurity and information gain

· Split candidates

· Stopping rule

·  Usage tips

· Problem specification parameters

· Stopping criteria

· Tunable parameters

· Caching and checkpointing

·  Scaling

·  Examples

· Classification

· Regression

決策樹算法常用于機器學習中分類和回歸問題,由于以下優點,決策樹得到廣泛使用:

1 處理特征分類時,對分類結果容易直覺解釋

2 容易擴充到多類情況

3 不去要對特征向量進行規整()

4 可以處理非線性問題

5 可以直覺觀察特征的比對互動過程

決策樹算法族,諸如随機森林和随機深林的提升算法在處理分類和回歸問題是效率最高。

Spark.mllib的決策樹使用連續特征和歸類特征,應用于兩類别分類和多類别分類,以及回歸問題。決策樹實作按行分片處理,最多允許分布式訓練百萬行資料。

随機森林和梯度提升樹詳見 Ensembles guide (https://spark.apache.org/docs/latest/mllib-ensembles.html)

3.3.1 基本算法

決策樹是貪婪算法,它會按二分去周遊整個特征向量空間。算法預測相同标簽類的葉節點集。每一種分片的結果都是在決策節點上,所有可能的劃分方法中選取最優的方法,選取最優的依據是資訊增益(information gain) 最大化。 換句話說,在每個決策節點,選取使 argmax(s)  IG(D,s) , 資訊增益最大化的參數s  ,此處 , IG(D,s) 是在資料集D 上應用劃分方法s 所得到的資訊增益。

3.3.1.1 節點混雜度和資訊增益

節點混雜度是測量節點上标簽集均一性。目前實作兩種分類混雜度(Gini 混雜度和熵), 一種回歸混雜度。

資訊增益不同于父節點的混雜度,以及子節點的混雜度帶權重之和。假設劃分s 将資料集D 劃分為D1  和D2 ,其中D1有N1個元素,D2有N2個元素。

資訊增益

IG(D,s) = Impurity(D) – N1/N Impurity(D1) – N2/N Impurity(D2)

3.3.1.2 拆分可選集

3.3.1.2.1 連續特征

在單機上小資料集上,給定特征向量,對連續特征的劃分備選集

spark.ml: high-level APIs for ML pipelines

·Overview: estimators, transformers and pipelines

·Extracting, transforming and selecting features

·Classification and regression

·Clustering

·Advanced topics

很多降維算法還沒有完全在spark.ml中實作,使用者可以自己把spark.mllib的實作和spark.ml中算法結合,構造自己的降維算法。

依賴

繼續閱讀