背景:我使用docker-compose 搭建的kafka服務
kafka的簡單介紹以及docker-compose部署單主機Kafka叢集
Kafka API簡單介紹
kafka除了用于管理和管理任務的指令行工具,Kafka還有5個用于Java和Scala的核心API
他們分别是
- The Admin API : 用于管理和inspect topics, brokers和其他 Kafka 對象
- The Producer API: 将事件流釋出(寫入)到一個或多個 Kafka topics
- The Consumer API: 訂閱(讀取)一個或多個topics并處理它們生成的事件流
- The Kafka Streams API: 用于實作流處理應用程式和微服務,它提供了更進階别的方法來處理事件流,包括轉換、聚合和連接配接等有狀态操作、視窗化、基于事件時間的處理等等。從一個或多個topics讀取輸入以生成一個或多個topics的輸出,有效地将輸入流轉換為輸出流。
- The Kafka Connect API:用于建構和運作可重用 的資料導入/導出connectors,這些connectors從外部系統和應用程式消費(讀取)或産生(寫入)事件流,以便它們可以與 Kafka 內建。例如,與 PostgreSQL 等關系資料庫的連接配接器可能會捕獲表的每次更改。但是,在實踐中,您通常不需要實作自己的connectors,因為 Kafka 社群已經提供了數百個即用型connectors。
我使用的wurstmeister/kafka鏡像的kafka是2.8.1版本的,通過docker inspect指令可以檢視
建立項目kafkademo用以測試
我使用IDEA 的Spring Initializr建立項目,引入web依賴。
我使用的spring boot 版本是2.6.3,kafka-clients 是2.8.1,和kafka server是對應的
pom.xml檔案
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.6.3</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.xt</groupId>
<artifactId>kafkademo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>kafkademo</name>
<description>kafkademo</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.8.1</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
建立連接配接用戶端
首先設定一個連接配接kafka的用戶端類AdminClient,他是一個抽象類,隻有一個KafkaAdminClient實作,如下代碼所示,他提供了兩種建立方式
//Admin的預設實作。 通過調用AdminClient中的create()方法之一來建立此類的執行個體。 使用者不應直接引用此類。
//這個類是線程安全的。
public abstract class AdminClient implements Admin {
/**
* Create a new Admin with the given configuration.
*
* @param props The configuration.
* @return The new KafkaAdminClient.
*/
public static AdminClient create(Properties props) {
return (AdminClient) Admin.create(props);
}
/**
* Create a new Admin with the given configuration.
*
* @param conf The configuration.
* @return The new KafkaAdminClient.
*/
public static AdminClient create(Map<String, Object> conf) {
return (AdminClient) Admin.create(conf);
}
}
通過使用Properties來建立連接配接kafka server的用戶端
//設定AdminClient
public static AdminClient adminClient(){
//Properties類表示一組持久的屬性。 Properties可以儲存到流中或從流中加載。屬性清單中的每個鍵及其對應的值都是一個字元串。繼承自Hashtable
Properties properties = new Properties();
//AdminClientConfig是AdminClient配置類,它還包含用于配置條目名稱的常量。
properties.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,"kafka伺服器IP:9092");
properties.put(AdminClientConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, 10000);
properties.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 5000);
//内置管理用戶端的基類 ,是一個抽象類,下面有一個實作 為KafkaAdminClient,此類是保證了線程安全
AdminClient adminClient = AdminClient.create(properties);
return adminClient;
}
建立topic
//建立Topic執行個體
public static void createTopic() {
AdminClient adminClient = adminClient();
// 副本因子
Short rs = 1;
//建立具有指定副本因子和分區數的新topic。
NewTopic newTopic = new NewTopic(TOPIC_NAME, 1 , rs);
//建立一批新主題。
//此操作不是事務性的,是以它可能對某些主題成功,而對另一些主題則失敗。
//CreateTopicsResult傳回成功後,所有代理可能需要幾秒鐘才能意識到主題已建立。在此期間, listTopics()和describeTopics(Collection)可能不會傳回有關新主題的資訊。
CreateTopicsResult topics = adminClient.createTopics(Arrays.asList(newTopic));
System.out.println("建立topic成功 : "+ topics.toString());
System.out.println("---------------------------------------------------------------");
}
擷取Topic清單
//擷取Topic清單
public static void topicLists() throws Exception {
AdminClient adminClient = adminClient();
// 是否檢視internal選項
ListTopicsOptions options = new ListTopicsOptions();
//設定我們是否應該列出内部topic。
options.listInternal(true);
//列出叢集中可用的topic。
ListTopicsResult listTopicsResult = adminClient.listTopics(options);
//傳回一個topic名稱集合的future(這裡是KafkaFuture)
Set<String> names = listTopicsResult.names().get();
//傳回一個KafkaFuture,它産生一個 TopicListing 對象的集合
Collection<TopicListing> topicListings = listTopicsResult.listings().get();
//傳回一個KafkaFuture,它産生一個topic名稱到 TopicListing 對象的映射。
KafkaFuture<Map<String, TopicListing>> mapKafkaFuture = listTopicsResult.namesToListings();
// 列印names
names.stream().forEach(System.out::println);
System.out.println("---------------------------topic清單-------------------------");
// 列印topicListings
topicListings.stream().forEach((topicList)->{
System.out.println(topicList);
});
System.out.println("---------------------------topic清單-------------------------");
}
擷取topic的描述資訊
// 擷取描述topic的資訊
public static void describeTopics() throws Exception {
AdminClient adminClient = adminClient();
//描述叢集中的一些topic。
DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(Arrays.asList(TOPIC_NAME));
Map<String, TopicDescription> stringTopicDescriptionMap = describeTopicsResult.all().get();
Set<Map.Entry<String, TopicDescription>> entries = stringTopicDescriptionMap.entrySet();
System.out.println("----------------------------topic資訊-----------------------------");
entries.stream().forEach((entry)->{
System.out.println("name :"+entry.getKey()+" , desc: "+ entry.getValue());
});
System.out.println("----------------------------topic資訊-----------------------------");
}
修改配置資訊
//修改Config資訊
public static void alterConfig() throws Exception{
AdminClient adminClient = adminClient();
Map<ConfigResource,Collection<AlterConfigOp>> configMaps = new HashMap<>();
//具有配置的資源的類,需要提供type和名稱 Type是他内部維護的枚舉類,共有四種類型:BROKER_LOGGER((byte) 8), BROKER((byte) 4), TOPIC((byte) 2), UNKNOWN((byte) 0)
ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, TOPIC_NAME);
//包含名稱、值和操作類型的更改配置條目的類 ,需要注入ConfigEntry,和操作類型,同樣OpType是個枚舉類
AlterConfigOp alterConfigOp = new AlterConfigOp(new ConfigEntry("preallocate","false"),AlterConfigOp.OpType.SET);
configMaps.put(configResource,Arrays.asList(alterConfigOp));
//逐漸更新指定資源的配置
AlterConfigsResult alterConfigsResult = adminClient.incrementalAlterConfigs(configMaps);
alterConfigsResult.all().get();
}
擷取配置的描述資訊
//擷取描述配置的資訊
public static void describeConfig() throws Exception{
AdminClient adminClient = adminClient();
ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, TOPIC_NAME);
//擷取指定資源的配置
DescribeConfigsResult describeConfigsResult = adminClient.describeConfigs(Arrays.asList(configResource));
Map<ConfigResource, Config> configResourceConfigMap = describeConfigsResult.all().get();
System.out.println("----------------------------配置資訊-----------------------------");
configResourceConfigMap.entrySet().stream().forEach((entry)->{
System.out.println("configResource : "+entry.getKey()+" , Config : "+entry.getValue());
});
System.out.println("----------------------------配置資訊-----------------------------");
}
增加topic的partition數量
//增加partition數量
public static void incrPartitions(int partitions) throws Exception{
AdminClient adminClient = adminClient();
Map<String, NewPartitions> partitionsMap = new HashMap<>();
NewPartitions newPartitions = NewPartitions.increaseTo(partitions);
partitionsMap.put(TOPIC_NAME, newPartitions);
CreatePartitionsResult createPartitionsResult = adminClient.createPartitions(partitionsMap);
createPartitionsResult.all().get();
}
删除Topic
//删除Topic
public static void delTopics() throws Exception {
AdminClient adminClient = adminClient();
//删除一批topic。
//此操作不是事務性的,是以它可能對某些主題成功,而對另一些主題則失敗。
//DeleteTopicsResult傳回成功後,所有代理可能需要幾秒鐘才能意識到主題已消失。 在此期間, listTopics()和describeTopics(Collection)可能會繼續傳回有關已删除主題的資訊。
DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(Arrays.asList(TOPIC_NAME));
deleteTopicsResult.all().get();
}
完整代碼
public class AdminSample {
public final static String TOPIC_NAME="xt";
//設定AdminClient
public static AdminClient adminClient(){
//Properties類表示一組持久的屬性。 Properties可以儲存到流中或從流中加載。屬性清單中的每個鍵及其對應的值都是一個字元串。繼承自Hashtable
Properties properties = new Properties();
//AdminClientConfig是AdminClient配置類,它還包含用于配置條目名稱的常量。
properties.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,"kafka IP:9092");
properties.put(AdminClientConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, 10000);
properties.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 5000);
//内置管理用戶端的基類 ,是一個抽象類,下面有一個實作 為KafkaAdminClient,此類是保證了線程安全
AdminClient adminClient = AdminClient.create(properties);
return adminClient;
}
public static void main(String[] args) throws Exception {
AdminClient adminClient = AdminSample.adminClient();
System.out.println("adminClient : "+ adminClient);
// 建立Topic執行個體
createTopic();
// 擷取Topic清單
topicLists();
// 描述Topic
describeTopics();
// 修改Config
alterConfig();
// 查詢Config
describeConfig();
// 增加partition數量
incrPartitions(2);
// 删除Topic執行個體
delTopics();
}
//建立Topic執行個體
public static void createTopic() {
AdminClient adminClient = adminClient();
// 副本因子
Short rs = 1;
//建立具有指定副本因子和分區數的新topic。
NewTopic newTopic = new NewTopic(TOPIC_NAME, 1 , rs);
//建立一批新主題。
//此操作不是事務性的,是以它可能對某些主題成功,而對另一些主題則失敗。
//CreateTopicsResult傳回成功後,所有代理可能需要幾秒鐘才能意識到主題已建立。在此期間, listTopics()和describeTopics(Collection)可能不會傳回有關新主題的資訊。
CreateTopicsResult topics = adminClient.createTopics(Arrays.asList(newTopic));
System.out.println("建立topic成功 : "+ topics.toString());
System.out.println("---------------------------------------------------------------");
}
//擷取Topic清單
public static void topicLists() throws Exception {
AdminClient adminClient = adminClient();
// 是否檢視internal選項
ListTopicsOptions options = new ListTopicsOptions();
//設定我們是否應該列出内部topic。
options.listInternal(true);
//列出叢集中可用的topic。
ListTopicsResult listTopicsResult = adminClient.listTopics(options);
//傳回一個topic名稱集合的future(這裡是KafkaFuture)
Set<String> names = listTopicsResult.names().get();
//傳回一個KafkaFuture,它産生一個 TopicListing 對象的集合
Collection<TopicListing> topicListings = listTopicsResult.listings().get();
//傳回一個KafkaFuture,它産生一個topic名稱到 TopicListing 對象的映射。
KafkaFuture<Map<String, TopicListing>> mapKafkaFuture = listTopicsResult.namesToListings();
// 列印names
names.stream().forEach(System.out::println);
System.out.println("---------------------------topic清單-------------------------");
// 列印topicListings
topicListings.stream().forEach((topicList)->{
System.out.println(topicList);
});
System.out.println("---------------------------topic清單-------------------------");
}
// 擷取描述topic的資訊
public static void describeTopics() throws Exception {
AdminClient adminClient = adminClient();
//描述叢集中的一些topic。
DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(Arrays.asList(TOPIC_NAME));
Map<String, TopicDescription> stringTopicDescriptionMap = describeTopicsResult.all().get();
Set<Map.Entry<String, TopicDescription>> entries = stringTopicDescriptionMap.entrySet();
System.out.println("----------------------------topic資訊-----------------------------");
entries.stream().forEach((entry)->{
System.out.println("name :"+entry.getKey()+" , desc: "+ entry.getValue());
});
System.out.println("----------------------------topic資訊-----------------------------");
}
//修改Config資訊
public static void alterConfig() throws Exception{
AdminClient adminClient = adminClient();
Map<ConfigResource,Collection<AlterConfigOp>> configMaps = new HashMap<>();
//具有配置的資源的類,需要提供type和名稱 Type是他内部維護的枚舉類,共有四種類型:BROKER_LOGGER((byte) 8), BROKER((byte) 4), TOPIC((byte) 2), UNKNOWN((byte) 0)
ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, TOPIC_NAME);
//包含名稱、值和操作類型的更改配置條目的類 ,需要注入ConfigEntry,和操作類型,同樣OpType是個枚舉類
AlterConfigOp alterConfigOp = new AlterConfigOp(new ConfigEntry("preallocate","false"),AlterConfigOp.OpType.SET);
configMaps.put(configResource,Arrays.asList(alterConfigOp));
//逐漸更新指定資源的配置
AlterConfigsResult alterConfigsResult = adminClient.incrementalAlterConfigs(configMaps);
alterConfigsResult.all().get();
}
//擷取描述配置的資訊
public static void describeConfig() throws Exception{
AdminClient adminClient = adminClient();
ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, TOPIC_NAME);
//擷取指定資源的配置
DescribeConfigsResult describeConfigsResult = adminClient.describeConfigs(Arrays.asList(configResource));
Map<ConfigResource, Config> configResourceConfigMap = describeConfigsResult.all().get();
System.out.println("----------------------------配置資訊-----------------------------");
configResourceConfigMap.entrySet().stream().forEach((entry)->{
System.out.println("configResource : "+entry.getKey()+" , Config : "+entry.getValue());
});
System.out.println("----------------------------配置資訊-----------------------------");
}
//增加partition數量
public static void incrPartitions(int partitions) throws Exception{
AdminClient adminClient = adminClient();
Map<String, NewPartitions> partitionsMap = new HashMap<>();
NewPartitions newPartitions = NewPartitions.increaseTo(partitions);
partitionsMap.put(TOPIC_NAME, newPartitions);
CreatePartitionsResult createPartitionsResult = adminClient.createPartitions(partitionsMap);
createPartitionsResult.all().get();
}
//删除Topic
public static void delTopics() throws Exception {
AdminClient adminClient = adminClient();
//删除一批topic。
//此操作不是事務性的,是以它可能對某些主題成功,而對另一些主題則失敗。
//DeleteTopicsResult傳回成功後,所有代理可能需要幾秒鐘才能意識到主題已消失。 在此期間, listTopics()和describeTopics(Collection)可能會繼續傳回有關已删除主題的資訊。
DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(Arrays.asList(TOPIC_NAME));
deleteTopicsResult.all().get();
}
}
References:
- https://kafka.apache.org/intro