Distributed mode下Kafka Connect启动流程详解
- 启动命令与配置
- connect-distributed脚本
- ConnectDistributed启动类
- 总结
Kafka Connect有两种启动模式:standalone mode和Distributed mode。standalone mode一般用于开发和测试的时候。而生产部署就需要使用Distributed mode,在多个节点部署Kafka Connect,组成一组Kafka Connect集群,将Connectors和Tasks分布到多台机器运行。这篇文章将详细讲解,在Distributed mode方式下Kafka Connect的启动过程。
启动命令与配置
Kafka Connect在Distributed mode下,部署方式也很简单。可通过手动下载confluent安装部署压缩包来部署。
bootstrap.servers=127.0.0.1:9092 // kafka connect依赖的kafka集群的主机和端口配置,多个用,分隔
group.id=my-group-id // 相同组的Kafka Connect配置同一id,与consumer group一样概念
// source connector和sink connector消息key,value序列化方式
key.converter=io.confluent.connect.avro.AvroConverter
value.converter=io.confluent.connect.avro.AvroConverter
// schema registry地址,虽然不是必须,但是使用schema registry更方便
key.converter.schema.registry.url=http://localhost:8081
value.converter.schema.registry.url=http://localhost:8081
// kafka connect内部需要用到的三个topic
config.storage.topic=my-connect-configs
offset.storage.topic=my-connect-offsets
status.storage.topic=my-connect-statuses
config.storage.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1
// kafka connect内部信息保存到kafka时消息的序列化方式
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
// 指定kafka connect加载的connector插件路径
plugin.path=/home/admin/kafkaConnect/plugins
一般kafka connect只配置上面的参数就足够。在一个节点上执行如下命令即可。
connect-distributed config.properties
在其他节点部署同一组Kafka Connect,登录到相应节点执行同样命令步骤即可。
connect-distributed脚本
Kafka Connect分布式模式的启动命令
connect-distributed
是一个bash脚本,内容如下:
if [ $# -lt 1 ];
then
echo "USAGE: $0 [-daemon] connect-distributed.properties"
exit 1
fi
# 获取connect-distributed所在目录名
base_dir=$(dirname $0)
# java_base_dir 指向confluent目录下share/java目录
# 该目录包括了confluent组件和kafka connect用到的jar包
java_base_dir=$( cd -P "$base_dir/../share/java" && pwd )
# kafka connect用到share/java/confluent-security/connect
# share/java/kafka、share/java/confluent-common、
# share/java/kafka-serde-tools、hare/java/monitoring-interceptors
# 这几个目录下的jar,加入CLASSPATH
for library in "confluent-security/connect" "kafka" "confluent-common" "kafka-serde-tools" "monitoring-interceptors"; do
dir="$java_base_dir/$library"
if [ -d "$dir" ]; then
classpath_prefix="$CLASSPATH:"
if [ "x$CLASSPATH" = "x" ]; then
classpath_prefix=""
fi
CLASSPATH="$classpath_prefix$dir/*"
fi
done
# 配置log4j配置
if [ "x$KAFKA_LOG4J_OPTS" = "x" ]; then
# 默认的配置文件路径
LOG4J_CONFIG_NORMAL_INSTALL="/etc/kafka/connect-log4j.properties"
LOG4J_CONFIG_ZIP_INSTALL="$base_dir/../etc/kafka/connect-log4j.properties"
if [ -e "$LOG4J_CONFIG_NORMAL_INSTALL" ]; then # Normal install layout
KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:${LOG4J_CONFIG_NORMAL_INSTALL}"
elif [ -e "${LOG4J_CONFIG_ZIP_INSTALL}" ]; then # Simple zip file layout
KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:${LOG4J_CONFIG_ZIP_INSTALL}"
else # Fallback to normal default
KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:$base_dir/../config/connect-log4j.properties"
fi
fi
export KAFKA_LOG4J_OPTS
# kafka connect堆默认参数
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-Xms256M -Xmx2G"
fi
EXTRA_ARGS=${EXTRA_ARGS-'-name connectDistributed'}
COMMAND=$1
case $COMMAND in
-daemon)
EXTRA_ARGS="-daemon "$EXTRA_ARGS
shift
;;
*)
;;
esac
export CLASSPATH
# ConnectDistributed为启动类
exec $(dirname $0)/kafka-run-class $EXTRA_ARGS org.apache.kafka.connect.cli.ConnectDistributed "[email protected]"
通过上面的脚本分析可知,该脚本主要的工作是将confluent目录下share/java目录中Kafka Connect依赖的库加入
CLASSPATH
,并且设置log4j配置文件路径,与Kafka Connect堆参数等。最后调用
org.apache.kafka.connect.cli.ConnectDistributed
来启动Kafka Connect。
ConnectDistributed启动类
下面是
org.apache.kafka.connect.cli.ConnectDistributed
的main方法的代码:
public static void main(String[] args) {
if (args.length < 1 || Arrays.asList(args).contains("--help")) {
log.info("Usage: ConnectDistributed worker.properties");
Exit.exit(1);
}
try {
WorkerInfo initInfo = new WorkerInfo();
initInfo.logAll();
String workerPropsFile = args[0];
Map<String, String> workerProps = !workerPropsFile.isEmpty() ?
Utils.propsToStringMap(Utils.loadProps(workerPropsFile)) : Collections.emptyMap();
ConnectDistributed connectDistributed = new ConnectDistributed();
Connect connect = connectDistributed.startConnect(workerProps);
// Shutdown will be triggered by Ctrl-C or via HTTP shutdown request
connect.awaitStop();
} catch (Throwable t) {
log.error("Stopping due to error", t);
Exit.exit(2);
}
}
通过上面的源码可知,main方法将配置文件中Kafka Connect的配置通过
workerProps
变量保存起来,创建
ConnectDistributed
实例,通过
startConnect
方法,继续Kafka Connect的启动。
public Connect startConnect(Map<String, String> workerProps) {
// 开头就扫描插件目录,将插件加载进来,每个插件都通过不同的classLoader隔离
log.info("Scanning for plugin classes. This might take a moment ...");
Plugins plugins = new Plugins(workerProps);
plugins.compareAndSwapWithDelegatingLoader();
DistributedConfig config = new DistributedConfig(workerProps);
String kafkaClusterId = ConnectUtils.lookupKafkaClusterId(config);
log.debug("Kafka cluster ID: {}", kafkaClusterId);
/**
* 创建jetty server和监听端口connector (jetty的类)
* 如果有admin.listeners也会创建相应connecotrs
* **/
RestServer rest = new RestServer(config);
/**
* 设置StaticsHandler
* 设置graceful_shut_down时间
* 启动jettyServer
* */
rest.initializeServer();
URI advertisedUrl = rest.advertisedUrl();
// 当前kafka connect worker的workerId,由配置的host和端口组成
String workerId = advertisedUrl.getHost() + ":" + advertisedUrl.getPort();
// Create the admin client to be shared by all backing stores.
Map<String, Object> adminProps = new HashMap<>(config.originals());
ConnectUtils.addMetricsContextProperties(adminProps, config, kafkaClusterId);
SharedTopicAdmin sharedAdmin = new SharedTopicAdmin(adminProps);
// 存储offset
KafkaOffsetBackingStore offsetBackingStore = new KafkaOffsetBackingStore(sharedAdmin);
offsetBackingStore.configure(config);
ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy = plugins.newPlugin(
config.getString(WorkerConfig.CONNECTOR_CLIENT_POLICY_CLASS_CONFIG),
config, ConnectorClientConfigOverridePolicy.class);
// kafka conect的worker实例
Worker worker = new Worker(workerId, time, plugins, config, offsetBackingStore, connectorClientConfigOverridePolicy);
// 指定config.providers,一些密码配置可以使用这种方式保存在其他地方,通过这种方式获取配置
WorkerConfigTransformer configTransformer = worker.configTransformer();
Converter internalValueConverter = worker.getInternalValueConverter();
// 保存status
StatusBackingStore statusBackingStore = new KafkaStatusBackingStore(time, internalValueConverter, sharedAdmin);
statusBackingStore.configure(config);
// 保存config,该topic只能有唯一分区
ConfigBackingStore configBackingStore = new KafkaConfigBackingStore(
internalValueConverter,
config,
configTransformer,
sharedAdmin);
// Pass the shared admin to the distributed herder as an additional AutoCloseable object that should be closed when the
// herder is stopped. This is easier than having to track and own the lifecycle ourselves.
DistributedHerder herder = new DistributedHerder(config, time, worker,
kafkaClusterId, statusBackingStore, configBackingStore,
advertisedUrl.toString(), connectorClientConfigOverridePolicy, sharedAdmin);
// Connect类将所有kafka connect的处理组件都绑定到一起
// 比如herder,worker,storage,command接口,并且管理他们的生命周期
final Connect connect = new Connect(herder, rest);
log.info("Kafka Connect distributed worker initialization took {}ms", time.hiResClockMs() - initStart);
try {
/**
* 调用herder.start方法
* --> 对于DistributedHerder.start方法就是向线程池提交自身
* --> 线程池DistributedHerder会运行run方法
* 调用rest.initializeResources方法
* --> 会将rest接口处理类注册到jettyServer
* **/
connect.start();
} catch (Exception e) {
log.error("Failed to start Connect", e);
connect.stop();
Exit.exit(3);
}
return connect;
}
通过上面的源码可知,在Kafka Connect启动过程会做以下几件事情:
- 扫描插件目录,加载每个插件,不同的插件用不同的classLoader进行隔离。因此如果要跟新插件必须重启Kafka Connect进行。可以使用滚动升级的方式。
- 启动REST接口的服务器,该服务器使用jetty。
- 创建3个BackingStorage,用于存放config,offset,status信息。依赖kafka实现这些信息的保存,对应配置文件的三个topic。
- 创建
实例,Worker
管理和运行分配给它的Connectors和Tasks。Connectors和Tasks都运行在Worker
创建的线程池中。Woker的启动过程可参考这篇文章。Worker
- 创建
实例,DistributedHerder
实例会处理REST请求和Rebalance事件等。注意DistributedHerder
实例使用单独一个线程处理大部分工作,因此负载太重时请求可能会阻塞。DistributedHerder
- 创建
实例,Connect
实例所有kafka connect的处理组件都绑定到一起。Connect
实例的Connect
方法会触发各种kafka connect的处理组件的start
方法,比如start
,DistributedHerder
等,并且会在jetty服务器中注册哥哥REST接口的处理器。Worker
当
startConnect
方法结束后,Kafka Connect的启动过程就结束了。
总结
这篇文章讲解了Kafka Connect的启动过程。通过对启动源码的分析可以大致了解Kafka Connect的各种内部组件和机制,对于深入研究和学习Kafka Connect是个很好的开头。