天天看点

Kafka Connect transform初体验

可以配置Kafka Connector transforms以进行一个轻量级的消息修改。transforms可以方面的修改数据以及事件路由。

接下来我们将介绍如何配置一个transform。

首先,我们要在任务中配置transform,无论source 还是sink都可以配置transform

{

    "name": "test-source",

    "config": {

        "connector.class": "io.debezium.connector.mysql.MySqlConnector",

        "database.hostname": "localhost",

        "database.port": "3306",

        "database.user": "root",

        "database.password": "root",

        "tombstones.on.delete": "false",

        "database.server.id": "1",

        "database.server.name": "test-server",

        "database.history.kafka.topic": "test-server-history",

        "database.history.kafka.bootstrap.servers": "localhost:9092",

        "include.schema.changes": "true",

        "database.serverTimezone": "Asia/Shanghai",

        "database.driver": "com.mysql.jdbc.Driver",

        "database.history.kafka.recovery.poll.interval.ms": "3000",

        "defaultFetchSize": "1000",

        "database.tinyInt1isBit": "false",

        "snapshot.locking.mode": "none",

        "decimal.handling.mode": "string",

        "transforms": "MakeMap,InsertSource",

        "transforms.MakeMap.type":"org.apache.kafka.connect.transforms.HoistField$Value",

        "transforms.MakeMap.field":"line",

        "transforms.InsertSource.type":"org.apache.kafka.connect.transforms.InsertField$Value",

        "transforms.InsertSource.static.field":"data_source",

        "transforms.InsertSource.static.value":"test-file-source"

    }

}

这是官方配置一个transforms的例子。主要配置有transforms,这个key下面主要配置有哪些transforms,在有多个transforms的情况下,transform执行的顺序是按照配置的顺序,在本文后面我们会贴出源码来向大家说明。

官方以及第三方提供了很多transforms,在满足我们需求的情况下可以直接使用,如果不能满足我们的需求我们将需要自己开发或者是二次开发transform。

我们来介绍下官方目前有哪些transforms:

1. Cast

2. ExtractField

3. Flatten

4. HoistField

5. InsertField

6. MaskField

7. RegexRouter

8. ReplaceField

9. TimestampConverter

10. SetSchemaMetadata

11. TimestampRouter

12. ValueToKey

confluent支持的transform有:

1. Cast 使用的是Kafka的包(org.apache.kafka.connect.transforms.Cast$Key 或者是 org.apache.kafka.connect.transforms.Cast$Value)

2. Drop(io.confluent.connect.transforms.Drop$Key 或者是 io.confluent.connect.transforms.Drop$Value)

3. ExtractField 使用的是Kafka的包 org.apache.kafka.connect.transforms.ExtractField$Value or org.apache.kafka.connect.transforms.ExtractField$Key

4. ExtractTopic (io.confluent.connect.transforms.ExtractTopic$Key 或者是 io.confluent.connect.transforms.ExtractTopic$Value)

5. Filter (io.confluent.connect.transforms.Filter$Key 或者是 io.confluent.connect.transforms.Filter$Value)

6. Flatten (org.apache.kafka.connect.transforms.Flatten$Key 或者是 org.apache.kafka.connect.transforms.Flatten$Value)

7. HoistField (org.apache.kafka.connect.transforms.HoistField$Key 或者是 org.apache.kafka.connect.transforms.HoistField$Value)

8. InsertField (org.apache.kafka.connect.transforms.InsertField$Key 或者是 org.apache.kafka.connect.transforms.InsertField$Value)

9. MaskField (org.apache.kafka.connect.transforms.MaskField$Key 或者是 org.apache.kafka.connect.transforms.MaskField$Value)

10. MessageTimeStampRouter (io.confluent.connect.transforms.MessageTimestampRouter)

11. RegexRouter 使用Kafka的包 org.apache.kafka.connect.transforms.RegexRouter

12. ReplaceField (org.apache.kafka.connect.transforms.ReplaceField$Key 或者是 org.apache.kafka.connect.transforms.ReplaceField$Value)

13. SetSchemaMetadata (org.apache.kafka.connect.transforms.SetSchemaMetadata$Key 或者是 org.apache.kafka.connect.transforms.SetSchemaMetadata$Value)

14. TimestampConverter (org.apache.kafka.connect.transforms.TimestampConverter$Key 或者是org.apache.kafka.connect.transforms.TimestampConverter$Value)

15. TimestampRouter (org.apache.kafka.connect.transforms.TimestampRouter)

16. TombstoneHandler (io.confluent.connect.transforms.TombstoneHandler)

17. ValueToKey (org.apache.kafka.connect.transforms.ValueToKey)

具体使用将在后期的文档中详细说明。

在这些transform不能满足我们需求的情况下,我们只能开发自己的transform了,

开发transform主要是实现这个接口:Transformation

实现的方法有:

1. configure 因为Transformation继承了Configurable类,这里从source、sink 配置的json中获取transform的配置信息

2. apply 主要处理数据的地方,例如类型转换、数据结果转换、数据值的转换都在这里处理

3. close 因为 Transformation 继承了Closeable类

4. config 这里主要配置transform相关的配置,例如我们上面提到的:

 "transforms.MakeMap.type":"org.apache.kafka.connect.transforms.HoistField$Value",

  "transforms.MakeMap.field":"line",

这里配置的值在config值能否在transform中能获取到可以在这个方法里做处理。

接下来我们将介绍一个transform的源码:

public class RegexRouter<R extends ConnectRecord<R>> implements Transformation<R> {

