系統
啟動 Kafka
-daemon
參數可以讓 Kafka 在背景運作。
kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties
指定 JMX 端口啟動
JMX 的全稱為 Java Management Extensions。顧名思義,是管理 Java 的一種擴充,通過 JMX 可以友善我們監控 Kafka 的記憶體,線程,CPU 的使用情況,以及生産和消費消息的名額。
JMX_PORT=9999 kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties
停止 Kafka
kafka-server-stop.sh
Topic
建立 Topic
kafka-topics.sh --create --bootstrap-server 10.37.62.20:9092 --replication-factor 3 --partitions 3 --topic <topic-name>
列出所有 Topic
kafka-topics.sh --bootstrap-server 10.37.62.20:9092 --list
檢視指定 Topic
kafka-topics.sh --bootstrap-server 10.37.62.20:9092 --describe --topic <topic-name>
删除指定 Topic
kafka-topics.sh --bootstrap-server 10.37.62.20:9092 --delete --topic <topic-name>
擴充 Topic 的 Partition 數量
artition 數量隻能擴大不能縮小。
kafka-topics.sh --bootstrap-server 10.37.62.20:9092 --topic app --alter --partitions 30
擴充 topic 每個 partition 的副本數量
replication factor 可以擴大也可以縮小,最多不能超過 broker 數量。先建立一個檔案名為 increace-factor.json,這裡要擴充的是 mysql-audit-log 這個 topic 的 partition 到 15 個:0,1,2 為 broker id。
{"version":1,
"partitions":[
{"topic":"mysql-audit-log","partition":0,"replicas":[0,1,2]},
{"topic":"mysql-audit-log","partition":1,"replicas":[0,1,2]},
{"topic":"mysql-audit-log","partition":2,"replicas":[0,1,2]},
{"topic":"mysql-audit-log","partition":3,"replicas":[0,1,2]},
{"topic":"mysql-audit-log","partition":4,"replicas":[0,1,2]},
{"topic":"mysql-audit-log","partition":5,"replicas":[0,1,2]},
{"topic":"mysql-audit-log","partition":6,"replicas":[0,1,2]},
{"topic":"mysql-audit-log","partition":7,"replicas":[0,1,2]},
{"topic":"mysql-audit-log","partition":8,"replicas":[0,1,2]},
{"topic":"mysql-audit-log","partition":9,"replicas":[0,1,2]},
{"topic":"mysql-audit-log","partition":10,"replicas":[0,1,2]},
{"topic":"mysql-audit-log","partition":11,"replicas":[0,1,2]},
{"topic":"mysql-audit-log","partition":12,"replicas":[0,1,2]},
{"topic":"mysql-audit-log","partition":13,"replicas":[0,1,2]},
{"topic":"mysql-audit-log","partition":14,"replicas":[0,1,2]}
]}
kafka-reassign-partitions.sh --zookeeper 10.37.62.20:2181 --reassignment-json-file increace-factor.json --execute
檢視 Topic 資料大小
#方法一
kafka-log-dirs.sh \
--bootstrap-server 192.168.1.87:9092 \
--topic-list mytopic \
--describe \
| grep -oP '(?<=size":)\d+' \
| awk '{ sum += $1 } END { print sum }'
#傳回結果,機關 Byte
648
#方法二,需要安裝 jq
kafka-log-dirs.sh \
--bootstrap-server 192.168.1.87:9092 \
--topic-list mytopic \
--describe \
| grep '^{' \
| jq '[ ..|.size? | numbers ] | add'
#傳回結果,機關 Byte
648
消費者組 Consumer Group
列出所有的 Consumer Group
kafka-consumer-groups.sh --bootstrap-server 10.37.62.20:9092 --list
檢視指定 Consumer Group 詳情
- GROUP:消費者 group
- TOPIC:話題 id
- PARTITION:分區 id
- CURRENT-OFFSET:目前已消費的條數
- LOG-END-OFFSET:總條數
- LAG:未消費的條數
- CONSUMER-ID:消費者 id
- HOST:消費者 ip 位址
- CLIENT-ID:用戶端 id
#這裡檢視的是 logstash_mysql 這個消費者 group 的消費情況
kafka-consumer-groups.sh --bootstrap-server 10.37.62.20:9092 --describe --group logstash_mysql
#傳回結果
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
logstash_mysql mysql-audit-log 11 1312115 1312857 742 logstash-5-0545a8a7-f7bd-430c-b619-7a2b206addd2 /10.37.62.24 logstash-5
logstash_mysql mysql-audit-log 1 1312593 1313345 752 logstash-0-d86bd51a-d010-45de-aa6f-f6da8542b779 /10.37.62.23 logstash-0
logstash_mysql mysql-audit-log 2 1309548 1310317 769 logstash-1-496340ea-935d-444d-a184-51d42e225054 /10.37.62.24 logstash-1
logstash_mysql mysql-audit-log 12 1313083 1313194 111 logstash-6-806b20cb-a9af-49c1-b37d-ccb33a646ab2 /10.37.62.24 logstash-6
logstash_mysql mysql-audit-log 6 1310984 1311192 208 logstash-13-8d474bf6-e8d0-4b8a-b319-cf5e2e6cc078 /10.37.62.24 logstash-13
logstash_mysql mysql-audit-log 9 1312998 1313768 770 logstash-3-29863fb0-6708-4fb1-9e28-bd81c30ce8ef /10.37.62.24 logstash-3
logstash_mysql mysql-audit-log 4 1315150 1315276 126 logstash-11-6d66a188-85b7-476b-bd89-5423ef48cd01 /10.37.62.24 logstash-11
logstash_mysql mysql-audit-log 0 22770935522 22770935650 128 logstash-0-7be475d6-a49e-4ff9-bf83-6b83f6067306 /10.37.62.24 logstash-0
logstash_mysql mysql-audit-log 8 1309956 1310103 147 logstash-2-3c313c6f-8c98-4333-8bad-2f9696457d7d /10.37.62.24 logstash-2
logstash_mysql mysql-audit-log 13 1314659 1314775 116 logstash-7-e98fd14e-e7f6-45e5-8ccf-2442058f0bc9 /10.37.62.24 logstash-7
logstash_mysql mysql-audit-log 14 1313145 1313250 105 logstash-8-2c3345a8-f8f1-4f08-a18e-333dff2f0d65 /10.37.62.24 logstash-8
logstash_mysql mysql-audit-log 5 1314037 1314297 260 logstash-12-ce018227-9e59-4137-a23f-5ccc0c7d4f6a /10.37.62.24 logstash-12
logstash_mysql mysql-audit-log 10 1312883 1312962 79 logstash-4-9eb84ae4-3351-4083-9b1f-288910a6c3b8 /10.37.62.24 logstash-4
logstash_mysql mysql-audit-log 7 1312476 1313200 724 logstash-14-680c982e-5cf3-406b-810a-4d5c96b5bdee /10.37.62.24 logstash-14
logstash_mysql mysql-audit-log 3 1313227 1313328 101 logstash-10-e212dc18-a2bb-42d9-9d0b-095a93841efc /10.37.62.24 logstash-10
删除指定 Consumer Group
kafka-topics.sh --bootstrap-server 10.37.62.20:9092 --delete --topic pgw-nginx
消息
生産消息
普通生産消息
kafka-console-producer.sh --broker-list 11.8.36.125:9092 --topic mytopic
>this is my topic
生産消息指定 Key
key.separator=,
指定以逗号作為 key 和 value 的分隔符。
kafka-console-producer.sh --broker-list kafka1:9092 --topic cr7-topic --property parse.key=true --property key.separator=,
>mykey,{"orderAmount":1000,"orderId":1,"productId":101,"productNum":1}
消費消息
從頭開始消費
從頭開始消費是可以消費到之前的消息的,通過
--from-beginning
指定:
kafka-console-consumer.sh --bootstrap-server 11.8.36.125:9092 --topic mytopic --from-beginning
this is my topic
從尾部開始消費
--offset latest
指定從尾部開始消費,另外還需要指定 partition,可以指定多個:
kafka-console-consumer.sh --bootstrap-server 11.8.36.125:9092 --topic mytopic --offset latest --partition 0 1 2
消費指定條數的消息
--max-messages
指定取的個數:
kafka-console-consumer.sh --bootstrap-server 11.8.36.125:9092 --topic mytopic --offset latest --partition 0 1 2 --max-messages 2
bobo
1111
Processed a total of 2 messages
指定消費組進行消費
--consumer-property group.id=<消費者組名>
執行消費者組進行消費:
kafka-console-consumer.sh --bootstrap-server kafka1:9092 --topic test_partition --consumer-property group.id=test_group --from-beginning
檢視消息具體内容
kafka-dump-log.sh --files cr7-topic-0/00000000000000000000.log -deep-iteration --print-data-log
#輸出結果
| offset: 1080 CreateTime: 1615020877664 keysize: 1 valuesize: 63 sequence: -1 headerKeys: [] key: 1 payload: {"orderAmount":1000,"orderId":1,"productId":101,"productNum":1}
| offset: 1081 CreateTime: 1615020877677 keysize: 1 valuesize: 63 sequence: -1 headerKeys: [] key: 5 payload: {"orderAmount":1000,"orderId":5,"productId":105,"productNum":5}
| offset: 1082 CreateTime: 1615020877677 keysize: 1 valuesize: 63 sequence: -1 headerKeys: [] key: 7 payload: {"orderAmount":1000,"orderId":7,"productId":107,"productNum":7}
| offset: 1083 CreateTime: 1615020877677 keysize: 1 valuesize: 63 sequence: -1 headerKeys: [] key: 8 payload: {"orderAmount":1000,"orderId":8,"productId":108,"productNum":8}
| offset: 1084 CreateTime: 1615020877677 keysize: 2 valuesize: 65 sequence: -1 headerKeys: [] key: 11 payload: {"orderAmount":1000,"orderId":11,"productId":111,"productNum":11}
| offset: 1085 CreateTime: 1615020877677 keysize: 2 valuesize: 65 sequence: -1 headerKeys: [] key: 15 payload: {"orderAmount":1000,"orderId":15,"productId":115,"productNum":15}
| offset: 1086 CreateTime: 1615020877678 keysize: 2 valuesize: 65 sequence: -1 headerKeys: [] key: 17 payload: {"orderAmount":1000,"orderId":17,"productId":117,"productNum":17}
| offset: 1087 CreateTime: 1615020877678 keysize: 2 valuesize: 65 sequence: -1 headerKeys: [] key: 21 payload: {"orderAmount":1000,"orderId":21,"productId":121,"productNum":21}
檢視 Topic 中目前消息總數
Kafka 自帶的指令沒有直接提供這樣的功能,要使用 Kafka 提供的工具類 GetOffsetShell 來計算給定 Topic 每個分區目前最早位移和最新位移,內插補點就是每個分區的目前的消息總數,将該 Topic 所有分區的消息總數累加就能得到該 Topic 總的消息數。
首先查詢 Topic 中每個分區 offset 的最小值(起始位置),使用
--time -2
參數。一個分區的起始位置并不是每時每刻都為 0 ,因為日志清理的動作會清理舊的資料,是以分區的起始位置會自然而然地增加。
kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list kafka1:9092 -topic test-topic --time -2
#前面是分區号,後面是 offset
test-topic:0:0
test-topic:1:0
然後使用
--time -1
參數查詢 Topic 各個分區的 offset 的最大值。
kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list kafka1:9092 --time -1 --topic test-topic
#輸出結果
test-topic:0:5500000
test-topic:1:5500000
對于本例來說,test-topic 中目前總的消息數為 (5500000 - 0) + (5500000 - 0),等于 1100 萬條。如果隻是要擷取 Topic 中總的消息數(包括已經從 Kafka 删除的消息),那麼隻需要将 Topic 中每個 Partition 的 Offset 累加即可。
Offset
重置消費者 Offset
#檢視消費者組消費情況
#目前的 0 分區 CURRENT-OFFSET 是 4,2 分區 CURRENT-OFFSET 是 6
kafka-consumer-groups.sh --bootstrap-server kafka1:9092 --describe --group my-consumer-group
#傳回結果
Consumer group 'my-consumer-group' has no active members.
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
my-consumer-group transaction-topic-msg 2 6 6 0 - - -
my-consumer-group transaction-topic-msg 1 0 0 0 - - -
my-consumer-group transaction-topic-msg 0 4 4 0 - - - -
#重置消費者組 offset 為 3,重置是所有分區一起重置
kafka-consumer-groups.sh --bootstrap-server kafka1:9092 --group my-consumer-group --reset-offsets --execute --to-offset 3 --topic transaction-topic-msg
#傳回結果
[2021-06-25 10:44:51,848] WARN New offset (3) is higher than latest offset for topic partition transaction-topic-msg-1. Value will be set to 0 (kafka.admin.ConsumerGroupCommand$)
GROUP TOPIC PARTITION NEW-OFFSET
my-consumer-group transaction-topic-msg 0 3
my-consumer-group transaction-topic-msg 1 0
my-consumer-group transaction-topic-msg 2 3
#可以看到 0 分區和 2 分區的 CURRENT-OFFSET 都變為 3 了
kafka-consumer-groups.sh --bootstrap-server kafka1:9092 --describe --group my-consumer-group
#傳回結果
Consumer group 'my-consumer-group' has no active members.
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
my-consumer-group transaction-topic-msg 2 3 6 3 - - -
my-consumer-group transaction-topic-msg 1 0 0 0 - - -
my-consumer-group transaction-topic-msg 0 3 4 1 - - -
#可以重新消費到之前的資料
kafka-console-consumer.sh --bootstrap-server kafka1:9092 --topic transaction-topic-msg --group my-consumer-group
#傳回結果
message-111111
message-333333
性能測試
生産者性能測試
- --num-records 10000000: 向指定主題發送了 1 千萬條消息。
- --record-size 1024: 每條消息的大小為 1024KB。
- --throughput -1: 不限制吞吐量。
- --producer-props: 指定生産者參數。
-
- acks=-1: 這要求 ISR 清單裡跟 leader 保持同步的那些 follower 都要把消息同步過去,才能認為這條消息是寫入成功。
- linger.ms=2000: batch.size 和 linger.ms 是對 kafka producer 性能影響比較大的兩個參數。batch.size 是 producer 批量發送的基本機關,預設是 16384Bytes,即 16kB;lingger.ms 是 sender 線程在檢查 batch 是否 ready 時候,判斷有沒有過期的參數,預設大小是 0ms。
- compression.type=lz4: 使用 lz4 壓縮算法。
[root@kafka1 ~]# kafka-producer-perf-test.sh --topic test_producer_perf --num-records 10000000 --throughput -1 --record-size 1024 --producer-props bootstrap.servers=kafka1:9092 acks=-1 linger.ms=2000 compression.type=lz4
#輸出結果
705600 records sent, 141063.6 records/sec (137.76 MB/sec), 54.8 ms avg latency, 557.0 ms max latency.
1204178 records sent, 240739.3 records/sec (235.10 MB/sec), 44.1 ms avg latency, 402.0 ms max latency.
1370938 records sent, 274187.6 records/sec (267.76 MB/sec), 27.9 ms avg latency, 311.0 ms max latency.
1464605 records sent, 292628.4 records/sec (285.77 MB/sec), 19.2 ms avg latency, 139.0 ms max latency.
1477239 records sent, 295447.8 records/sec (288.52 MB/sec), 31.8 ms avg latency, 290.0 ms max latency.
1446682 records sent, 289336.4 records/sec (282.56 MB/sec), 26.4 ms avg latency, 281.0 ms max latency.
1555098 records sent, 311019.6 records/sec (303.73 MB/sec), 37.6 ms avg latency, 344.0 ms max latency.
10000000 records sent, 263894.020162 records/sec (257.71 MB/sec), 32.60 ms avg latency, 557.00 ms max latency, 12 ms 50th, 140 ms 95th, 262 ms 99th, 396 ms 99.9th.
我們應該關心延時的機率分布情況,僅僅知道一個平均值是沒有意義的。這就是這裡計算分位數的原因。通常我們關注到 99th 分位就可以了。比如在上面的輸出中,99th 值是 262 ms,這表明測試生産者生産的消息中,有 99% 消息的延時都在 262 ms 以内。你完全可以把這個資料當作這個生産者對外承諾的 SLA。
消費者性能測試
[root@kafka1 ~]# kafka-consumer-perf-test.sh --broker-list kafka1:9092 --messages 10000000 --topic test_producer_perf
#輸出結果
start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec, rebalance.time.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec
2021-03-09 10:34:18:447, 2021-03-09 10:34:33:948, 9765.6250, 629.9997, 10000000, 645119.6697, 1615257259068, -1615257243567, -0.0000, -0.0062
雖然輸出格式有所差别,但該腳本也會列印出消費者的吞吐量資料。比如本例中的 629.9997MB/s。有點令人遺憾的是,它沒有計算不同分位數下的分布情況。是以,在實際使用過程中,這個腳本的使用率要比生産者性能測試腳本的使用率低。
修改動态參數
檢視支援的動态參數
如果你想要知道動态 Broker 參數都有哪些,一種方式是在 Kafka 官網中檢視 Broker 端參數清單,另一種方式是直接運作無參數的 kafka-configs 腳本,該腳本的說明文檔會告訴你目前動态 Broker 參數都有哪些。
[root@kafka1 ~]# kafka-configs.sh
This tool helps to manipulate and describe entity config for a topic, client, user or broker
Option Description
------ -----------
--add-config <String> Key Value pairs of configs to add.
Square brackets can be used to group
values which contain commas: 'k1=v1,
k2=[v1,v2,v2],k3=v3'. The following
#Topic 動态參數
is a list of valid configurations:
For entity-type 'topics':
cleanup.policy
compression.type
delete.retention.ms
file.delete.delay.ms
flush.messages
flush.ms
follower.replication.throttled.
replicas
index.interval.bytes
leader.replication.throttled.replicas
max.compaction.lag.ms
max.message.bytes
message.downconversion.enable
message.format.version
message.timestamp.difference.max.ms
message.timestamp.type
min.cleanable.dirty.ratio
min.compaction.lag.ms
min.insync.replicas
preallocate
retention.bytes
retention.ms
segment.bytes
segment.index.bytes
segment.jitter.ms
segment.ms
unclean.leader.election.enable
#Broker 動态參數
For entity-type 'brokers':
advertised.listeners
background.threads
compression.type
follower.replication.throttled.rate
leader.replication.throttled.rate
listener.security.protocol.map
listeners
log.cleaner.backoff.ms
log.cleaner.dedupe.buffer.size
log.cleaner.delete.retention.ms
log.cleaner.io.buffer.load.factor
log.cleaner.io.buffer.size
log.cleaner.io.max.bytes.per.second
log.cleaner.max.compaction.lag.ms
log.cleaner.min.cleanable.ratio
log.cleaner.min.compaction.lag.ms
log.cleaner.threads
log.cleanup.policy
log.flush.interval.messages
log.flush.interval.ms
log.index.interval.bytes
log.index.size.max.bytes
log.message.downconversion.enable
log.message.timestamp.difference.max.
ms
log.message.timestamp.type
log.preallocate
log.retention.bytes
log.retention.ms
log.roll.jitter.ms
log.roll.ms
log.segment.bytes
log.segment.delete.delay.ms
max.connection.creation.rate
max.connections
max.connections.per.ip
max.connections.per.ip.overrides
message.max.bytes
metric.reporters
min.insync.replicas
num.io.threads
num.network.threads
num.recovery.threads.per.data.dir
num.replica.fetchers
principal.builder.class
replica.alter.log.dirs.io.max.bytes.
per.second
sasl.enabled.mechanisms
sasl.jaas.config
sasl.kerberos.kinit.cmd
sasl.kerberos.min.time.before.relogin
sasl.kerberos.principal.to.local.rules
sasl.kerberos.service.name
sasl.kerberos.ticket.renew.jitter
sasl.kerberos.ticket.renew.window.
factor
sasl.login.refresh.buffer.seconds
sasl.login.refresh.min.period.seconds
sasl.login.refresh.window.factor
sasl.login.refresh.window.jitter
sasl.mechanism.inter.broker.protocol
ssl.cipher.suites
ssl.client.auth
ssl.enabled.protocols
ssl.endpoint.identification.algorithm
ssl.engine.factory.class
ssl.key.password
ssl.keymanager.algorithm
ssl.keystore.certificate.chain
ssl.keystore.key
ssl.keystore.location
ssl.keystore.password
ssl.keystore.type
ssl.protocol
ssl.provider
ssl.secure.random.implementation
ssl.trustmanager.algorithm
ssl.truststore.certificates
ssl.truststore.location
ssl.truststore.password
ssl.truststore.type
unclean.leader.election.enable
For entity-type 'users':
SCRAM-SHA-256
SCRAM-SHA-512
consumer_byte_rate
controller_mutation_rate
producer_byte_rate
request_percentage
For entity-type 'clients':
consumer_byte_rate
controller_mutation_rate
producer_byte_rate
request_percentage
Entity types 'users' and 'clients' may
be specified together to update
config for clients of a specific
user.
修改 Broker 動态參數
修改動态參數無需重新開機 Broker,動态 Broker 參數的使用場景非常廣泛,通常包括但不限于以下幾種:
- 動态調整 Broker 端各種線程池大小,實時應對突發流量。
- 動态調整 Broker 端連接配接資訊或安全配置資訊。
- 動态更新 SSL Keystore 有效期。
- 動态調整 Broker 端 Compact 操作性能。
- 實時變更 JMX 名額收集器 (JMX Metrics Reporter)。
Kafka Broker Config 的參數有以下 3 種類型:
- read-only:被标記為 read-only 的參數和原來的參數行為一樣,隻有重新開機 Broker,才能令修改生效。
- per-broker:被标記為 per-broker 的參數屬于動态參數,修改它之後,隻會在對應的 Broker 上生效。
- cluster-wide:被标記為 cluster-wide 的參數也屬于動态參數,修改它之後,會在整個叢集範圍内生效,也就是說,對所有 Broker 都生效。你也可以為具體的 Broker 修改 cluster-wide 參數。
在叢集層面設定全局值,即設定 cluster-wide 範圍值,将
unclean.leader.election.enable
參數在叢集層面設定為 true。
kafka-configs.sh --bootstrap-server 10.37.249.58:9092 \
--entity-type brokers --entity-default --alter \
--add-config unclean.leader.election.enable=true
#傳回結果
Completed updating default config for brokers in the cluster.
如果要設定 cluster-wide 範圍的動态參數,需要顯式指定 entity-default。現在,我們使用下面的指令來檢視一下剛才的配置是否成功。
kafka-configs.sh --bootstrap-server 10.37.249.58:9092 \
--entity-type brokers --entity-default --describe
#傳回結果
Default configs for brokers in the cluster are:
unclean.leader.election.enable=true sensitive=false synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:unclean.leader.election.enable=true}
在 Zookeeper 上檢視 /config/brokers/ 節點可以檢視 cluster-wide 的動态參數設定。
[zk: (CONNECTED) ] > get /config/brokers/<default>
{"version":1,"config":{"unclean.leader.election.enable":"true"}}
cZxid = 17179869570
ctime = 1631246402937
mZxid = 17179869570
mtime = 1631246402937
pZxid = 17179869570
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0
dataLength = 64
numChildren = 0
設定 per-broker 範圍參數。我們還是以
unclean.leader.election.enable
參數為例,我現在為 ID 為 1 的 Broker 設定一個不同的值。指令如下:
kafka-configs.sh --bootstrap-server 10.37.249.58:9092 --entity-type brokers --entity-name 1 --alter --add-config unclean.leader.election.enable=false
#傳回結果
Completed updating config for broker 1.
我們使用下列指令檢視 Broker ID 為 1 的節點動态參數,可以看到
DYNAMIC_BROKER_CONFIG:unclean.leader.election.enable=false
,表示我們剛才對 per-broker 參數的調整生效了。
kafka-configs.sh --bootstrap-server 10.37.249.58:9092 --entity-type brokers --entity-name 1 --describe
#傳回結果
Dynamic configs for broker 1 are:
unclean.leader.election.enable=false sensitive=false synonyms={DYNAMIC_BROKER_CONFIG:unclean.leader.election.enable=false, DYNAMIC_DEFAULT_BROKER_CONFIG:unclean.leader.election.enable=true, STATIC_BROKER_CONFIG:unclean.leader.election.enable=false, DEFAULT_CONFIG:unclean.leader.election.enable=false}
在 Zookeeper 上檢視 /config/brokers/1 節點可以檢視 Broker ID 為 1 的節點的動态參數設定。
[zk: (CONNECTED) ] > get /config/brokers/1
{"version":1,"config":{"unclean.leader.election.enable":"false"}}
cZxid = 17179869574
ctime = 1631246495120
mZxid = 17179869574
mtime = 1631246495120
pZxid = 17179869574
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0
dataLength = 65
numChildren = 0[zk: (CONNECTED) ] > get /config/brokers/<default>[zk: (CONNECTED) ] > get /config/brokers/1
删除 cluster-wide 範圍動态參數。
kafka-configs.sh --bootstrap-server 10.37.249.58:9092 \
--entity-type brokers --entity-default --alter \
--delete-config unclean.leader.election.enable
#傳回結果
Completed updating default config for brokers in the cluster.
删除 per-broker 範圍參數。
kafka-configs.sh --bootstrap-server 10.37.249.58:9092 \
--entity-type brokers --entity-name 1 --alter \
--delete-config unclean.leader.election.enable
#傳回結果
Completed updating config for broker 1.
修改 Topic 動态參數
設定 Topic test-topic 的
retention.ms
為 10000。
kafka-configs.sh --bootstrap-server 10.37.249.58:9092 \
--entity-type topics --entity-name test-topic --alter \
--add-config retention.ms=10000
檢視設定的 Topic 動态參數。
kafka-configs.sh --bootstrap-server 10.37.249.58:9092 \
--entity-type topics --entity-name test-topic --describe
#傳回結果
Dynamic configs for topic test-topic are:
retention.ms=10000 sensitive=false synonyms={DYNAMIC_TOPIC_CONFIG:retention.ms=10000}
在 Zookeeper 上可以檢視 /config/topics/ 來檢視 Topic 動态參數。
[zk: (CONNECTED) ] > get /config/topics/test-topic
{"version":1,"config":{"retention.ms":"10000"}}
cZxid = 17179869460
ctime = 1631245744105
mZxid = 17179869619
mtime = 1631250116481
pZxid = 17179869460
cversion = 0
dataVersion = 10
aclVersion = 0
ephemeralOwner = 0
dataLength = 47
numChildren = 0[zk: (CONNECTED) ] > get /config/topics/test-topic
删除 Topic 動态參數。
kafka-configs.sh --bootstrap-server 10.37.249.58:9092 \
--entity-type topics --entity-name test-topic --alter \
--delete-config retention.ms
Kafka 叢集一鍵啟動/停止腳本
環境變量設定:
#/etc/profile 檔案
export KAFKA_HOME=/usr/local/kafka
export PATH=$PATH:$KAFKA_HOME/bin
一鍵啟動/停止腳本,檢視狀态需要安裝 jps 工具。
#! /bin/bash
# 填寫 Kafka Broker 節點位址
hosts=(kafka1 kafka2 kafka3)
# 列印啟動分布式腳本資訊
mill=`date "+%N"`
tdate=`date "+%Y-%m-%d %H:%M:%S,${mill:0:3}"`
echo [$tdate] INFO [Kafka Cluster] begins to execute the $1 operation.
# 執行分布式開啟指令
function start()
{
for i in ${hosts[@]}
do
smill=`date "+%N"`
stdate=`date "+%Y-%m-%d %H:%M:%S,${smill:0:3}"`
ssh root@$i "source /etc/profile;echo [$stdate] INFO [Kafka Broker $i] begins to execute the startup operation.;kafka-server-start.sh $KAFKA_HOME/config/server.properties>/dev/null" &
sleep 1
done
}
# 執行分布式關閉指令
function stop()
{
for i in ${hosts[@]}
do
smill=`date "+%N"`
stdate=`date "+%Y-%m-%d %H:%M:%S,${smill:0:3}"`
ssh root@$i "source /etc/profile;echo [$stdate] INFO [Kafka Broker $i] begins to execute the shutdown operation.;kafka-server-stop.sh>/dev/null;" &
sleep 1
done
}
# 檢視 Kafka Broker 節點狀态
function status()
{
for i in ${hosts[@]}
do
smill=`date "+%N"`
stdate=`date "+%Y-%m-%d %H:%M:%S,${smill:0:3}"`
ssh root@$i "source /etc/profile;echo [$stdate] INFO [Kafka Broker $i] status message is :;jps | grep Kafka;" &
sleep 1
done
}
# 判斷輸入的 Kafka 指令參數是否有效
case "$1" in
start)
start
;;
stop)
stop
;;
status)
status
;;
*)
echo "Usage: $0 {start|stop|status}"
RETVAL=1
esac
參考資料
- Kafka 動态配置了解下?(https://time.geekbang.org/column/article/113504)
- 常見工具腳本大彙總(https://time.geekbang.org/column/article/116111)
- Kafka 并不難學!