天天看點

【kafka運維】Kafka全網最全最詳細運維指令合集(精品強烈建議收藏!!!)

本文所有指令,部落客均全部操作驗證過,保證準确性; 非複制粘貼拼湊文章; 如果想了解更多工具指令,可在評論區留下評論,部落客會擇期加上;

部落客正在連載 Kafka源碼、Kafka運維、Kafka實踐系列文章 并且相關文章會配套錄制視訊

本文為專欄第一篇歡迎關注石臻臻的雜貨鋪不迷路!!!

以下大部分運維操作,都可以使用 LogI-Kafka-Manager 在平台上可視化操作;

@TOC

1.TopicCommand

1.1.Topic建立

bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 3 --topic test

相關可選參數

參數 描述 例子

--bootstrap-server

指定kafka服務
指定連接配接到的kafka服務; 如果有這個參數,則

--zookeeper

可以不需要
--bootstrap-server localhost:9092

--zookeeper

棄用, 通過zk的連接配接方式連接配接到kafka叢集; --zookeeper localhost:2181 或者localhost:2181/kafka

--replication-factor

副本數量,注意不能大于broker數量;如果不提供,則會用叢集中預設配置 --replication-factor 3

--partitions

分區數量,當建立或者修改topic的時候,用這個來指定分區數;如果建立的時候沒有提供參數,則用叢集中預設值; 注意如果是修改的時候,分區比之前小會有問題 --partitions 3

--replica-assignment

副本分區配置設定方式;建立topic的時候可以自己指定副本配置設定情況;

--replica-assignment

BrokerId-0:BrokerId-1:BrokerId-2,BrokerId-1:BrokerId-2:BrokerId-0,BrokerId-2:BrokerId-1:BrokerId-0 ; 這個意思是有三個分區和三個副本,對應配置設定的Broker; 逗号隔開辨別分區;冒号隔開表示副本

--config

<String: name=value>
用來設定topic級别的配置以覆寫預設配置;隻在--create 和--bootstrap-server 同時使用時候生效; 可以配置的參數清單請看文末附件 例如覆寫兩個配置

--config retention.bytes=123455 --config retention.ms=600001

--command-config

<String: command 檔案路徑>
用來配置用戶端Admin Client啟動配置,隻在--bootstrap-server 同時使用時候生效; 例如:設定請求的逾時時間

--command-config config/producer.proterties

; 然後在檔案中配置 request.timeout.ms=300000

1.2.删除Topic

bin/kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic test

支援正規表達式比對Topic來進行删除,隻需要将topic 用雙引号包裹起來

例如: 删除以

create_topic_byhand_zk

為開頭的topic;

bin/kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic "create_topic_byhand_zk.*"

.

表示任意比對除換行符 \n 之外的任何單字元。要比對 . ,請使用 . 。

·*·

:比對前面的子表達式零次或多次。要比對 * 字元,請使用 *。

.*

: 任意字元

删除任意Topic (慎用)

bin/kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic ".*?" 更多的用法請參考正規表達式

1.3.Topic分區擴容

zk方式(不推薦)

>bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic topic1 --partitions 2

kafka版本 >= 2.2 支援下面方式(推薦)

單個Topic擴容

bin/kafka-topics.sh --bootstrap-server broker_host:port --alter --topic test_create_topic1 --partitions 4

批量擴容 (将所有正規表達式比對到的Topic分區擴容到4個)

sh bin/kafka-topics.sh --topic ".*?" --bootstrap-server 172.23.248.85:9092 --alter --partitions 4

".*?"

正規表達式的意思是比對所有; 您可按需比對

PS: 當某個Topic的分區少于指定的分區數時候,他會抛出異常;但是不會影響其他Topic正常進行;

相關可選參數

參數 描述 例子

--replica-assignment

副本分區配置設定方式;建立topic的時候可以自己指定副本配置設定情況;

--replica-assignment

BrokerId-0:BrokerId-1:BrokerId-2,BrokerId-1:BrokerId-2:BrokerId-0,BrokerId-2:BrokerId-1:BrokerId-0 ; 這個意思是有三個分區和三個副本,對應配置設定的Broker; 逗号隔開辨別分區;冒号隔開表示副本

PS: 雖然這裡配置的是全部的分區副本配置設定配置,但是正在生效的是新增的分區;

比如: 以前3分區1副本是這樣的

Broker-1 Broker-2 Broker-3 Broker-4
1 2

現在新增一個分區,

--replica-assignment

2,1,3,4 ; 看這個意思好像是把0,1号分區互相換個Broker

Broker-1 Broker-2 Broker-3 Broker-4
1 2 3

