天天看點

Spark0.9分布式運作MLlib的二進制分類算法

MLlib是的Spark實作一些常見的機器學習(ML)的功能并且提供一些相關的測試和資料生成器的功能。 MLlib目前支援4種常見的類型的機器學習問題的設定,即,二進制分類,回歸,聚類和協同過濾,以及一個原始梯度下降優化。這個指南将概述在MLlib所支援的功能,并且還提供了如何調用MLlib的一些例子。

依賴庫

MLlib使用jblas線性代數庫,它本身取決于本地Fortran程式。如果不是已經安裝在你的節點,你可能需要安裝gfortran運作時庫。如果不能自動檢測到這些庫,MLlib将抛出一個連結錯誤。

使用MLlib在Python中,您将需要安裝1.7或更新版本的NumPy和Python 2.7。

二進制分類含義

二進制分類是一種監督學習算法問題,我們想将實體為兩種不同的類别或标簽,如,預測郵件是否是垃圾郵件。這個問題涉及到通過執行一組打标簽的資料集來進行學習的算法,比如,一組通過(數值)來代表特性以及分類标簽的資料實體。算法會傳回訓練模型,該訓練模型可以預測那些未知标簽的新實體的标簽。

MLlib目前支援兩種标準的二進制分類模型,即線性支援向量機(向量機)和邏輯回歸以及對于每個算法模型的随機變量的L1和L2規則化算法。所有的算法都會利用原始梯度下降訓練算法的(在下面描述),并采取作為輸入正則化參數(regParam)以及各種數與梯度下降相關的參數(stepSize,numIterations miniBatchFraction)。

可用的二進制分類算法:

SVMWithSGD

LogisticRegressionWithSGD

scala模闆建立

為了讓sbt正确工作,我們需要正确放置SimpleApp位置。scala和simple.scalat根據下面典型的目錄結構來放置。一旦建立好了模闆,我們可以通過編譯來建立一個JAR包,它包含應用程式的代碼,然後使用sbt/sbt run來執行我們編寫的程式。

find .
./scala/sample/scala/lib
./scala/sample/scala/lib/spark-assembly_2.10-0.9.0-incubating-hadoop2.2.0.jar
./scala/sbt
./scala/sbt/sbt-launch-1.jar
./scala/sbt/sbt
./scala/src
./scala/src/main
./scala/src/main/scala
./scala/src/main/scala/SimpleApp.scala
           

二進制分類代碼

下面的代碼片段說明了如何加載一個樣本資料集,對訓練資料集執行訓練算法,在代碼中我們使用靜态對象,并計算出實際結果和預測模型計算訓練結果之間的誤差。

import org.apache.spark.SparkContext
import org.apache.spark.mllib.classification.SVMWithSGD
import org.apache.spark.mllib.regression.LabeledPoint

object SimpleApp {
  def main(args: Array[String]) {
    val sc = new SparkContext("spark://192.168.159.129:7077", "Simple App", "/root/spark-0.9",
      List("target/scala-2.10/scala_2.10-0.1-SNAPSHOT.jar"))
    val data = sc.textFile("hdfs://master:9000/mllib/sample_svm_data.txt")
    val parsedData = data.map { line =>
    val parts = line.split(' ')
    LabeledPoint(parts(0).toDouble, parts.tail.map(x => x.toDouble).toArray)
    }
  
    // Run training algorithm to build the model
    val numIterations = 20
    val model = SVMWithSGD.train(parsedData, numIterations)
    
    // Evaluate model on training examples and compute training error
    val labelAndPreds = parsedData.map { point =>
      val prediction = model.predict(point.features)
      (point.label, prediction)
    }
    val trainErr = labelAndPreds.filter(r => r._1 != r._2).count.toDouble / parsedData.count
    println("Training Error = " + trainErr)
    }
}
           

執行結果

