天天看點

Spark的邏輯回歸與P_R_F評估Spark的邏輯回歸與P_R_F評估

Spark的邏輯回歸與P_R_F評估

1邏輯回歸

可以使用預測2分類的場景,必須使用已經有分類的樣本,然後經過訓練,預測未分類的樣本的Lable,輸出是機率,表示一般為正的機率是好多。

輸入:

libsvn資料

樣本如下:

sample_binary_classification_data.txt在spark的目錄中有,屬性太多了就不複制了。一般這種資料是存在表中,att1,att2…attn,Lable,使用sql語句就可以轉換成libsvn格式了哈,比較簡單。

輸出:

訓練之後,得到的是邏輯回歸系數,對于任何一個未分類樣本,通過屬性和邏輯回歸系數的乘積和就是預測為正的機率。

比如,下面就是,(預測未正的機率,樣本真實的分類)

0.9999999999883085–>1.0

0.999999905633042–>1.0

5.131563021283302E-10–>0.0

具體的概念請自行查詢

2邏輯回歸的評估

評估方式使用p,r,F值這種不均勻樣本的評估方式,下面是各個名額的含義

p:精度,準确性的,越大越好

r:召回,預測正确的覆寫的樣本廣度,越大越好

F值:調和值,越大約好

但是p和r一般是相對的,一個太大,一個鐵定太小,看公式就可以明白。

ROC曲線的下的面積AUC,表示提升,增加誤差,會提升多少的準确,如果增加10%的誤差,會提升60%的準确,那麼模型是可以的,AUC越大越好(接近1),具體含義請查詢。以後會專門寫一篇評估算法文章。

3spark代碼實作(其實就是spark的exaple的列子哈,做了一些修改和注釋)

package org.wq.scala.ml
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics
import org.apache.spark.{SparkConf, SparkContext}
// $example on$
import org.apache.spark.mllib.classification.LogisticRegressionWithLBFGS
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.util.MLUtils
/**
  * Created by Administrator on 2016/10/21.
  */
object LogicRegression {

  def main(args: Array[String]): Unit = {
    //有兩個參數,第一個參數是libsvn的訓練樣本,第二個參數是儲存路勁

    if(args.length!=){
      println("請輸入兩個參數 第一個參數是libsvn的訓練樣本,第二個參數是儲存路勁")
      System.exit()
    }
    val data_path=args()
    val model_path=args()
    val conf = new SparkConf().setAppName("BinaryClassificationMetricsExample")
    val sc = new SparkContext(conf)
    // $example on$
    // Load training data in LIBSVM format
    //加載SVM檔案
    //SVM檔案格式為:Label 1:value 2:value
    //Lable隻有1和0,使用邏輯回歸必須這樣哈
    //這種格式的資料一般使用sql就可可以建構
    //RDD[LabeledPoint]
    val data = MLUtils.loadLibSVMFile(sc, data_path)

    // Split data into training (60%) and test (40%)
    val Array(training, test) = data.randomSplit(Array(, ), seed = L)
    training.cache()

    // Run training algorithm to build the model
    //LBFGS是一種優化算法,作用于梯度下降法類似
    //setNumClasses表示類标簽有2個
    val model = new LogisticRegressionWithLBFGS()
      .setNumClasses()
      .run(training)

    // Clear the prediction threshold so the model will return probabilities
    //清楚threshold,那麼模型傳回值為機率-沒怎麼看懂哈NO!
    model.clearThreshold

    // Compute raw scores on the test set
    //結果為(預測分類機率,真實分類) 一般是預測為分類為正向1的機率
    val predictionAndLabels = test.map { case LabeledPoint(label, features) =>
      val prediction = model.predict(features)
      (prediction, label)
    }
    predictionAndLabels.collect().map(x=>{
      println(x._1+"-->"+x._2)

    })
    //模型的存儲和讀取
    model.save(sc,model_path)
    //LogisticRegression.load("");

    // Instantiate metrics object
    //使用了一個BinaryClassificationMetrics來評估
    val metrics = new BinaryClassificationMetrics(predictionAndLabels)

    // Precision by threshold
    //是什麼意思呢,是邏輯回歸機率的門檻值,大于它為正(1),小于它為負(0)
    //這裡列出了所有門檻值的p,r,f值
    val precision = metrics.precisionByThreshold
    precision.foreach { case (t, p) =>
      println(s"Threshold: $t, Precision: $p")
    }

    // Recall by threshold
    val recall = metrics.recallByThreshold
    recall.foreach { case (t, r) =>
      println(s"Threshold: $t, Recall: $r")
    }

    // Precision-Recall Curve
    val PRC = metrics.pr

    // F-measure

    //the beta factor in F-Measure computation.
    //beta 表示機率的門檻值哈
    val f1Score = metrics.fMeasureByThreshold
    f1Score.foreach { case (t, f) =>
      println(s"Threshold: $t, F-score: $f, Beta = 1")
    }

    val beta = 
    val fScore = metrics.fMeasureByThreshold(beta)
    f1Score.foreach { case (t, f) =>
      println(s"Threshold: $t, F-score: $f, Beta = 0.5")
    }

    // AUPRC,精度,召回曲線下的面積
    val auPRC = metrics.areaUnderPR
    println("Area under precision-recall curve = " + auPRC)

    // Compute thresholds used in ROC and PR curves
    val thresholds = precision.map(_._1)

    // ROC Curve
    val roc = metrics.roc

    // AUROC,ROC曲線下面的面積,人稱AUC
    val auROC = metrics.areaUnderROC
    println("Area under ROC = " + auROC)
    // $example off$
  }
}
           