但是實際上不會這樣做,Controller在處理的時候會把前面3個截掉; 隻取新增的分區配置設定方式,原來的還是不會變

Broker-1 Broker-2 Broker-3 Broker-4
1 2 3

1.4.查詢Topic描述

1.查詢單個Topic

sh bin/kafka-topics.sh --topic test --bootstrap-server xxxx:9092 --describe --exclude-internal

2.批量查詢Topic(正規表達式比對,下面是查詢所有Topic)

sh bin/kafka-topics.sh --topic ".*?" --bootstrap-server xxxx:9092 --describe --exclude-internal

支援正規表達式比對Topic,隻需要将topic 用雙引号包裹起來

相關可選參數

參數 描述 例子

--bootstrap-server

指定kafka服務
指定連接配接到的kafka服務; 如果有這個參數,則

--zookeeper

可以不需要
--bootstrap-server localhost:9092

--at-min-isr-partitions

查詢的時候省略一些計數和配置資訊

--at-min-isr-partitions

--exclude-internal

排除kafka内部topic,比如

__consumer_offsets-*

--exclude-internal

--topics-with-overrides

僅顯示已覆寫配置的主題,也就是單獨針對Topic設定的配置覆寫預設配置;不展示分區資訊

--topics-with-overrides

5.查詢Topic清單

1.查詢所有Topic清單

sh bin/kafka-topics.sh --bootstrap-server xxxxxx:9092 --list --exclude-internal

2.查詢比對Topic清單(正規表達式)

查詢

test_create_

開頭的所有Topic清單

sh bin/kafka-topics.sh --bootstrap-server xxxxxx:9092 --list --exclude-internal --topic "test_create_.*"

相關可選參數

參數 描述 例子

--exclude-internal

排除kafka内部topic,比如

__consumer_offsets-*

--exclude-internal

--topic

可以正規表達式進行比對,展示topic名稱

--topic

2.ConfigCommand

Config相關操作; 動态配置可以覆寫預設的靜态配置;

2.1 查詢配置

Topic配置查詢

展示關于Topic的動靜态配置

1.查詢單個Topic配置(隻列舉動态配置)

sh bin/kafka-configs.sh --describe --bootstrap-server xxxxx:9092 --topic test_create_topic

或者

sh bin/kafka-configs.sh --describe --bootstrap-server 172.23.248.85:9092 --entity-type topics --entity-name test_create_topic

2.查詢所有Topic配置(包括内部Topic)(隻列舉動态配置)

sh bin/kafka-configs.sh --describe --bootstrap-server 172.23.248.85:9092 --entity-type topics

3.查詢Topic的詳細配置(動态+靜态)

隻需要加上一個參數

--all

其他配置/clients/users/brokers/broker-loggers 的查詢

同理 ;隻需要将

--entity-type

改成對應的類型就行了 (topics/clients/users/brokers/broker-loggers)

查詢kafka版本資訊

sh bin/kafka-configs.sh --describe --bootstrap-server xxxx:9092 --version

<font color=red>所有可配置的動态配置 請看最後面的 *附件* 部分</font>

2.2 增删改 配置

--alter

--alter

删除配置:

--delete-config

k1=v1,k2=v2

添加/修改配置:

--add-config

k1,k2

選擇類型:

--entity-type

(topics/clients/users/brokers/broker-

loggers)           

複制

類型名稱:

--entity-name

Topic添加/修改動态配置

--add-config

sh bin/kafka-configs.sh --bootstrap-server xxxxx:9092 --alter --entity-type topics --entity-name test_create_topic1 --add-config file.delete.delay.ms=222222,retention.ms=999999

Topic删除動态配置

--delete-config

sh bin/kafka-configs.sh --bootstrap-server xxxxx:9092 --alter --entity-type topics --entity-name test_create_topic1 --delete-config file.delete.delay.ms,retention.ms

其他配置同理,隻需要類型改下

--entity-type

類型有: (topics/clients/users/brokers/broker- loggers)

<font color=red>哪些配置可以修改 請看最後面的附件:ConfigCommand 的一些可選配置 </font>

3.副本擴縮、分區遷移、跨路徑遷移 kafka-reassign-partitions

請戳 【kafka運維】副本擴縮容、資料遷移、副本重配置設定、副本跨路徑遷移 (如果點不出來,表示文章暫未發表,請耐心等待)

4.Topic的發送kafka-console-producer.sh

4.1 生産無key消息

## 生産者
bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test --producer.config config/producer.properties           

複制

4.2 生産有key消息

加上屬性

--property parse.key=true

## 生産者
bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test --producer.config config/producer.properties  --property parse.key=true           

