天天看点

Flink ML 机器学习算法介绍(二)-特征工程

Flink ML 机器学习算法介绍(二)-特征工程

Feature Engineering 特征工程

数据与特征工程决定了模型的上限,改进算法只不过是逼近这个上限而已。

特征工程是机器学习中非常重要的一部分,它是将原始数据转换为模型可理解的形式。一般来说,特征工程包括特征构建、特征提取和特征选择三个部分1其中,特征构建是指从原始数据中构建新的特征,例如,将时间戳转换为小时、星期几等等。特征提取是指从原始数据中提取有用的特征,例如,从文本中提取关键词。而特征选择则是指从所有的特征中选择最有用的一些特征,以便于训练模型。

特征工程是机器学习中最重要的步骤之一,需要耗费大量时间。目的是准备一个最适合机器学习算法的输入数据集,并增强机器学习模型的性能。

Flink ML 2.2版本中大幅增加了这方面的算法支持,总共有33个特征工程方面的算法,涵盖了各个方面的内容。

1、Binarizer

二值化
Flink ML 机器学习算法介绍(二)-特征工程

顾名思义,Binarizer在机器学习中是一种将连续特征值离散化的方法,通过设置阈值将数据二值化(将特征值设置为0或1)。

Binarizer算法有许多应用,其中一些应用包括将连续的特征值二值化,如在图像处理中将连续的像素值数据离散化为黑白像素值数据。 另一个应用是将多类标签转换为二进制标签的过程,通过将每个类别转换为一个二进制向量,其中只有一个元素为1,其余元素均为0.

Binarizer可以处理DenseVector、SparseVector或数值类型的连续特征。

输入

参数名称 类型 默认值 描述
inputCols Number/Vector null 需要进行二值化的Number/Vectors

输出

参数名称 类型 默认值 描述
outputCols Number/Vector null 二值化的Number/Vectors

参数

名称 默认值 类型 是否必填 描述
inputCols null String[] yes 输入列名称
outputCols null String[] yes 输出列名称
thresholds null Double[] yes 用于将连续特征二值化的阈值

示例代码:

import org.apache.flink.ml.feature.binarizer.Binarizer;
import org.apache.flink.ml.linalg.Vectors;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;


import java.util.Arrays;


/** Simple program that creates a Binarizer instance and uses it for feature engineering. */
public class BinarizerExample {
    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);


        // Generates input data.
        DataStream<Row> inputStream =
                env.fromElements(
                        Row.of(
                                1,
                                Vectors.dense(1, 2),
                                Vectors.sparse(
                                        17, new int[] {0, 3, 9}, new double[] {1.0, 2.0, 7.0})),
                        Row.of(
                                2,
                                Vectors.dense(2, 1),
                                Vectors.sparse(
                                        17, new int[] {0, 2, 14}, new double[] {5.0, 4.0, 1.0})),
                        Row.of(
                                3,
                                Vectors.dense(5, 18),
                                Vectors.sparse(
                                        17, new int[] {0, 11, 12}, new double[] {2.0, 4.0, 4.0})));


        Table inputTable = tEnv.fromDataStream(inputStream).as("f0", "f1", "f2");


        // Creates a Binarizer object and initializes its parameters.
        Binarizer binarizer =
                new Binarizer()
                        .setInputCols("f0", "f1", "f2")
                        .setOutputCols("of0", "of1", "of2")
                        .setThresholds(0.0, 0.0, 0.0);


        // Transforms input data.
        Table outputTable = binarizer.transform(inputTable)[0];


        // Extracts and displays the results.
        for (CloseableIterator<Row> it = outputTable.execute().collect(); it.hasNext(); ) {
            Row row = it.next();


            Object[] inputValues = new Object[binarizer.getInputCols().length];
            Object[] outputValues = new Object[binarizer.getInputCols().length];
            for (int i = 0; i < inputValues.length; i++) {
                inputValues[i] = row.getField(binarizer.getInputCols()[i]);
                outputValues[i] = row.getField(binarizer.getOutputCols()[i]);
            }


            System.out.printf(
                    "Input Values: %s\tOutput Values: %s\n",
                    Arrays.toString(inputValues), Arrays.toString(outputValues));
        }
    }
}           

