天天看點

Kafka:Topic概念與API介紹

Topic

事件被組織并持久地存儲在​

​Topic​

​​中,​

​Topic​

​​類似于檔案系統中的檔案夾,事件就是該檔案夾中的檔案。​

​Kafka​

​​中的​

​Topic​

​​始終是多生産者和多訂閱者:一個​

​Topic​

​​可以有零個、一個或多個生産者向其寫入事件,也可以有零個、一個或多個消費者訂閱這些事件。​

​Topic​

​​中的事件可以根據需要随時讀取,與傳統的消息中間件不同,事件在使用後不會被删除,相反,可以通過配置來定義​

​Kafka​

​​中每個​

​Topic​

​​應該保留事件的時間,超過該事件後舊事件将被丢棄。​

​Kafka​

​的性能在資料大小方面實際上是恒定的,是以長時間存儲資料是非常好的。

Partition

​Topic​

​​是分區的,這意味着一個​

​Topic​

​​可以分布在多個​

​Kafka​

​​節點上。資料的這種分布式放置對于可伸縮性非常重要,因為它允許用戶端應用程式同時從多個​

​Kafka​

​​節點讀取和寫入資料。将新事件釋出到​

​Topic​

​​時,它實際上會​

​appended​

​​到​

​Topic​

​​的一個​

​Partition​

​​中。具有相同僚件​

​key​

​​的事件将寫入同一​

​Partition​

​​,​

​Kafka​

​​保證給定​

​Topic​

​​的​

​Partition​

​的任何使用者都将始終以與寫入時完全相同的順序讀取該分區的事件。

Replication

為了使資料具有容錯性和高可用性,每個​

​Topic​

​​都可以有多個​

​Replication​

​​,以便始終有多個​

​Kafka​

​​節點具有資料副本,以防出現問題。常見的生産設定是​

​replicationFactor​

​​為​

​3​

​​,即始終有三份資料副本(包括一份原始資料)。此​

​Replication​

​​在​

​Topic​

​​的​

​Partition​

​級别執行。

​Kafka​

​​在指定數量(通過​

​replicationFactor​

​​)的伺服器上複制每個​

​Topic​

​​的​

​Partition​

​​,這允許在叢集中的某些伺服器發生故障時進行自動故障轉移,以便在出現故障時服務仍然可用。​

​Replication​

​​的機關是​

​Topic​

​​的​

​Partition​

​​。在非故障條件下,​

​Kafka​

​​中的每個​

​Partition​

​​都有一個​

​leader​

​​和零個或多個​

​follower​

​​。​

​replicationFactor​

​​是複制副本(包括​

​leader​

​​)的總數。所有讀和寫操作都将轉到​

​Partition​

​​的​

​leader​

​​上。通常,有比​

​Kafka​

​​節點多得多的​

​Partition​

​​,并且這些​

​Partition​

​​的​

​leader​

​​在​

​Kafka​

​​節點之間均勻分布。​

​follower​

​​上的資料需要與​

​leader​

​​的資料同步,所有資料都具有相同的偏移量和順序(當然,在任何給定時間,​

​leader​

​​的資料末尾可能有一些尚未複制的資料)。​

​follower​

​​會像普通​

​Kafka​

​​消費者一樣使用來自​

​leader​

​​的消息,并将其應用到自己的資料中。如下圖所示,三個​

​Kafka​

​​節點上有兩個​

​Topic​

​​(​

​Topic 0​

​​和​

​Topic 1​

​​),​

​Topic 0​

​​有兩個​

​Partition​

​​并且​

​replicationFactor​

​​為​

​3​

​​(紅色的​

​Partition​

​​為​

​leader​

​​),​

​Topic 1​

​​有三個​

​Partition​

​​,​

​replicationFactor​

​​也為​

​3​

​​(紅色的​

​Partition​

​​為​

​leader​

​)。

Kafka:Topic概念與API介紹

API

添加依賴:

<dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>3.0.0</version>
        </dependency>      

這裡使用的​

​kafka-clients​

​​版本和部落客之前部署的​

​Kafka​

​版本一緻:

  • ​​Kafka:部署Kafka​​

client

操作​

​Topic​

​​的用戶端通過​

​AdminClient​

​抽象類來建立,源碼如下:

package org.apache.kafka.clients.admin;

import java.util.Map;
import java.util.Properties;

public abstract class AdminClient implements Admin {

    /**
     * 使用給定的配置建立一個新的Admin
     * props:Admin的配置
     * 傳回KafkaAdminClient執行個體
     */
    public static AdminClient create(Properties props) {
        return (AdminClient) Admin.create(props);
    }

    /**
     * 重載方法
     * 使用給定的配置建立一個新的Admin
     * props:Admin的配置
     * 傳回KafkaAdminClient執行個體
     */
    public static AdminClient create(Map<String, Object> conf) {
        return (AdminClient) Admin.create(conf);
    }
}      