複制

<font color=red>預設消息key與消息value間使用“Tab鍵”進行分隔,是以消息key以及value中切勿使用轉義字元(\t)</font>

可選參數

參數 值類型 說明 有效值
--bootstrap-server String 要連接配接的伺服器必需(除非指定--broker-list) 如:host1:prot1,host2:prot2
--topic String (必需)接收消息的主題名稱
--batch-size Integer 單個批進行中發送的消息數 200(預設值)
--compression-codec String 壓縮編解碼器 none、gzip(預設值)snappy、lz4、zstd
--max-block-ms Long 在發送請求期間,生産者将阻止的最長時間 60000(預設值)
--max-memory-bytes Long 生産者用來緩沖等待發送到伺服器的總記憶體 33554432(預設值)
--max-partition-memory-bytes Long 為分區配置設定的緩沖區大小 16384
--message-send-max-retries Integer 最大的重試發送次數 3
--metadata-expiry-ms Long 強制更新中繼資料的時間門檻值(ms) 300000
--producer-property String 将自定義屬性傳遞給生成器的機制 如:key=value
--producer.config String 生産者配置屬性檔案--producer-property優先于此配置 配置檔案完整路徑
--property String 自定義消息讀取器 parse.key=true/false key.separator=<key.separator>ignore.error=true/false
--request-required-acks String 生産者請求的确認方式 0、1(預設值)、all
--request-timeout-ms Integer 生産者請求的确認逾時時間 1500(預設值)
--retry-backoff-ms Integer 生産者重試前,重新整理中繼資料的等待時間門檻值 100(預設值)
--socket-buffer-size Integer TCP接收緩沖大小 102400(預設值)
--timeout Integer 消息排隊異步等待處理的時間門檻值 1000(預設值)
--sync 同步發送消息
--version 顯示 Kafka 版本 不配合其他參數時,顯示為本地Kafka版本
--help 列印幫助資訊

5. Topic的消費kafka-console-consumer.sh

1. 新用戶端從頭消費

--from-beginning

(注意這裡是新用戶端,如果之前已經消費過了是不會從頭消費的)

下面沒有指定用戶端名稱,是以每次執行都是新用戶端都會從頭消費

sh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

2. 正規表達式比對topic進行消費

--whitelist

消費所有的topic

sh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --whitelist '.*'

消費所有的topic,并且還從頭消費

sh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --whitelist '.*' --from-beginning

3.顯示key進行消費

--property print.key=true

sh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --property print.key=true

4. 指定分區消費

--partition

指定起始偏移量消費

--offset

sh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --partition 0 --offset 100

5. 給用戶端命名

--group

注意給用戶端命名之後,如果之前有過消費,那麼

--from-beginning

就不會再從頭消費了

sh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --group test-group

6. 添加用戶端屬性

--consumer-property

這個參數也可以給用戶端添加屬性,但是注意 不能多個地方配置同一個屬性,他們是互斥的;比如在下面的基礎上還加上屬性

--group test-group

那肯定不行

sh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test

--consumer-property group.id=test-consumer-group

7. 添加用戶端屬性

--consumer.config

--consumer-property

一樣的性質,都是添加用戶端的屬性,不過這裡是指定一個檔案,把屬性寫在檔案裡面,

--consumer-property

的優先級大于

--consumer.config

sh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --consumer.config config/consumer.properties
參數 描述 例子

--group

指定消費者所屬組的ID

--topic

被消費的topic

--partition

指定分區 ;除非指定

–offset

,否則從分區結束(latest)開始消費

--partition 0

--offset

執行消費的起始offset位置 ;預設值: latest; /latest /earliest /偏移量

--offset

10

--whitelist

正規表達式比對topic;

--topic

就不用指定了; 比對到的所有topic都會消費; 當然用了這個參數,

--partition

--offset

等就不能使用了

--consumer-property

将使用者定義的屬性以key=value的形式傳遞給使用者

--consumer-property

group.id=test-consumer-group

--consumer.config

消費者配置屬性檔案請注意,

consumer-property

優先于此配置

--consumer.config

config/consumer.properties

--property

初始化消息格式化程式的屬性 print.timestamp=true,false 、print.key=true,false 、print.value=true,false 、key.separator=<key.separator> 、line.separator=<line.separator>、key.deserializer=<key.deserializer>、value.deserializer=<value.deserializer>

--from-beginning

從存在的最早消息開始,而不是從最新消息開始,注意如果配置了用戶端名稱并且之前消費過,那就不會從頭消費了

--max-messages

消費的最大資料量,若不指定,則持續消費下去

