天天看点

深入分析Kafka Connect的Worker实现Worker中的线程池Worker的启动Worker运行ConnectorWorker运行Task总结

深入分析Kafka Connect的Worker实现

  • Worker中的线程池
  • Worker的启动
  • Worker运行Connector
  • Worker运行Task
  • 总结

根据Confluent官方文档Kafka Connect Concept的描述。Kafka Connect的Connectors和Tasks都是运行在进程中的逻辑工作单元。而这些工作进程称为Worker,Kafka Connect运行逻辑如下图所示:

深入分析Kafka Connect的Worker实现Worker中的线程池Worker的启动Worker运行ConnectorWorker运行Task总结

从上图来看,一个Kafka Connect进程就是一个Woker。同一组Kafka Connect组成同一组Worker,分担属于该组的Connectors和Taskss运行工作。根据Distributed mode下Kafka Connect启动流程详解分析的启动过程来看,Kafka Connect源码中抽象了一个Worker类,在启动的时候会创建一个实例,并且运行。这篇文章就是深入分析Worker类的实现和如何启动运行分配给它的Connectors和Tasks。

Worker中的线程池

Woker中有一个线程池,用于运行Connectors和Tasks的工作的。Worker的公共构造函数如下:

深入分析Kafka Connect的Worker实现Worker中的线程池Worker的启动Worker运行ConnectorWorker运行Task总结

从上面的构造函数可知,Worker的线程池是一个默认的缓存线程池。每次要运行一个Connector或Task任务,都会提交一个Runnable。线程池会随着运行的Connectors和Tasks数量增加。

Worker的启动

在Kafka Connect启动流程中,创建了Worker实例后会调用

start

方法,启动Worker。Worker的

start

方法实现如下:

深入分析Kafka Connect的Worker实现Worker中的线程池Worker的启动Worker运行ConnectorWorker运行Task总结

Worker的

start

方法比较简单,主要是启动

offsetBackingStorage

offsetBackingStorage

可用于Source Connector的任务,保存已经产生的offset信息。这样给需要保存已经产生的消息的信息的Source Connector提供了一个统一的机制,无需Source Connector另外设计一套。

offsetBackingStorage

信息的持久化都是依赖Kafka的topic,与Consumer的offset保存原理一样。

Worker运行Connector

一般当我们希望向Kafka Connect提交新的Connector时候都会调用如下REST接口例子:

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" 192.168.39.213:8083/connectors -d '{ 
    "name": "project-connector", 
    "config": {
        "tasks.max": "1",
        "connector.class": "io.debezium.connector.mysql.MySqlConnector", 
        "database.hostname": "127.0.0.1", 
        "database.port": "3306", 
        "database.user": "root", 
        "database.password": "123456", 
        "database.server.id": "100", 
        "database.server.name": "myServerName", 
        "database.include.list": "myDatabase", 
        "table.whitlelist" : "project,member",
        "database.history.kafka.bootstrap.servers": "127.0.0.1:9092", 
        "database.history.kafka.topic": "debezium-mysql", 
        "include.schema.changes": "true" 
    }
}'
           

当Kafka Connect组的Leader收到这个Connector配置后,会保存到Config的topic中。这组Kafka Connect就开始新的Rebalance过程,当Rebalance结束后,这个Connector就会被分配给其中一个Kafka Connect运行。

Kafka Connect运行Connector时都会调用

Worker.startConnector

方法:

public void startConnector(
            String connName,
            Map<String, String> connProps,
            CloseableConnectorContext ctx,
            ConnectorStatus.Listener statusListener,
            TargetState initialState,
            Callback<TargetState> onConnectorStateChange
    ) {
        try (LoggingContext loggingContext = LoggingContext.forConnector(connName)) {
            . . . . . . . 
            final WorkerConnector workerConnector;
            ClassLoader savedLoader = plugins.currentThreadLoader();
            try {
                // 根据配置中Connector class类,从插件classLoader加载这个类
                final String connClass = connProps.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
                ClassLoader connectorLoader = plugins.delegatingLoader().connectorLoader(connClass);
                savedLoader = Plugins.compareAndSwapLoaders(connectorLoader);

                log.info("Creating connector {} of type {}", connName, connClass);
                // 创建该Connector类实例
                final Connector connector = plugins.newConnector(connClass);
                // 根据Connector类型不同创建不同的ConnectorConfig配置类
                final ConnectorConfig connConfig = ConnectUtils.isSinkConnector(connector)
                        ? new SinkConnectorConfig(plugins, connProps)
                        : new SourceConnectorConfig(plugins, connProps, config.topicCreationEnable());

                final OffsetStorageReader offsetReader = new OffsetStorageReaderImpl(
                        offsetBackingStore, connName, internalKeyConverter, internalValueConverter);
                // 创建WorkerConnector类,其中封装了Connector
                // WorkerConnector是实现Runnable接口
                workerConnector = new WorkerConnector(
                        connName, connector, connConfig, ctx, metrics, statusListener, offsetReader, connectorLoader);
                log.info("Instantiated connector {} with version {} of type {}", connName, connector.version(), connector.getClass());
                workerConnector.transitionTo(initialState, onConnectorStateChange);
                Plugins.compareAndSwapLoaders(savedLoader);
            } catch (Throwable t) {
                . . . . . . . 
                return;
            }

            . . . . . . 

            executor.submit(workerConnector);

            log.info("Finished creating connector {}", connName);
            workerMetricsGroup.recordConnectorStartupSuccess();
        }
    }
           

