天天看点

Kafka:Topic概念与API介绍

Topic

事件被组织并持久地存储在​

​Topic​

​​中,​

​Topic​

​​类似于文件系统中的文件夹,事件就是该文件夹中的文件。​

​Kafka​

​​中的​

​Topic​

​​始终是多生产者和多订阅者:一个​

​Topic​

​​可以有零个、一个或多个生产者向其写入事件,也可以有零个、一个或多个消费者订阅这些事件。​

​Topic​

​​中的事件可以根据需要随时读取,与传统的消息中间件不同,事件在使用后不会被删除,相反,可以通过配置来定义​

​Kafka​

​​中每个​

​Topic​

​​应该保留事件的时间,超过该事件后旧事件将被丢弃。​

​Kafka​

​的性能在数据大小方面实际上是恒定的,因此长时间存储数据是非常好的。

Partition

​Topic​

​​是分区的,这意味着一个​

​Topic​

​​可以分布在多个​

​Kafka​

​​节点上。数据的这种分布式放置对于可伸缩性非常重要,因为它允许客户端应用程序同时从多个​

​Kafka​

​​节点读取和写入数据。将新事件发布到​

​Topic​

​​时,它实际上会​

​appended​

​​到​

​Topic​

​​的一个​

​Partition​

​​中。具有相同事件​

​key​

​​的事件将写入同一​

​Partition​

​​,​

​Kafka​

​​保证给定​

​Topic​

​​的​

​Partition​

​的任何使用者都将始终以与写入时完全相同的顺序读取该分区的事件。

Replication

为了使数据具有容错性和高可用性,每个​

​Topic​

​​都可以有多个​

​Replication​

​​,以便始终有多个​

​Kafka​

​​节点具有数据副本,以防出现问题。常见的生产设置是​

​replicationFactor​

​​为​

​3​

​​,即始终有三份数据副本(包括一份原始数据)。此​

​Replication​

​​在​

​Topic​

​​的​

​Partition​

​级别执行。

​Kafka​

​​在指定数量(通过​

​replicationFactor​

​​)的服务器上复制每个​

​Topic​

​​的​

​Partition​

​​,这允许在集群中的某些服务器发生故障时进行自动故障转移,以便在出现故障时服务仍然可用。​

​Replication​

​​的单位是​

​Topic​

​​的​

​Partition​

​​。在非故障条件下,​

​Kafka​

​​中的每个​

​Partition​

​​都有一个​

​leader​

​​和零个或多个​

​follower​

​​。​

​replicationFactor​

​​是复制副本(包括​

​leader​

​​)的总数。所有读和写操作都将转到​

​Partition​

​​的​

​leader​

​​上。通常,有比​

​Kafka​

​​节点多得多的​

​Partition​

​​,并且这些​

​Partition​

​​的​

​leader​

​​在​

​Kafka​

​​节点之间均匀分布。​

​follower​

​​上的数据需要与​

​leader​

​​的数据同步,所有数据都具有相同的偏移量和顺序(当然,在任何给定时间,​

​leader​

​​的数据末尾可能有一些尚未复制的数据)。​

​follower​

​​会像普通​

​Kafka​

​​消费者一样使用来自​

​leader​

​​的消息,并将其应用到自己的数据中。如下图所示,三个​

​Kafka​

​​节点上有两个​

​Topic​

​​(​

​Topic 0​

​​和​

​Topic 1​

​​),​

​Topic 0​

​​有两个​

​Partition​

​​并且​

​replicationFactor​

​​为​

​3​

​​(红色的​

​Partition​

​​为​

​leader​

​​),​

​Topic 1​

​​有三个​

​Partition​

​​,​

​replicationFactor​

​​也为​

​3​

​​(红色的​

​Partition​

​​为​

​leader​

​)。

Kafka:Topic概念与API介绍

API

添加依赖:

<dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>3.0.0</version>
        </dependency>      

这里使用的​

​kafka-clients​

​​版本和博主之前部署的​

​Kafka​

​版本一致:

  • ​​Kafka:部署Kafka​​

client

操作​

​Topic​

​​的客户端通过​

​AdminClient​

​抽象类来创建,源码如下:

package org.apache.kafka.clients.admin;

import java.util.Map;
import java.util.Properties;

public abstract class AdminClient implements Admin {

    /**
     * 使用给定的配置创建一个新的Admin
     * props:Admin的配置
     * 返回KafkaAdminClient实例
     */
    public static AdminClient create(Properties props) {
        return (AdminClient) Admin.create(props);
    }

    /**
     * 重载方法
     * 使用给定的配置创建一个新的Admin
     * props:Admin的配置
     * 返回KafkaAdminClient实例
     */
    public static AdminClient create(Map<String, Object> conf) {
        return (AdminClient) Admin.create(conf);
    }
}      

