Feature Engineering 特征工程
数据与特征工程决定了模型的上限,改进算法只不过是逼近这个上限而已。
特征工程是机器学习中非常重要的一部分,它是将原始数据转换为模型可理解的形式。一般来说,特征工程包括特征构建、特征提取和特征选择三个部分1其中,特征构建是指从原始数据中构建新的特征,例如,将时间戳转换为小时、星期几等等。特征提取是指从原始数据中提取有用的特征,例如,从文本中提取关键词。而特征选择则是指从所有的特征中选择最有用的一些特征,以便于训练模型。
特征工程是机器学习中最重要的步骤之一,需要耗费大量时间。目的是准备一个最适合机器学习算法的输入数据集,并增强机器学习模型的性能。
Flink ML 2.2版本中大幅增加了这方面的算法支持,总共有33个特征工程方面的算法,涵盖了各个方面的内容。
1、Binarizer
二值化
顾名思义,Binarizer在机器学习中是一种将连续特征值离散化的方法,通过设置阈值将数据二值化(将特征值设置为0或1)。
Binarizer算法有许多应用,其中一些应用包括将连续的特征值二值化,如在图像处理中将连续的像素值数据离散化为黑白像素值数据。 另一个应用是将多类标签转换为二进制标签的过程,通过将每个类别转换为一个二进制向量,其中只有一个元素为1,其余元素均为0.
Binarizer可以处理DenseVector、SparseVector或数值类型的连续特征。
输入
参数名称 | 类型 | 默认值 | 描述 |
inputCols | Number/Vector | null | 需要进行二值化的Number/Vectors |
输出
参数名称 | 类型 | 默认值 | 描述 |
outputCols | Number/Vector | null | 二值化的Number/Vectors |
参数
名称 | 默认值 | 类型 | 是否必填 | 描述 |
inputCols | null | String[] | yes | 输入列名称 |
outputCols | null | String[] | yes | 输出列名称 |
thresholds | null | Double[] | yes | 用于将连续特征二值化的阈值 |
示例代码:
import org.apache.flink.ml.feature.binarizer.Binarizer;
import org.apache.flink.ml.linalg.Vectors;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import java.util.Arrays;
/** Simple program that creates a Binarizer instance and uses it for feature engineering. */
public class BinarizerExample {
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
// Generates input data.
DataStream<Row> inputStream =
env.fromElements(
Row.of(
1,
Vectors.dense(1, 2),
Vectors.sparse(
17, new int[] {0, 3, 9}, new double[] {1.0, 2.0, 7.0})),
Row.of(
2,
Vectors.dense(2, 1),
Vectors.sparse(
17, new int[] {0, 2, 14}, new double[] {5.0, 4.0, 1.0})),
Row.of(
3,
Vectors.dense(5, 18),
Vectors.sparse(
17, new int[] {0, 11, 12}, new double[] {2.0, 4.0, 4.0})));
Table inputTable = tEnv.fromDataStream(inputStream).as("f0", "f1", "f2");
// Creates a Binarizer object and initializes its parameters.
Binarizer binarizer =
new Binarizer()
.setInputCols("f0", "f1", "f2")
.setOutputCols("of0", "of1", "of2")
.setThresholds(0.0, 0.0, 0.0);
// Transforms input data.
Table outputTable = binarizer.transform(inputTable)[0];
// Extracts and displays the results.
for (CloseableIterator<Row> it = outputTable.execute().collect(); it.hasNext(); ) {
Row row = it.next();
Object[] inputValues = new Object[binarizer.getInputCols().length];
Object[] outputValues = new Object[binarizer.getInputCols().length];
for (int i = 0; i < inputValues.length; i++) {
inputValues[i] = row.getField(binarizer.getInputCols()[i]);
outputValues[i] = row.getField(binarizer.getOutputCols()[i]);
}
System.out.printf(
"Input Values: %s\tOutput Values: %s\n",
Arrays.toString(inputValues), Arrays.toString(outputValues));
}
}
}
2、Bucketizer
Bucketizer中文意思是“数据分桶”,是来源于 Spark MLlib 中的算法,用于将连续的数值型特征转换为离散型特征。例如,将人分为年龄段分为老人、中年人、青年人、儿童,将温度分为高、中、低等几个档次等等。
输入
参数名称 | 类型 | 默认值 | 描述 |
inputCols | Number | null | 要进行数据分桶的连续特征值 |
输出
参数名称 | 类型 | 默认值 | 描述 |
outputCols | Double | null | 离散化特征 |
参数
名称 | 默认值 | 类型 | 是否必填 | 描述 |
inputCols | null | String[] | yes | 输入列名称 |
outputCols | null | String[] | yes | 输出列名称 |
handleInvalid | "error" | Double | no | 处理无效条目的策略。支持的值:‘error’,‘skip’,‘keep’。 |
splitsArray | null | Double[] | yes | 将连续特征映射到数据桶中的分割点数组 |
示例代码:
import org.apache.flink.ml.common.param.HasHandleInvalid;
import org.apache.flink.ml.feature.bucketizer.Bucketizer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import java.util.Arrays;
/** Simple program that creates a Bucketizer instance and uses it for feature engineering. */
public class BucketizerExample {
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
// Generates input data.
DataStream<Row> inputStream = env.fromElements(Row.of(-0.5, 0.0, 1.0, 0.0));
Table inputTable = tEnv.fromDataStream(inputStream).as("f1", "f2", "f3", "f4");
// Creates a Bucketizer object and initializes its parameters.
Double[][] splitsArray =
new Double[][] {
new Double[] {-0.5, 0.0, 0.5},
new Double[] {-1.0, 0.0, 2.0},
new Double[] {Double.NEGATIVE_INFINITY, 10.0, Double.POSITIVE_INFINITY},
new Double[] {Double.NEGATIVE_INFINITY, 1.5, Double.POSITIVE_INFINITY}
};
Bucketizer bucketizer =
new Bucketizer()
.setInputCols("f1", "f2", "f3", "f4")
.setOutputCols("o1", "o2", "o3", "o4")
.setSplitsArray(splitsArray)
.setHandleInvalid(HasHandleInvalid.SKIP_INVALID);
// Uses the Bucketizer object for feature transformations.
Table outputTable = bucketizer.transform(inputTable)[0];
// Extracts and displays the results.
for (CloseableIterator<Row> it = outputTable.execute().collect(); it.hasNext(); ) {
Row row = it.next();
double[] inputValues = new double[bucketizer.getInputCols().length];
double[] outputValues = new double[bucketizer.getInputCols().length];
for (int i = 0; i < inputValues.length; i++) {
inputValues[i] = (double) row.getField(bucketizer.getInputCols()[i]);
outputValues[i] = (double) row.getField(bucketizer.getOutputCols()[i]);
}
System.out.printf(
"Input Values: %s\tOutput Values: %s\n",
Arrays.toString(inputValues), Arrays.toString(outputValues));
}
}
}
3、CountVectorizer(计数向量化器)
CountVectorizer是属于常见的特征数值计算类,是一个文本特征提取方法。对于每一个训练文本,它只考虑每种词汇在该训练文本中出现的频率。
CountVectorizer可以将文本转换为向量,以便进行机器学习和自然语言处理。它可以用于提取文本中的特征,例如单词计数,以便进行分类和聚类。
输入
参数名称 | 类型 | 默认值 | 描述 |
inputCols | String[] | "input" | 字符串数组 |
输出
参数名称 | 类型 | 默认值 | 描述 |
outputCols | SparseVector | "output" | 标记计数的向量 |
参数
名称 | 默认值 | 类型 | 是否必填 | 描述 |
inputCols | "input" | String | no | 输入列名称 |
outputCols | "output" | String | no | 输出列名称 |
minTF | 1.0 | Double | no | 过滤以忽略文档中的罕见单词。对于每个文档,忽略出现次数/计数小于给定阈值的术语。如果这是一个大于等于1的整数,则指定了计数(术语必须在文档中出现的次数);如果这是[0,1)中的双倍,则指定了分数(文档的标记计数) |
binary | false | Boolean | no | 二进制切换以控制输出向量值。如果为True,则将所有非零计数(应用minTF过滤器后)设置为1.0。 |
vocabularySize | 2^18 | Interger | no | 词汇表的最大值大小。CountVectorizer将构建一个词汇表,该词汇表仅考虑按语料库中的术语频率排序的前vocabulary size个术语。 |
minDF | 1.0 | Double | no | 指定术语必须出现在不同文档中的最小数量,才能包含在词汇表中。如果这是大于等于1的整数,则指定术语必须出现在的文档数;如果这是[0,1)中的双倍,则指定文档的分数。 |
maxDF | 2^63 - 1 | Double | no | 指定术语必须出现在不同文档中的最大数量,才能包含在词汇表中。出现超过阈值的术语将被忽略。如果这是大于等于1的整数,则指定术语可能出现在的文档数的最大值;如果这是[0,1)中的双倍,则指定术语可能出现在的文档分数的最大值。 |
示例代码:
import org.apache.flink.ml.feature.countvectorizer.CountVectorizer;
import org.apache.flink.ml.feature.countvectorizer.CountVectorizerModel;
import org.apache.flink.ml.linalg.SparseVector;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import java.util.Arrays;
/**
* Simple program that trains a {@link CountVectorizer} model and uses it for feature engineering.
*/
public class CountVectorizerExample {
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
// Generates input training and prediction data.
DataStream<Row> dataStream =
env.fromElements(
Row.of((Object) new String[] {"a", "c", "b", "c"}),
Row.of((Object) new String[] {"c", "d", "e"}),
Row.of((Object) new String[] {"a", "b", "c"}),
Row.of((Object) new String[] {"e", "f"}),
Row.of((Object) new String[] {"a", "c", "a"}));
Table inputTable = tEnv.fromDataStream(dataStream).as("input");
// Creates an CountVectorizer object and initialize its parameters
CountVectorizer countVectorizer = new CountVectorizer();
// Trains the CountVectorizer model
CountVectorizerModel model = countVectorizer.fit(inputTable);
// Uses the CountVectorizer model for predictions.
Table outputTable = model.transform(inputTable)[0];
// Extracts and displays the results.
for (CloseableIterator<Row> it = outputTable.execute().collect(); it.hasNext(); ) {
Row row = it.next();
String[] inputValue = (String[]) row.getField(countVectorizer.getInputCol());
SparseVector outputValue = (SparseVector) row.getField(countVectorizer.getOutputCol());
System.out.printf(
"Input Value: %-15s \tOutput Value: %s\n",
Arrays.toString(inputValue), outputValue.toString());
}
}
}
4、DCT
DCT算法是一种特征工程算法,它可以将1D实向量的离散余弦变换应用于特征提取。在图像处理中,DCT算法可以用于压缩图像,而在音频处理中,DCT算法可以用于压缩音频。在机器学习中,DCT算法可以用于特征提取和数据压缩。
DCT算法的应用场景包括信号处理、数据压缩、图像处理和音频处理,在信号处理中,DCT算法可以用于降噪和滤波。在数据压缩中,可以用于压缩图像和音频。在图像处理中,可以用于图像压缩、图像增强和图像分析。 在音频处理中,可以用于音频压缩、音频增强和音频分析。
输入
参数名称 | 类型 | 默认值 | 描述 |
inputCols | Vector | "input" | Input vector to be cosine transformed. |
输出
参数名称 | 类型 | 默认值 | 描述 |
outputCols | Vector | "output" | Cosine transformed output vector. |
参数
名称 | 默认值 | 类型 | 是否必填 | 描述 |
inputCols | null | String[] | no | 输入列名称 |
outputCols | null | String[] | no | 输出列名称 |
inverse | false | Boolean | no | 这是一个关于DCT的参数,它指示是否执行反向DCT(true)或正向DCT(false) |
示例代码:
import org.apache.flink.ml.feature.dct.DCT;
import org.apache.flink.ml.linalg.Vector;
import org.apache.flink.ml.linalg.Vectors;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import java.util.Arrays;
import java.util.List;
/** Simple program that creates a DCT instance and uses it for feature engineering. */
public class DCTExample {
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
// Generates input data.
List<Vector> inputData =
Arrays.asList(
Vectors.dense(1.0, 1.0, 1.0, 1.0), Vectors.dense(1.0, 0.0, -1.0, 0.0));
Table inputTable = tEnv.fromDataStream(env.fromCollection(inputData)).as("input");
// Creates a DCT object and initializes its parameters.
DCT dct = new DCT();
// Uses the DCT object for feature transformations.
Table outputTable = dct.transform(inputTable)[0];
// Extracts and displays the results.
for (CloseableIterator<Row> it = outputTable.execute().collect(); it.hasNext(); ) {
Row row = it.next();
Vector inputValue = row.getFieldAs(dct.getInputCol());
Vector outputValue = row.getFieldAs(dct.getOutputCol());
System.out.printf("Input Value: %s\tOutput Value: %s\n", inputValue, outputValue);
}
}
}
5、ElementwiseProduct
ElementwiseProduct是来源于Spark MLlib中的算法,它执行每个输入向量与提供的“权重”向量的Hadamard积(Hadamard product是矩阵的一种运算,它是将两个矩阵中对应位置的元素相乘得到一个新的矩阵)的输出。ElementwiseProduct函数使用Hadamard乘积将每个输入向量与给定缩放向量相乘。如果输入向量的大小不等于缩放向量的大小,则转换器将抛出IllegalArgumentException异常。
ElementwiseProduct 在机器学习中的应用场景比较广泛,主要用于特征交叉,将两个特征向量进行点乘得到新的特征向量。
例如,在推荐系统中,可以使用 ElementwiseProduct 将用户的历史行为和物品的属性进行交叉,得到新的特征向量,从而提高推荐系统的准确性。
输入
参数名称 | 类型 | 默认值 | 描述 |
inputCols | Vector | "input" | 需要进行缩放的特征 |
输出
参数名称 | 类型 | 默认值 | 描述 |
outputCols | Vector | "output" | 缩放过后的特征 |
参数
名称 | 默认值 | 类型 | 是否必填 | 描述 |
inputCols | null | String | no | 输入列名称 |
outputCols | null | String | no | 输出列名称 |
scalingVec | null | String | yes | 缩放向量 |
示例代码:
import org.apache.flink.ml.feature.elementwiseproduct.ElementwiseProduct;
import org.apache.flink.ml.linalg.Vector;
import org.apache.flink.ml.linalg.Vectors;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
/**
* Simple program that creates an ElementwiseProduct instance and uses it for feature engineering.
*/
public class ElementwiseProductExample {
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
// Generates input data.
DataStream<Row> inputStream =
env.fromElements(
Row.of(0, Vectors.dense(1.1, 3.2)), Row.of(1, Vectors.dense(2.1, 3.1)));
Table inputTable = tEnv.fromDataStream(inputStream).as("id", "vec");
// Creates an ElementwiseProduct object and initializes its parameters.
ElementwiseProduct elementwiseProduct =
new ElementwiseProduct()
.setInputCol("vec")
.setOutputCol("outputVec")
.setScalingVec(Vectors.dense(1.1, 1.1));
// Transforms input data.
Table outputTable = elementwiseProduct.transform(inputTable)[0];
// Extracts and displays the results.
for (CloseableIterator<Row> it = outputTable.execute().collect(); it.hasNext(); ) {
Row row = it.next();
Vector inputValue = (Vector) row.getField(elementwiseProduct.getInputCol());
Vector outputValue = (Vector) row.getField(elementwiseProduct.getOutputCol());
System.out.printf("Input Value: %s \tOutput Value: %s\n", inputValue, outputValue);
}
}
}