2、Bucketizer

Flink ML 机器学习算法介绍(二)-特征工程

Bucketizer中文意思是“数据分桶”,是来源于 Spark MLlib 中的算法,用于将连续的数值型特征转换为离散型特征。例如,将人分为年龄段分为老人、中年人、青年人、儿童,将温度分为高、中、低等几个档次等等。

输入

参数名称 类型 默认值 描述
inputCols Number null 要进行数据分桶的连续特征值

输出

参数名称 类型 默认值 描述
outputCols Double null 离散化特征

参数

名称 默认值 类型 是否必填 描述
inputCols null String[] yes 输入列名称
outputCols null String[] yes 输出列名称
handleInvalid "error" Double no 处理无效条目的策略。支持的值:‘error’,‘skip’,‘keep’。
splitsArray null Double[] yes 将连续特征映射到数据桶中的分割点数组

示例代码:

import org.apache.flink.ml.common.param.HasHandleInvalid;
import org.apache.flink.ml.feature.bucketizer.Bucketizer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;


import java.util.Arrays;


/** Simple program that creates a Bucketizer instance and uses it for feature engineering. */
public class BucketizerExample {
    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);


        // Generates input data.
        DataStream<Row> inputStream = env.fromElements(Row.of(-0.5, 0.0, 1.0, 0.0));
        Table inputTable = tEnv.fromDataStream(inputStream).as("f1", "f2", "f3", "f4");


        // Creates a Bucketizer object and initializes its parameters.
        Double[][] splitsArray =
                new Double[][] {
                    new Double[] {-0.5, 0.0, 0.5},
                    new Double[] {-1.0, 0.0, 2.0},
                    new Double[] {Double.NEGATIVE_INFINITY, 10.0, Double.POSITIVE_INFINITY},
                    new Double[] {Double.NEGATIVE_INFINITY, 1.5, Double.POSITIVE_INFINITY}
                };
        Bucketizer bucketizer =
                new Bucketizer()
                        .setInputCols("f1", "f2", "f3", "f4")
                        .setOutputCols("o1", "o2", "o3", "o4")
                        .setSplitsArray(splitsArray)
                        .setHandleInvalid(HasHandleInvalid.SKIP_INVALID);


        // Uses the Bucketizer object for feature transformations.
        Table outputTable = bucketizer.transform(inputTable)[0];


        // Extracts and displays the results.
        for (CloseableIterator<Row> it = outputTable.execute().collect(); it.hasNext(); ) {
            Row row = it.next();


            double[] inputValues = new double[bucketizer.getInputCols().length];
            double[] outputValues = new double[bucketizer.getInputCols().length];
            for (int i = 0; i < inputValues.length; i++) {
                inputValues[i] = (double) row.getField(bucketizer.getInputCols()[i]);
                outputValues[i] = (double) row.getField(bucketizer.getOutputCols()[i]);
            }


            System.out.printf(
                    "Input Values: %s\tOutput Values: %s\n",
                    Arrays.toString(inputValues), Arrays.toString(outputValues));
        }
    }
}

           

3、CountVectorizer(计数向量化器)

CountVectorizer是属于常见的特征数值计算类,是一个文本特征提取方法。对于每一个训练文本,它只考虑每种词汇在该训练文本中出现的频率。

CountVectorizer可以将文本转换为向量,以便进行机器学习和自然语言处理。它可以用于提取文本中的特征,例如单词计数,以便进行分类和聚类。

输入

参数名称 类型 默认值 描述
inputCols String[] "input" 字符串数组

输出

参数名称 类型 默认值 描述
outputCols SparseVector "output" 标记计数的向量

参数

