Topic
事件被组织并持久地存储在
Topic
中,
Topic
类似于文件系统中的文件夹,事件就是该文件夹中的文件。
Kafka
中的
Topic
始终是多生产者和多订阅者:一个
Topic
可以有零个、一个或多个生产者向其写入事件,也可以有零个、一个或多个消费者订阅这些事件。
Topic
中的事件可以根据需要随时读取,与传统的消息中间件不同,事件在使用后不会被删除,相反,可以通过配置来定义
Kafka
中每个
Topic
应该保留事件的时间,超过该事件后旧事件将被丢弃。
Kafka
的性能在数据大小方面实际上是恒定的,因此长时间存储数据是非常好的。
Partition
Topic
是分区的,这意味着一个
Topic
可以分布在多个
Kafka
节点上。数据的这种分布式放置对于可伸缩性非常重要,因为它允许客户端应用程序同时从多个
Kafka
节点读取和写入数据。将新事件发布到
Topic
时,它实际上会
appended
到
Topic
的一个
Partition
中。具有相同事件
key
的事件将写入同一
Partition
,
Kafka
保证给定
Topic
的
Partition
的任何使用者都将始终以与写入时完全相同的顺序读取该分区的事件。
Replication
为了使数据具有容错性和高可用性,每个
Topic
都可以有多个
Replication
,以便始终有多个
Kafka
节点具有数据副本,以防出现问题。常见的生产设置是
replicationFactor
为
3
,即始终有三份数据副本(包括一份原始数据)。此
Replication
在
Topic
的
Partition
级别执行。
Kafka
在指定数量(通过
replicationFactor
)的服务器上复制每个
Topic
的
Partition
,这允许在集群中的某些服务器发生故障时进行自动故障转移,以便在出现故障时服务仍然可用。
Replication
的单位是
Topic
的
Partition
。在非故障条件下,
Kafka
中的每个
Partition
都有一个
leader
和零个或多个
follower
。
replicationFactor
是复制副本(包括
leader
)的总数。所有读和写操作都将转到
Partition
的
leader
上。通常,有比
Kafka
节点多得多的
Partition
,并且这些
Partition
的
leader
在
Kafka
节点之间均匀分布。
follower
上的数据需要与
leader
的数据同步,所有数据都具有相同的偏移量和顺序(当然,在任何给定时间,
leader
的数据末尾可能有一些尚未复制的数据)。
follower
会像普通
Kafka
消费者一样使用来自
leader
的消息,并将其应用到自己的数据中。如下图所示,三个
Kafka
节点上有两个
Topic
(
Topic 0
和
Topic 1
),
Topic 0
有两个
Partition
并且
replicationFactor
为
3
(红色的
Partition
为
leader
),
Topic 1
有三个
Partition
,
replicationFactor
也为
3
(红色的
Partition
为
leader
)。
API
添加依赖:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.0.0</version>
</dependency>
这里使用的
kafka-clients
版本和博主之前部署的
Kafka
版本一致:
- Kafka:部署Kafka
client
操作
Topic
的客户端通过
AdminClient
抽象类来创建,源码如下:
package org.apache.kafka.clients.admin;
import java.util.Map;
import java.util.Properties;
public abstract class AdminClient implements Admin {
/**
* 使用给定的配置创建一个新的Admin
* props:Admin的配置
* 返回KafkaAdminClient实例
*/
public static AdminClient create(Properties props) {
return (AdminClient) Admin.create(props);
}
/**
* 重载方法
* 使用给定的配置创建一个新的Admin
* props:Admin的配置
* 返回KafkaAdminClient实例
*/
public static AdminClient create(Map<String, Object> conf) {
return (AdminClient) Admin.create(conf);
}
}
实际上会返回一个
KafkaAdminClient
实例(
KafkaAdminClient
类是
AdminClient
抽象类的子类),
KafkaAdminClient
类的方法比较多,其中
private
方法服务于
public
方法(提供给用户的服务)。
KafkaAdminClient
类提供的
public
方法是对
Admin
接口的实现。
create
创建一个新的
Topic
。
package com.kaven.kafka.admin;
import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.KafkaFuture;
import java.util.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
public class Admin {
private static final AdminClient adminClient = Admin.getAdminClient();
public static void main(String[] args) throws InterruptedException, ExecutionException {
Admin admin = new Admin();
admin.createTopic();
Thread.sleep(100000);
}
public void createTopic() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(1);
CreateTopicsResult topics = adminClient.createTopics(
Collections.singleton(new NewTopic("new-topic-kaven", 1, (short) 1))
);
Map<String, KafkaFuture<Void>> values = topics.values();
values.forEach((name, future) -> {
future.whenComplete((a, throwable) -> {
if(throwable != null) {
System.out.println(throwable.getMessage());
}
System.out.println(name);
latch.countDown();
});
});
latch.await();
}
public static AdminClient getAdminClient() {
Properties properties = new Properties();
properties.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.31.240:9092");
return AdminClient.create(properties);
}
}
创建
AdminClient
(简单使用,配置
BOOTSTRAP_SERVERS_CONFIG
就可以了):
public static AdminClient getAdminClient() {
Properties properties = new Properties();
properties.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.31.240:9092");
return AdminClient.create(properties);
}
创建
Topic
(传入一个
NewTopic
实例,并且给该
NewTopic
实例配置
name
、
numPartitions
、
replicationFactor
):
public void createTopic() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(1);
CreateTopicsResult topics = adminClient.createTopics(
Collections.singleton(new NewTopic("new-topic-kaven", 1, (short) 1))
);
Map<String, KafkaFuture<Void>> values = topics.values();
values.forEach((name, future) -> {
future.whenComplete((a, throwable) -> {
if(throwable != null) {
System.out.println(throwable.getMessage());
}
System.out.println(name);
latch.countDown();
});
});
latch.await();
}
提供的方法大都是异步编程模式的,这些基础知识就不介绍了,输出如下图所示:
list
获取
Topic
列表。
public void listTopics() throws ExecutionException, InterruptedException {
ListTopicsResult listTopicsResult = adminClient.
listTopics(new ListTopicsOptions().listInternal(true));
Set<String> names = listTopicsResult.names().get();
names.forEach(System.out::println);
}
get
方法会等待
future
完成,然后返回其结果。输出如下图所示:
通过下面这个配置,可以获取到
Kafka
内置的
Topic
。
new ListTopicsOptions().listInternal(true)
默认是不会获取到
Kafka
内置的
Topic
。
public void listTopics() throws ExecutionException, InterruptedException {
ListTopicsResult listTopicsResult = adminClient.listTopics();
Set<String> names = listTopicsResult.names().get();
names.forEach(System.out::println);
}
delete
删除
Topic
。
public void deleteTopic() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(2);
DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(Arrays.asList("java-client4", "java-client2"));
deleteTopicsResult.topicNameValues().forEach((name, future) -> {
future.whenComplete((a, throwable) -> {
if(throwable != null) {
System.out.println(throwable.getMessage());
}
System.out.println(name);
latch.countDown();
});
});
latch.await();
}
输出如下图所示:
现在再获取
Topic
的列表,输出如下图所示(删除的
Topic
已经不在了):
describe
获取
Topic
的描述。
public void describeTopic() {
Map<String, KafkaFuture<TopicDescription>> values =
adminClient.describeTopics(Collections.singleton("new-topic-kaven")).values();
for (String name : values.keySet()) {
values.get(name).whenComplete((describe, throwable) -> {
if(throwable != null) {
System.out.println(throwable.getMessage());
}
System.out.println(name);
System.out.println(describe);
});
}
}
输出如下图所示:
输出符合预期,因为创建该
Topic
的配置为:
new NewTopic("new-topic-kaven", 1, (short) 1)
config
获取
Topic
的配置。
public void describeTopicConfig() throws ExecutionException, InterruptedException {
DescribeConfigsResult describeConfigsResult = adminClient
.describeConfigs(Collections.singleton(new ConfigResource(ConfigResource.Type.TOPIC, "new-topic-kaven")));
describeConfigsResult.all().get().forEach(((configResource, config) -> {
System.out.println(configResource);
System.out.println(config);
}));
}
输出如下图所示:
describeConfigs
方法很显然还可以获取其他资源的配置(通过指定资源的类型)。
public enum Type {
BROKER_LOGGER((byte) 8), BROKER((byte) 4), TOPIC((byte) 2), UNKNOWN((byte) 0);
...
}
alter
增量更新
Topic
的配置。
public void incrementalAlterConfig() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(1);
Map<ConfigResource, Collection<AlterConfigOp>> alter = new HashMap<>();
alter.put(
new ConfigResource(ConfigResource.Type.TOPIC, "new-topic-kaven"),
Collections.singletonList(
new AlterConfigOp(
new ConfigEntry("compression.type", "gzip"),
AlterConfigOp.OpType.SET
)
)
);
AlterConfigsResult alterConfigsResult = adminClient.incrementalAlterConfigs(alter);
alterConfigsResult.values().forEach(((configResource, voidKafkaFuture) -> {
voidKafkaFuture.whenComplete((a, throwable) -> {
if(throwable != null) {
System.out.println(throwable.getMessage());
}
System.out.println(configResource);
latch.countDown();
});
}));
latch.await();
}
输出如下图所示:
很显然
incrementalAlterConfigs
方法也可以增量更新其他资源的配置(通过指定资源的类型)。
ConfigResource
定义需要修改配置的资源,
Collection<AlterConfigOp>
定义该资源具体的配置修改操作。
Map<ConfigResource, Collection<AlterConfigOp>> alter = new HashMap<>();
configEntry
定义资源需要修改的配置条目,
operationType
定义修改操作的类型。
public AlterConfigOp(ConfigEntry configEntry, OpType operationType) {
this.configEntry = configEntry;
this.opType = operationType;
}
修改操作的类型。
public enum OpType {
/**
* 设置配置条目的值
*/
SET((byte) 0),
/**
* 将配置条目恢复为默认值(可能为空)
*/
DELETE((byte) 1),
/**
* 仅适用于列表类型的配置条目
* 将指定的值添加到配置条目的当前值
* 如果尚未设置配置值,则添加到默认值
*/
APPEND((byte) 2),
/**
* 仅适用于列表类型的配置条目
* 从配置条目的当前值中删除指定的值
* 删除当前不在配置条目中的值是合法的
* 从当前配置值中删除所有条目会留下一个空列表,并且不会恢复为条目的默认值
*/
SUBTRACT((byte) 3);
...
}
资源的配置条目,包含配置名称、值等。
public class ConfigEntry {
private final String name;
private final String value;
private final ConfigSource source;
private final boolean isSensitive;
private final boolean isReadOnly;
private final List<ConfigSynonym> synonyms;
private final ConfigType type;
private final String documentation;
...
}
在获取
Topic
配置的输出中也可以发现这些配置条目。
很显然,这里修改名称为
new-topic-kaven
的
Topic
的
compression.type
配置条目(压缩类型)。
alter.put(
new ConfigResource(ConfigResource.Type.TOPIC, "new-topic-kaven"),
Collections.singletonList(
new AlterConfigOp(
new ConfigEntry("compression.type", "gzip"),
AlterConfigOp.OpType.SET
)
)
);
compression.type
配置条目的默认值为
producer
(意味着保留生产者设置的原始压缩编解码器),和上面的图也对应,博主将该配置条目修改成了
gzip
。
再来获取该
Topic
的配置,如下图所示(很显然配置修改成功了):