    public static final String OVERVIEW_DOC = "Update the record topic using the configured regular expression and replacement string."
            + "<p/>Under the hood, the regex is compiled to a <code>java.util.regex.Pattern</code>. "
            + "If the pattern matches the input topic, <code>java.util.regex.Matcher#replaceFirst()</code> is used with the replacement string to obtain the new topic.";

    public static final ConfigDef CONFIG_DEF = new ConfigDef()
            .define(ConfigName.REGEX, ConfigDef.Type.STRING, ConfigDef.NO_DEFAULT_VALUE, new RegexValidator(), ConfigDef.Importance.HIGH,
                    "Regular expression to use for matching.")
            .define(ConfigName.REPLACEMENT, ConfigDef.Type.STRING, ConfigDef.NO_DEFAULT_VALUE, ConfigDef.Importance.HIGH,
                    "Replacement string.");

    private interface ConfigName {
        String REGEX = "regex";
        String REPLACEMENT = "replacement";
    }

    private Pattern regex;
    private String replacement;

    @Override
    public void configure(Map<String, ?> props) {
        final SimpleConfig config = new SimpleConfig(CONFIG_DEF, props);
        regex = Pattern.compile(config.getString(ConfigName.REGEX));
        replacement = config.getString(ConfigName.REPLACEMENT);
    }

    @Override
    public R apply(R record) {
        final Matcher matcher = regex.matcher(record.topic());
        if (matcher.matches()) {
            final String topic = matcher.replaceFirst(replacement);
            return record.newRecord(topic, record.kafkaPartition(), record.keySchema(), record.key(), record.valueSchema(), record.value(), record.timestamp());
        }
        return record;
    }

    @Override
    public void close() {
    }

    @Override
    public ConfigDef config() {
        return CONFIG_DEF;
    }

}      
从这个代码片段可以看出,这个transform只接受两个配置项的值:
public static final ConfigDef CONFIG_DEF = new ConfigDef()
        .define(ConfigName.REGEX, ConfigDef.Type.STRING, ConfigDef.NO_DEFAULT_VALUE, new RegexValidator(), ConfigDef.Importance.HIGH,
                "Regular expression to use for matching.")
        .define(ConfigName.REPLACEMENT, ConfigDef.Type.STRING, ConfigDef.NO_DEFAULT_VALUE, ConfigDef.Importance.HIGH,
                "Replacement string.");      
一个是ConfigName.REGEX(replacement), 另一个是ConfigName.REPLACEMENT(replacement)

从配置configure方法中可以看出根据我们配置的CONFIG_DEF获取其对于配置项的值

public void configure(Map<String, ?> props) {
    final SimpleConfig config = new SimpleConfig(CONFIG_DEF, props);
    regex = Pattern.compile(config.getString(ConfigName.REGEX));
    replacement = config.getString(ConfigName.REPLACEMENT);
}      

而在这个例子中close方法是空的,说明这边在调用close方法的时候没有需要执行的,例如不需要释放资源、 不需要关闭连接等。

接下来我们将说这个transform中的重点了:apply方法,其主要思路是获取这条数据(record)的topic,然后根据我们配置的regex去匹配,如果能匹配上,将对topic的值替换成我们配置的值,并重新生成新的数据(record);如果没有匹配上则返回其原始值。

@Override
public R apply(R record) {
    final Matcher matcher = regex.matcher(record.topic());
    if (matcher.matches()) {
        final String topic = matcher.replaceFirst(replacement);
        return record.newRecord(topic, record.kafkaPartition(), record.keySchema(), record.key(), record.valueSchema(), record.value(), record.timestamp());
    }
    return record;
}      

自此一个tranfrom的处理就完成了,会继续往下传,如果有多个transfrom将会一一执行。我们来看下transform在Kafka Connet运行的时候是如何执行的。

源码在 org.apache.kafka.connect.runtime.WorkerSourceTask#sendRecords 发送记录给下游的时候调用

在发送每条数据(record)的时候会调用:final SourceRecord record = transformationChain.apply(preTransformRecord);

而在apply方法的实现是:

public R apply(R record) {
    if (transformations.isEmpty()) return record;

    for (Transformation<R> transformation : transformations) {
        record = transformation.apply(record);
        if (record == null) break;
    }

    return record;
}      

代码简洁,如果我们没有配置transform直接原记录返回,如果有配置transform,会按照顺序,对数据(record)应用一次transform,然后返回结果。

transform配置项的获取源码:

public <R extends ConnectRecord<R>> List<Transformation<R>> transformations() {
    final List<String> transformAliases = getList(TRANSFORMS_CONFIG);

    final List<Transformation<R>> transformations = new ArrayList<>(transformAliases.size());
    for (String alias : transformAliases) {
        final String prefix = TRANSFORMS_CONFIG + "." + alias + ".";
        final Transformation<R> transformation;
        try {
            transformation = getClass(prefix + "type").asSubclass(Transformation.class).newInstance();
        } catch (Exception e) {
            throw new ConnectException(e);
        }
        transformation.configure(originalsWithPrefix(prefix));
        transformations.add(transformation);
    }

    return transformations;
}      
...
public static final String TRANSFORMS_CONFIG = "transforms";       

Okay, 到这里我们我们介绍完了 transform的定义、使用、如何开发一个transform以及transform在Kafka Connect中是如何执行的。

后面我们将介绍Kafka Connect source connector 以及如何开发一个source connector。敬请期待,如果有问题可以和我留言哦~

继续阅读