天天看點

叢集管理工具KafkaAdminClient理與示例

歡迎支援筆者新作:《深入了解Kafka:核心設計與實踐原理》和《RabbitMQ實戰指南》,同時歡迎關注筆者的微信公衆号:朱小厮的部落格。

前言

一般情況下,我們都習慣使用Kafka中bin目錄下的腳本工具來管理檢視Kafka,但是有些時候需要将某些管理檢視的功能內建到系統(比如Kafka Manager)中,那麼就需要調用一些API來直接操作Kafka了。在Kafka0.11.0.0版本之前,可以通過kafka-core包(Kafka的服務端代碼,采用Scala編寫)下的AdminClient和AdminUtils來實作部分的叢集管理操作,比如筆者之前在Kafka解析之topic建立(1)和Kafka解析之topic建立(2)兩篇文章中所講解的Topic的建立就用到了AdminUtils類。而在Kafka0.11.0.0版本之後,又多了一個AdminClient,這個是在kafka-client包下的,這是一個抽象類,具體的實作是org.apache.kafka.clients.admin.KafkaAdminClient,這個就是本文所要陳述的重點了。

功能與原理介紹

在Kafka官網中這麼描述AdminClient:The AdminClient API supports managing and inspecting topics, brokers, acls, and other Kafka objects. 具體的KafkaAdminClient包含了一下幾種功能(以Kafka1.0.0版本為準):

建立Topic:createTopics(Collection<NewTopic> newTopics)

删除Topic:deleteTopics(Collection<String> topics)

羅列所有Topic:listTopics()

查詢Topic:describeTopics(Collection<String> topicNames)

查詢叢集資訊:describeCluster()

查詢ACL資訊:describeAcls(AclBindingFilter filter)

建立ACL資訊:createAcls(Collection<AclBinding> acls)

删除ACL資訊:deleteAcls(Collection<AclBindingFilter> filters)

查詢配置資訊:describeConfigs(Collection<ConfigResource> resources)

修改配置資訊:alterConfigs(Map<ConfigResource, Config> configs)

修改副本的日志目錄:alterReplicaLogDirs(Map<TopicPartitionReplica, String> replicaAssignment)

查詢節點的日志目錄資訊:describeLogDirs(Collection<Integer> brokers)

查詢副本的日志目錄資訊:describeReplicaLogDirs(Collection<TopicPartitionReplica> replicas)

增加分區:createPartitions(Map<String, NewPartitions> newPartitions)

其内部原理是使用Kafka自定義的一套二進制協定來實作,詳細可以參見Kafka協定。主要實作步驟:

用戶端根據方法的調用建立相應的協定請求,比如建立Topic的createTopics方法,其内部就是發送CreateTopicRequest請求。

用戶端發送請求至Kafka Broker。

Kafka Broker處理相應的請求并回執,比如與CreateTopicRequest對應的是CreateTopicResponse。

用戶端接收相應的回執并進行解析處理。

和協定有關的請求和回執的類基本都在org.apache.kafka.common.requests包中,AbstractRequest和AbstractResponse是這些請求和回執類的兩個基本父類。

示例

下面就以建立Topic來舉一個簡單的KafkaAdminClient的使用案例,【代碼清單1】:

private static final String NEW_TOPIC = "topic-test2";

private static final String brokerUrl = "localhost:9092";

private static AdminClient adminClient;