這個是部署到spark的版本,本地運作的版本請修改,就是修改master為local和證據一個warehouse:

val conf = new SparkConf().setAppName("BinaryClassificationMetricsExample").setMaster("local").set("spark.sql.warehouse.dir","E:/ideaWorkspace/ScalaSparkMl/spark-warehouse")

val data_path="data/mllib/sample_binary_classification_data.txt"
val model_path="data/mllib/LRModel"
           

4送出部署,預測

我的目錄

資料目錄:/home/jar/data

jar目錄:/home/jar

模型目錄:/home/jar/model

把jar上傳到master節點的/home/jar目錄下(可以上傳到任意節點),且保證salves節點的/home/jar/data目錄都有sample_binary_classification_data.txt 檔案。

接着輸入spark-submit把任務送出給叢集執行

spark-submit  --class org.wq.scala.ml.LogicRegression --master spark://master: --executor-memory m --num-executors   /home/jar/LRModel.jar  /home/jar/data/sample_binary_classification_data.txt  /home/jar/model/LRModel
           

執行結果,額太長了,複制不下來,粘貼最後一點吧。在標明那個門檻值作為模型的機率的門檻值的時候,建議檢視auc和F值,兩個值比較大的那個機率就可以比較好的哈。

Area under precision-recall curve = 
// :: INFO SparkContext: Starting job: collect at SlidingRDD.scala:
// :: INFO DAGScheduler: Got job  (collect at SlidingRDD.scala:) with  output partitions
// :: INFO DAGScheduler: Final stage: ResultStage  (collect at SlidingRDD.scala:)
// :: INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage )
// :: INFO DAGScheduler: Missing parents: List()
// :: INFO DAGScheduler: Submitting ResultStage  (MapPartitionsRDD[] at mapPartitions at SlidingRDD.scala:), which has no missing parents
// :: INFO MemoryStore: Block broadcast_46 stored as values in memory (estimated size  KB, free  MB)
// :: INFO MemoryStore: Block broadcast_46_piece0 stored as bytes in memory (estimated size  KB, free  MB)
// :: INFO BlockManagerInfo: Added broadcast_46_piece0 in memory on 192:43830 (size: 3 KB, free: 413 MB)
// :: INFO SparkContext: Created broadcast  from broadcast at DAGScheduler.scala:
// :: INFO DAGScheduler: Submitting  missing tasks from ResultStage  (MapPartitionsRDD[] at mapPartitions at SlidingRDD.scala:)
// :: INFO TaskSchedulerImpl: Adding task set  with  tasks
// :: INFO TaskSetManager: Starting task  in stage  (TID , , partition , PROCESS_LOCAL,  bytes)
// :: INFO TaskSetManager: Starting task  in stage  (TID , , partition , PROCESS_LOCAL,  bytes)
// :: INFO CoarseGrainedSchedulerBackend$DriverEndpoint: Launching task  on executor id: 1 hostname: 192.
// :: INFO CoarseGrainedSchedulerBackend$DriverEndpoint: Launching task  on executor id: 0 hostname: 192.
// :: INFO BlockManagerInfo: Added broadcast_46_piece0 in memory on 192:33302 (size: 3 KB, free: 225 MB)
// :: INFO BlockManagerInfo: Added broadcast_46_piece0 in memory on 192:48184 (size: 3 KB, free: 225 MB)
// :: INFO TaskSetManager: Starting task  in stage  (TID , , partition , PROCESS_LOCAL,  bytes)
// :: INFO CoarseGrainedSchedulerBackend$DriverEndpoint: Launching task  on executor id: 1 hostname: 192.
// :: INFO TaskSetManager: Finished task  in stage  (TID ) in  ms on 192 (1/4)
// :: INFO TaskSetManager: Starting task  in stage  (TID , , partition , PROCESS_LOCAL,  bytes)
// :: INFO CoarseGrainedSchedulerBackend$DriverEndpoint: Launching task  on executor id: 0 hostname: 192.
// :: INFO TaskSetManager: Finished task  in stage  (TID ) in  ms on 192 (2/4)
// :: INFO TaskSetManager: Finished task  in stage  (TID ) in  ms on 192 (3/4)
// :: INFO TaskSetManager: Finished task  in stage  (TID ) in  ms on 192 (4/4)
// :: INFO TaskSchedulerImpl: Removed TaskSet , whose tasks have all completed, from pool 
// :: INFO DAGScheduler: ResultStage  (collect at SlidingRDD.scala:) finished in  s
// :: INFO DAGScheduler: Job  finished: collect at SlidingRDD.scala:, took  s
// :: INFO SparkContext: Starting job: aggregate at AreaUnderCurve.scala:
// :: INFO DAGScheduler: Got job  (aggregate at AreaUnderCurve.scala:) with  output partitions
// :: INFO DAGScheduler: Final stage: ResultStage  (aggregate at AreaUnderCurve.scala:)
// :: INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage )
// :: INFO DAGScheduler: Missing parents: List()
// :: INFO DAGScheduler: Submitting ResultStage  (SlidingRDD[] at RDD at SlidingRDD.scala:), which has no missing parents
// :: INFO MemoryStore: Block broadcast_47 stored as values in memory (estimated size  KB, free  MB)
// :: INFO MemoryStore: Block broadcast_47_piece0 stored as bytes in memory (estimated size  KB, free  MB)
// :: INFO BlockManagerInfo: Added broadcast_47_piece0 in memory on 192:43830 (size: 3 KB, free: 413 MB)
// :: INFO SparkContext: Created broadcast  from broadcast at DAGScheduler.scala:
// :: INFO DAGScheduler: Submitting  missing tasks from ResultStage  (SlidingRDD[] at RDD at SlidingRDD.scala:)
// :: INFO TaskSchedulerImpl: Adding task set  with  tasks
// :: INFO TaskSetManager: Starting task  in stage  (TID , , partition , PROCESS_LOCAL,  bytes)
// :: INFO TaskSetManager: Starting task  in stage  (TID , , partition , PROCESS_LOCAL,  bytes)
// :: INFO CoarseGrainedSchedulerBackend$DriverEndpoint: Launching task  on executor id: 1 hostname: 192.
// :: INFO CoarseGrainedSchedulerBackend$DriverEndpoint: Launching task  on executor id: 0 hostname: 192.
// :: INFO BlockManagerInfo: Added broadcast_47_piece0 in memory on 192:33302 (size: 3 KB, free: 225 MB)
// :: INFO BlockManagerInfo: Added broadcast_47_piece0 in memory on 192:48184 (size: 3 KB, free: 225 MB)
// :: INFO TaskSetManager: Starting task  in stage  (TID , , partition , PROCESS_LOCAL,  bytes)
// :: INFO TaskSetManager: Finished task  in stage  (TID ) in  ms on 192 (1/3)
// :: INFO CoarseGrainedSchedulerBackend$DriverEndpoint: Launching task  on executor id: 1 hostname: 192.
// :: INFO TaskSetManager: Finished task  in stage  (TID ) in  ms on 192 (2/3)
// :: INFO TaskSetManager: Finished task  in stage  (TID ) in  ms on 192 (3/3)
// :: INFO TaskSchedulerImpl: Removed TaskSet , whose tasks have all completed, from pool 
// :: INFO DAGScheduler: ResultStage  (aggregate at AreaUnderCurve.scala:) finished in  s
// :: INFO DAGScheduler: Job  finished: aggregate at AreaUnderCurve.scala:, took  s
Area under ROC = 
// :: INFO SparkContext: Invoking stop() from shutdown hook
// :: INFO SparkUI: Stopped Spark web UI at http://:
// :: INFO StandaloneSchedulerBackend: Shutting down all executors
// :: INFO CoarseGrainedSchedulerBackend$DriverEndpoint: Asking each executor to shut down
// :: INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
// :: INFO MemoryStore: MemoryStore cleared
// :: INFO BlockManager: BlockManager stopped
// :: INFO BlockManagerMaster: BlockManagerMaster stopped
// :: INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
// :: INFO SparkContext: Successfully stopped SparkContext
// :: INFO ShutdownHookManager: Shutdown hook called
// :: INFO ShutdownHookManager: Deleting directory /tmp/spark-c96536c2-fc6-dab-b72e-b588eaedbe
           

繼續閱讀