从上面的代码片段可知,

startConnector

的主要工作是将Connector实例和配置都封装到

WorkerConnector

类中。

WorkerConnector

实现了Runnable接口,可以提交到线程池中运行。

下图是

WorkerConnector

run方法在启动时的大致过程,run方法会调用doRun方法进行实际的工作。

深入分析Kafka Connect的Worker实现Worker中的线程池Worker的启动Worker运行ConnectorWorker运行Task总结

通过上面的时序图可知,Connector启动过程是调用Connector的几个接口,调用顺序为:

initialize

-->

taskClasss

-->

taskConfigs

-->

start

从taskClasss,taskConfigs两个方法的调用是Connector最主要的工作。由于Connector不是做实际的数据读写工作的,而这些工作是交由Conenctor对应的Task来处理的。那么Connector的主要工作就是创建对应的Task的配置,当Connector生成了Task的配置并且这些配置被放入Config的topic中后,会触发Kafka Connect新一轮的Rebalance,新的Rebalance结束后,这些Task也被分配到对应的Kafka Connect上面运行了,这个过程跟分配Connector是一样的。

Worker运行Task

下面是Worker的

startTask

方法实现片段:

public boolean startTask(
            ConnectorTaskId id,
            ClusterConfigState configState,
            Map<String, String> connProps,
            Map<String, String> taskProps,
            TaskStatus.Listener statusListener,
            TargetState initialState
    ) {
        final WorkerTask workerTask;
        try (LoggingContext loggingContext = LoggingContext.forTask(id)) {

            . . . . . .

            ClassLoader savedLoader = plugins.currentThreadLoader();
            try {
            	// 加载Task的class类,并且创建实例
                String connType = connProps.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
                ClassLoader connectorLoader = plugins.delegatingLoader().connectorLoader(connType);
                savedLoader = Plugins.compareAndSwapLoaders(connectorLoader);
                final ConnectorConfig connConfig = new ConnectorConfig(plugins, connProps);
                final TaskConfig taskConfig = new TaskConfig(taskProps);
                final Class<? extends Task> taskClass = taskConfig.getClass(TaskConfig.TASK_CLASS_CONFIG).asSubclass(Task.class);
                final Task task = plugins.newTask(taskClass);
                log.info("Instantiated task {} with version {} of type {}", id, task.version(), taskClass.getName());

                Converter keyConverter = plugins.newConverter(connConfig, WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, ClassLoaderUsage
                                                                                                                           .CURRENT_CLASSLOADER);
                Converter valueConverter = plugins.newConverter(connConfig, WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, ClassLoaderUsage.CURRENT_CLASSLOADER);
                HeaderConverter headerConverter = plugins.newHeaderConverter(connConfig, WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG,
                                                                             ClassLoaderUsage.CURRENT_CLASSLOADER);
                . . . . . . .
				// 构造WokerTask实例,封装Task实例和配置信息等
                workerTask = buildWorkerTask(configState, connConfig, id, task, statusListener, initialState, keyConverter, valueConverter,
                                             headerConverter, connectorLoader);
                workerTask.initialize(taskConfig);
                Plugins.compareAndSwapLoaders(savedLoader);
            } catch (Throwable t) {
                . . . . . .
                return false;
            }

            WorkerTask existing = tasks.putIfAbsent(id, workerTask);
            if (existing != null)
                throw new ConnectException("Task already exists in this worker: " + id);

            executor.submit(workerTask);
            . . . . . .
            return true;
        }
    }
           

从上面的代码可知,启动Task过程跟Connector相似。从插件的ClassLoader加载Task的class并且创建Task实例。然后创建

WorkerTask

实例,

WorkerTask

会封装Task实例,对应的KafkaProducer/KafkaConsumer和配置信息等。下面是

buildWorkerTask

方法实现片段:

