天天看点

Distributed mode下Kafka Connect启动流程详解启动命令与配置connect-distributed脚本ConnectDistributed启动类总结

Distributed mode下Kafka Connect启动流程详解

  • 启动命令与配置
  • connect-distributed脚本
  • ConnectDistributed启动类
  • 总结

Kafka Connect有两种启动模式:standalone mode和Distributed mode。standalone mode一般用于开发和测试的时候。而生产部署就需要使用Distributed mode,在多个节点部署Kafka Connect,组成一组Kafka Connect集群,将Connectors和Tasks分布到多台机器运行。这篇文章将详细讲解,在Distributed mode方式下Kafka Connect的启动过程。

启动命令与配置

Kafka Connect在Distributed mode下,部署方式也很简单。可通过手动下载confluent安装部署压缩包来部署。

bootstrap.servers=127.0.0.1:9092 // kafka connect依赖的kafka集群的主机和端口配置,多个用,分隔

group.id=my-group-id // 相同组的Kafka Connect配置同一id,与consumer group一样概念

// source connector和sink connector消息key,value序列化方式
key.converter=io.confluent.connect.avro.AvroConverter
value.converter=io.confluent.connect.avro.AvroConverter
// schema registry地址,虽然不是必须,但是使用schema registry更方便
key.converter.schema.registry.url=http://localhost:8081
value.converter.schema.registry.url=http://localhost:8081

// kafka connect内部需要用到的三个topic
config.storage.topic=my-connect-configs
offset.storage.topic=my-connect-offsets
status.storage.topic=my-connect-statuses

config.storage.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1

// kafka connect内部信息保存到kafka时消息的序列化方式
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

// 指定kafka connect加载的connector插件路径
plugin.path=/home/admin/kafkaConnect/plugins
           

一般kafka connect只配置上面的参数就足够。在一个节点上执行如下命令即可。

connect-distributed config.properties
           

在其他节点部署同一组Kafka Connect,登录到相应节点执行同样命令步骤即可。

connect-distributed脚本

Kafka Connect分布式模式的启动命令

connect-distributed

是一个bash脚本,内容如下:

if [ $# -lt 1 ];
then
        echo "USAGE: $0 [-daemon] connect-distributed.properties"
        exit 1
fi

# 获取connect-distributed所在目录名
base_dir=$(dirname $0)

# java_base_dir 指向confluent目录下share/java目录
# 该目录包括了confluent组件和kafka connect用到的jar包
java_base_dir=$( cd -P "$base_dir/../share/java" && pwd )

# kafka connect用到share/java/confluent-security/connect
# share/java/kafka、share/java/confluent-common、
# share/java/kafka-serde-tools、hare/java/monitoring-interceptors
# 这几个目录下的jar,加入CLASSPATH
for library in "confluent-security/connect" "kafka" "confluent-common" "kafka-serde-tools" "monitoring-interceptors"; do
  dir="$java_base_dir/$library"
  if [ -d "$dir" ]; then
    classpath_prefix="$CLASSPATH:"
    if [ "x$CLASSPATH" = "x" ]; then
      classpath_prefix=""
    fi
    CLASSPATH="$classpath_prefix$dir/*"
  fi
done

# 配置log4j配置
if [ "x$KAFKA_LOG4J_OPTS" = "x" ]; then
 # 默认的配置文件路径
  LOG4J_CONFIG_NORMAL_INSTALL="/etc/kafka/connect-log4j.properties"
  LOG4J_CONFIG_ZIP_INSTALL="$base_dir/../etc/kafka/connect-log4j.properties"
  if [ -e "$LOG4J_CONFIG_NORMAL_INSTALL" ]; then # Normal install layout
    KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:${LOG4J_CONFIG_NORMAL_INSTALL}"
  elif [ -e "${LOG4J_CONFIG_ZIP_INSTALL}" ]; then # Simple zip file layout
    KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:${LOG4J_CONFIG_ZIP_INSTALL}"
  else # Fallback to normal default
    KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:$base_dir/../config/connect-log4j.properties"
  fi
fi
export KAFKA_LOG4J_OPTS

# kafka connect堆默认参数
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
  export KAFKA_HEAP_OPTS="-Xms256M -Xmx2G"
fi

EXTRA_ARGS=${EXTRA_ARGS-'-name connectDistributed'}

COMMAND=$1
case $COMMAND in
  -daemon)
    EXTRA_ARGS="-daemon "$EXTRA_ARGS
    shift
    ;;
  *)
    ;;
