主題
主題的目的是将消息進行歸類。
主題建立
通過shell建立主題
建立成功以後會在log.dir或者log.dirs參數所配置的目錄下建立相應的主題分區,
- 如果隻有一個節點,副本因子隻能為1。副本因子的個數必須小于或者等于Borker個數
- 各個Broker上建立的檔案夾個數為分區數與副本因子的乘積。
通過TopicCommand建立主題
import kafka.admin.TopicCommand;
public class TopicCommandDemo {
public static void main(String[] args) {
createTopic();
}
public static void createTopic(){
String[] options = new String[]{"--zookeeper",
"ip:port/myKafka",
"--create",
"--topic",
"topicNameByCommand",
"--replication-factor",
"1",
"--partitions",
"1"
};
TopicCommand.main(options);
}
}
主題其他操作
# 列出所有的主題名
[[email protected] bin]# ./kafka-topics.sh --zookeeper localhost:2181/myKafka --list
# 列出某個主題的詳情
[[email protected] bin]# ./kafka-topics.sh --zookeeper localhost:2181/myKafka --describe --topic topicName
# 列出多個主題的詳情
[[email protected] bin]# ./kafka-topics.sh --zookeeper localhost:2181/myKafka --describe --topic topicName,topicNameByCommand
[[email protected] bin]# ./kafka-topics.sh --zookeeper localhost:2181/myKafka --alter --topic topicName --partitions 5
# 分區隻支援增加不支援減少
KafkaAdminClient操作
// 檢視topic資訊
public static void describeTopic() throws ExecutionException, InterruptedException {
String broker = "ip:port";
String topicName = "topicName";
Properties properties = new Properties();
properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,broker);
AdminClient client = AdminClient.create(properties);
DescribeTopicsResult topicsResult = client.describeTopics(Collections.singleton(topicName));
Map<String, TopicDescription> descriptionMap = topicsResult.all().get();
descriptionMap.forEach((k,v)->{
System.err.println("key:"+k+",value:"+v);
});
client.close();
}
// 建立topic
public static void createTopic() throws ExecutionException, InterruptedException {
String broker = "47.97.73.187:9092";
String topicName = "topicNameByClient";
Properties properties = new Properties();
properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,broker);
AdminClient client = AdminClient.create(properties);
CreateTopicsResult topics = client.createTopics(Collections.singleton(new NewTopic(topicName, 2, (short) 1)));
topics.all().get();
client.close();
}
// ...