天天看點

梯度疊代樹回歸(GBDT)算法原理及Spark MLlib調用執行個體(Scala/Java/python)

梯度疊代樹回歸

算法簡介:

        梯度提升樹是一種決策樹的內建算法。它通過反複疊代訓練決策樹來最小化損失函數。決策樹類似,梯度提升樹具有可處理類别特征、易擴充到多分類問題、不需特征縮放等性質。Spark.ml通過使用現有decision tree工具來實作。

       梯度提升樹依次疊代訓練一系列的決策樹。在一次疊代中,算法使用現有的內建來對每個訓練執行個體的類别進行預測,然後将預測結果與真實的标簽值進行比較。通過重新标記,來賦予預測結果不好的執行個體更高的權重。是以,在下次疊代中,決策樹會對先前的錯誤進行修正。

       對執行個體标簽進行重新标記的機制由損失函數來指定。每次疊代過程中,梯度疊代樹在訓練資料上進一步減少損失函數的值。spark.ml為分類問題提供一種損失函數(Log Loss),為回歸問題提供兩種損失函數(平方誤差與絕對誤差)。

       Spark.ml支援二分類以及回歸的随機森林算法,适用于連續特征以及類别特征。

*注意梯度提升樹目前不支援多分類問題。

參數:

checkpointInterval:

類型:整數型。

含義:設定檢查點間隔(>=1),或不設定檢查點(-1)。

featuresCol:

類型:字元串型。

含義:特征列名。

impurity:

類型:字元串型。

含義:計算資訊增益的準則(不區分大小寫)。

labelCol:

類型:字元串型。

含義:标簽列名。

lossType:

類型:字元串型。

含義:損失函數類型。

maxBins:

類型:整數型。

含義:連續特征離散化的最大數量,以及選擇每個節點分裂特征的方式。

maxDepth:

類型:整數型。

含義:樹的最大深度(>=0)。

maxIter:

類型:整數型。

含義:疊代次數(>=0)。

minInfoGain:

類型:雙精度型。

含義:分裂節點時所需最小資訊增益。

minInstancesPerNode:

類型:整數型。

含義:分裂後自節點最少包含的執行個體數量。

predictionCol:

類型:字元串型。

含義:預測結果列名。

seed:

類型:長整型。

含義:随機種子。

subsamplingRate:

類型:雙精度型。

含義:學習一棵決策樹使用的訓練資料比例,範圍[0,1]。

stepSize:

類型:雙精度型。

含義:每次疊代優化步長。

示例:

       下面的例子中,GBTRegressor僅疊代了一次,在實際操作中是不現實的。

Scala:

import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.evaluation.RegressionEvaluator
import org.apache.spark.ml.feature.VectorIndexer
import org.apache.spark.ml.regression.{GBTRegressionModel, GBTRegressor}

// Load and parse the data file, converting it to a DataFrame.
val data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

// Automatically identify categorical features, and index them.
// Set maxCategories so features with > 4 distinct values are treated as continuous.
val featureIndexer = new VectorIndexer()
  .setInputCol("features")
  .setOutputCol("indexedFeatures")
  .setMaxCategories(4)
  .fit(data)

// Split the data into training and test sets (30% held out for testing).
val Array(trainingData, testData) = data.randomSplit(Array(0.7, 0.3))

// Train a GBT model.
val gbt = new GBTRegressor()
  .setLabelCol("label")
  .setFeaturesCol("indexedFeatures")
  .setMaxIter(10)

// Chain indexer and GBT in a Pipeline.
val pipeline = new Pipeline()
  .setStages(Array(featureIndexer, gbt))

// Train model. This also runs the indexer.
val model = pipeline.fit(trainingData)

// Make predictions.
val predictions = model.transform(testData)

// Select example rows to display.
predictions.select("prediction", "label", "features").show(5)

// Select (prediction, true label) and compute test error.
val evaluator = new RegressionEvaluator()
  .setLabelCol("label")
  .setPredictionCol("prediction")
  .setMetricName("rmse")
val rmse = evaluator.evaluate(predictions)
println("Root Mean Squared Error (RMSE) on test data = " + rmse)

val gbtModel = model.stages(1).asInstanceOf[GBTRegressionModel]
println("Learned regression GBT model:\n" + gbtModel.toDebugString)
           

Java :

import org.apache.spark.ml.Pipeline;
import org.apache.spark.ml.PipelineModel;
import org.apache.spark.ml.PipelineStage;
import org.apache.spark.ml.evaluation.RegressionEvaluator;
import org.apache.spark.ml.feature.VectorIndexer;
import org.apache.spark.ml.feature.VectorIndexerModel;
import org.apache.spark.ml.regression.GBTRegressionModel;
import org.apache.spark.ml.regression.GBTRegressor;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

// Load and parse the data file, converting it to a DataFrame.
Dataset<Row> data = spark.read().format("libsvm").load("data/mllib/sample_libsvm_data.txt");

// Automatically identify categorical features, and index them.
// Set maxCategories so features with > 4 distinct values are treated as continuous.
VectorIndexerModel featureIndexer = new VectorIndexer()
  .setInputCol("features")
  .setOutputCol("indexedFeatures")
  .setMaxCategories(4)
  .fit(data);

// Split the data into training and test sets (30% held out for testing).
Dataset<Row>[] splits = data.randomSplit(new double[] {0.7, 0.3});
Dataset<Row> trainingData = splits[0];
Dataset<Row> testData = splits[1];

// Train a GBT model.
GBTRegressor gbt = new GBTRegressor()
  .setLabelCol("label")
  .setFeaturesCol("indexedFeatures")
  .setMaxIter(10);

// Chain indexer and GBT in a Pipeline.
Pipeline pipeline = new Pipeline().setStages(new PipelineStage[] {featureIndexer, gbt});

// Train model. This also runs the indexer.
PipelineModel model = pipeline.fit(trainingData);

// Make predictions.
Dataset<Row> predictions = model.transform(testData);

// Select example rows to display.
predictions.select("prediction", "label", "features").show(5);

// Select (prediction, true label) and compute test error.
RegressionEvaluator evaluator = new RegressionEvaluator()
  .setLabelCol("label")
  .setPredictionCol("prediction")
  .setMetricName("rmse");
double rmse = evaluator.evaluate(predictions);
System.out.println("Root Mean Squared Error (RMSE) on test data = " + rmse);

GBTRegressionModel gbtModel = (GBTRegressionModel)(model.stages()[1]);
System.out.println("Learned regression GBT model:\n" + gbtModel.toDebugString());
           

Python :

from pyspark.ml import Pipeline
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator

# Load and parse the data file, converting it to a DataFrame.
data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

# Automatically identify categorical features, and index them.
# Set maxCategories so features with > 4 distinct values are treated as continuous.
featureIndexer =\
    VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(data)

# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data.randomSplit([0.7, 0.3])

# Train a GBT model.
gbt = GBTRegressor(featuresCol="indexedFeatures", maxIter=10)

# Chain indexer and GBT in a Pipeline
pipeline = Pipeline(stages=[featureIndexer, gbt])

# Train model.  This also runs the indexer.
model = pipeline.fit(trainingData)

# Make predictions.
predictions = model.transform(testData)

# Select example rows to display.
predictions.select("prediction", "label", "features").show(5)

# Select (prediction, true label) and compute test error
evaluator = RegressionEvaluator(
    labelCol="label", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

gbtModel = model.stages[1]
print(gbtModel)  # summary only
           

繼續閱讀