[[email protected] spark-0.9]# cd /root/sample/scala
[[email protected] scala]# sbt/sbt package run
[info] Set current project to scala (in build file:/root/sample/scala/)
[info] Compiling 1 Scala source to /root/sample/scala/target/scala-2.10/classes...
[info] Packaging /root/sample/scala/target/scala-2.10/scala_2.10-0.1-SNAPSHOT.jar ...
[info] Done packaging.
[success] Total time: 15 s, completed Feb 10, 2014 11:27:51 PM
[info] Running SimpleApp
log4j:WARN No appenders could be found for logger (akka.event.slf4j.Slf4jLogger).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
14/02/10 23:27:54 INFO SparkEnv: Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
14/02/10 23:27:54 INFO SparkEnv: Registering BlockManagerMaster
14/02/10 23:27:54 INFO DiskBlockManager: Created local directory at /tmp/spark-local-20140210232754-e3bb
14/02/10 23:27:54 INFO MemoryStore: MemoryStore started with capacity 580.0 MB.
14/02/10 23:27:54 INFO ConnectionManager: Bound socket to port 48916 with id = ConnectionManagerId(master,48916)
14/02/10 23:27:54 INFO BlockManagerMaster: Trying to register BlockManager
14/02/10 23:27:54 INFO BlockManagerMasterActor$BlockManagerInfo: Registering block manager master:48916 with 580.0 MB RAM
14/02/10 23:27:54 INFO BlockManagerMaster: Registered BlockManager
14/02/10 23:27:54 INFO HttpServer: Starting HTTP Server
14/02/10 23:27:55 INFO HttpBroadcast: Broadcast server started at http://192.168.159.129:49765
14/02/10 23:27:55 INFO SparkEnv: Registering MapOutputTracker
14/02/10 23:27:55 INFO HttpFileServer: HTTP File server directory is /tmp/spark-b309992e-6b24-4823-9ce7-68ff0ee6ec1a
14/02/10 23:27:55 INFO HttpServer: Starting HTTP Server
14/02/10 23:27:56 INFO SparkUI: Started Spark Web UI at http://master:4040
14/02/10 23:27:56 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
14/02/10 23:27:56 INFO SparkContext: Added JAR target/scala-2.10/scala_2.10-0.1-SNAPSHOT.jar at http://192.168.159.129:35769/jars/scala_2.10-0.1-SNAPSHOT.jar with timestamp 1392046076889
14/02/10 23:27:56 INFO AppClient$ClientActor: Connecting to master spark://192.168.159.129:7077...
14/02/10 23:27:57 WARN SizeEstimator: Failed to check whether UseCompressedOops is set; assuming yes
14/02/10 23:27:57 INFO MemoryStore: ensureFreeSpace(132636) called with curMem=0, maxMem=608187187
14/02/10 23:27:57 INFO MemoryStore: Block broadcast_0 stored as values to memory (estimated size 129.5 KB, free 579.9 MB)
14/02/10 23:27:58 INFO SparkDeploySchedulerBackend: Connected to Spark cluster with app ID app-20140210232758-0007
14/02/10 23:27:58 INFO AppClient$ClientActor: Executor added: app-20140210232758-0007/0 on worker-20140210205103-slaver01-37106 (slaver01:37106) with 1 cores
14/02/10 23:27:58 INFO SparkDeploySchedulerBackend: Granted executor ID app-20140210232758-0007/0 on hostPort slaver01:37106 with 1 cores, 512.0 MB RAM
14/02/10 23:27:58 INFO AppClient$ClientActor: Executor added: app-20140210232758-0007/1 on worker-20140210205049-slaver02-48689 (slaver02:48689) with 1 cores
9:16 INFO DAGScheduler: Final stage: Stage 15 (reduce at GradientDescent.scala:150)
14/02/10 23:29:16 INFO DAGScheduler: Parents of final stage: List()
14/02/10 23:29:16 INFO DAGScheduler: Missing parents: List()
14/02/10 23:29:16 INFO DAGScheduler: Submitting Stage 15 (MappedRDD[30] at map at GradientDescent.scala:145), which has no missing parents
14/02/10 23:29:16 INFO DAGScheduler: Submitting 2 missing tasks from Stage 15 (MappedRDD[30] at map at GradientDescent.scala:145)
14/02/10 23:29:16 INFO TaskSchedulerImpl: Adding task set 15.0 with 2 tasks
14/02/10 23:29:16 INFO TaskSetManager: Starting task 15.0:0 as TID 28 on executor 1: slaver02 (NODE_LOCAL)
14/02/10 23:29:16 INFO TaskSetManager: Serialized task 15.0:0 as 2501 bytes in 0 ms
14/02/10 23:29:16 INFO TaskSetManager: Starting task 15.0:1 as TID 29 on executor 0: slaver01 (NODE_LOCAL)
14/02/10 23:29:16 INFO TaskSetManager: Serialized task 15.0:1 as 2501 bytes in 0 ms
14/02/10 23:29:16 INFO TaskSetManager: Finished TID 29 in 59 ms on slaver01 (progress: 0/2)
14/02/10 23:29:16 INFO DAGScheduler: Completed ResultTask(15, 1)
14/02/10 23:29:16 INFO TaskSetManager: Finished TID 28 in 64 ms on slaver02 (progress: 1/2)
14/02/10 23:29:16 INFO TaskSchedulerImpl: Remove TaskSet 15.0 from pool
14/02/10 23:29:16 INFO DAGScheduler: Completed ResultTask(15, 0)
14/02/10 23:29:16 INFO DAGScheduler: Stage 15 (reduce at GradientDescent.scala:150) finished in 0.062 s
14/02/10 23:29:16 INFO SparkContext: Job finished: reduce at GradientDescent.scala:150, took 0.079776485 s
14/02/10 23:29:16 INFO SparkContext: Starting job: reduce at GradientDescent.scala:150
14/02/10 23:29:16 INFO DAGScheduler: Got job 16 (reduce at GradientDescent.scala:150) with 2 output partitions (allowLocal=false)
14/02/10 23:29:16 INFO DAGScheduler: Final stage: Stage 16 (reduce at GradientDescent.scala:150)
14/02/10 23:29:16 INFO DAGScheduler: Parents of final stage: List()
14/02/10 23:29:16 INFO DAGScheduler: Missing parents: List()
14/02/10 23:29:16 INFO DAGScheduler: Submitting Stage 16 (MappedRDD[32] at map at GradientDescent.scala:145), which has no missing parents
14/02/10 23:29:16 INFO DAGScheduler: Submitting 2 missing tasks from Stage 16 (MappedRDD[32] at map at GradientDescent.scala:145)
14/02/10 23:29:16 INFO TaskSchedulerImpl: Adding task set 16.0 with 2 tasks
14/02/10 23:29:16 INFO TaskSetManager: Starting task 16.0:0 as TID 30 on executor 1: slaver02 (NODE_LOCAL)
14/02/10 23:29:16 INFO TaskSetManager: Serialized task 16.0:0 as 2504 bytes in 0 ms
14/02/10 23:29:16 INFO TaskSetManager: Starting task 16.0:1 as TID 31 on executor 0: slaver01 (NODE_LOCAL)
14/02/10 23:29:16 INFO TaskSetManager: Serialized task 16.0:1 as 2504 bytes in 0 ms
14/02/10 23:29:16 INFO TaskSetManager: Finished TID 31 in 32 ms on slaver01 (progress: 0/2)
14/02/10 23:29:16 INFO DAGScheduler: Completed ResultTask(16, 1)
14/02/10 23:29:16 INFO TaskSetManager: Finished TID 30 in 65 ms on slaver02 (progress: 1/2)
14/02/10 23:29:16 INFO DAGScheduler: Completed ResultTask(16, 0)
14/02/10 23:29:16 INFO DAGScheduler: Stage 16 (reduce at GradientDescent.scala:150) finished in 0.068 s
14/02/10 23:29:16 INFO SparkContext: Job finished: reduce at GradientDescent.scala:150, took 0.084612863 s
14/02/10 23:29:16 INFO TaskSchedulerImpl: Remove TaskSet 16.0 from pool
14/02/10 23:29:16 INFO SparkContext: Starting job: reduce at GradientDescent.scala:150
14/02/10 23:29:16 INFO DAGScheduler: Got job 17 (reduce at GradientDescent.scala:150) with 2 output partitions (allowLocal=false)
14/02/10 23:29:16 INFO DAGScheduler: Final stage: Stage 17 (reduce at GradientDescent.scala:150)
14/02/10 23:29:16 INFO DAGScheduler: Parents of final stage: List()
14/02/10 23:29:16 INFO DAGScheduler: Missing parents: List()
14/02/10 23:29:16 INFO DAGScheduler: Submitting Stage 17 (MappedRDD[34] at map at GradientDescent.scala:145), which has no missing parents
14/02/10 23:29:16 INFO DAGScheduler: Submitting 2 missing tasks from Stage 17 (MappedRDD[34] at map at GradientDescent.scala:145)
14/02/10 23:29:16 INFO TaskSchedulerImpl: Adding task set 17.0 with 2 tasks
14/02/10 23:29:16 INFO TaskSetManager: Starting task 17.0:0 as TID 32 on executor 1: slaver02 (NODE_LOCAL)
14/02/10 23:29:16 INFO TaskSetManager: Serialized task 17.0:0 as 2500 bytes in 0 ms
14/02/10 23:29:16 INFO TaskSetManager: Starting task 17.0:1 as TID 33 on executor 0: slaver01 (NODE_LOCAL)
14/02/10 23:29:16 INFO TaskSetManager: Serialized task 17.0:1 as 2500 bytes in 0 ms
14/02/10 23:29:16 INFO TaskSetManager: Finished TID 32 in 47 ms on slaver02 (progress: 0/2)
14/02/10 23:29:16 INFO DAGScheduler: Completed ResultTask(17, 0)
14/02/10 23:29:16 INFO TaskSetManager: Finished TID 33 in 75 ms on slaver01 (progress: 1/2)
14/02/10 23:29:16 INFO TaskSchedulerImpl: Remove TaskSet 17.0 from pool
14/02/10 23:29:16 INFO DAGScheduler: Completed ResultTask(17, 1)
14/02/10 23:29:16 INFO DAGScheduler: Stage 17 (reduce at GradientDescent.scala:150) finished in 0.070 s
14/02/10 23:29:16 INFO SparkContext: Job finished: reduce at GradientDescent.scala:150, took 0.084426168 s
14/02/10 23:29:16 INFO SparkContext: Starting job: reduce at GradientDescent.scala:150
14/02/10 23:29:16 INFO DAGScheduler: Got job 18 (reduce at GradientDescent.scala:150) with 2 output partitions (allowLocal=false)
14/02/10 23:29:16 INFO DAGScheduler: Final stage: Stage 18 (reduce at GradientDescent.scala:150)
14/02/10 23:29:16 INFO DAGScheduler: Parents of final stage: List()
14/02/10 23:29:16 INFO DAGScheduler: Missing parents: List()
14/02/10 23:29:16 INFO DAGScheduler: Submitting Stage 18 (MappedRDD[36] at map at GradientDescent.scala:145), which has no missing parents
14/02/10 23:29:16 INFO DAGScheduler: Submitting 2 missing tasks from Stage 18 (MappedRDD[36] at map at GradientDescent.scala:145)
14/02/10 23:29:16 INFO TaskSchedulerImpl: Adding task set 18.0 with 2 tasks
14/02/10 23:29:16 INFO TaskSetManager: Starting task 18.0:0 as TID 34 on executor 1: slaver02 (NODE_LOCAL)
14/02/10 23:29:16 INFO TaskSetManager: Serialized task 18.0:0 as 2504 bytes in 0 ms
14/02/10 23:29:16 INFO TaskSetManager: Starting task 18.0:1 as TID 35 on executor 0: slaver01 (NODE_LOCAL)
14/02/10 23:29:16 INFO TaskSetManager: Serialized task 18.0:1 as 2504 bytes in 0 ms
14/02/10 23:29:16 INFO TaskSetManager: Finished TID 34 in 40 ms on slaver02 (progress: 0/2)
14/02/10 23:29:16 INFO DAGScheduler: Completed ResultTask(18, 0)
14/02/10 23:29:16 INFO TaskSetManager: Finished TID 35 in 81 ms on slaver01 (progress: 1/2)
14/02/10 23:29:16 INFO TaskSchedulerImpl: Remove TaskSet 18.0 from pool
14/02/10 23:29:16 INFO DAGScheduler: Completed ResultTask(18, 1)
14/02/10 23:29:16 INFO DAGScheduler: Stage 18 (reduce at GradientDescent.scala:150) finished in 0.079 s
14/02/10 23:29:16 INFO SparkContext: Job finished: reduce at GradientDescent.scala:150, took 0.09669554 s
14/02/10 23:29:16 INFO SparkContext: Starting job: reduce at GradientDescent.scala:150
14/02/10 23:29:16 INFO DAGScheduler: Got job 19 (reduce at GradientDescent.scala:150) with 2 output partitions (allowLocal=false)
14/02/10 23:29:16 INFO DAGScheduler: Final stage: Stage 19 (reduce at GradientDescent.scala:150)
14/02/10 23:29:16 INFO DAGScheduler: Parents of final stage: List()
14/02/10 23:29:16 INFO DAGScheduler: Missing parents: List()
14/02/10 23:29:16 INFO DAGScheduler: Submitting Stage 19 (MappedRDD[38] at map at GradientDescent.scala:145), which has no missing parents
14/02/10 23:29:16 INFO DAGScheduler: Submitting 2 missing tasks from Stage 19 (MappedRDD[38] at map at GradientDescent.scala:145)
14/02/10 23:29:16 INFO TaskSchedulerImpl: Adding task set 19.0 with 2 tasks
14/02/10 23:29:16 INFO TaskSetManager: Starting task 19.0:0 as TID 36 on executor 1: slaver02 (NODE_LOCAL)
14/02/10 23:29:16 INFO TaskSetManager: Serialized task 19.0:0 as 2502 bytes in 0 ms
14/02/10 23:29:16 INFO TaskSetManager: Starting task 19.0:1 as TID 37 on executor 0: slaver01 (NODE_LOCAL)
14/02/10 23:29:16 INFO TaskSetManager: Serialized task 19.0:1 as 2502 bytes in 0 ms
14/02/10 23:29:16 INFO TaskSetManager: Finished TID 36 in 63 ms on slaver02 (progress: 0/2)
14/02/10 23:29:16 INFO DAGScheduler: Completed ResultTask(19, 0)
14/02/10 23:29:16 INFO TaskSetManager: Finished TID 37 in 80 ms on slaver01 (progress: 1/2)
14/02/10 23:29:16 INFO TaskSchedulerImpl: Remove TaskSet 19.0 from pool
14/02/10 23:29:16 INFO DAGScheduler: Completed ResultTask(19, 1)
14/02/10 23:29:16 INFO DAGScheduler: Stage 19 (reduce at GradientDescent.scala:150) finished in 0.076 s
14/02/10 23:29:16 INFO SparkContext: Job finished: reduce at GradientDescent.scala:150, took 0.090877223 s
14/02/10 23:29:16 INFO SparkContext: Starting job: reduce at GradientDescent.scala:150
14/02/10 23:29:16 INFO DAGScheduler: Got job 20 (reduce at GradientDescent.scala:150) with 2 output partitions (allowLocal=false)
14/02/10 23:29:16 INFO DAGScheduler: Final stage: Stage 20 (reduce at GradientDescent.scala:150)
14/02/10 23:29:16 INFO DAGScheduler: Parents of final stage: List()
14/02/10 23:29:16 INFO DAGScheduler: Missing parents: List()
14/02/10 23:29:16 INFO DAGScheduler: Submitting Stage 20 (MappedRDD[40] at map at GradientDescent.scala:145), which has no missing parents
14/02/10 23:29:16 INFO DAGScheduler: Submitting 2 missing tasks from Stage 20 (MappedRDD[40] at map at GradientDescent.scala:145)
14/02/10 23:29:16 INFO TaskSchedulerImpl: Adding task set 20.0 with 2 tasks
14/02/10 23:29:16 INFO TaskSetManager: Starting task 20.0:0 as TID 38 on executor 1: slaver02 (NODE_LOCAL)
14/02/10 23:29:16 INFO TaskSetManager: Serialized task 20.0:0 as 2499 bytes in 0 ms
14/02/10 23:29:16 INFO TaskSetManager: Starting task 20.0:1 as TID 39 on executor 0: slaver01 (NODE_LOCAL)
14/02/10 23:29:16 INFO TaskSetManager: Serialized task 20.0:1 as 2499 bytes in 0 ms
14/02/10 23:29:16 INFO TaskSetManager: Finished TID 39 in 57 ms on slaver01 (progress: 0/2)
14/02/10 23:29:16 INFO DAGScheduler: Completed ResultTask(20, 1)
14/02/10 23:29:16 INFO TaskSetManager: Finished TID 38 in 64 ms on slaver02 (progress: 1/2)
14/02/10 23:29:16 INFO TaskSchedulerImpl: Remove TaskSet 20.0 from pool
14/02/10 23:29:16 INFO DAGScheduler: Completed ResultTask(20, 0)
14/02/10 23:29:16 INFO DAGScheduler: Stage 20 (reduce at GradientDescent.scala:150) finished in 0.061 s
14/02/10 23:29:16 INFO SparkContext: Job finished: reduce at GradientDescent.scala:150, took 0.071109426 s
14/02/10 23:29:16 INFO SparkContext: Starting job: reduce at GradientDescent.scala:150
14/02/10 23:29:16 INFO DAGScheduler: Got job 21 (reduce at GradientDescent.scala:150) with 2 output partitions (allowLocal=false)
14/02/10 23:29:16 INFO DAGScheduler: Final stage: Stage 21 (reduce at GradientDescent.scala:150)
14/02/10 23:29:16 INFO DAGScheduler: Parents of final stage: List()
14/02/10 23:29:16 INFO DAGScheduler: Missing parents: List()
14/02/10 23:29:16 INFO DAGScheduler: Submitting Stage 21 (MappedRDD[42] at map at GradientDescent.scala:145), which has no missing parents
14/02/10 23:29:16 INFO DAGScheduler: Submitting 2 missing tasks from Stage 21 (MappedRDD[42] at map at GradientDescent.scala:145)
14/02/10 23:29:16 INFO TaskSchedulerImpl: Adding task set 21.0 with 2 tasks
14/02/10 23:29:16 INFO TaskSetManager: Starting task 21.0:0 as TID 40 on executor 1: slaver02 (NODE_LOCAL)
14/02/10 23:29:16 INFO TaskSetManager: Serialized task 21.0:0 as 2500 bytes in 0 ms
14/02/10 23:29:16 INFO TaskSetManager: Starting task 21.0:1 as TID 41 on executor 0: slaver01 (NODE_LOCAL)
14/02/10 23:29:16 INFO TaskSetManager: Serialized task 21.0:1 as 2500 bytes in 0 ms
14/02/10 23:29:16 INFO TaskSetManager: Finished TID 41 in 43 ms on slaver01 (progress: 0/2)
14/02/10 23:29:16 INFO DAGScheduler: Completed ResultTask(21, 1)
14/02/10 23:29:16 INFO TaskSetManager: Finished TID 40 in 55 ms on slaver02 (progress: 1/2)
14/02/10 23:29:16 INFO TaskSchedulerImpl: Remove TaskSet 21.0 from pool
14/02/10 23:29:16 INFO DAGScheduler: Completed ResultTask(21, 0)
14/02/10 23:29:16 INFO DAGScheduler: Stage 21 (reduce at GradientDescent.scala:150) finished in 0.052 s
14/02/10 23:29:16 INFO SparkContext: Job finished: reduce at GradientDescent.scala:150, took 0.0626958 s
14/02/10 23:29:16 INFO SparkContext: Starting job: reduce at GradientDescent.scala:150
14/02/10 23:29:16 INFO DAGScheduler: Got job 22 (reduce at GradientDescent.scala:150) with 2 output partitions (allowLocal=false)
14/02/10 23:29:16 INFO DAGScheduler: Final stage: Stage 22 (reduce at GradientDescent.scala:150)
14/02/10 23:29:16 INFO DAGScheduler: Parents of final stage: List()
14/02/10 23:29:16 INFO DAGScheduler: Missing parents: List()
14/02/10 23:29:16 INFO DAGScheduler: Submitting Stage 22 (MappedRDD[44] at map at GradientDescent.scala:145), which has no missing parents
14/02/10 23:29:16 INFO DAGScheduler: Submitting 2 missing tasks from Stage 22 (MappedRDD[44] at map at GradientDescent.scala:145)
14/02/10 23:29:16 INFO TaskSchedulerImpl: Adding task set 22.0 with 2 tasks
14/02/10 23:29:16 INFO TaskSetManager: Starting task 22.0:0 as TID 42 on executor 1: slaver02 (NODE_LOCAL)
14/02/10 23:29:16 INFO TaskSetManager: Serialized task 22.0:0 as 2503 bytes in 0 ms
14/02/10 23:29:16 INFO TaskSetManager: Starting task 22.0:1 as TID 43 on executor 0: slaver01 (NODE_LOCAL)
14/02/10 23:29:16 INFO TaskSetManager: Serialized task 22.0:1 as 2503 bytes in 0 ms
14/02/10 23:29:16 INFO TaskSetManager: Finished TID 43 in 44 ms on slaver01 (progress: 0/2)
14/02/10 23:29:16 INFO DAGScheduler: Completed ResultTask(22, 1)
14/02/10 23:29:16 INFO TaskSetManager: Finished TID 42 in 54 ms on slaver02 (progress: 1/2)
14/02/10 23:29:16 INFO TaskSchedulerImpl: Remove TaskSet 22.0 from pool
14/02/10 23:29:16 INFO DAGScheduler: Completed ResultTask(22, 0)
14/02/10 23:29:16 INFO DAGScheduler: Stage 22 (reduce at GradientDescent.scala:150) finished in 0.051 s
14/02/10 23:29:16 INFO SparkContext: Job finished: reduce at GradientDescent.scala:150, took 0.060071497 s
14/02/10 23:29:16 INFO GradientDescent: GradientDescent finished. Last 10 stochastic losses 1.973918565153662, 1.8255523040966746, 1.7699816024631967, 1.6469799583886178, 1.625661917005991, 1.5283113889552784, 1.5173506129512995, 1.422277398167446, 1.4154959896484256, 1.3621279370271806
14/02/10 23:29:16 INFO SVMWithSGD: Final model weights 0.14951408585149972,0.03831072711197627,0.037730161810440484,0.18505277569820583,-2.563032483490213E-4,0.07950273502493031,0.0946837869570233,0.007664328764458717,0.12219548598644159,0.12219548598644195,0.034482086651882085,0.035443622005655644,0.02700659703930399,-0.002137650963695721,0.007242361663251616,0.020208016800350677
14/02/10 23:29:16 INFO SVMWithSGD: Final model intercept 0.06977506975495361
14/02/10 23:29:16 INFO SparkContext: Starting job: count at SimpleApp.scala:24
14/02/10 23:29:16 INFO DAGScheduler: Got job 23 (count at SimpleApp.scala:24) with 2 output partitions (allowLocal=false)
14/02/10 23:29:16 INFO DAGScheduler: Final stage: Stage 23 (count at SimpleApp.scala:24)
14/02/10 23:29:16 INFO DAGScheduler: Parents of final stage: List()
14/02/10 23:29:16 INFO DAGScheduler: Missing parents: List()
14/02/10 23:29:16 INFO DAGScheduler: Submitting Stage 23 (FilteredRDD[46] at filter at SimpleApp.scala:24), which has no missing parents
14/02/10 23:29:16 INFO DAGScheduler: Submitting 2 missing tasks from Stage 23 (FilteredRDD[46] at filter at SimpleApp.scala:24)
14/02/10 23:29:16 INFO TaskSchedulerImpl: Adding task set 23.0 with 2 tasks
14/02/10 23:29:16 INFO TaskSetManager: Starting task 23.0:0 as TID 44 on executor 1: slaver02 (NODE_LOCAL)
14/02/10 23:29:16 INFO TaskSetManager: Serialized task 23.0:0 as 2164 bytes in 0 ms
14/02/10 23:29:16 INFO TaskSetManager: Starting task 23.0:1 as TID 45 on executor 0: slaver01 (NODE_LOCAL)
14/02/10 23:29:16 INFO TaskSetManager: Serialized task 23.0:1 as 2164 bytes in 0 ms
14/02/10 23:29:16 INFO TaskSetManager: Finished TID 45 in 54 ms on slaver01 (progress: 0/2)
14/02/10 23:29:16 INFO DAGScheduler: Completed ResultTask(23, 1)
14/02/10 23:29:16 INFO TaskSetManager: Finished TID 44 in 73 ms on slaver02 (progress: 1/2)
14/02/10 23:29:16 INFO TaskSchedulerImpl: Remove TaskSet 23.0 from pool
14/02/10 23:29:16 INFO DAGScheduler: Completed ResultTask(23, 0)
14/02/10 23:29:16 INFO DAGScheduler: Stage 23 (count at SimpleApp.scala:24) finished in 0.069 s
14/02/10 23:29:16 INFO SparkContext: Job finished: count at SimpleApp.scala:24, took 0.089120924 s
14/02/10 23:29:16 INFO SparkContext: Starting job: count at SimpleApp.scala:24
14/02/10 23:29:16 INFO DAGScheduler: Got job 24 (count at SimpleApp.scala:24) with 2 output partitions (allowLocal=false)
14/02/10 23:29:16 INFO DAGScheduler: Final stage: Stage 24 (count at SimpleApp.scala:24)
14/02/10 23:29:16 INFO DAGScheduler: Parents of final stage: List()
14/02/10 23:29:16 INFO DAGScheduler: Missing parents: List()
14/02/10 23:29:16 INFO DAGScheduler: Submitting Stage 24 (MappedRDD[2] at map at SimpleApp.scala:10), which has no missing parents
14/02/10 23:29:16 INFO DAGScheduler: Submitting 2 missing tasks from Stage 24 (MappedRDD[2] at map at SimpleApp.scala:10)
14/02/10 23:29:16 INFO TaskSchedulerImpl: Adding task set 24.0 with 2 tasks
14/02/10 23:29:16 INFO TaskSetManager: Starting task 24.0:0 as TID 46 on executor 1: slaver02 (NODE_LOCAL)
14/02/10 23:29:16 INFO TaskSetManager: Serialized task 24.0:0 as 1726 bytes in 0 ms
14/02/10 23:29:16 INFO TaskSetManager: Starting task 24.0:1 as TID 47 on executor 0: slaver01 (NODE_LOCAL)
14/02/10 23:29:16 INFO TaskSetManager: Serialized task 24.0:1 as 1726 bytes in 0 ms
14/02/10 23:29:16 INFO TaskSetManager: Finished TID 47 in 35 ms on slaver01 (progress: 0/2)
14/02/10 23:29:16 INFO DAGScheduler: Completed ResultTask(24, 1)
14/02/10 23:29:17 INFO TaskSetManager: Finished TID 46 in 141 ms on slaver02 (progress: 1/2)
14/02/10 23:29:17 INFO TaskSchedulerImpl: Remove TaskSet 24.0 from pool
14/02/10 23:29:17 INFO DAGScheduler: Completed ResultTask(24, 0)
14/02/10 23:29:17 INFO DAGScheduler: Stage 24 (count at SimpleApp.scala:24) finished in 0.144 s
14/02/10 23:29:17 INFO SparkContext: Job finished: count at SimpleApp.scala:24, took 0.165520408 s
Training Error = 0.4968944099378882
14/02/10 23:29:17 INFO ConnectionManager: Selector thread was interrupted!
[success] Total time: 85 s, completed Feb 10, 2014 11:29:17 PM
[[email protected] scala]#
           

預設SVMWithSGD.train()方法執行L2正規化算法,正則化參數設定為1.0。如果我們想要配置這個算法,我們可以進一步設定SVMWithSGD的屬性,可以直接通過建立一個新的SVMWithSGD對象和調用setter方法。所有其他MLlib算法都可以用這種方式來定制。例如,下面的代碼通過正則化參數設定為0.1來産生的L1正規化随機變量向量機,疊代200次來運作這個訓練算法。

import org.apache.spark.mllib.optimization.L1Updater

val svmAlg = new SVMWithSGD()
svmAlg.optimizer.setNumIterations(200)
  .setRegParam(0.1)
  .setUpdater(new L1Updater)
val modelL1 = svmAlg.run(parsedData)
           

繼續閱讀