private WorkerTask buildWorkerTask(ClusterConfigState configState,
                                       ConnectorConfig connConfig,
                                       ConnectorTaskId id,
                                       Task task,
                                       TaskStatus.Listener statusListener,
                                       TargetState initialState,
                                       Converter keyConverter,
                                       Converter valueConverter,
                                       HeaderConverter headerConverter,
                                       ClassLoader loader) {
        ErrorHandlingMetrics errorHandlingMetrics = errorHandlingMetrics(id);
        final Class<? extends Connector> connectorClass = plugins.connectorClass(
            connConfig.getString(ConnectorConfig.CONNECTOR_CLASS_CONFIG));
        RetryWithToleranceOperator retryWithToleranceOperator = new RetryWithToleranceOperator(connConfig.errorRetryTimeout(),
                connConfig.errorMaxDelayInMillis(), connConfig.errorToleranceType(), Time.SYSTEM);
        retryWithToleranceOperator.metrics(errorHandlingMetrics);

        // 判断Task类型是Source还是Sink
        if (task instanceof SourceTask) {
            SourceConnectorConfig sourceConfig = new SourceConnectorConfig(plugins,
                    connConfig.originalsStrings(), config.topicCreationEnable());
            retryWithToleranceOperator.reporters(sourceTaskReporters(id, sourceConfig, errorHandlingMetrics));
            // 如果配有Transformer,这里构造Transformer链,用于发送前转换消息
            TransformationChain<SourceRecord> transformationChain = new TransformationChain<>(sourceConfig.<SourceRecord>transformations(), retryWithToleranceOperator);
            . . . . . . .
            Map<String, Object> producerProps = producerConfigs(id, "connector-producer-" + id, config, sourceConfig, connectorClass,
                                                                connectorClientConfigOverridePolicy, kafkaClusterId);
            // 如果是SourceTask就要创建对应的KafkaProducer,用于向Kafka topic发送数据
            KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(producerProps);
   			. . . . . . .

            // Note we pass the configState as it performs dynamic transformations under the covers
            return new WorkerSourceTask(id, (SourceTask) task, statusListener, initialState, keyConverter, valueConverter,
                    headerConverter, transformationChain, producer, admin, topicCreationGroups,
                    offsetReader, offsetWriter, config, configState, metrics, loader, time, retryWithToleranceOperator, herder.statusBackingStore(), executor);
        } else if (task instanceof SinkTask) {
        	// 如果配有Transformer,这里构造Transformer链,用于收到消息后发送给SinkTask处理前进行消息转换
            TransformationChain<SinkRecord> transformationChain = new TransformationChain<>(connConfig.<SinkRecord>transformations(), retryWithToleranceOperator);
            SinkConnectorConfig sinkConfig = new SinkConnectorConfig(plugins, connConfig.originalsStrings());
            retryWithToleranceOperator.reporters(sinkTaskReporters(id, sinkConfig, errorHandlingMetrics, connectorClass));
            WorkerErrantRecordReporter workerErrantRecordReporter = createWorkerErrantRecordReporter(sinkConfig, retryWithToleranceOperator,
                    keyConverter, valueConverter, headerConverter);

            Map<String, Object> consumerProps = consumerConfigs(id, config, connConfig, connectorClass, connectorClientConfigOverridePolicy, kafkaClusterId);
            // 如果是SinkTask,创建对应的KafkaConsumer,同一组的Task的Consumer group id由生成Connector名字组成
            KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(consumerProps);

            return new WorkerSinkTask(id, (SinkTask) task, statusListener, initialState, config, configState, metrics, keyConverter,
                                      valueConverter, headerConverter, transformationChain, consumer, loader, time,
                                      retryWithToleranceOperator, workerErrantRecordReporter, herder.statusBackingStore());
        }
        . . . . . .
    }
           

从上面的代码可知,根据Task的不同,会创建不同的

WorkerTask

子类。如果是SourceTask会创建KafkaProducer,并且创建

WorkerSourceTask

。如果是SinkTask会创建KafkaConsumer,并且创建

WorkerSinkTask

因为

WorkerTask

实现了Runnable接口,提交到线程池后,它的run方法都会调用

execute

方法执行实际的工作。而

execute

方法是虚函数,具体实现根据

WorkerSourceTask

WorkerSinkTask

不同而不同。但是这两种

WorkerTask

首先都会回调Task的生命周期方法如initialize和start等。主要不同的地方是

WorkerSourceTask

会产生消息,通过KafkaProducer发送到Kafka集群。而

WorkerSinkTask

则是使用KafkaConsumer订阅topic消息,然后将订阅的消息传给SinkTask的put方法,实现SinkTask处理任务的。

总结

一个Kafka Connect进程就是一个Worker,Kafka Connect通过抽象实现一个Worker类来实现对应的功能。Worker类主要管理与运行分配给它的Connectors和Tasks,这些Connectors和Tasks都会通过WorkerConnector和WokerTask两个实现了Runnable接口的类来封装。通过放入Worker类中的线程池,实现Connectors和Tasks的运行工作。

Connector运行的主要工作是,生成它的Task的配置。而Task是实际进行读写处理消息工作的组件。