实际上会返回一个​

​KafkaAdminClient​

​​实例(​

​KafkaAdminClient​

​​类是​

​AdminClient​

​​抽象类的子类),​

​KafkaAdminClient​

​​类的方法比较多,其中​

​private​

​​方法服务于​

​public​

​方法(提供给用户的服务)。

Kafka:Topic概念与API介绍

​KafkaAdminClient​

​​类提供的​

​public​

​​方法是对​

​Admin​

​接口的实现。

Kafka:Topic概念与API介绍

create

创建一个新的​

​Topic​

​。

package com.kaven.kafka.admin;

import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.KafkaFuture;

import java.util.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;

public class Admin {

    private static final AdminClient adminClient = Admin.getAdminClient();

    public static void main(String[] args) throws InterruptedException, ExecutionException {

        Admin admin = new Admin();
        admin.createTopic();
        Thread.sleep(100000);
    }

    public void createTopic() throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(1);
        CreateTopicsResult topics = adminClient.createTopics(
                Collections.singleton(new NewTopic("new-topic-kaven", 1, (short) 1))
        );
        Map<String, KafkaFuture<Void>> values = topics.values();
        values.forEach((name, future) -> {
            future.whenComplete((a, throwable) -> {
                if(throwable != null) {
                    System.out.println(throwable.getMessage());
                }
                System.out.println(name);
                latch.countDown();
            });
        });
        latch.await();
    }

    public static AdminClient getAdminClient() {
        Properties properties = new Properties();
        properties.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.31.240:9092");
        return AdminClient.create(properties);
    }
}      

创建​

​AdminClient​

​​(简单使用,配置​

​BOOTSTRAP_SERVERS_CONFIG​

​就可以了):

public static AdminClient getAdminClient() {
        Properties properties = new Properties();
        properties.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.31.240:9092");
        return AdminClient.create(properties);
    }      

创建​

​Topic​

​​(传入一个​

​NewTopic​

​​实例,并且给该​

​NewTopic​

​​实例配置​

​name​

​​、​

​numPartitions​

​​、​

​replicationFactor​

​):

public void createTopic() throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(1);
        CreateTopicsResult topics = adminClient.createTopics(
                Collections.singleton(new NewTopic("new-topic-kaven", 1, (short) 1))
        );
        Map<String, KafkaFuture<Void>> values = topics.values();
        values.forEach((name, future) -> {
            future.whenComplete((a, throwable) -> {
                if(throwable != null) {
                    System.out.println(throwable.getMessage());
                }
                System.out.println(name);
                latch.countDown();
            });
        });
        latch.await();
    }      

提供的方法大都是异步编程模式的,这些基础知识就不介绍了,输出如下图所示:

Kafka:Topic概念与API介绍

list

获取​

​Topic​

​列表。

public void listTopics() throws ExecutionException, InterruptedException {
        ListTopicsResult listTopicsResult = adminClient.
                listTopics(new ListTopicsOptions().listInternal(true));
        Set<String> names = listTopicsResult.names().get();
        names.forEach(System.out::println);
    }      

​get​

​​方法会等待​

​future​

​完成,然后返回其结果。输出如下图所示:

Kafka:Topic概念与API介绍

通过下面这个配置,可以获取到​

​Kafka​

​​内置的​

​Topic​

​。

new ListTopicsOptions().listInternal(true)      

默认是不会获取到​

​Kafka​

​​内置的​

​Topic​

​。

public void listTopics() throws ExecutionException, InterruptedException {
        ListTopicsResult listTopicsResult = adminClient.listTopics();
        Set<String> names = listTopicsResult.names().get();
        names.forEach(System.out::println);
    }      
Kafka:Topic概念与API介绍

delete

删除​

​Topic​

​。

public void deleteTopic() throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(2);
        DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(Arrays.asList("java-client4", "java-client2"));
        deleteTopicsResult.topicNameValues().forEach((name, future) -> {
            future.whenComplete((a, throwable) -> {
                if(throwable != null) {
                    System.out.println(throwable.getMessage());
                }
                System.out.println(name);
                latch.countDown();
            });
        });
        latch.await();
    }      

输出如下图所示:

Kafka:Topic概念与API介绍

现在再获取​

​Topic​

​​的列表,输出如下图所示(删除的​

​Topic​

​已经不在了):

Kafka:Topic概念与API介绍

describe

获取​

​Topic​

​的描述。

public void describeTopic() {
        Map<String, KafkaFuture<TopicDescription>> values =
                adminClient.describeTopics(Collections.singleton("new-topic-kaven")).values();
        for (String name : values.keySet()) {
            values.get(name).whenComplete((describe, throwable) -> {
                if(throwable != null) {
                    System.out.println(throwable.getMessage());
                }
                System.out.println(name);
                System.out.println(describe);
            });
        }
    }      

