天天看點

Spark MLlib學習(1)-- Pipelines基本概念例子

基本概念

DataFrame

機器學習API使用來自Spark SQL的DataFrame作為資料集,它能包括多種資料類型,如文本、特征向量、标簽、預測值等。

Transformers

一個Transformers是一個能轉化一個DataFrame到另一個DataFrame的算法,例如,一個model可以轉化帶有特征的DataFrame為一個帶有預測值的DataFrame。

Transformers包括特征轉化器(feature transformers)和已訓練模型(learned models),通常實作方法 

transform(),一般通過附加上更多列的方式轉化DataFrame為另一個DataFrame。

  • 特征轉化器:讀取DataFrame的一個列,映射為另一個,輸出一個新的DataFrame,這個DataFrame附加上新的映射列。
  • 已訓練模型:讀取DataFrame的包含特征向量的列,預測特征向量的标簽,輸出預測标簽作為附加列。

Estimators

一個Estimators能通過一個DataFrame生成一個Transformer,例如,一個機器學習算法是一個Estimators,它能在DataFrame上訓練得到model。

通常實作方法fit()

Pipeline

一個Pipeline連結多個Transformers和Estimators,指定一個機器學習工作流。

例如,一個簡單的文本檔案處理需要以下步驟:

  1. 劃分檔案的文本為單詞
  2. 轉化單詞為特征向量
  3. 從特征向量和标簽中學習預測模型

這些步驟就是一個機器學習工作流,也就是Pipeline,它包含一系列

PipelineStages,并且按一定順序運作。

例子

Estimator, Transformer, and Param

import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.linalg.{Vector, Vectors}
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.sql.Row

// Prepare training data from a list of (label, features) tuples.
val training = spark.createDataFrame(Seq(
  (1.0, Vectors.dense(0.0, 1.1, 0.1)),
  (0.0, Vectors.dense(2.0, 1.0, -1.0)),
  (0.0, Vectors.dense(2.0, 1.3, 1.0)),
  (1.0, Vectors.dense(0.0, 1.2, -0.5))
)).toDF("label", "features")

// Create a LogisticRegression instance. This instance is an Estimator.
//這是一個邏輯回歸執行個體,是一個Estimator
val lr = new LogisticRegression()
// Print out the parameters, documentation, and any default values.
//列印邏輯回歸參數
println(s"LogisticRegression parameters:\n ${lr.explainParams()}\n")

// We may set parameters using setter methods.
//設定參數
lr.setMaxIter(10)
  .setRegParam(0.01)

// Learn a LogisticRegression model. This uses the parameters stored in lr.
//訓練邏輯回歸模型
val model1 = lr.fit(training)
// Since model1 is a Model (i.e., a Transformer produced by an Estimator),
// we can view the parameters it used during fit().
// This prints the parameter (name: value) pairs, where names are unique IDs for this
// LogisticRegression instance.
//列印訓練model1所用的參數
println(s"Model 1 was fit using parameters: ${model1.parent.extractParamMap}")

// We may alternatively specify parameters using a ParamMap,
// which supports several methods for specifying parameters.
//使用ParamMap制定參數
val paramMap = ParamMap(lr.maxIter -> 20)
  .put(lr.maxIter, 30)  // Specify 1 Param. This overwrites the original maxIter.
  .put(lr.regParam -> 0.1, lr.threshold -> 0.55)  // Specify multiple Params.

// One can also combine ParamMaps.
val paramMap2 = ParamMap(lr.probabilityCol -> "myProbability")  // Change output column name.
val paramMapCombined = paramMap ++ paramMap2

// Now learn a new model using the paramMapCombined parameters.
// paramMapCombined overrides all parameters set earlier via lr.set* methods.
val model2 = lr.fit(training, paramMapCombined)
println(s"Model 2 was fit using parameters: ${model2.parent.extractParamMap}")

// Prepare test data.
val test = spark.createDataFrame(Seq(
  (1.0, Vectors.dense(-1.0, 1.5, 1.3)),
  (0.0, Vectors.dense(3.0, 2.0, -0.1)),
  (1.0, Vectors.dense(0.0, 2.2, -1.5))
)).toDF("label", "features")

// Make predictions on test data using the Transformer.transform() method.
// LogisticRegression.transform will only use the 'features' column.
// Note that model2.transform() outputs a 'myProbability' column instead of the usual
// 'probability' column since we renamed the lr.probabilityCol parameter previously.
//用model2做預測
model2.transform(test)
  .select("features", "label", "myProbability", "prediction")
  .collect()
  .foreach { case Row(features: Vector, label: Double, prob: Vector, prediction: Double) =>
    println(s"($features, $label) -> prob=$prob, prediction=$prediction")
  }
           

Pipeline

import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.feature.{HashingTF, Tokenizer}
import org.apache.spark.ml.linalg.Vector
import org.apache.spark.sql.Row

// Prepare training documents from a list of (id, text, label) tuples.
val training = spark.createDataFrame(Seq(
  (0L, "a b c d e spark", 1.0),
  (1L, "b d", 0.0),
  (2L, "spark f g h", 1.0),
  (3L, "hadoop mapreduce", 0.0)
)).toDF("id", "text", "label")

// Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr.
//配置pipeline,包含三個階段:tokenizer, hashingTF, and lr.
val tokenizer = new Tokenizer()
  .setInputCol("text")
  .setOutputCol("words")
val hashingTF = new HashingTF()
  .setNumFeatures(1000)
  .setInputCol(tokenizer.getOutputCol)
  .setOutputCol("features")
val lr = new LogisticRegression()
  .setMaxIter(10)
  .setRegParam(0.001)
val pipeline = new Pipeline()
  .setStages(Array(tokenizer, hashingTF, lr))

// Fit the pipeline to training documents.
//使用pipeline訓練模型
val model = pipeline.fit(training)

// Now we can optionally save the fitted pipeline to disk
//儲存模型到磁盤
model.write.overwrite().save("/tmp/spark-logistic-regression-model")

// We can also save this unfit pipeline to disk
//儲存pipeline到磁盤
pipeline.write.overwrite().save("/tmp/unfit-lr-model")

// And load it back in during production
//從磁盤加載已儲存的model
val sameModel = PipelineModel.load("/tmp/spark-logistic-regression-model")

// Prepare test documents, which are unlabeled (id, text) tuples.
val test = spark.createDataFrame(Seq(
  (4L, "spark i j k"),
  (5L, "l m n"),
  (6L, "spark hadoop spark"),
  (7L, "apache hadoop")
)).toDF("id", "text")

// Make predictions on test documents.
model.transform(test)
  .select("id", "text", "probability", "prediction")
  .collect()
  .foreach { case Row(id: Long, text: String, prob: Vector, prediction: Double) =>
    println(s"($id, $text) --> prob=$prob, prediction=$prediction")
  }
           

繼續閱讀