可以配置Kafka Connector transforms以进行一个轻量级的消息修改。transforms可以方面的修改数据以及事件路由。
接下来我们将介绍如何配置一个transform。
首先,我们要在任务中配置transform,无论source 还是sink都可以配置transform
{
"name": "test-source",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "localhost",
"database.port": "3306",
"database.user": "root",
"database.password": "root",
"tombstones.on.delete": "false",
"database.server.id": "1",
"database.server.name": "test-server",
"database.history.kafka.topic": "test-server-history",
"database.history.kafka.bootstrap.servers": "localhost:9092",
"include.schema.changes": "true",
"database.serverTimezone": "Asia/Shanghai",
"database.driver": "com.mysql.jdbc.Driver",
"database.history.kafka.recovery.poll.interval.ms": "3000",
"defaultFetchSize": "1000",
"database.tinyInt1isBit": "false",
"snapshot.locking.mode": "none",
"decimal.handling.mode": "string",
"transforms": "MakeMap,InsertSource",
"transforms.MakeMap.type":"org.apache.kafka.connect.transforms.HoistField$Value",
"transforms.MakeMap.field":"line",
"transforms.InsertSource.type":"org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.InsertSource.static.field":"data_source",
"transforms.InsertSource.static.value":"test-file-source"
}
}
这是官方配置一个transforms的例子。主要配置有transforms,这个key下面主要配置有哪些transforms,在有多个transforms的情况下,transform执行的顺序是按照配置的顺序,在本文后面我们会贴出源码来向大家说明。
官方以及第三方提供了很多transforms,在满足我们需求的情况下可以直接使用,如果不能满足我们的需求我们将需要自己开发或者是二次开发transform。
我们来介绍下官方目前有哪些transforms:
1. Cast
2. ExtractField
3. Flatten
4. HoistField
5. InsertField
6. MaskField
7. RegexRouter
8. ReplaceField
9. TimestampConverter
10. SetSchemaMetadata
11. TimestampRouter
12. ValueToKey
confluent支持的transform有:
1. Cast 使用的是Kafka的包(org.apache.kafka.connect.transforms.Cast$Key 或者是 org.apache.kafka.connect.transforms.Cast$Value)
2. Drop(io.confluent.connect.transforms.Drop$Key 或者是 io.confluent.connect.transforms.Drop$Value)
3. ExtractField 使用的是Kafka的包 org.apache.kafka.connect.transforms.ExtractField$Value or org.apache.kafka.connect.transforms.ExtractField$Key
4. ExtractTopic (io.confluent.connect.transforms.ExtractTopic$Key 或者是 io.confluent.connect.transforms.ExtractTopic$Value)
5. Filter (io.confluent.connect.transforms.Filter$Key 或者是 io.confluent.connect.transforms.Filter$Value)
6. Flatten (org.apache.kafka.connect.transforms.Flatten$Key 或者是 org.apache.kafka.connect.transforms.Flatten$Value)
7. HoistField (org.apache.kafka.connect.transforms.HoistField$Key 或者是 org.apache.kafka.connect.transforms.HoistField$Value)
8. InsertField (org.apache.kafka.connect.transforms.InsertField$Key 或者是 org.apache.kafka.connect.transforms.InsertField$Value)
9. MaskField (org.apache.kafka.connect.transforms.MaskField$Key 或者是 org.apache.kafka.connect.transforms.MaskField$Value)
10. MessageTimeStampRouter (io.confluent.connect.transforms.MessageTimestampRouter)
11. RegexRouter 使用Kafka的包 org.apache.kafka.connect.transforms.RegexRouter
12. ReplaceField (org.apache.kafka.connect.transforms.ReplaceField$Key 或者是 org.apache.kafka.connect.transforms.ReplaceField$Value)
13. SetSchemaMetadata (org.apache.kafka.connect.transforms.SetSchemaMetadata$Key 或者是 org.apache.kafka.connect.transforms.SetSchemaMetadata$Value)
14. TimestampConverter (org.apache.kafka.connect.transforms.TimestampConverter$Key 或者是org.apache.kafka.connect.transforms.TimestampConverter$Value)
15. TimestampRouter (org.apache.kafka.connect.transforms.TimestampRouter)
16. TombstoneHandler (io.confluent.connect.transforms.TombstoneHandler)
17. ValueToKey (org.apache.kafka.connect.transforms.ValueToKey)
具体使用将在后期的文档中详细说明。
在这些transform不能满足我们需求的情况下,我们只能开发自己的transform了,
开发transform主要是实现这个接口:Transformation
实现的方法有:
1. configure 因为Transformation继承了Configurable类,这里从source、sink 配置的json中获取transform的配置信息
2. apply 主要处理数据的地方,例如类型转换、数据结果转换、数据值的转换都在这里处理
3. close 因为 Transformation 继承了Closeable类
4. config 这里主要配置transform相关的配置,例如我们上面提到的:
"transforms.MakeMap.type":"org.apache.kafka.connect.transforms.HoistField$Value",
"transforms.MakeMap.field":"line",
这里配置的值在config值能否在transform中能获取到可以在这个方法里做处理。
接下来我们将介绍一个transform的源码:
public class RegexRouter<R extends ConnectRecord<R>> implements Transformation<R> { public static final String OVERVIEW_DOC = "Update the record topic using the configured regular expression and replacement string." + "<p/>Under the hood, the regex is compiled to a <code>java.util.regex.Pattern</code>. " + "If the pattern matches the input topic, <code>java.util.regex.Matcher#replaceFirst()</code> is used with the replacement string to obtain the new topic."; public static final ConfigDef CONFIG_DEF = new ConfigDef() .define(ConfigName.REGEX, ConfigDef.Type.STRING, ConfigDef.NO_DEFAULT_VALUE, new RegexValidator(), ConfigDef.Importance.HIGH, "Regular expression to use for matching.") .define(ConfigName.REPLACEMENT, ConfigDef.Type.STRING, ConfigDef.NO_DEFAULT_VALUE, ConfigDef.Importance.HIGH, "Replacement string."); private interface ConfigName { String REGEX = "regex"; String REPLACEMENT = "replacement"; } private Pattern regex; private String replacement; @Override public void configure(Map<String, ?> props) { final SimpleConfig config = new SimpleConfig(CONFIG_DEF, props); regex = Pattern.compile(config.getString(ConfigName.REGEX)); replacement = config.getString(ConfigName.REPLACEMENT); } @Override public R apply(R record) { final Matcher matcher = regex.matcher(record.topic()); if (matcher.matches()) { final String topic = matcher.replaceFirst(replacement); return record.newRecord(topic, record.kafkaPartition(), record.keySchema(), record.key(), record.valueSchema(), record.value(), record.timestamp()); } return record; } @Override public void close() { } @Override public ConfigDef config() { return CONFIG_DEF; } }
从这个代码片段可以看出,这个transform只接受两个配置项的值:一个是ConfigName.REGEX(replacement), 另一个是ConfigName.REPLACEMENT(replacement)public static final ConfigDef CONFIG_DEF = new ConfigDef() .define(ConfigName.REGEX, ConfigDef.Type.STRING, ConfigDef.NO_DEFAULT_VALUE, new RegexValidator(), ConfigDef.Importance.HIGH, "Regular expression to use for matching.") .define(ConfigName.REPLACEMENT, ConfigDef.Type.STRING, ConfigDef.NO_DEFAULT_VALUE, ConfigDef.Importance.HIGH, "Replacement string.");
从配置configure方法中可以看出根据我们配置的CONFIG_DEF获取其对于配置项的值
public void configure(Map<String, ?> props) { final SimpleConfig config = new SimpleConfig(CONFIG_DEF, props); regex = Pattern.compile(config.getString(ConfigName.REGEX)); replacement = config.getString(ConfigName.REPLACEMENT); }
而在这个例子中close方法是空的,说明这边在调用close方法的时候没有需要执行的,例如不需要释放资源、 不需要关闭连接等。
接下来我们将说这个transform中的重点了:apply方法,其主要思路是获取这条数据(record)的topic,然后根据我们配置的regex去匹配,如果能匹配上,将对topic的值替换成我们配置的值,并重新生成新的数据(record);如果没有匹配上则返回其原始值。
@Override public R apply(R record) { final Matcher matcher = regex.matcher(record.topic()); if (matcher.matches()) { final String topic = matcher.replaceFirst(replacement); return record.newRecord(topic, record.kafkaPartition(), record.keySchema(), record.key(), record.valueSchema(), record.value(), record.timestamp()); } return record; }
自此一个tranfrom的处理就完成了,会继续往下传,如果有多个transfrom将会一一执行。我们来看下transform在Kafka Connet运行的时候是如何执行的。
源码在 org.apache.kafka.connect.runtime.WorkerSourceTask#sendRecords 发送记录给下游的时候调用
在发送每条数据(record)的时候会调用:final SourceRecord record = transformationChain.apply(preTransformRecord);
而在apply方法的实现是:
public R apply(R record) { if (transformations.isEmpty()) return record; for (Transformation<R> transformation : transformations) { record = transformation.apply(record); if (record == null) break; } return record; }
代码简洁,如果我们没有配置transform直接原记录返回,如果有配置transform,会按照顺序,对数据(record)应用一次transform,然后返回结果。
transform配置项的获取源码:
...public <R extends ConnectRecord<R>> List<Transformation<R>> transformations() { final List<String> transformAliases = getList(TRANSFORMS_CONFIG); final List<Transformation<R>> transformations = new ArrayList<>(transformAliases.size()); for (String alias : transformAliases) { final String prefix = TRANSFORMS_CONFIG + "." + alias + "."; final Transformation<R> transformation; try { transformation = getClass(prefix + "type").asSubclass(Transformation.class).newInstance(); } catch (Exception e) { throw new ConnectException(e); } transformation.configure(originalsWithPrefix(prefix)); transformations.add(transformation); } return transformations; }
public static final String TRANSFORMS_CONFIG = "transforms";
Okay, 到这里我们我们介绍完了 transform的定义、使用、如何开发一个transform以及transform在Kafka Connect中是如何执行的。
后面我们将介绍Kafka Connect source connector 以及如何开发一个source connector。敬请期待,如果有问题可以和我留言哦~