實際上會傳回一個​

​KafkaAdminClient​

​​執行個體(​

​KafkaAdminClient​

​​類是​

​AdminClient​

​​抽象類的子類),​

​KafkaAdminClient​

​​類的方法比較多,其中​

​private​

​​方法服務于​

​public​

​方法(提供給使用者的服務)。

Kafka:Topic概念與API介紹

​KafkaAdminClient​

​​類提供的​

​public​

​​方法是對​

​Admin​

​接口的實作。

Kafka:Topic概念與API介紹

create

建立一個新的​

​Topic​

​。

package com.kaven.kafka.admin;

import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.KafkaFuture;

import java.util.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;

public class Admin {

    private static final AdminClient adminClient = Admin.getAdminClient();

    public static void main(String[] args) throws InterruptedException, ExecutionException {

        Admin admin = new Admin();
        admin.createTopic();
        Thread.sleep(100000);
    }

    public void createTopic() throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(1);
        CreateTopicsResult topics = adminClient.createTopics(
                Collections.singleton(new NewTopic("new-topic-kaven", 1, (short) 1))
        );
        Map<String, KafkaFuture<Void>> values = topics.values();
        values.forEach((name, future) -> {
            future.whenComplete((a, throwable) -> {
                if(throwable != null) {
                    System.out.println(throwable.getMessage());
                }
                System.out.println(name);
                latch.countDown();
            });
        });
        latch.await();
    }

    public static AdminClient getAdminClient() {
        Properties properties = new Properties();
        properties.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.31.240:9092");
        return AdminClient.create(properties);
    }
}      

建立​

​AdminClient​

​​(簡單使用,配置​

​BOOTSTRAP_SERVERS_CONFIG​

​就可以了):

public static AdminClient getAdminClient() {
        Properties properties = new Properties();
        properties.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.31.240:9092");
        return AdminClient.create(properties);
    }      

建立​

​Topic​

​​(傳入一個​

​NewTopic​

​​執行個體,并且給該​

​NewTopic​

​​執行個體配置​

​name​

​​、​

​numPartitions​

​​、​

​replicationFactor​

​):

public void createTopic() throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(1);
        CreateTopicsResult topics = adminClient.createTopics(
                Collections.singleton(new NewTopic("new-topic-kaven", 1, (short) 1))
        );
        Map<String, KafkaFuture<Void>> values = topics.values();
        values.forEach((name, future) -> {
            future.whenComplete((a, throwable) -> {
                if(throwable != null) {
                    System.out.println(throwable.getMessage());
                }
                System.out.println(name);
                latch.countDown();
            });
        });
        latch.await();
    }      

提供的方法大都是異步程式設計模式的,這些基礎知識就不介紹了,輸出如下圖所示:

Kafka:Topic概念與API介紹

list

擷取​

​Topic​

​清單。

public void listTopics() throws ExecutionException, InterruptedException {
        ListTopicsResult listTopicsResult = adminClient.
                listTopics(new ListTopicsOptions().listInternal(true));
        Set<String> names = listTopicsResult.names().get();
        names.forEach(System.out::println);
    }      

​get​

​​方法會等待​

​future​

​完成,然後傳回其結果。輸出如下圖所示:

Kafka:Topic概念與API介紹

通過下面這個配置,可以擷取到​

​Kafka​

​​内置的​

​Topic​

​。

new ListTopicsOptions().listInternal(true)      

預設是不會擷取到​

​Kafka​

​​内置的​

​Topic​

​。

public void listTopics() throws ExecutionException, InterruptedException {
        ListTopicsResult listTopicsResult = adminClient.listTopics();
        Set<String> names = listTopicsResult.names().get();
        names.forEach(System.out::println);
    }      
Kafka:Topic概念與API介紹

delete

删除​

​Topic​

​。

public void deleteTopic() throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(2);
        DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(Arrays.asList("java-client4", "java-client2"));
        deleteTopicsResult.topicNameValues().forEach((name, future) -> {
            future.whenComplete((a, throwable) -> {
                if(throwable != null) {
                    System.out.println(throwable.getMessage());
                }
                System.out.println(name);
                latch.countDown();
            });
        });
        latch.await();
    }      

輸出如下圖所示:

Kafka:Topic概念與API介紹

現在再擷取​

​Topic​

​​的清單,輸出如下圖所示(删除的​

​Topic​

​已經不在了):

Kafka:Topic概念與API介紹

describe

擷取​

​Topic​

​的描述。

public void describeTopic() {
        Map<String, KafkaFuture<TopicDescription>> values =
                adminClient.describeTopics(Collections.singleton("new-topic-kaven")).values();
        for (String name : values.keySet()) {
            values.get(name).whenComplete((describe, throwable) -> {
                if(throwable != null) {
                    System.out.println(throwable.getMessage());
                }
                System.out.println(name);
                System.out.println(describe);
            });
        }
    }      