名称 默认值 类型 是否必填 描述
inputCols "input" String no 输入列名称
outputCols "output" String no 输出列名称
minTF 1.0 Double no 过滤以忽略文档中的罕见单词。对于每个文档,忽略出现次数/计数小于给定阈值的术语。如果这是一个大于等于1的整数,则指定了计数(术语必须在文档中出现的次数);如果这是[0,1)中的双倍,则指定了分数(文档的标记计数)
binary false Boolean no 二进制切换以控制输出向量值。如果为True,则将所有非零计数(应用minTF过滤器后)设置为1.0。
vocabularySize 2^18 Interger no 词汇表的最大值大小。CountVectorizer将构建一个词汇表,该词汇表仅考虑按语料库中的术语频率排序的前vocabulary size个术语。
minDF 1.0 Double no 指定术语必须出现在不同文档中的最小数量,才能包含在词汇表中。如果这是大于等于1的整数,则指定术语必须出现在的文档数;如果这是[0,1)中的双倍,则指定文档的分数。
maxDF 2^63 - 1 Double no 指定术语必须出现在不同文档中的最大数量,才能包含在词汇表中。出现超过阈值的术语将被忽略。如果这是大于等于1的整数,则指定术语可能出现在的文档数的最大值;如果这是[0,1)中的双倍,则指定术语可能出现在的文档分数的最大值。

示例代码:

import org.apache.flink.ml.feature.countvectorizer.CountVectorizer;
import org.apache.flink.ml.feature.countvectorizer.CountVectorizerModel;
import org.apache.flink.ml.linalg.SparseVector;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;


import java.util.Arrays;


/**
 * Simple program that trains a {@link CountVectorizer} model and uses it for feature engineering.
 */
public class CountVectorizerExample {


    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);


        // Generates input training and prediction data.
        DataStream<Row> dataStream =
                env.fromElements(
                        Row.of((Object) new String[] {"a", "c", "b", "c"}),
                        Row.of((Object) new String[] {"c", "d", "e"}),
                        Row.of((Object) new String[] {"a", "b", "c"}),
                        Row.of((Object) new String[] {"e", "f"}),
                        Row.of((Object) new String[] {"a", "c", "a"}));
        Table inputTable = tEnv.fromDataStream(dataStream).as("input");


        // Creates an CountVectorizer object and initialize its parameters
        CountVectorizer countVectorizer = new CountVectorizer();


        // Trains the CountVectorizer model
        CountVectorizerModel model = countVectorizer.fit(inputTable);


        // Uses the CountVectorizer model for predictions.
        Table outputTable = model.transform(inputTable)[0];


        // Extracts and displays the results.
        for (CloseableIterator<Row> it = outputTable.execute().collect(); it.hasNext(); ) {
            Row row = it.next();
            String[] inputValue = (String[]) row.getField(countVectorizer.getInputCol());
            SparseVector outputValue = (SparseVector) row.getField(countVectorizer.getOutputCol());
            System.out.printf(
                    "Input Value: %-15s \tOutput Value: %s\n",
                    Arrays.toString(inputValue), outputValue.toString());
        }
    }
}           

4、DCT

DCT算法是一种特征工程算法,它可以将1D实向量的离散余弦变换应用于特征提取。在图像处理中,DCT算法可以用于压缩图像,而在音频处理中,DCT算法可以用于压缩音频。在机器学习中,DCT算法可以用于特征提取和数据压缩。

DCT算法的应用场景包括信号处理、数据压缩、图像处理和音频处理,在信号处理中,DCT算法可以用于降噪和滤波。在数据压缩中,可以用于压缩图像和音频。在图像处理中,可以用于图像压缩、图像增强和图像分析。 在音频处理中,可以用于音频压缩、音频增强和音频分析。

输入

参数名称 类型 默认值 描述
inputCols Vector "input" Input vector to be cosine transformed.

输出

参数名称 类型 默认值 描述
outputCols Vector "output" Cosine transformed output vector.

参数

名称 默认值 类型 是否必填 描述
inputCols null String[] no 输入列名称
outputCols null String[] no 输出列名称
inverse false Boolean no 这是一个关于DCT的参数,它指示是否执行反向DCT(true)或正向DCT(false)

