梯度疊代樹回歸
算法簡介:
梯度提升樹是一種決策樹的內建算法。它通過反複疊代訓練決策樹來最小化損失函數。決策樹類似,梯度提升樹具有可處理類别特征、易擴充到多分類問題、不需特征縮放等性質。Spark.ml通過使用現有decision tree工具來實作。
梯度提升樹依次疊代訓練一系列的決策樹。在一次疊代中,算法使用現有的內建來對每個訓練執行個體的類别進行預測,然後将預測結果與真實的标簽值進行比較。通過重新标記,來賦予預測結果不好的執行個體更高的權重。是以,在下次疊代中,決策樹會對先前的錯誤進行修正。
對執行個體标簽進行重新标記的機制由損失函數來指定。每次疊代過程中,梯度疊代樹在訓練資料上進一步減少損失函數的值。spark.ml為分類問題提供一種損失函數(Log Loss),為回歸問題提供兩種損失函數(平方誤差與絕對誤差)。
Spark.ml支援二分類以及回歸的随機森林算法,适用于連續特征以及類别特征。
*注意梯度提升樹目前不支援多分類問題。
參數:
checkpointInterval:
類型:整數型。
含義:設定檢查點間隔(>=1),或不設定檢查點(-1)。
featuresCol:
類型:字元串型。
含義:特征列名。
impurity:
類型:字元串型。
含義:計算資訊增益的準則(不區分大小寫)。
labelCol:
類型:字元串型。
含義:标簽列名。
lossType:
類型:字元串型。
含義:損失函數類型。
maxBins:
類型:整數型。
含義:連續特征離散化的最大數量,以及選擇每個節點分裂特征的方式。
maxDepth:
類型:整數型。
含義:樹的最大深度(>=0)。
maxIter:
類型:整數型。
含義:疊代次數(>=0)。
minInfoGain:
類型:雙精度型。
含義:分裂節點時所需最小資訊增益。
minInstancesPerNode:
類型:整數型。
含義:分裂後自節點最少包含的執行個體數量。
predictionCol:
類型:字元串型。
含義:預測結果列名。
seed:
類型:長整型。
含義:随機種子。
subsamplingRate:
類型:雙精度型。
含義:學習一棵決策樹使用的訓練資料比例,範圍[0,1]。
stepSize:
類型:雙精度型。
含義:每次疊代優化步長。
示例:
下面的例子中,GBTRegressor僅疊代了一次,在實際操作中是不現實的。
Scala:
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.evaluation.RegressionEvaluator
import org.apache.spark.ml.feature.VectorIndexer
import org.apache.spark.ml.regression.{GBTRegressionModel, GBTRegressor}
// Load and parse the data file, converting it to a DataFrame.
val data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
// Automatically identify categorical features, and index them.
// Set maxCategories so features with > 4 distinct values are treated as continuous.
val featureIndexer = new VectorIndexer()
.setInputCol("features")
.setOutputCol("indexedFeatures")
.setMaxCategories(4)
.fit(data)
// Split the data into training and test sets (30% held out for testing).
val Array(trainingData, testData) = data.randomSplit(Array(0.7, 0.3))
// Train a GBT model.
val gbt = new GBTRegressor()
.setLabelCol("label")
.setFeaturesCol("indexedFeatures")
.setMaxIter(10)
// Chain indexer and GBT in a Pipeline.
val pipeline = new Pipeline()
.setStages(Array(featureIndexer, gbt))
// Train model. This also runs the indexer.
val model = pipeline.fit(trainingData)
// Make predictions.
val predictions = model.transform(testData)
// Select example rows to display.
predictions.select("prediction", "label", "features").show(5)
// Select (prediction, true label) and compute test error.
val evaluator = new RegressionEvaluator()
.setLabelCol("label")
.setPredictionCol("prediction")
.setMetricName("rmse")
val rmse = evaluator.evaluate(predictions)
println("Root Mean Squared Error (RMSE) on test data = " + rmse)
val gbtModel = model.stages(1).asInstanceOf[GBTRegressionModel]
println("Learned regression GBT model:\n" + gbtModel.toDebugString)
Java :
import org.apache.spark.ml.Pipeline;
import org.apache.spark.ml.PipelineModel;
import org.apache.spark.ml.PipelineStage;
import org.apache.spark.ml.evaluation.RegressionEvaluator;
import org.apache.spark.ml.feature.VectorIndexer;
import org.apache.spark.ml.feature.VectorIndexerModel;
import org.apache.spark.ml.regression.GBTRegressionModel;
import org.apache.spark.ml.regression.GBTRegressor;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
// Load and parse the data file, converting it to a DataFrame.
Dataset<Row> data = spark.read().format("libsvm").load("data/mllib/sample_libsvm_data.txt");
// Automatically identify categorical features, and index them.
// Set maxCategories so features with > 4 distinct values are treated as continuous.
VectorIndexerModel featureIndexer = new VectorIndexer()
.setInputCol("features")
.setOutputCol("indexedFeatures")
.setMaxCategories(4)
.fit(data);
// Split the data into training and test sets (30% held out for testing).
Dataset<Row>[] splits = data.randomSplit(new double[] {0.7, 0.3});
Dataset<Row> trainingData = splits[0];
Dataset<Row> testData = splits[1];
// Train a GBT model.
GBTRegressor gbt = new GBTRegressor()
.setLabelCol("label")
.setFeaturesCol("indexedFeatures")
.setMaxIter(10);
// Chain indexer and GBT in a Pipeline.
Pipeline pipeline = new Pipeline().setStages(new PipelineStage[] {featureIndexer, gbt});
// Train model. This also runs the indexer.
PipelineModel model = pipeline.fit(trainingData);
// Make predictions.
Dataset<Row> predictions = model.transform(testData);
// Select example rows to display.
predictions.select("prediction", "label", "features").show(5);
// Select (prediction, true label) and compute test error.
RegressionEvaluator evaluator = new RegressionEvaluator()
.setLabelCol("label")
.setPredictionCol("prediction")
.setMetricName("rmse");
double rmse = evaluator.evaluate(predictions);
System.out.println("Root Mean Squared Error (RMSE) on test data = " + rmse);
GBTRegressionModel gbtModel = (GBTRegressionModel)(model.stages()[1]);
System.out.println("Learned regression GBT model:\n" + gbtModel.toDebugString());
Python :
from pyspark.ml import Pipeline
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator
# Load and parse the data file, converting it to a DataFrame.
data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
# Automatically identify categorical features, and index them.
# Set maxCategories so features with > 4 distinct values are treated as continuous.
featureIndexer =\
VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(data)
# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data.randomSplit([0.7, 0.3])
# Train a GBT model.
gbt = GBTRegressor(featuresCol="indexedFeatures", maxIter=10)
# Chain indexer and GBT in a Pipeline
pipeline = Pipeline(stages=[featureIndexer, gbt])
# Train model. This also runs the indexer.
model = pipeline.fit(trainingData)
# Make predictions.
predictions = model.transform(testData)
# Select example rows to display.
predictions.select("prediction", "label", "features").show(5)
# Select (prediction, true label) and compute test error
evaluator = RegressionEvaluator(
labelCol="label", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)
gbtModel = model.stages[1]
print(gbtModel) # summary only