输出如下图所示:

Kafka:Topic概念与API介绍

输出符合预期,因为创建该​

​Topic​

​的配置为:

new NewTopic("new-topic-kaven", 1, (short) 1)      

config

获取​

​Topic​

​的配置。

public void describeTopicConfig() throws ExecutionException, InterruptedException {
        DescribeConfigsResult describeConfigsResult = adminClient
                .describeConfigs(Collections.singleton(new ConfigResource(ConfigResource.Type.TOPIC, "new-topic-kaven")));
        describeConfigsResult.all().get().forEach(((configResource, config) -> {
            System.out.println(configResource);
            System.out.println(config);
        }));
    }      

输出如下图所示:

Kafka:Topic概念与API介绍

​describeConfigs​

​方法很显然还可以获取其他资源的配置(通过指定资源的类型)。

public enum Type {
        BROKER_LOGGER((byte) 8), BROKER((byte) 4), TOPIC((byte) 2), UNKNOWN((byte) 0);
        ...
    }      

alter

增量更新​

​Topic​

​的配置。

public void incrementalAlterConfig() throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(1);
        
        Map<ConfigResource, Collection<AlterConfigOp>> alter = new HashMap<>();
        alter.put(
                new ConfigResource(ConfigResource.Type.TOPIC, "new-topic-kaven"),
                Collections.singletonList(
                        new AlterConfigOp(
                                new ConfigEntry("compression.type", "gzip"), 
                                AlterConfigOp.OpType.SET
                        )
                )
                
        );
        
        AlterConfigsResult alterConfigsResult = adminClient.incrementalAlterConfigs(alter);
        alterConfigsResult.values().forEach(((configResource, voidKafkaFuture) -> {
            voidKafkaFuture.whenComplete((a, throwable) -> {
                if(throwable != null) {
                    System.out.println(throwable.getMessage());
                }
                System.out.println(configResource);
                latch.countDown();
            });
        }));
        latch.await();
    }      

输出如下图所示:

Kafka:Topic概念与API介绍

很显然​

​incrementalAlterConfigs​

​方法也可以增量更新其他资源的配置(通过指定资源的类型)。

​ConfigResource​

​​定义需要修改配置的资源,​

​Collection<AlterConfigOp>​

​定义该资源具体的配置修改操作。

Map<ConfigResource, Collection<AlterConfigOp>> alter = new HashMap<>();      

​configEntry​

​​定义资源需要修改的配置条目,​

​operationType​

​定义修改操作的类型。

public AlterConfigOp(ConfigEntry configEntry, OpType operationType) {
        this.configEntry = configEntry;
        this.opType =  operationType;
    }      

修改操作的类型。

public enum OpType {
        /**
         * 设置配置条目的值
         */
        SET((byte) 0),
        /**
         * 将配置条目恢复为默认值(可能为空)
         */
        DELETE((byte) 1),
        /**
         * 仅适用于列表类型的配置条目
         * 将指定的值添加到配置条目的当前值
         * 如果尚未设置配置值,则添加到默认值
         */
        APPEND((byte) 2),
        /**
         * 仅适用于列表类型的配置条目
         * 从配置条目的当前值中删除指定的值
         * 删除当前不在配置条目中的值是合法的
         * 从当前配置值中删除所有条目会留下一个空列表,并且不会恢复为条目的默认值
         */
        SUBTRACT((byte) 3);
        ...
    }      

资源的配置条目,包含配置名称、值等。

public class ConfigEntry {

    private final String name;
    private final String value;
    private final ConfigSource source;
    private final boolean isSensitive;
    private final boolean isReadOnly;
    private final List<ConfigSynonym> synonyms;
    private final ConfigType type;
    private final String documentation;
    ...
}      

在获取​

​Topic​

​配置的输出中也可以发现这些配置条目。

Kafka:Topic概念与API介绍

很显然,这里修改名称为​

​new-topic-kaven​

​​的​

​Topic​

​​的​

​compression.type​

​配置条目(压缩类型)。

alter.put(
                new ConfigResource(ConfigResource.Type.TOPIC, "new-topic-kaven"),
                Collections.singletonList(
                        new AlterConfigOp(
                                new ConfigEntry("compression.type", "gzip"), 
                                AlterConfigOp.OpType.SET
                        )
                )
                
        );      

​compression.type​

​​配置条目的默认值为​

​producer​

​​(意味着保留生产者设置的原始压缩编解码器),和上面的图也对应,博主将该配置条目修改成了​

​gzip​

​。

Kafka:Topic概念与API介绍

再来获取该​

​Topic​

​的配置,如下图所示(很显然配置修改成功了):

Kafka:Topic概念与API介绍