--max-messages

100

--skip-message-on-error

如果處理消息時出錯,請跳過它而不是暫停

--isolation-level

設定為read_committed以過濾掉未送出的事務性消息,設定為read_uncommitted以讀取所有消息,預設值:read_uncommitted

--formatter

kafka.tools.DefaultMessageFormatter、kafka.tools.LoggingMessageFormatter、kafka.tools.NoOpMessageFormatter、kafka.tools.ChecksumMessageFormatter

6.kafka-leader-election Leader重新選舉

6.1 指定Topic指定分區用重新

PREFERRED:優先副本政策

進行Leader重選舉

> sh bin/kafka-leader-election.sh --bootstrap-server xxxx:9090 --topic test_create_topic4 --election-type PREFERRED --partition 0           

複制

6.2 所有Topic所有分區用重新

PREFERRED:優先副本政策

進行Leader重選舉

sh bin/kafka-leader-election.sh --bootstrap-server xxxx:9090 --election-type preferred  --all-topic-partitions           

複制

6.3 設定配置檔案批量指定topic和分區進行Leader重選舉

先配置leader-election.json檔案

{
  "partitions": [
    {
      "topic": "test_create_topic4",
      "partition": 1
    },
    {
      "topic": "test_create_topic4",
      "partition": 2
    }
  ]
}           

複制

sh bin/kafka-leader-election.sh --bootstrap-server xxx:9090 --election-type preferred  --path-to-json-file config/leader-election.json
            

複制

相關可選參數

參數 描述 例子

--bootstrap-server

指定kafka服務
指定連接配接到的kafka服務 --bootstrap-server localhost:9092

--topic

指定Topic,此參數跟

--all-topic-partitions

path-to-json-file

三者互斥

--partition

指定分區,跟

--topic

搭配使用

--election-type

兩個選舉政策(

PREFERRED:

優先副本選舉,如果第一個副本不線上的話會失敗;

UNCLEAN

: 政策)

--all-topic-partitions

所有topic所有分區執行Leader重選舉; 此參數跟

--topic

path-to-json-file

三者互斥

--path-to-json-file

配置檔案批量選舉,此參數跟

--topic

all-topic-partitions

三者互斥

7. 持續批量推送消息kafka-verifiable-producer.sh

單次發送100條消息

--max-messages 100

一共要推送多少條,預設為-1,-1表示一直推送到程序關閉位置

sh bin/kafka-verifiable-producer.sh --topic test_create_topic4 --bootstrap-server localhost:9092

--max-messages 100

每秒發送最大吞吐量不超過消息

--throughput 100

推送消息時的吞吐量,機關messages/sec。預設為-1,表示沒有限制

sh bin/kafka-verifiable-producer.sh --topic test_create_topic4 --bootstrap-server localhost:9092

--throughput 100

發送的消息體帶字首

--value-prefix

sh bin/kafka-verifiable-producer.sh --topic test_create_topic4 --bootstrap-server localhost:9092

--value-prefix 666

注意

--value-prefix 666

必須是整數,發送的消息體的格式是加上一個 點号

.

例如:

666.

其他參數:

--producer.config CONFIG_FILE

指定producer的配置檔案

--acks ACKS

每次推送消息的ack值,預設是-1

8. 持續批量拉取消息kafka-verifiable-consumer

持續消費

sh bin/kafka-verifiable-consumer.sh --group-id test_consumer --bootstrap-server localhost:9092 --topic test_create_topic4

單次最大消費10條消息

--max-messages 10

sh bin/kafka-verifiable-consumer.sh --group-id test_consumer --bootstrap-server localhost:9092 --topic test_create_topic4

--max-messages 10

相關可選參數

參數 描述 例子

--bootstrap-server

指定kafka服務
指定連接配接到的kafka服務; --bootstrap-server localhost:9092

--topic

指定消費的topic

--group-id

消費者id;不指定的話每次都是新的組id

group-instance-id

消費組執行個體ID,唯一值

--max-messages

單次最大消費的消息數量

--enable-autocommit

是否開啟offset自動送出;預設為false

--reset-policy

當以前沒有消費記錄時,選擇要拉取offset的政策,可以是

earliest

,

latest

,

none

。預設是earliest

--assignment-strategy

consumer配置設定分區政策,預設是

org.apache.kafka.clients.consumer.RangeAssignor

--consumer.config

指定consumer的配置檔案

9.生産者壓力測試kafka-producer-perf-test.sh

1. 發送1024條消息

--num-records 100

并且每條消息大小為1KB

--record-size 1024

最大吞吐量每秒10000條

--throughput 100

