天天看点

Kafka常用命令总结

kafka服务启动

$KAFKA_HOME/bin/kafka-server-start.sh -daemon config/server.properties

创建Topic

$KAFKA_HOME/bin/kafka-topics.sh --create --topic test0–zookeeper 127.0.0.1:2181 --config max.message.bytes=12800000 --config flush.messages=1 --partitions 5 --replication-factor 1

–topic后面的test0是topic的名称

–zookeeper应该和server.properties文件中的zookeeper.connect一样

–config指定当前topic上有效的参数值

–partitions指定topic的partition数量,如果不指定该数量,默认是server.properties文件中的num.partitions配置值

–replication-factor指定每个partition的副本个数,默认1个

列出所有Topic

$KAFKA_HOME/bin/kafka-topics.sh --list --zookeeper 127.0.0.1:2181

查看Topic的分区和副本

$KAFKA_HOME/bin/kafka-topics.sh --describe --zookeeper 127.0.0.1:2181 --topic test0

删除topic

#删除kafka的topic命令

$KAFKA_HOME/bin/kafka-topics.sh --delete --zookeeper 127.0.0.1:2181 --topic test0

#删除zookeeper中该topic相关的目录命令:

rm -r /kafka/config/topics/test0

rm -r /kafka/brokers/topics/test0

查看topic消费的offset(偏移量)

$KAFKA_HOME/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 127.0.0.1:9092 --topic test0 --time -1

修改topic的partition数量(只能增加不能减少)

$KAFKA_HOME/bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group testgroup --topic test0 --zookeeper 127.0.0.1:2181

启动kafka生产者

$KAFKA_HOME/bin/kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test0

启动kafka消费者

#从头开始

$KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test0 --from-beginning

#从尾部开始

$KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test0 --offset latest

#指定分区

$KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test0 --offset latest --partition 1

#取指定个数

$KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test0 --offset latest --partition 1 --max-messages 1

查看有哪些消费者Group

$KAFKA_HOME/bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server 127.0.0.1:9092 --list

查看Group详情

$KAFKA_HOME/bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server 127.0.0.1:9092 --group test --describe

删除Group

$KAFKA_HOME/bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server 127.0.0.1:9092 --group test --delete