esac

export CLASSPATH
# ConnectDistributed为启动类
exec $(dirname $0)/kafka-run-class $EXTRA_ARGS org.apache.kafka.connect.cli.ConnectDistributed "[email protected]"
           

通过上面的脚本分析可知,该脚本主要的工作是将confluent目录下share/java目录中Kafka Connect依赖的库加入

CLASSPATH

,并且设置log4j配置文件路径,与Kafka Connect堆参数等。最后调用

org.apache.kafka.connect.cli.ConnectDistributed

来启动Kafka Connect。

ConnectDistributed启动类

下面是

org.apache.kafka.connect.cli.ConnectDistributed

的main方法的代码:

public static void main(String[] args) {

        if (args.length < 1 || Arrays.asList(args).contains("--help")) {
            log.info("Usage: ConnectDistributed worker.properties");
            Exit.exit(1);
        }

        try {
            WorkerInfo initInfo = new WorkerInfo();
            initInfo.logAll();

            String workerPropsFile = args[0];
            Map<String, String> workerProps = !workerPropsFile.isEmpty() ?
                    Utils.propsToStringMap(Utils.loadProps(workerPropsFile)) : Collections.emptyMap();

            ConnectDistributed connectDistributed = new ConnectDistributed();
            Connect connect = connectDistributed.startConnect(workerProps);

            // Shutdown will be triggered by Ctrl-C or via HTTP shutdown request
            connect.awaitStop();

        } catch (Throwable t) {
            log.error("Stopping due to error", t);
            Exit.exit(2);
        }
    }
           

通过上面的源码可知,main方法将配置文件中Kafka Connect的配置通过

workerProps

变量保存起来,创建

ConnectDistributed

实例,通过

startConnect

方法,继续Kafka Connect的启动。

