天天看點

Kafka-主題主題

主題

主題的目的是将消息進行歸類。

主題建立

通過shell建立主題

建立成功以後會在log.dir或者log.dirs參數所配置的目錄下建立相應的主題分區,
  • 如果隻有一個節點,副本因子隻能為1。副本因子的個數必須小于或者等于Borker個數
  • 各個Broker上建立的檔案夾個數為分區數與副本因子的乘積。

通過TopicCommand建立主題

import kafka.admin.TopicCommand;

public class TopicCommandDemo {
    public static void main(String[] args) {
        createTopic();
    }

    public static void createTopic(){
        String[] options = new String[]{"--zookeeper",
                "ip:port/myKafka",
                "--create",
                "--topic",
                "topicNameByCommand",
                "--replication-factor",
                "1",
                "--partitions",
                "1"
        };
        TopicCommand.main(options);

    }
}
           

主題其他操作

  • 檢視主題
# 列出所有的主題名
[[email protected] bin]# ./kafka-topics.sh --zookeeper localhost:2181/myKafka --list
# 列出某個主題的詳情
[[email protected] bin]# ./kafka-topics.sh --zookeeper localhost:2181/myKafka --describe --topic topicName
# 列出多個主題的詳情
[[email protected] bin]# ./kafka-topics.sh --zookeeper localhost:2181/myKafka --describe --topic topicName,topicNameByCommand
           
  • 修改主題
[[email protected] bin]# ./kafka-topics.sh --zookeeper localhost:2181/myKafka --alter --topic topicName --partitions 5
# 分區隻支援增加不支援減少
           
  • 删除主題

KafkaAdminClient操作

// 檢視topic資訊
public static void describeTopic() throws ExecutionException, InterruptedException {
    String broker = "ip:port";
    String topicName = "topicName";
    Properties properties = new Properties();
    properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,broker);
    AdminClient client = AdminClient.create(properties);
    DescribeTopicsResult topicsResult = client.describeTopics(Collections.singleton(topicName));
    Map<String, TopicDescription> descriptionMap = topicsResult.all().get();
    descriptionMap.forEach((k,v)->{
        System.err.println("key:"+k+",value:"+v);
    });
    client.close();
}
// 建立topic
public static void createTopic() throws ExecutionException, InterruptedException {
    String broker = "47.97.73.187:9092";
    String topicName = "topicNameByClient";
    Properties properties = new Properties();
    properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,broker);
    AdminClient client = AdminClient.create(properties);
    CreateTopicsResult topics = client.createTopics(Collections.singleton(new NewTopic(topicName, 2, (short) 1)));
    topics.all().get();
    client.close();
}
// ...