天天看點

Kafka常用指令總結

kafka服務啟動

$KAFKA_HOME/bin/kafka-server-start.sh -daemon config/server.properties

建立Topic

$KAFKA_HOME/bin/kafka-topics.sh --create --topic test0–zookeeper 127.0.0.1:2181 --config max.message.bytes=12800000 --config flush.messages=1 --partitions 5 --replication-factor 1

–topic後面的test0是topic的名稱

–zookeeper應該和server.properties檔案中的zookeeper.connect一樣

–config指定目前topic上有效的參數值

–partitions指定topic的partition數量,如果不指定該數量,預設是server.properties檔案中的num.partitions配置值

–replication-factor指定每個partition的副本個數,預設1個

列出所有Topic

$KAFKA_HOME/bin/kafka-topics.sh --list --zookeeper 127.0.0.1:2181

檢視Topic的分區和副本

$KAFKA_HOME/bin/kafka-topics.sh --describe --zookeeper 127.0.0.1:2181 --topic test0

删除topic

#删除kafka的topic指令

$KAFKA_HOME/bin/kafka-topics.sh --delete --zookeeper 127.0.0.1:2181 --topic test0

#删除zookeeper中該topic相關的目錄指令:

rm -r /kafka/config/topics/test0

rm -r /kafka/brokers/topics/test0

檢視topic消費的offset(偏移量)

$KAFKA_HOME/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 127.0.0.1:9092 --topic test0 --time -1

修改topic的partition數量(隻能增加不能減少)

$KAFKA_HOME/bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group testgroup --topic test0 --zookeeper 127.0.0.1:2181

啟動kafka生産者

$KAFKA_HOME/bin/kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test0

啟動kafka消費者

#從頭開始

$KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test0 --from-beginning

#從尾部開始

$KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test0 --offset latest

#指定分區

$KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test0 --offset latest --partition 1

#取指定個數

$KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test0 --offset latest --partition 1 --max-messages 1

檢視有哪些消費者Group

$KAFKA_HOME/bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server 127.0.0.1:9092 --list

檢視Group詳情

$KAFKA_HOME/bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server 127.0.0.1:9092 --group test --describe

删除Group

$KAFKA_HOME/bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server 127.0.0.1:9092 --group test --delete