天天看點

kafka用戶端操作之Admin API

背景:我使用docker-compose 搭建的kafka服務

​kafka的簡單介紹以及docker-compose部署單主機Kafka叢集​​

Kafka API簡單介紹

kafka除了用于管理和管理任務的指令行工具,Kafka還有5個用于Java和Scala的核心API

他們分别是

  • The Admin API : 用于管理和inspect topics, brokers和其他 Kafka 對象
  • The Producer API: 将事件流釋出(寫入)到一個或多個 Kafka topics
  • The Consumer API: 訂閱(讀取)一個或多個topics并處理它們生成的事件流
  • The Kafka Streams API: 用于實作流處理應用程式和微服務,它提供了更進階别的方法來處理事件流,包括轉換、聚合和連接配接等有狀态操作、視窗化、基于事件時間的處理等等。從一個或多個topics讀取輸入以生成一個或多個topics的輸出,有效地将輸入流轉換為輸出流。
  • The Kafka Connect API:用于建構和運作可重用 的資料導入/導出connectors,這些connectors從外部系統和應用程式消費(讀取)或産生(寫入)事件流,以便它們可以與 Kafka 內建。例如,與 PostgreSQL 等關系資料庫的連接配接器可能會捕獲表的每次更改。但是,在實踐中,您通常不需要實作自己的connectors,因為 Kafka 社群已經提供了數百個即用型connectors。

我使用的wurstmeister/kafka鏡像的kafka是2.8.1版本的,通過docker inspect指令可以檢視

kafka用戶端操作之Admin API

建立項目kafkademo用以測試

我使用IDEA 的Spring Initializr建立項目,引入web依賴。

我使用的spring boot 版本是2.6.3,kafka-clients 是2.8.1,和kafka server是對應的

pom.xml檔案

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.6.3</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.xt</groupId>
    <artifactId>kafkademo</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>kafkademo</name>
    <description>kafkademo</description>
    <properties>
        <java.version>1.8</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.8.1</version>
        </dependency>
    </dependencies>
    
    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>      

建立連接配接用戶端

首先設定一個連接配接kafka的用戶端類AdminClient,他是一個抽象類,隻有一個KafkaAdminClient實作,如下代碼所示,他提供了兩種建立方式

//Admin的預設實作。 通過調用AdminClient中的create()方法之一來建立此類的執行個體。 使用者不應直接引用此類。
//這個類是線程安全的。
public abstract class AdminClient implements Admin {

    /**
     * Create a new Admin with the given configuration.
     *
     * @param props The configuration.
     * @return The new KafkaAdminClient.
     */
    public static AdminClient create(Properties props) {
        return (AdminClient) Admin.create(props);
    }

    /**
     * Create a new Admin with the given configuration.
     *
     * @param conf The configuration.
     * @return The new KafkaAdminClient.
     */
    public static AdminClient create(Map<String, Object> conf) {
        return (AdminClient) Admin.create(conf);
    }
}      

通過使用Properties來建立連接配接kafka server的用戶端

//設定AdminClient
public static AdminClient adminClient(){
    //Properties類表示一組持久的屬性。 Properties可以儲存到流中或從流中加載。屬性清單中的每個鍵及其對應的值都是一個字元串。繼承自Hashtable
    Properties properties = new Properties();
    //AdminClientConfig是AdminClient配置類,它還包含用于配置條目名稱的常量。
    properties.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,"kafka伺服器IP:9092");
    properties.put(AdminClientConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, 10000);
    properties.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 5000);
    //内置管理用戶端的基類 ,是一個抽象類,下面有一個實作 為KafkaAdminClient,此類是保證了線程安全
    AdminClient adminClient = AdminClient.create(properties);
    return adminClient;
}      

建立topic

