Spark的邏輯回歸與P_R_F評估
1邏輯回歸
可以使用預測2分類的場景,必須使用已經有分類的樣本,然後經過訓練,預測未分類的樣本的Lable,輸出是機率,表示一般為正的機率是好多。
輸入:
libsvn資料
樣本如下:
sample_binary_classification_data.txt在spark的目錄中有,屬性太多了就不複制了。一般這種資料是存在表中,att1,att2…attn,Lable,使用sql語句就可以轉換成libsvn格式了哈,比較簡單。
輸出:
訓練之後,得到的是邏輯回歸系數,對于任何一個未分類樣本,通過屬性和邏輯回歸系數的乘積和就是預測為正的機率。
比如,下面就是,(預測未正的機率,樣本真實的分類)
0.9999999999883085–>1.0
0.999999905633042–>1.0
5.131563021283302E-10–>0.0
具體的概念請自行查詢
2邏輯回歸的評估
評估方式使用p,r,F值這種不均勻樣本的評估方式,下面是各個名額的含義
p:精度,準确性的,越大越好
r:召回,預測正确的覆寫的樣本廣度,越大越好
F值:調和值,越大約好
但是p和r一般是相對的,一個太大,一個鐵定太小,看公式就可以明白。
ROC曲線的下的面積AUC,表示提升,增加誤差,會提升多少的準确,如果增加10%的誤差,會提升60%的準确,那麼模型是可以的,AUC越大越好(接近1),具體含義請查詢。以後會專門寫一篇評估算法文章。
3spark代碼實作(其實就是spark的exaple的列子哈,做了一些修改和注釋)
package org.wq.scala.ml
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics
import org.apache.spark.{SparkConf, SparkContext}
// $example on$
import org.apache.spark.mllib.classification.LogisticRegressionWithLBFGS
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.util.MLUtils
/**
* Created by Administrator on 2016/10/21.
*/
object LogicRegression {
def main(args: Array[String]): Unit = {
//有兩個參數,第一個參數是libsvn的訓練樣本,第二個參數是儲存路勁
if(args.length!=){
println("請輸入兩個參數 第一個參數是libsvn的訓練樣本,第二個參數是儲存路勁")
System.exit()
}
val data_path=args()
val model_path=args()
val conf = new SparkConf().setAppName("BinaryClassificationMetricsExample")
val sc = new SparkContext(conf)
// $example on$
// Load training data in LIBSVM format
//加載SVM檔案
//SVM檔案格式為:Label 1:value 2:value
//Lable隻有1和0,使用邏輯回歸必須這樣哈
//這種格式的資料一般使用sql就可可以建構
//RDD[LabeledPoint]
val data = MLUtils.loadLibSVMFile(sc, data_path)
// Split data into training (60%) and test (40%)
val Array(training, test) = data.randomSplit(Array(, ), seed = L)
training.cache()
// Run training algorithm to build the model
//LBFGS是一種優化算法,作用于梯度下降法類似
//setNumClasses表示類标簽有2個
val model = new LogisticRegressionWithLBFGS()
.setNumClasses()
.run(training)
// Clear the prediction threshold so the model will return probabilities
//清楚threshold,那麼模型傳回值為機率-沒怎麼看懂哈NO!
model.clearThreshold
// Compute raw scores on the test set
//結果為(預測分類機率,真實分類) 一般是預測為分類為正向1的機率
val predictionAndLabels = test.map { case LabeledPoint(label, features) =>
val prediction = model.predict(features)
(prediction, label)
}
predictionAndLabels.collect().map(x=>{
println(x._1+"-->"+x._2)
})
//模型的存儲和讀取
model.save(sc,model_path)
//LogisticRegression.load("");
// Instantiate metrics object
//使用了一個BinaryClassificationMetrics來評估
val metrics = new BinaryClassificationMetrics(predictionAndLabels)
// Precision by threshold
//是什麼意思呢,是邏輯回歸機率的門檻值,大于它為正(1),小于它為負(0)
//這裡列出了所有門檻值的p,r,f值
val precision = metrics.precisionByThreshold
precision.foreach { case (t, p) =>
println(s"Threshold: $t, Precision: $p")
}
// Recall by threshold
val recall = metrics.recallByThreshold
recall.foreach { case (t, r) =>
println(s"Threshold: $t, Recall: $r")
}
// Precision-Recall Curve
val PRC = metrics.pr
// F-measure
//the beta factor in F-Measure computation.
//beta 表示機率的門檻值哈
val f1Score = metrics.fMeasureByThreshold
f1Score.foreach { case (t, f) =>
println(s"Threshold: $t, F-score: $f, Beta = 1")
}
val beta =
val fScore = metrics.fMeasureByThreshold(beta)
f1Score.foreach { case (t, f) =>
println(s"Threshold: $t, F-score: $f, Beta = 0.5")
}
// AUPRC,精度,召回曲線下的面積
val auPRC = metrics.areaUnderPR
println("Area under precision-recall curve = " + auPRC)
// Compute thresholds used in ROC and PR curves
val thresholds = precision.map(_._1)
// ROC Curve
val roc = metrics.roc
// AUROC,ROC曲線下面的面積,人稱AUC
val auROC = metrics.areaUnderROC
println("Area under ROC = " + auROC)
// $example off$
}
}
這個是部署到spark的版本,本地運作的版本請修改,就是修改master為local和證據一個warehouse:
val conf = new SparkConf().setAppName("BinaryClassificationMetricsExample").setMaster("local").set("spark.sql.warehouse.dir","E:/ideaWorkspace/ScalaSparkMl/spark-warehouse")
val data_path="data/mllib/sample_binary_classification_data.txt"
val model_path="data/mllib/LRModel"
4送出部署,預測
我的目錄
資料目錄:/home/jar/data
jar目錄:/home/jar
模型目錄:/home/jar/model
把jar上傳到master節點的/home/jar目錄下(可以上傳到任意節點),且保證salves節點的/home/jar/data目錄都有sample_binary_classification_data.txt 檔案。
接着輸入spark-submit把任務送出給叢集執行
spark-submit --class org.wq.scala.ml.LogicRegression --master spark://master: --executor-memory m --num-executors /home/jar/LRModel.jar /home/jar/data/sample_binary_classification_data.txt /home/jar/model/LRModel
執行結果,額太長了,複制不下來,粘貼最後一點吧。在標明那個門檻值作為模型的機率的門檻值的時候,建議檢視auc和F值,兩個值比較大的那個機率就可以比較好的哈。
Area under precision-recall curve =
// :: INFO SparkContext: Starting job: collect at SlidingRDD.scala:
// :: INFO DAGScheduler: Got job (collect at SlidingRDD.scala:) with output partitions
// :: INFO DAGScheduler: Final stage: ResultStage (collect at SlidingRDD.scala:)
// :: INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage )
// :: INFO DAGScheduler: Missing parents: List()
// :: INFO DAGScheduler: Submitting ResultStage (MapPartitionsRDD[] at mapPartitions at SlidingRDD.scala:), which has no missing parents
// :: INFO MemoryStore: Block broadcast_46 stored as values in memory (estimated size KB, free MB)
// :: INFO MemoryStore: Block broadcast_46_piece0 stored as bytes in memory (estimated size KB, free MB)
// :: INFO BlockManagerInfo: Added broadcast_46_piece0 in memory on 192:43830 (size: 3 KB, free: 413 MB)
// :: INFO SparkContext: Created broadcast from broadcast at DAGScheduler.scala:
// :: INFO DAGScheduler: Submitting missing tasks from ResultStage (MapPartitionsRDD[] at mapPartitions at SlidingRDD.scala:)
// :: INFO TaskSchedulerImpl: Adding task set with tasks
// :: INFO TaskSetManager: Starting task in stage (TID , , partition , PROCESS_LOCAL, bytes)
// :: INFO TaskSetManager: Starting task in stage (TID , , partition , PROCESS_LOCAL, bytes)
// :: INFO CoarseGrainedSchedulerBackend$DriverEndpoint: Launching task on executor id: 1 hostname: 192.
// :: INFO CoarseGrainedSchedulerBackend$DriverEndpoint: Launching task on executor id: 0 hostname: 192.
// :: INFO BlockManagerInfo: Added broadcast_46_piece0 in memory on 192:33302 (size: 3 KB, free: 225 MB)
// :: INFO BlockManagerInfo: Added broadcast_46_piece0 in memory on 192:48184 (size: 3 KB, free: 225 MB)
// :: INFO TaskSetManager: Starting task in stage (TID , , partition , PROCESS_LOCAL, bytes)
// :: INFO CoarseGrainedSchedulerBackend$DriverEndpoint: Launching task on executor id: 1 hostname: 192.
// :: INFO TaskSetManager: Finished task in stage (TID ) in ms on 192 (1/4)
// :: INFO TaskSetManager: Starting task in stage (TID , , partition , PROCESS_LOCAL, bytes)
// :: INFO CoarseGrainedSchedulerBackend$DriverEndpoint: Launching task on executor id: 0 hostname: 192.
// :: INFO TaskSetManager: Finished task in stage (TID ) in ms on 192 (2/4)
// :: INFO TaskSetManager: Finished task in stage (TID ) in ms on 192 (3/4)
// :: INFO TaskSetManager: Finished task in stage (TID ) in ms on 192 (4/4)
// :: INFO TaskSchedulerImpl: Removed TaskSet , whose tasks have all completed, from pool
// :: INFO DAGScheduler: ResultStage (collect at SlidingRDD.scala:) finished in s
// :: INFO DAGScheduler: Job finished: collect at SlidingRDD.scala:, took s
// :: INFO SparkContext: Starting job: aggregate at AreaUnderCurve.scala:
// :: INFO DAGScheduler: Got job (aggregate at AreaUnderCurve.scala:) with output partitions
// :: INFO DAGScheduler: Final stage: ResultStage (aggregate at AreaUnderCurve.scala:)
// :: INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage )
// :: INFO DAGScheduler: Missing parents: List()
// :: INFO DAGScheduler: Submitting ResultStage (SlidingRDD[] at RDD at SlidingRDD.scala:), which has no missing parents
// :: INFO MemoryStore: Block broadcast_47 stored as values in memory (estimated size KB, free MB)
// :: INFO MemoryStore: Block broadcast_47_piece0 stored as bytes in memory (estimated size KB, free MB)
// :: INFO BlockManagerInfo: Added broadcast_47_piece0 in memory on 192:43830 (size: 3 KB, free: 413 MB)
// :: INFO SparkContext: Created broadcast from broadcast at DAGScheduler.scala:
// :: INFO DAGScheduler: Submitting missing tasks from ResultStage (SlidingRDD[] at RDD at SlidingRDD.scala:)
// :: INFO TaskSchedulerImpl: Adding task set with tasks
// :: INFO TaskSetManager: Starting task in stage (TID , , partition , PROCESS_LOCAL, bytes)
// :: INFO TaskSetManager: Starting task in stage (TID , , partition , PROCESS_LOCAL, bytes)
// :: INFO CoarseGrainedSchedulerBackend$DriverEndpoint: Launching task on executor id: 1 hostname: 192.
// :: INFO CoarseGrainedSchedulerBackend$DriverEndpoint: Launching task on executor id: 0 hostname: 192.
// :: INFO BlockManagerInfo: Added broadcast_47_piece0 in memory on 192:33302 (size: 3 KB, free: 225 MB)
// :: INFO BlockManagerInfo: Added broadcast_47_piece0 in memory on 192:48184 (size: 3 KB, free: 225 MB)
// :: INFO TaskSetManager: Starting task in stage (TID , , partition , PROCESS_LOCAL, bytes)
// :: INFO TaskSetManager: Finished task in stage (TID ) in ms on 192 (1/3)
// :: INFO CoarseGrainedSchedulerBackend$DriverEndpoint: Launching task on executor id: 1 hostname: 192.
// :: INFO TaskSetManager: Finished task in stage (TID ) in ms on 192 (2/3)
// :: INFO TaskSetManager: Finished task in stage (TID ) in ms on 192 (3/3)
// :: INFO TaskSchedulerImpl: Removed TaskSet , whose tasks have all completed, from pool
// :: INFO DAGScheduler: ResultStage (aggregate at AreaUnderCurve.scala:) finished in s
// :: INFO DAGScheduler: Job finished: aggregate at AreaUnderCurve.scala:, took s
Area under ROC =
// :: INFO SparkContext: Invoking stop() from shutdown hook
// :: INFO SparkUI: Stopped Spark web UI at http://:
// :: INFO StandaloneSchedulerBackend: Shutting down all executors
// :: INFO CoarseGrainedSchedulerBackend$DriverEndpoint: Asking each executor to shut down
// :: INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
// :: INFO MemoryStore: MemoryStore cleared
// :: INFO BlockManager: BlockManager stopped
// :: INFO BlockManagerMaster: BlockManagerMaster stopped
// :: INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
// :: INFO SparkContext: Successfully stopped SparkContext
// :: INFO ShutdownHookManager: Shutdown hook called
// :: INFO ShutdownHookManager: Deleting directory /tmp/spark-c96536c2-fc6-dab-b72e-b588eaedbe