sh bin/kafka-producer-perf-test.sh --topic test_create_topic4 --num-records 100 --throughput 100000 --producer-props bootstrap.servers=localhost:9092 --record-size 1024

你可以通過LogIKM檢視分區是否增加了對應的資料大小

【kafka運維】Kafka全網最全最詳細運維指令合集(精品強烈建議收藏!!!)

在這裡插入圖檔描述

從LogIKM 可以看到發送了1024條消息; 并且總資料量=1M; 1024條*1024byte = 1M;

2. 用指定消息檔案

--payload-file

發送100條消息最大吞吐量每秒100條

--throughput 100

  1. 先配置好消息檔案

    batchmessage.txt

    【kafka運維】Kafka全網最全最詳細運維指令合集(精品強烈建議收藏!!!)
    在這裡插入圖檔描述
  2. 然後執行指令

    發送的消息會從

    batchmessage.txt

    裡面随機選擇; 注意這裡我們沒有用參數

    --payload-delimeter

    指定分隔符,預設分隔符是\n換行;
> bin/kafka-producer-perf-test.sh --topic test_create_topic4 --num-records 100 --throughput 100  --producer-props bootstrap.servers=localhost:9090 --payload-file config/batchmessage.txt
>           

複制

  1. 驗證消息,可以通過 LogIKM 檢視發送的消息