//建立Topic執行個體
public static void createTopic() {

    AdminClient adminClient = adminClient();
    // 副本因子
    Short rs = 1;
    //建立具有指定副本因子和分區數的新topic。
    NewTopic newTopic = new NewTopic(TOPIC_NAME, 1 , rs);
    //建立一批新主題。
    //此操作不是事務性的,是以它可能對某些主題成功,而對另一些主題則失敗。
    //CreateTopicsResult傳回成功後,所有代理可能需要幾秒鐘才能意識到主題已建立。在此期間, listTopics()和describeTopics(Collection)可能不會傳回有關新主題的資訊。
    CreateTopicsResult topics = adminClient.createTopics(Arrays.asList(newTopic));
    System.out.println("建立topic成功 : "+ topics.toString());
    System.out.println("---------------------------------------------------------------");

}      

擷取Topic清單

//擷取Topic清單
public static void topicLists() throws Exception {
    AdminClient adminClient = adminClient();
    // 是否檢視internal選項
    ListTopicsOptions options = new ListTopicsOptions();
    //設定我們是否應該列出内部topic。
    options.listInternal(true);

    //列出叢集中可用的topic。
    ListTopicsResult listTopicsResult = adminClient.listTopics(options);
    //傳回一個topic名稱集合的future(這裡是KafkaFuture)
    Set<String> names = listTopicsResult.names().get();
    //傳回一個KafkaFuture,它産生一個 TopicListing 對象的集合
    Collection<TopicListing> topicListings = listTopicsResult.listings().get();
    //傳回一個KafkaFuture,它産生一個topic名稱到 TopicListing 對象的映射。
    KafkaFuture<Map<String, TopicListing>> mapKafkaFuture = listTopicsResult.namesToListings();
    // 列印names
    names.stream().forEach(System.out::println);
    System.out.println("---------------------------topic清單-------------------------");
    // 列印topicListings
    topicListings.stream().forEach((topicList)->{
        System.out.println(topicList);
    });
    System.out.println("---------------------------topic清單-------------------------");
}      

擷取topic的描述資訊

// 擷取描述topic的資訊
public static void describeTopics() throws Exception {
    AdminClient adminClient = adminClient();
    //描述叢集中的一些topic。
    DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(Arrays.asList(TOPIC_NAME));

    Map<String, TopicDescription> stringTopicDescriptionMap = describeTopicsResult.all().get();

    Set<Map.Entry<String, TopicDescription>> entries = stringTopicDescriptionMap.entrySet();
    System.out.println("----------------------------topic資訊-----------------------------");
    entries.stream().forEach((entry)->{
        System.out.println("name :"+entry.getKey()+" , desc: "+ entry.getValue());
    });
    System.out.println("----------------------------topic資訊-----------------------------");
}      

修改配置資訊

//修改Config資訊
public static void alterConfig() throws Exception{
    AdminClient adminClient = adminClient();

    Map<ConfigResource,Collection<AlterConfigOp>> configMaps = new HashMap<>();
    //具有配置的資源的類,需要提供type和名稱  Type是他内部維護的枚舉類,共有四種類型:BROKER_LOGGER((byte) 8), BROKER((byte) 4), TOPIC((byte) 2), UNKNOWN((byte) 0)
    ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, TOPIC_NAME);
    //包含名稱、值和操作類型的更改配置條目的類  ,需要注入ConfigEntry,和操作類型,同樣OpType是個枚舉類
    AlterConfigOp alterConfigOp = new AlterConfigOp(new ConfigEntry("preallocate","false"),AlterConfigOp.OpType.SET);
    configMaps.put(configResource,Arrays.asList(alterConfigOp));
    //逐漸更新指定資源的配置
    AlterConfigsResult alterConfigsResult = adminClient.incrementalAlterConfigs(configMaps);
    alterConfigsResult.all().get();
}      

擷取配置的描述資訊

//擷取描述配置的資訊
public static void describeConfig() throws Exception{
    AdminClient adminClient = adminClient();

    ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, TOPIC_NAME);
    //擷取指定資源的配置
    DescribeConfigsResult describeConfigsResult = adminClient.describeConfigs(Arrays.asList(configResource));
    Map<ConfigResource, Config> configResourceConfigMap = describeConfigsResult.all().get();
    System.out.println("----------------------------配置資訊-----------------------------");
    configResourceConfigMap.entrySet().stream().forEach((entry)->{
        System.out.println("configResource : "+entry.getKey()+" , Config : "+entry.getValue());
    });
    System.out.println("----------------------------配置資訊-----------------------------");
}      