輸出如下圖所示:

Kafka:Topic概念與API介紹

輸出符合預期,因為建立該​

​Topic​

​的配置為:

new NewTopic("new-topic-kaven", 1, (short) 1)      

config

擷取​

​Topic​

​的配置。

public void describeTopicConfig() throws ExecutionException, InterruptedException {
        DescribeConfigsResult describeConfigsResult = adminClient
                .describeConfigs(Collections.singleton(new ConfigResource(ConfigResource.Type.TOPIC, "new-topic-kaven")));
        describeConfigsResult.all().get().forEach(((configResource, config) -> {
            System.out.println(configResource);
            System.out.println(config);
        }));
    }      

輸出如下圖所示:

Kafka:Topic概念與API介紹

​describeConfigs​

​方法很顯然還可以擷取其他資源的配置(通過指定資源的類型)。

public enum Type {
        BROKER_LOGGER((byte) 8), BROKER((byte) 4), TOPIC((byte) 2), UNKNOWN((byte) 0);
        ...
    }      

alter

增量更新​

​Topic​

​的配置。

public void incrementalAlterConfig() throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(1);
        
        Map<ConfigResource, Collection<AlterConfigOp>> alter = new HashMap<>();
        alter.put(
                new ConfigResource(ConfigResource.Type.TOPIC, "new-topic-kaven"),
                Collections.singletonList(
                        new AlterConfigOp(
                                new ConfigEntry("compression.type", "gzip"), 
                                AlterConfigOp.OpType.SET
                        )
                )
                
        );
        
        AlterConfigsResult alterConfigsResult = adminClient.incrementalAlterConfigs(alter);
        alterConfigsResult.values().forEach(((configResource, voidKafkaFuture) -> {
            voidKafkaFuture.whenComplete((a, throwable) -> {
                if(throwable != null) {
                    System.out.println(throwable.getMessage());
                }
                System.out.println(configResource);
                latch.countDown();
            });
        }));
        latch.await();
    }      

輸出如下圖所示:

Kafka:Topic概念與API介紹

很顯然​

​incrementalAlterConfigs​

​方法也可以增量更新其他資源的配置(通過指定資源的類型)。

​ConfigResource​

​​定義需要修改配置的資源,​

​Collection<AlterConfigOp>​

​定義該資源具體的配置修改操作。

Map<ConfigResource, Collection<AlterConfigOp>> alter = new HashMap<>();      

​configEntry​

​​定義資源需要修改的配置條目,​

​operationType​

​定義修改操作的類型。

public AlterConfigOp(ConfigEntry configEntry, OpType operationType) {
        this.configEntry = configEntry;
        this.opType =  operationType;
    }      

修改操作的類型。

public enum OpType {
        /**
         * 設定配置條目的值
         */
        SET((byte) 0),
        /**
         * 将配置條目恢複為預設值(可能為空)
         */
        DELETE((byte) 1),
        /**
         * 僅适用于清單類型的配置條目
         * 将指定的值添加到配置條目的目前值
         * 如果尚未設定配置值,則添加到預設值
         */
        APPEND((byte) 2),
        /**
         * 僅适用于清單類型的配置條目
         * 從配置條目的目前值中删除指定的值
         * 删除目前不在配置條目中的值是合法的
         * 從目前配置值中删除所有條目會留下一個空清單,并且不會恢複為條目的預設值
         */
        SUBTRACT((byte) 3);
        ...
    }      

資源的配置條目,包含配置名稱、值等。

public class ConfigEntry {

    private final String name;
    private final String value;
    private final ConfigSource source;
    private final boolean isSensitive;
    private final boolean isReadOnly;
    private final List<ConfigSynonym> synonyms;
    private final ConfigType type;
    private final String documentation;
    ...
}      

在擷取​

​Topic​

​配置的輸出中也可以發現這些配置條目。

Kafka:Topic概念與API介紹

很顯然,這裡修改名稱為​

​new-topic-kaven​

​​的​

​Topic​

​​的​

​compression.type​

​配置條目(壓縮類型)。

alter.put(
                new ConfigResource(ConfigResource.Type.TOPIC, "new-topic-kaven"),
                Collections.singletonList(
                        new AlterConfigOp(
                                new ConfigEntry("compression.type", "gzip"), 
                                AlterConfigOp.OpType.SET
                        )
                )
                
        );      

​compression.type​

​​配置條目的預設值為​

​producer​

​​(意味着保留生産者設定的原始壓縮編解碼器),和上面的圖也對應,部落客将該配置條目修改成了​

​gzip​

​。

Kafka:Topic概念與API介紹

再來擷取該​

​Topic​

​的配置,如下圖所示(很顯然配置修改成功了):

Kafka:Topic概念與API介紹