Topic
事件被組織并持久地存儲在
Topic
中,
Topic
類似于檔案系統中的檔案夾,事件就是該檔案夾中的檔案。
Kafka
中的
Topic
始終是多生産者和多訂閱者:一個
Topic
可以有零個、一個或多個生産者向其寫入事件,也可以有零個、一個或多個消費者訂閱這些事件。
Topic
中的事件可以根據需要随時讀取,與傳統的消息中間件不同,事件在使用後不會被删除,相反,可以通過配置來定義
Kafka
中每個
Topic
應該保留事件的時間,超過該事件後舊事件将被丢棄。
Kafka
的性能在資料大小方面實際上是恒定的,是以長時間存儲資料是非常好的。
Partition
Topic
是分區的,這意味着一個
Topic
可以分布在多個
Kafka
節點上。資料的這種分布式放置對于可伸縮性非常重要,因為它允許用戶端應用程式同時從多個
Kafka
節點讀取和寫入資料。将新事件釋出到
Topic
時,它實際上會
appended
到
Topic
的一個
Partition
中。具有相同僚件
key
的事件将寫入同一
Partition
,
Kafka
保證給定
Topic
的
Partition
的任何使用者都将始終以與寫入時完全相同的順序讀取該分區的事件。
Replication
為了使資料具有容錯性和高可用性,每個
Topic
都可以有多個
Replication
,以便始終有多個
Kafka
節點具有資料副本,以防出現問題。常見的生産設定是
replicationFactor
為
3
,即始終有三份資料副本(包括一份原始資料)。此
Replication
在
Topic
的
Partition
級别執行。
Kafka
在指定數量(通過
replicationFactor
)的伺服器上複制每個
Topic
的
Partition
,這允許在叢集中的某些伺服器發生故障時進行自動故障轉移,以便在出現故障時服務仍然可用。
Replication
的機關是
Topic
的
Partition
。在非故障條件下,
Kafka
中的每個
Partition
都有一個
leader
和零個或多個
follower
。
replicationFactor
是複制副本(包括
leader
)的總數。所有讀和寫操作都将轉到
Partition
的
leader
上。通常,有比
Kafka
節點多得多的
Partition
,并且這些
Partition
的
leader
在
Kafka
節點之間均勻分布。
follower
上的資料需要與
leader
的資料同步,所有資料都具有相同的偏移量和順序(當然,在任何給定時間,
leader
的資料末尾可能有一些尚未複制的資料)。
follower
會像普通
Kafka
消費者一樣使用來自
leader
的消息,并将其應用到自己的資料中。如下圖所示,三個
Kafka
節點上有兩個
Topic
(
Topic 0
和
Topic 1
),
Topic 0
有兩個
Partition
并且
replicationFactor
為
3
(紅色的
Partition
為
leader
),
Topic 1
有三個
Partition
,
replicationFactor
也為
3
(紅色的
Partition
為
leader
)。
API
添加依賴:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.0.0</version>
</dependency>
這裡使用的
kafka-clients
版本和部落客之前部署的
Kafka
版本一緻:
- Kafka:部署Kafka
client
操作
Topic
的用戶端通過
AdminClient
抽象類來建立,源碼如下:
package org.apache.kafka.clients.admin;
import java.util.Map;
import java.util.Properties;
public abstract class AdminClient implements Admin {
/**
* 使用給定的配置建立一個新的Admin
* props:Admin的配置
* 傳回KafkaAdminClient執行個體
*/
public static AdminClient create(Properties props) {
return (AdminClient) Admin.create(props);
}
/**
* 重載方法
* 使用給定的配置建立一個新的Admin
* props:Admin的配置
* 傳回KafkaAdminClient執行個體
*/
public static AdminClient create(Map<String, Object> conf) {
return (AdminClient) Admin.create(conf);
}
}
實際上會傳回一個
KafkaAdminClient
執行個體(
KafkaAdminClient
類是
AdminClient
抽象類的子類),
KafkaAdminClient
類的方法比較多,其中
private
方法服務于
public
方法(提供給使用者的服務)。
KafkaAdminClient
類提供的
public
方法是對
Admin
接口的實作。
create
建立一個新的
Topic
。
package com.kaven.kafka.admin;
import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.KafkaFuture;
import java.util.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
public class Admin {
private static final AdminClient adminClient = Admin.getAdminClient();
public static void main(String[] args) throws InterruptedException, ExecutionException {
Admin admin = new Admin();
admin.createTopic();
Thread.sleep(100000);
}
public void createTopic() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(1);
CreateTopicsResult topics = adminClient.createTopics(
Collections.singleton(new NewTopic("new-topic-kaven", 1, (short) 1))
);
Map<String, KafkaFuture<Void>> values = topics.values();
values.forEach((name, future) -> {
future.whenComplete((a, throwable) -> {
if(throwable != null) {
System.out.println(throwable.getMessage());
}
System.out.println(name);
latch.countDown();
});
});
latch.await();
}
public static AdminClient getAdminClient() {
Properties properties = new Properties();
properties.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.31.240:9092");
return AdminClient.create(properties);
}
}
建立
AdminClient
(簡單使用,配置
BOOTSTRAP_SERVERS_CONFIG
就可以了):
public static AdminClient getAdminClient() {
Properties properties = new Properties();
properties.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.31.240:9092");
return AdminClient.create(properties);
}
建立
Topic
(傳入一個
NewTopic
執行個體,并且給該
NewTopic
執行個體配置
name
、
numPartitions
、
replicationFactor
):
public void createTopic() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(1);
CreateTopicsResult topics = adminClient.createTopics(
Collections.singleton(new NewTopic("new-topic-kaven", 1, (short) 1))
);
Map<String, KafkaFuture<Void>> values = topics.values();
values.forEach((name, future) -> {
future.whenComplete((a, throwable) -> {
if(throwable != null) {
System.out.println(throwable.getMessage());
}
System.out.println(name);
latch.countDown();
});
});
latch.await();
}
提供的方法大都是異步程式設計模式的,這些基礎知識就不介紹了,輸出如下圖所示:
list
擷取
Topic
清單。
public void listTopics() throws ExecutionException, InterruptedException {
ListTopicsResult listTopicsResult = adminClient.
listTopics(new ListTopicsOptions().listInternal(true));
Set<String> names = listTopicsResult.names().get();
names.forEach(System.out::println);
}
get
方法會等待
future
完成,然後傳回其結果。輸出如下圖所示:
通過下面這個配置,可以擷取到
Kafka
内置的
Topic
。
new ListTopicsOptions().listInternal(true)
預設是不會擷取到
Kafka
内置的
Topic
。
public void listTopics() throws ExecutionException, InterruptedException {
ListTopicsResult listTopicsResult = adminClient.listTopics();
Set<String> names = listTopicsResult.names().get();
names.forEach(System.out::println);
}
delete
删除
Topic
。
public void deleteTopic() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(2);
DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(Arrays.asList("java-client4", "java-client2"));
deleteTopicsResult.topicNameValues().forEach((name, future) -> {
future.whenComplete((a, throwable) -> {
if(throwable != null) {
System.out.println(throwable.getMessage());
}
System.out.println(name);
latch.countDown();
});
});
latch.await();
}
輸出如下圖所示:
現在再擷取
Topic
的清單,輸出如下圖所示(删除的
Topic
已經不在了):
describe
擷取
Topic
的描述。
public void describeTopic() {
Map<String, KafkaFuture<TopicDescription>> values =
adminClient.describeTopics(Collections.singleton("new-topic-kaven")).values();
for (String name : values.keySet()) {
values.get(name).whenComplete((describe, throwable) -> {
if(throwable != null) {
System.out.println(throwable.getMessage());
}
System.out.println(name);
System.out.println(describe);
});
}
}
輸出如下圖所示:
輸出符合預期,因為建立該
Topic
的配置為:
new NewTopic("new-topic-kaven", 1, (short) 1)
config
擷取
Topic
的配置。
public void describeTopicConfig() throws ExecutionException, InterruptedException {
DescribeConfigsResult describeConfigsResult = adminClient
.describeConfigs(Collections.singleton(new ConfigResource(ConfigResource.Type.TOPIC, "new-topic-kaven")));
describeConfigsResult.all().get().forEach(((configResource, config) -> {
System.out.println(configResource);
System.out.println(config);
}));
}
輸出如下圖所示:
describeConfigs
方法很顯然還可以擷取其他資源的配置(通過指定資源的類型)。
public enum Type {
BROKER_LOGGER((byte) 8), BROKER((byte) 4), TOPIC((byte) 2), UNKNOWN((byte) 0);
...
}
alter
增量更新
Topic
的配置。
public void incrementalAlterConfig() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(1);
Map<ConfigResource, Collection<AlterConfigOp>> alter = new HashMap<>();
alter.put(
new ConfigResource(ConfigResource.Type.TOPIC, "new-topic-kaven"),
Collections.singletonList(
new AlterConfigOp(
new ConfigEntry("compression.type", "gzip"),
AlterConfigOp.OpType.SET
)
)
);
AlterConfigsResult alterConfigsResult = adminClient.incrementalAlterConfigs(alter);
alterConfigsResult.values().forEach(((configResource, voidKafkaFuture) -> {
voidKafkaFuture.whenComplete((a, throwable) -> {
if(throwable != null) {
System.out.println(throwable.getMessage());
}
System.out.println(configResource);
latch.countDown();
});
}));
latch.await();
}
輸出如下圖所示:
很顯然
incrementalAlterConfigs
方法也可以增量更新其他資源的配置(通過指定資源的類型)。
ConfigResource
定義需要修改配置的資源,
Collection<AlterConfigOp>
定義該資源具體的配置修改操作。
Map<ConfigResource, Collection<AlterConfigOp>> alter = new HashMap<>();
configEntry
定義資源需要修改的配置條目,
operationType
定義修改操作的類型。
public AlterConfigOp(ConfigEntry configEntry, OpType operationType) {
this.configEntry = configEntry;
this.opType = operationType;
}
修改操作的類型。
public enum OpType {
/**
* 設定配置條目的值
*/
SET((byte) 0),
/**
* 将配置條目恢複為預設值(可能為空)
*/
DELETE((byte) 1),
/**
* 僅适用于清單類型的配置條目
* 将指定的值添加到配置條目的目前值
* 如果尚未設定配置值,則添加到預設值
*/
APPEND((byte) 2),
/**
* 僅适用于清單類型的配置條目
* 從配置條目的目前值中删除指定的值
* 删除目前不在配置條目中的值是合法的
* 從目前配置值中删除所有條目會留下一個空清單,并且不會恢複為條目的預設值
*/
SUBTRACT((byte) 3);
...
}
資源的配置條目,包含配置名稱、值等。
public class ConfigEntry {
private final String name;
private final String value;
private final ConfigSource source;
private final boolean isSensitive;
private final boolean isReadOnly;
private final List<ConfigSynonym> synonyms;
private final ConfigType type;
private final String documentation;
...
}
在擷取
Topic
配置的輸出中也可以發現這些配置條目。
很顯然,這裡修改名稱為
new-topic-kaven
的
Topic
的
compression.type
配置條目(壓縮類型)。
alter.put(
new ConfigResource(ConfigResource.Type.TOPIC, "new-topic-kaven"),
Collections.singletonList(
new AlterConfigOp(
new ConfigEntry("compression.type", "gzip"),
AlterConfigOp.OpType.SET
)
)
);
compression.type
配置條目的預設值為
producer
(意味着保留生産者設定的原始壓縮編解碼器),和上面的圖也對應,部落客将該配置條目修改成了
gzip
。
再來擷取該
Topic
的配置,如下圖所示(很顯然配置修改成功了):