增加topic的partition數量

//增加partition數量
public static void incrPartitions(int partitions) throws Exception{
   AdminClient adminClient = adminClient();
   Map<String, NewPartitions> partitionsMap = new HashMap<>();

   NewPartitions newPartitions = NewPartitions.increaseTo(partitions);
   partitionsMap.put(TOPIC_NAME, newPartitions);
   CreatePartitionsResult createPartitionsResult = adminClient.createPartitions(partitionsMap);
   createPartitionsResult.all().get();
}      

删除Topic

//删除Topic
public static void delTopics() throws Exception {
   AdminClient adminClient = adminClient();
   //删除一批topic。
   //此操作不是事務性的,是以它可能對某些主題成功,而對另一些主題則失敗。
   //DeleteTopicsResult傳回成功後,所有代理可能需要幾秒鐘才能意識到主題已消失。 在此期間, listTopics()和describeTopics(Collection)可能會繼續傳回有關已删除主題的資訊。
   DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(Arrays.asList(TOPIC_NAME));
   deleteTopicsResult.all().get();
}      

完整代碼

public class AdminSample {

    public final static String TOPIC_NAME="xt";

    //設定AdminClient
    public static AdminClient adminClient(){
        //Properties類表示一組持久的屬性。 Properties可以儲存到流中或從流中加載。屬性清單中的每個鍵及其對應的值都是一個字元串。繼承自Hashtable
        Properties properties = new Properties();
        //AdminClientConfig是AdminClient配置類,它還包含用于配置條目名稱的常量。
        properties.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,"kafka IP:9092");
        properties.put(AdminClientConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, 10000);
        properties.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 5000);
        //内置管理用戶端的基類 ,是一個抽象類,下面有一個實作 為KafkaAdminClient,此類是保證了線程安全
        AdminClient adminClient = AdminClient.create(properties);
        return adminClient;
    }


    public static void main(String[] args) throws Exception {
        AdminClient adminClient = AdminSample.adminClient();

        System.out.println("adminClient : "+ adminClient);
        // 建立Topic執行個體
        createTopic();
        // 擷取Topic清單
        topicLists();
        // 描述Topic
        describeTopics();
        // 修改Config
        alterConfig();
        // 查詢Config
        describeConfig();
        // 增加partition數量
        incrPartitions(2);
        // 删除Topic執行個體
        delTopics();
    }


     //建立Topic執行個體
    public static void createTopic() {

        AdminClient adminClient = adminClient();
        // 副本因子
        Short rs = 1;
        //建立具有指定副本因子和分區數的新topic。
        NewTopic newTopic = new NewTopic(TOPIC_NAME, 1 , rs);
        //建立一批新主題。
        //此操作不是事務性的,是以它可能對某些主題成功,而對另一些主題則失敗。
        //CreateTopicsResult傳回成功後,所有代理可能需要幾秒鐘才能意識到主題已建立。在此期間, listTopics()和describeTopics(Collection)可能不會傳回有關新主題的資訊。
        CreateTopicsResult topics = adminClient.createTopics(Arrays.asList(newTopic));
        System.out.println("建立topic成功 : "+ topics.toString());
        System.out.println("---------------------------------------------------------------");

    }


    //擷取Topic清單
    public static void topicLists() throws Exception {
        AdminClient adminClient = adminClient();
        // 是否檢視internal選項
        ListTopicsOptions options = new ListTopicsOptions();
        //設定我們是否應該列出内部topic。
        options.listInternal(true);

        //列出叢集中可用的topic。
        ListTopicsResult listTopicsResult = adminClient.listTopics(options);
        //傳回一個topic名稱集合的future(這裡是KafkaFuture)
        Set<String> names = listTopicsResult.names().get();
        //傳回一個KafkaFuture,它産生一個 TopicListing 對象的集合
        Collection<TopicListing> topicListings = listTopicsResult.listings().get();
        //傳回一個KafkaFuture,它産生一個topic名稱到 TopicListing 對象的映射。
        KafkaFuture<Map<String, TopicListing>> mapKafkaFuture = listTopicsResult.namesToListings();
        // 列印names
        names.stream().forEach(System.out::println);
        System.out.println("---------------------------topic清單-------------------------");
        // 列印topicListings
        topicListings.stream().forEach((topicList)->{
            System.out.println(topicList);
        });
        System.out.println("---------------------------topic清單-------------------------");
    }

    // 擷取描述topic的資訊
    public static void describeTopics() throws Exception {
        AdminClient adminClient = adminClient();
        //描述叢集中的一些topic。
        DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(Arrays.asList(TOPIC_NAME));

        Map<String, TopicDescription> stringTopicDescriptionMap = describeTopicsResult.all().get();

        Set<Map.Entry<String, TopicDescription>> entries = stringTopicDescriptionMap.entrySet();
        System.out.println("----------------------------topic資訊-----------------------------");
        entries.stream().forEach((entry)->{
            System.out.println("name :"+entry.getKey()+" , desc: "+ entry.getValue());
        });
        System.out.println("----------------------------topic資訊-----------------------------");
    }

    //修改Config資訊
    public static void alterConfig() throws Exception{
        AdminClient adminClient = adminClient();

        Map<ConfigResource,Collection<AlterConfigOp>> configMaps = new HashMap<>();
        //具有配置的資源的類,需要提供type和名稱  Type是他内部維護的枚舉類,共有四種類型:BROKER_LOGGER((byte) 8), BROKER((byte) 4), TOPIC((byte) 2), UNKNOWN((byte) 0)
        ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, TOPIC_NAME);
        //包含名稱、值和操作類型的更改配置條目的類  ,需要注入ConfigEntry,和操作類型,同樣OpType是個枚舉類
        AlterConfigOp alterConfigOp = new AlterConfigOp(new ConfigEntry("preallocate","false"),AlterConfigOp.OpType.SET);
        configMaps.put(configResource,Arrays.asList(alterConfigOp));
        //逐漸更新指定資源的配置
        AlterConfigsResult alterConfigsResult = adminClient.incrementalAlterConfigs(configMaps);
        alterConfigsResult.all().get();
    }

    //擷取描述配置的資訊
    public static void describeConfig() throws Exception{
        AdminClient adminClient = adminClient();

        ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, TOPIC_NAME);
        //擷取指定資源的配置
        DescribeConfigsResult describeConfigsResult = adminClient.describeConfigs(Arrays.asList(configResource));
        Map<ConfigResource, Config> configResourceConfigMap = describeConfigsResult.all().get();
        System.out.println("----------------------------配置資訊-----------------------------");
        configResourceConfigMap.entrySet().stream().forEach((entry)->{
            System.out.println("configResource : "+entry.getKey()+" , Config : "+entry.getValue());
        });
        System.out.println("----------------------------配置資訊-----------------------------");
    }


    //增加partition數量
    public static void incrPartitions(int partitions) throws Exception{
        AdminClient adminClient = adminClient();
        Map<String, NewPartitions> partitionsMap = new HashMap<>();

        NewPartitions newPartitions = NewPartitions.increaseTo(partitions);
        partitionsMap.put(TOPIC_NAME, newPartitions);
        CreatePartitionsResult createPartitionsResult = adminClient.createPartitions(partitionsMap);
        createPartitionsResult.all().get();
        
    }


    //删除Topic
    public static void delTopics() throws Exception {
        AdminClient adminClient = adminClient();
        //删除一批topic。
        //此操作不是事務性的,是以它可能對某些主題成功,而對另一些主題則失敗。
        //DeleteTopicsResult傳回成功後,所有代理可能需要幾秒鐘才能意識到主題已消失。 在此期間, listTopics()和describeTopics(Collection)可能會繼續傳回有關已删除主題的資訊。
        DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(Arrays.asList(TOPIC_NAME));
        deleteTopicsResult.all().get();
    }


}      

References:

  • ​​https://kafka.apache.org/intro​​

繼續閱讀