![在這裡插入圖檔描述](https://img-blog.csdnimg.cn/20210624175212664.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3UwMTA2MzQwNjY=,size_16,color_FFFFFF,t_70)           

複制

相關可選參數

參數 描述 例子

--topic

指定消費的topic

--num-records

發送多少條消息

--throughput

每秒消息最大吞吐量

--producer-props

生産者配置, k1=v1,k2=v2

--producer-props

bootstrap.servers= localhost:9092,client.id=test_client

--producer.config

生産者配置檔案

--producer.config

config/producer.propeties

--print-metrics

在test結束的時候列印監控資訊,預設false

--print-metrics

true

--transactional-id

指定事務 ID,測試并發事務的性能時需要,隻有在 --transaction-duration-ms > 0 時生效,預設值為 performance-producer-default-transactional-id

--transaction-duration-ms

指定事務持續的最長時間,超過這段時間後就會調用 commitTransaction 來送出事務,隻有指定了 > 0 的值才會開啟事務,預設值為 0

--record-size

一條消息的大小byte; 和 --payload-file 兩個中必須指定一個,但不能同時指定

--payload-file

指定消息的來源檔案,隻支援 UTF-8 編碼的文本檔案,檔案的消息分隔符通過

--payload-delimeter

指定,預設是用換行\nl來分割的,和 --record-size 兩個中必須指定一個,但不能同時指定 ; 如果提供的消息

--payload-delimeter

如果通過

--payload-file

指定了從檔案中擷取消息内容,那麼這個參數的意義是指定檔案的消息分隔符,預設值為 \n,即檔案的每一行視為一條消息;如果未指定

--payload-file

則此參數不生效;發送消息的時候是随機送檔案裡面選擇消息發送的;

10.消費者壓力測試kafka-consumer-perf-test.sh

消費100條消息

--messages 100

sh bin/kafka-consumer-perf-test.sh -topic test_create_topic4 --bootstrap-server localhost:9090 --messages 100

相關可選參數

參數 描述 例子

--bootstrap-server

--consumer.config

消費者配置檔案

--date-format

結果列印出來的時間格式化 預設:yyyy-MM-dd HH:mm:ss:SSS

--fetch-size

單次請求擷取資料的大小 預設1048576

--topic

指定消費的topic

--from-latest

--group

消費組ID

--hide-header

如果設定了,則不列印header資訊

--messages

需要消費的數量

--num-fetch-threads

feth 資料的線程數 預設:1

--print-metrics

結束的時候列印監控資料

--show-detailed-stats

--threads

消費線程數; 預設 10

11.删除指定分區的消息kafka-delete-records.sh

删除指定topic的某個分區的消息删除至offset為1024

先配置json檔案

offset-json-file.json

{"partitions":
[{"topic": "test1", "partition": 0,
  "offset": 1024}],
  "version":1
}           

複制

在執行指令

sh bin/kafka-delete-records.sh --bootstrap-server 172.23.250.249:9090 --offset-json-file config/offset-json-file.json

驗證 通過 LogIKM 檢視發送的消息

【kafka運維】Kafka全網最全最詳細運維指令合集(精品強烈建議收藏!!!)

在這裡插入圖檔描述

從這裡可以看出來,配置

"offset": 1024

的意思是從最開始的地方删除消息到 1024的offset; 是從最前面開始删除的

12. 檢視Broker磁盤資訊

查詢指定topic磁盤資訊

--topic-list topic1,topic2

sh bin/kafka-log-dirs.sh --bootstrap-server xxxx:9090 --describe --topic-list test2

查詢指定Broker磁盤資訊

--broker-list 0 broker1,broker2

sh bin/kafka-log-dirs.sh --bootstrap-server xxxxx:9090 --describe --topic-list test2 --broker-list 0

例如我一個3分區3副本的Topic的查出來的資訊

logDir

Broker中配置的

log.dir

{
	"version": 1,
	"brokers": [{
		"broker": 0,
		"logDirs": [{
			"logDir": "/Users/xxxx/work/IdeaPj/ss/kafka/kafka-logs-0",
			"error": null,
			"partitions": [{
				"partition": "test2-1",
				"size": 0,
				"offsetLag": 0,
				"isFuture": false
			}, {
				"partition": "test2-0",
				"size": 0,
				"offsetLag": 0,
				"isFuture": false
			}, {
				"partition": "test2-2",
				"size": 0,
				"offsetLag": 0,
				"isFuture": false
			}]
		}]
	}, {
		"broker": 1,
		"logDirs": [{
			"logDir": "/Users/xxxx/work/IdeaPj/ss/kafka/kafka-logs-1",
			"error": null,
			"partitions": [{
				"partition": "test2-1",
				"size": 0,
				"offsetLag": 0,
				"isFuture": false
			}, {
				"partition": "test2-0",
				"size": 0,
				"offsetLag": 0,
				"isFuture": false
			}, {
				"partition": "test2-2",
				"size": 0,
				"offsetLag": 0,
				"isFuture": false
			}]
		}]
	}, {
		"broker": 2,
		"logDirs": [{
			"logDir": "/Users/xxxx/work/IdeaPj/ss/kafka/kafka-logs-2",
			"error": null,
			"partitions": [{
				"partition": "test2-1",
				"size": 0,
				"offsetLag": 0,
				"isFuture": false
			}, {
				"partition": "test2-0",
				"size": 0,
				"offsetLag": 0,
				"isFuture": false
			}, {
				"partition": "test2-2",
				"size": 0,
				"offsetLag": 0,
				"isFuture": false
			}]
		}]
	}, {
		"broker": 3,
		"logDirs": [{
			"logDir": "/Users/xxxx/work/IdeaPj/ss/kafka/kafka-logs-3",
			"error": null,
			"partitions": []
		}]
	}]
}           

複制

如果你覺得通過指令查詢磁盤資訊比較麻煩,你也可以通過 LogIKM 檢視

【kafka運維】Kafka全網最全最詳細運維指令合集(精品強烈建議收藏!!!)

在這裡插入圖檔描述

12. 消費者組管理 kafka-consumer-groups.sh

1. 檢視消費者清單

--list

sh bin/kafka-consumer-groups.sh --bootstrap-server xxxx:9090 --list

【kafka運維】Kafka全網最全最詳細運維指令合集(精品強烈建議收藏!!!)
在這裡插入圖檔描述

先調用

MetadataRequest

拿到所有線上Broker清單

再給每個Broker發送

ListGroupsRequest

請求擷取 消費者組資料

2. 檢視消費者組詳情

--describe

DescribeGroupsRequest

檢視消費組詳情

--group

--all-groups

檢視指定消費組詳情

--group

sh bin/kafka-consumer-groups.sh --bootstrap-server xxxxx:9090 --describe --group test2_consumer_group

undefined

檢視所有消費組詳情

--all-groups

sh bin/kafka-consumer-groups.sh --bootstrap-server xxxxx:9090 --describe --all-groups

檢視該消費組 消費的所有Topic、及所在分區、最新消費offset、Log最新資料offset、Lag還未消費數量、消費者ID等等資訊
【kafka運維】Kafka全網最全最詳細運維指令合集(精品強烈建議收藏!!!)

在這裡插入圖檔描述

查詢消費者成員資訊

--members

所有消費組成員資訊

sh bin/kafka-consumer-groups.sh --describe --all-groups --members --bootstrap-server xxx:9090

指定消費組成員資訊

sh bin/kafka-consumer-groups.sh --describe --members --group test2_consumer_group --bootstrap-server xxxx:9090

【kafka運維】Kafka全網最全最詳細運維指令合集(精品強烈建議收藏!!!)

在這裡插入圖檔描述

查詢消費者狀态資訊

--state

所有消費組狀态資訊

sh bin/kafka-consumer-groups.sh --describe --all-groups --state --bootstrap-server xxxx:9090

指定消費組狀态資訊

sh bin/kafka-consumer-groups.sh --describe --state --group test2_consumer_group --bootstrap-server xxxxx:9090

【kafka運維】Kafka全網最全最詳細運維指令合集(精品強烈建議收藏!!!)
在這裡插入圖檔描述

3. 删除消費者組

--delete

DeleteGroupsRequest

删除消費組--delete

删除指定消費組

--group

sh bin/kafka-consumer-groups.sh --delete --group test2_consumer_group --bootstrap-server xxxx:9090

删除所有消費組

--all-groups

sh bin/kafka-consumer-groups.sh --delete --all-groups --bootstrap-server xxxx:9090

PS: 想要删除消費組前提是這個消費組的所有用戶端都停止消費/不線上才能夠成功删除;否則會報下面異常

Error: Deletion of some consumer groups failed:
* Group 'test2_consumer_group' could not be deleted due to: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.GroupNotEmptyException: The group is not empty.           

複制

4. 重置消費組的偏移量

--reset-offsets

<font color=red>能夠執行成功的一個前提是 消費組這會是不可用狀态;</font>

下面的示例使用的參數是:

--dry-run

;這個參數表示預執行,會列印出來将要處理的結果;

等你想真正執行的時候請換成參數

--excute

;

下面示例 重置模式都是

--to-earliest

重置到最早的;

請根據需要參考下面 相關重置Offset的模式 換成其他模式;

重置指定消費組的偏移量

--group

重置指定消費組的所有Topic的偏移量

--all-topic

sh bin/kafka-consumer-groups.sh --reset-offsets --to-earliest --group test2_consumer_group --bootstrap-server xxxx:9090 --dry-run --all-topic

重置指定消費組的指定Topic的偏移量

--topic

sh bin/kafka-consumer-groups.sh --reset-offsets --to-earliest --group test2_consumer_group --bootstrap-server xxxx:9090 --dry-run --topic test2

重置所有消費組的偏移量

--all-group

重置所有消費組的所有Topic的偏移量

--all-topic

sh bin/kafka-consumer-groups.sh --reset-offsets --to-earliest --all-group --bootstrap-server xxxx:9090 --dry-run --all-topic

重置所有消費組中指定Topic的偏移量

--topic

sh bin/kafka-consumer-groups.sh --reset-offsets --to-earliest --all-group --bootstrap-server xxxx:9090 --dry-run --topic test2

--reset-offsets

後面需要接重置的模式

相關重置Offset的模式

參數 描述 例子

--to-earliest

:
重置offset到最開始的那條offset(找到還未被删除最早的那個offset)

--to-current

:
直接重置offset到目前的offset,也就是LOE

--to-latest

重置到最後一個offset

--to-datetime

:
重置到指定時間的offset;格式為:

YYYY-MM-DDTHH:mm:SS.sss

;

--to-datetime "2021-6-26T00:00:00.000"

--to-offset

重置到指定的offset,但是通常情況下,比對到多個分區,這裡是将比對到的所有分區都重置到這一個值; 如果 1.目标最大offset<

--to-offset

, 這個時候重置為目标最大offset;2.目标最小offset>

--to-offset

,則重置為最小; 3.否則的話才會重置為

--to-offset

的目标值; 一般不用這個

--to-offset 3465

在這裡插入圖檔描述

--shift-by

按照偏移量增加或者減少多少個offset;正的為往前增加;負的往後退;當然這裡也是比對所有的;

--shift-by 100

--shift-by -100

--from-file

根據CVS文檔來重置; 這裡下面單獨講解

--from-file

着重講解一下

上面其他的一些模式重置的都是比對到的所有分區; 不能夠每個分區重置到不同的offset;不過

--from-file

可以讓我們更靈活一點;
  1. 先配置cvs文檔

    格式為: Topic:分區号: 重置目标偏移量```cvs

    test2,0,100

    test2,1,200

    test2,2,300

    ```

  2. 執行指令

    sh bin/kafka-consumer-groups.sh --reset-offsets --group test2_consumer_group --bootstrap-server xxxx:9090 --dry-run --from-file config/reset-offset.csv

5. 删除偏移量

delete-offsets

<font color=red>能夠執行成功的一個前提是 消費組這會是不可用狀态;</font>

偏移量被删除了之後,Consumer Group下次啟動的時候,會從頭消費;

sh bin/kafka-consumer-groups.sh --delete-offsets --group test2_consumer_group2 --bootstrap-server XXXX:9090 --topic test2

相關可選參數

參數 描述 例子

--bootstrap-server

指定連接配接到的kafka服務; --bootstrap-server localhost:9092

--list

列出所有消費組名稱

--list

--describe

查詢消費者描述資訊

--describe

--group

指定消費組

--all-groups

指定所有消費組

--members

查詢消費組的成員資訊

--state

查詢消費者的狀态資訊

--offsets

在查詢消費組描述資訊的時候,這個參數會列出消息的偏移量資訊; 預設就會有這個參數的;

dry-run

重置偏移量的時候,使用這個參數可以讓你預先看到重置情況,這個時候還沒有真正的執行,真正執行換成

--excute

;預設為

dry-run

--excute

真正的執行重置偏移量的操作;

--to-earliest

将offset重置到最早

to-latest

将offset重置到最近

附件

ConfigCommand 的一些可選配置

Topic相關可選配置

key value 示例
cleanup.policy 清理政策
compression.type 壓縮類型(通常建議在produce端控制)
delete.retention.ms 壓縮日志的保留時間
file.delete.delay.ms
flush.messages 持久化message限制
flush.ms 持久化頻率
follower.replication.throttled.replicas flowwer副本限流 格式:分區号:副本follower号,分區号:副本follower号 0:1,1:1
index.interval.bytes
leader.replication.throttled.replicas leader副本限流 格式:分區号:副本Leader号 0:0
max.compaction.lag.ms
max.message.bytes 最大的batch的message大小
message.downconversion.enable message是否向下相容
message.format.version message格式版本
message.timestamp.difference.max.ms
message.timestamp.type
min.cleanable.dirty.ratio
min.compaction.lag.ms
min.insync.replicas 最小的ISR
preallocate
retention.bytes 日志保留大小(通常按照時間限制)
retention.ms 日志保留時間
segment.bytes segment的大小限制
segment.index.bytes
segment.jitter.ms
segment.ms segment的切割時間
unclean.leader.election.enable 是否允許非同步副本選主

Broker相關可選配置

key value 示例
advertised.listeners
background.threads
compression.type
follower.replication.throttled.rate
leader.replication.throttled.rate
listener.security.protocol.map
listeners
log.cleaner.backoff.ms
log.cleaner.dedupe.buffer.size
log.cleaner.delete.retention.ms
log.cleaner.io.buffer.load.factor
log.cleaner.io.buffer.size
log.cleaner.io.max.bytes.per.second
log.cleaner.max.compaction.lag.ms
log.cleaner.min.cleanable.ratio
log.cleaner.min.compaction.lag.ms
log.cleaner.threads
log.cleanup.policy
log.flush.interval.messages
log.flush.interval.ms
log.index.interval.bytes
log.index.size.max.bytes
log.message.downconversion.enable
log.message.timestamp.difference.max.ms
log.message.timestamp.type
log.preallocate
log.retention.bytes
log.retention.ms
log.roll.jitter.ms
log.roll.ms
log.segment.bytes
log.segment.delete.delay.ms
max.connections
max.connections.per.ip
max.connections.per.ip.overrides
message.max.bytes
metric.reporters
min.insync.replicas
num.io.threads
num.network.threads
num.recovery.threads.per.data.dir
num.replica.fetchers
principal.builder.class
replica.alter.log.dirs.io.max.bytes.per.second
sasl.enabled.mechanisms
sasl.jaas.config
sasl.kerberos.kinit.cmd
sasl.kerberos.min.time.before.relogin
sasl.kerberos.principal.to.local.rules
sasl.kerberos.service.name
sasl.kerberos.ticket.renew.jitter
sasl.kerberos.ticket.renew.window.factor
sasl.login.refresh.buffer.seconds
sasl.login.refresh.min.period.seconds
sasl.login.refresh.window.factor
sasl.login.refresh.window.jitter
sasl.mechanism.inter.broker.protocol
ssl.cipher.suites
ssl.client.auth
ssl.enabled.protocols
ssl.endpoint.identification.algorithm
ssl.key.password
ssl.keymanager.algorithm
ssl.keystore.location
ssl.keystore.password
ssl.keystore.type
ssl.protocol
ssl.provider
ssl.secure.random.implementation
ssl.trustmanager.algorithm
ssl.truststore.location
ssl.truststore.password
ssl.truststore.type
unclean.leader.election.enable

Users相關可選配置

key value 示例
SCRAM-SHA-256
SCRAM-SHA-512
consumer_byte_rate 針對消費者user進行限流
producer_byte_rate 針對生産者進行限流
request_percentage 請求百分比

clients相關可選配置

key value 示例
consumer_byte_rate
producer_byte_rate
request_percentage

以上大部分運維操作,都可以使用 LogI-Kafka-Manager 在平台上可視化操作;