示例代码:

import org.apache.flink.ml.feature.dct.DCT;
import org.apache.flink.ml.linalg.Vector;
import org.apache.flink.ml.linalg.Vectors;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;


import java.util.Arrays;
import java.util.List;


/** Simple program that creates a DCT instance and uses it for feature engineering. */
public class DCTExample {
    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);


        // Generates input data.
        List<Vector> inputData =
                Arrays.asList(
                        Vectors.dense(1.0, 1.0, 1.0, 1.0), Vectors.dense(1.0, 0.0, -1.0, 0.0));
        Table inputTable = tEnv.fromDataStream(env.fromCollection(inputData)).as("input");


        // Creates a DCT object and initializes its parameters.
        DCT dct = new DCT();


        // Uses the DCT object for feature transformations.
        Table outputTable = dct.transform(inputTable)[0];


        // Extracts and displays the results.
        for (CloseableIterator<Row> it = outputTable.execute().collect(); it.hasNext(); ) {
            Row row = it.next();


            Vector inputValue = row.getFieldAs(dct.getInputCol());
            Vector outputValue = row.getFieldAs(dct.getOutputCol());


            System.out.printf("Input Value: %s\tOutput Value: %s\n", inputValue, outputValue);
        }
    }
}           

5、ElementwiseProduct

ElementwiseProduct是来源于Spark MLlib中的算法,它执行每个输入向量与提供的“权重”向量的Hadamard积(Hadamard product是矩阵的一种运算,它是将两个矩阵中对应位置的元素相乘得到一个新的矩阵)的输出。ElementwiseProduct函数使用Hadamard乘积将每个输入向量与给定缩放向量相乘。如果输入向量的大小不等于缩放向量的大小,则转换器将抛出IllegalArgumentException异常。

ElementwiseProduct 在机器学习中的应用场景比较广泛,主要用于特征交叉,将两个特征向量进行点乘得到新的特征向量。

例如,在推荐系统中,可以使用 ElementwiseProduct 将用户的历史行为和物品的属性进行交叉,得到新的特征向量,从而提高推荐系统的准确性。

输入

参数名称 类型 默认值 描述
inputCols Vector "input" 需要进行缩放的特征

输出

参数名称 类型 默认值 描述
outputCols Vector "output" 缩放过后的特征

参数

名称 默认值 类型 是否必填 描述
inputCols null String no 输入列名称
outputCols null String no 输出列名称
scalingVec null String yes 缩放向量

示例代码:

import org.apache.flink.ml.feature.elementwiseproduct.ElementwiseProduct;
import org.apache.flink.ml.linalg.Vector;
import org.apache.flink.ml.linalg.Vectors;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;


/**
 * Simple program that creates an ElementwiseProduct instance and uses it for feature engineering.
 */
public class ElementwiseProductExample {
    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);


        // Generates input data.
        DataStream<Row> inputStream =
                env.fromElements(
                        Row.of(0, Vectors.dense(1.1, 3.2)), Row.of(1, Vectors.dense(2.1, 3.1)));


        Table inputTable = tEnv.fromDataStream(inputStream).as("id", "vec");


        // Creates an ElementwiseProduct object and initializes its parameters.
        ElementwiseProduct elementwiseProduct =
                new ElementwiseProduct()
                        .setInputCol("vec")
                        .setOutputCol("outputVec")
                        .setScalingVec(Vectors.dense(1.1, 1.1));


        // Transforms input data.
        Table outputTable = elementwiseProduct.transform(inputTable)[0];


        // Extracts and displays the results.
        for (CloseableIterator<Row> it = outputTable.execute().collect(); it.hasNext(); ) {
            Row row = it.next();
            Vector inputValue = (Vector) row.getField(elementwiseProduct.getInputCol());
            Vector outputValue = (Vector) row.getField(elementwiseProduct.getOutputCol());
            System.out.printf("Input Value: %s \tOutput Value: %s\n", inputValue, outputValue);
        }
    }
}           

继续阅读