public Connect startConnect(Map<String, String> workerProps) {
    
    	// 开头就扫描插件目录,将插件加载进来,每个插件都通过不同的classLoader隔离
        log.info("Scanning for plugin classes. This might take a moment ...");
        Plugins plugins = new Plugins(workerProps);
        plugins.compareAndSwapWithDelegatingLoader();
        DistributedConfig config = new DistributedConfig(workerProps);

        String kafkaClusterId = ConnectUtils.lookupKafkaClusterId(config);
        log.debug("Kafka cluster ID: {}", kafkaClusterId);
        /**
         * 创建jetty server和监听端口connector (jetty的类)
         * 如果有admin.listeners也会创建相应connecotrs
         * **/
        RestServer rest = new RestServer(config);
        /**
         * 设置StaticsHandler
         * 设置graceful_shut_down时间
         * 启动jettyServer
         * */
        rest.initializeServer();

        URI advertisedUrl = rest.advertisedUrl();
        // 当前kafka connect worker的workerId,由配置的host和端口组成
        String workerId = advertisedUrl.getHost() + ":" + advertisedUrl.getPort();

        // Create the admin client to be shared by all backing stores.
        Map<String, Object> adminProps = new HashMap<>(config.originals());
        ConnectUtils.addMetricsContextProperties(adminProps, config, kafkaClusterId);
        SharedTopicAdmin sharedAdmin = new SharedTopicAdmin(adminProps);
        // 存储offset
        KafkaOffsetBackingStore offsetBackingStore = new KafkaOffsetBackingStore(sharedAdmin);
        offsetBackingStore.configure(config);

        ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy = plugins.newPlugin(
                config.getString(WorkerConfig.CONNECTOR_CLIENT_POLICY_CLASS_CONFIG),
                config, ConnectorClientConfigOverridePolicy.class);
        // kafka conect的worker实例
        Worker worker = new Worker(workerId, time, plugins, config, offsetBackingStore, connectorClientConfigOverridePolicy);
        // 指定config.providers,一些密码配置可以使用这种方式保存在其他地方,通过这种方式获取配置
        WorkerConfigTransformer configTransformer = worker.configTransformer();

        Converter internalValueConverter = worker.getInternalValueConverter();
        // 保存status
        StatusBackingStore statusBackingStore = new KafkaStatusBackingStore(time, internalValueConverter, sharedAdmin);
        statusBackingStore.configure(config);
        // 保存config,该topic只能有唯一分区
        ConfigBackingStore configBackingStore = new KafkaConfigBackingStore(
                internalValueConverter,
                config,
                configTransformer,
                sharedAdmin);

        // Pass the shared admin to the distributed herder as an additional AutoCloseable object that should be closed when the
        // herder is stopped. This is easier than having to track and own the lifecycle ourselves.
        DistributedHerder herder = new DistributedHerder(config, time, worker,
                kafkaClusterId, statusBackingStore, configBackingStore,
                advertisedUrl.toString(), connectorClientConfigOverridePolicy, sharedAdmin);
        // Connect类将所有kafka connect的处理组件都绑定到一起
        // 比如herder,worker,storage,command接口,并且管理他们的生命周期
        final Connect connect = new Connect(herder, rest);
        log.info("Kafka Connect distributed worker initialization took {}ms", time.hiResClockMs() - initStart);
        try {
            /**
             * 调用herder.start方法
             * --> 对于DistributedHerder.start方法就是向线程池提交自身
             * --> 线程池DistributedHerder会运行run方法
             * 调用rest.initializeResources方法
             * --> 会将rest接口处理类注册到jettyServer
             * **/
            connect.start();
        } catch (Exception e) {
            log.error("Failed to start Connect", e);
            connect.stop();
            Exit.exit(3);
        }

        return connect;
    }
           

通过上面的源码可知,在Kafka Connect启动过程会做以下几件事情:

  1. 扫描插件目录,加载每个插件,不同的插件用不同的classLoader进行隔离。因此如果要跟新插件必须重启Kafka Connect进行。可以使用滚动升级的方式。
  2. 启动REST接口的服务器,该服务器使用jetty。
  3. 创建3个BackingStorage,用于存放config,offset,status信息。依赖kafka实现这些信息的保存,对应配置文件的三个topic。
  4. 创建

    Worker

    实例,

    Worker

    管理和运行分配给它的Connectors和Tasks。Connectors和Tasks都运行在

    Worker

    创建的线程池中。Woker的启动过程可参考这篇文章。
  5. 创建

    DistributedHerder

    实例,

    DistributedHerder

    实例会处理REST请求和Rebalance事件等。注意

    DistributedHerder

    实例使用单独一个线程处理大部分工作,因此负载太重时请求可能会阻塞。
  6. 创建

    Connect

    实例,

    Connect

    实例所有kafka connect的处理组件都绑定到一起。

    Connect

    实例的

    start

    方法会触发各种kafka connect的处理组件的

    start

    方法,比如

    DistributedHerder

    Worker

    等,并且会在jetty服务器中注册哥哥REST接口的处理器。

startConnect

方法结束后,Kafka Connect的启动过程就结束了。

总结

这篇文章讲解了Kafka Connect的启动过程。通过对启动源码的分析可以大致了解Kafka Connect的各种内部组件和机制,对于深入研究和学习Kafka Connect是个很好的开头。