天天看點

Kafka 常用工具腳本總結

系統

啟動 Kafka

-daemon

參數可以讓 Kafka 在背景運作。

kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties      

指定 JMX 端口啟動

JMX 的全稱為 Java Management Extensions。顧名思義,是管理 Java 的一種擴充,通過 JMX 可以友善我們監控 Kafka 的記憶體,線程,CPU 的使用情況,以及生産和消費消息的名額。

JMX_PORT=9999 kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties      

停止 Kafka

kafka-server-stop.sh      

Topic

建立 Topic

kafka-topics.sh --create  --bootstrap-server 10.37.62.20:9092 --replication-factor 3 --partitions 3 --topic <topic-name>      

列出所有 Topic

kafka-topics.sh  --bootstrap-server 10.37.62.20:9092 --list      

檢視指定 Topic

kafka-topics.sh --bootstrap-server 10.37.62.20:9092 --describe --topic <topic-name>      

删除指定 Topic

kafka-topics.sh --bootstrap-server 10.37.62.20:9092 --delete --topic <topic-name>      

擴充 Topic 的 Partition 數量

artition 數量隻能擴大不能縮小。

kafka-topics.sh --bootstrap-server 10.37.62.20:9092 --topic app --alter --partitions 30      

擴充 topic 每個 partition 的副本數量

replication factor 可以擴大也可以縮小,最多不能超過 broker 數量。先建立一個檔案名為 increace-factor.json,這裡要擴充的是 mysql-audit-log 這個 topic 的 partition 到 15 個:0,1,2 為 broker id。

{"version":1,
"partitions":[
{"topic":"mysql-audit-log","partition":0,"replicas":[0,1,2]},
{"topic":"mysql-audit-log","partition":1,"replicas":[0,1,2]},
{"topic":"mysql-audit-log","partition":2,"replicas":[0,1,2]},
{"topic":"mysql-audit-log","partition":3,"replicas":[0,1,2]},
{"topic":"mysql-audit-log","partition":4,"replicas":[0,1,2]},
{"topic":"mysql-audit-log","partition":5,"replicas":[0,1,2]},
{"topic":"mysql-audit-log","partition":6,"replicas":[0,1,2]},
{"topic":"mysql-audit-log","partition":7,"replicas":[0,1,2]},
{"topic":"mysql-audit-log","partition":8,"replicas":[0,1,2]},
{"topic":"mysql-audit-log","partition":9,"replicas":[0,1,2]},
{"topic":"mysql-audit-log","partition":10,"replicas":[0,1,2]},
{"topic":"mysql-audit-log","partition":11,"replicas":[0,1,2]},
{"topic":"mysql-audit-log","partition":12,"replicas":[0,1,2]},
{"topic":"mysql-audit-log","partition":13,"replicas":[0,1,2]},
{"topic":"mysql-audit-log","partition":14,"replicas":[0,1,2]}
]}      
kafka-reassign-partitions.sh --zookeeper 10.37.62.20:2181 --reassignment-json-file  increace-factor.json --execute      

檢視 Topic 資料大小

#方法一
kafka-log-dirs.sh \
  --bootstrap-server 192.168.1.87:9092 \
  --topic-list mytopic \
  --describe \
  | grep -oP '(?<=size":)\d+'  \
  | awk '{ sum += $1 } END { print sum }'
  
#傳回結果,機關 Byte
648
#方法二,需要安裝 jq
kafka-log-dirs.sh \
    --bootstrap-server 192.168.1.87:9092 \
    --topic-list mytopic \
    --describe \
  | grep '^{' \
  | jq '[ ..|.size? | numbers ] | add'
#傳回結果,機關 Byte
648      

消費者組 Consumer Group

列出所有的 Consumer Group

kafka-consumer-groups.sh --bootstrap-server 10.37.62.20:9092 --list      

檢視指定 Consumer Group 詳情

  • GROUP:消費者 group
  • TOPIC:話題 id
  • PARTITION:分區 id
  • CURRENT-OFFSET:目前已消費的條數
  • LOG-END-OFFSET:總條數
  • LAG:未消費的條數
  • CONSUMER-ID:消費者 id
  • HOST:消費者 ip 位址
  • CLIENT-ID:用戶端 id
#這裡檢視的是 logstash_mysql 這個消費者 group 的消費情況
kafka-consumer-groups.sh --bootstrap-server 10.37.62.20:9092 --describe --group logstash_mysql
#傳回結果
GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                      HOST            CLIENT-ID
logstash_mysql  mysql-audit-log 11         1312115         1312857         742             logstash-5-0545a8a7-f7bd-430c-b619-7a2b206addd2  /10.37.62.24    logstash-5
logstash_mysql  mysql-audit-log 1          1312593         1313345         752             logstash-0-d86bd51a-d010-45de-aa6f-f6da8542b779  /10.37.62.23    logstash-0
logstash_mysql  mysql-audit-log 2          1309548         1310317         769             logstash-1-496340ea-935d-444d-a184-51d42e225054  /10.37.62.24    logstash-1
logstash_mysql  mysql-audit-log 12         1313083         1313194         111             logstash-6-806b20cb-a9af-49c1-b37d-ccb33a646ab2  /10.37.62.24    logstash-6
logstash_mysql  mysql-audit-log 6          1310984         1311192         208             logstash-13-8d474bf6-e8d0-4b8a-b319-cf5e2e6cc078 /10.37.62.24    logstash-13
logstash_mysql  mysql-audit-log 9          1312998         1313768         770             logstash-3-29863fb0-6708-4fb1-9e28-bd81c30ce8ef  /10.37.62.24    logstash-3
logstash_mysql  mysql-audit-log 4          1315150         1315276         126             logstash-11-6d66a188-85b7-476b-bd89-5423ef48cd01 /10.37.62.24    logstash-11
logstash_mysql  mysql-audit-log 0          22770935522     22770935650     128             logstash-0-7be475d6-a49e-4ff9-bf83-6b83f6067306  /10.37.62.24    logstash-0
logstash_mysql  mysql-audit-log 8          1309956         1310103         147             logstash-2-3c313c6f-8c98-4333-8bad-2f9696457d7d  /10.37.62.24    logstash-2
logstash_mysql  mysql-audit-log 13         1314659         1314775         116             logstash-7-e98fd14e-e7f6-45e5-8ccf-2442058f0bc9  /10.37.62.24    logstash-7
logstash_mysql  mysql-audit-log 14         1313145         1313250         105             logstash-8-2c3345a8-f8f1-4f08-a18e-333dff2f0d65  /10.37.62.24    logstash-8
logstash_mysql  mysql-audit-log 5          1314037         1314297         260             logstash-12-ce018227-9e59-4137-a23f-5ccc0c7d4f6a /10.37.62.24    logstash-12
logstash_mysql  mysql-audit-log 10         1312883         1312962         79              logstash-4-9eb84ae4-3351-4083-9b1f-288910a6c3b8  /10.37.62.24    logstash-4
logstash_mysql  mysql-audit-log 7          1312476         1313200         724             logstash-14-680c982e-5cf3-406b-810a-4d5c96b5bdee /10.37.62.24    logstash-14
logstash_mysql  mysql-audit-log 3          1313227         1313328         101             logstash-10-e212dc18-a2bb-42d9-9d0b-095a93841efc /10.37.62.24    logstash-10      

删除指定 Consumer Group

kafka-topics.sh --bootstrap-server 10.37.62.20:9092 --delete --topic pgw-nginx      

消息

生産消息

普通生産消息

kafka-console-producer.sh --broker-list 11.8.36.125:9092 --topic mytopic
>this is my topic      

生産消息指定 Key

key.separator=,

指定以逗号作為 key 和 value 的分隔符。

kafka-console-producer.sh --broker-list kafka1:9092 --topic cr7-topic --property parse.key=true --property key.separator=,
>mykey,{"orderAmount":1000,"orderId":1,"productId":101,"productNum":1}      

消費消息

從頭開始消費

從頭開始消費是可以消費到之前的消息的,通過

--from-beginning

指定:

kafka-console-consumer.sh --bootstrap-server 11.8.36.125:9092 --topic mytopic --from-beginning
this is my topic      

從尾部開始消費

--offset latest

指定從尾部開始消費,另外還需要指定 partition,可以指定多個:

kafka-console-consumer.sh --bootstrap-server 11.8.36.125:9092 --topic mytopic  --offset latest  --partition 0 1 2      

消費指定條數的消息

--max-messages

指定取的個數:

kafka-console-consumer.sh --bootstrap-server 11.8.36.125:9092 --topic mytopic  --offset latest  --partition 0 1 2 --max-messages 2
bobo
1111
Processed a total of 2 messages      

指定消費組進行消費

--consumer-property group.id=<消費者組名>

執行消費者組進行消費:

kafka-console-consumer.sh --bootstrap-server  kafka1:9092 --topic test_partition --consumer-property  group.id=test_group --from-beginning      

檢視消息具體内容

kafka-dump-log.sh --files cr7-topic-0/00000000000000000000.log  -deep-iteration --print-data-log
 
 #輸出結果
| offset: 1080 CreateTime: 1615020877664 keysize: 1 valuesize: 63 sequence: -1 headerKeys: [] key: 1 payload: {"orderAmount":1000,"orderId":1,"productId":101,"productNum":1}
| offset: 1081 CreateTime: 1615020877677 keysize: 1 valuesize: 63 sequence: -1 headerKeys: [] key: 5 payload: {"orderAmount":1000,"orderId":5,"productId":105,"productNum":5}
| offset: 1082 CreateTime: 1615020877677 keysize: 1 valuesize: 63 sequence: -1 headerKeys: [] key: 7 payload: {"orderAmount":1000,"orderId":7,"productId":107,"productNum":7}
| offset: 1083 CreateTime: 1615020877677 keysize: 1 valuesize: 63 sequence: -1 headerKeys: [] key: 8 payload: {"orderAmount":1000,"orderId":8,"productId":108,"productNum":8}
| offset: 1084 CreateTime: 1615020877677 keysize: 2 valuesize: 65 sequence: -1 headerKeys: [] key: 11 payload: {"orderAmount":1000,"orderId":11,"productId":111,"productNum":11}
| offset: 1085 CreateTime: 1615020877677 keysize: 2 valuesize: 65 sequence: -1 headerKeys: [] key: 15 payload: {"orderAmount":1000,"orderId":15,"productId":115,"productNum":15}
| offset: 1086 CreateTime: 1615020877678 keysize: 2 valuesize: 65 sequence: -1 headerKeys: [] key: 17 payload: {"orderAmount":1000,"orderId":17,"productId":117,"productNum":17}
| offset: 1087 CreateTime: 1615020877678 keysize: 2 valuesize: 65 sequence: -1 headerKeys: [] key: 21 payload: {"orderAmount":1000,"orderId":21,"productId":121,"productNum":21}      

檢視 Topic 中目前消息總數

Kafka 自帶的指令沒有直接提供這樣的功能,要使用 Kafka 提供的工具類 GetOffsetShell 來計算給定 Topic 每個分區目前最早位移和最新位移,內插補點就是每個分區的目前的消息總數,将該 Topic 所有分區的消息總數累加就能得到該 Topic 總的消息數。

首先查詢 Topic 中每個分區 offset 的最小值(起始位置),使用

--time -2

參數。一個分區的起始位置并不是每時每刻都為 0 ,因為日志清理的動作會清理舊的資料,是以分區的起始位置會自然而然地增加。

kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list kafka1:9092 -topic test-topic  --time -2
#前面是分區号,後面是 offset
test-topic:0:0
test-topic:1:0      

然後使用

--time -1

參數查詢 Topic 各個分區的 offset 的最大值。

kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list kafka1:9092 --time -1 --topic test-topic
#輸出結果
test-topic:0:5500000
test-topic:1:5500000      

對于本例來說,test-topic 中目前總的消息數為 (5500000 - 0) + (5500000 - 0),等于 1100 萬條。如果隻是要擷取 Topic 中總的消息數(包括已經從 Kafka 删除的消息),那麼隻需要将 Topic 中每個 Partition 的 Offset 累加即可。

Offset

重置消費者 Offset

#檢視消費者組消費情況
#目前的 0 分區 CURRENT-OFFSET 是 4,2 分區 CURRENT-OFFSET 是 6
kafka-consumer-groups.sh --bootstrap-server kafka1:9092 --describe --group my-consumer-group
#傳回結果
Consumer group 'my-consumer-group' has no active members.
GROUP             TOPIC                 PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
my-consumer-group transaction-topic-msg 2          6               6               0               -               -               -
my-consumer-group transaction-topic-msg 1          0               0               0               -               -               -
my-consumer-group transaction-topic-msg 0          4               4               0               -               -               -         -
#重置消費者組 offset 為 3,重置是所有分區一起重置
kafka-consumer-groups.sh --bootstrap-server kafka1:9092 --group my-consumer-group --reset-offsets --execute --to-offset 3 --topic transaction-topic-msg
#傳回結果
[2021-06-25 10:44:51,848] WARN New offset (3) is higher than latest offset for topic partition transaction-topic-msg-1. Value will be set to 0 (kafka.admin.ConsumerGroupCommand$)
GROUP                          TOPIC                          PARTITION  NEW-OFFSET     
my-consumer-group              transaction-topic-msg          0          3              
my-consumer-group              transaction-topic-msg          1          0              
my-consumer-group              transaction-topic-msg          2          3              
#可以看到 0 分區和 2 分區的 CURRENT-OFFSET 都變為 3 了
kafka-consumer-groups.sh --bootstrap-server kafka1:9092 --describe --group my-consumer-group
#傳回結果
Consumer group 'my-consumer-group' has no active members.
GROUP             TOPIC                 PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
my-consumer-group transaction-topic-msg 2          3               6               3               -               -               -
my-consumer-group transaction-topic-msg 1          0               0               0               -               -               -
my-consumer-group transaction-topic-msg 0          3               4               1               -               -               -
#可以重新消費到之前的資料
kafka-console-consumer.sh --bootstrap-server kafka1:9092 --topic transaction-topic-msg  --group my-consumer-group 
#傳回結果
message-111111
message-333333      

性能測試

生産者性能測試

  • --num-records 10000000: 向指定主題發送了 1 千萬條消息。
  • --record-size 1024: 每條消息的大小為 1024KB。
  • --throughput -1: 不限制吞吐量。
  • --producer-props: 指定生産者參數。
    • acks=-1: 這要求 ISR 清單裡跟 leader 保持同步的那些 follower 都要把消息同步過去,才能認為這條消息是寫入成功。
    • linger.ms=2000: batch.size 和 linger.ms 是對 kafka producer 性能影響比較大的兩個參數。batch.size 是 producer 批量發送的基本機關,預設是 16384Bytes,即 16kB;lingger.ms 是 sender 線程在檢查 batch 是否 ready 時候,判斷有沒有過期的參數,預設大小是 0ms。
    • compression.type=lz4: 使用 lz4 壓縮算法。
[root@kafka1 ~]# kafka-producer-perf-test.sh --topic test_producer_perf --num-records 10000000 --throughput -1 --record-size 1024 --producer-props bootstrap.servers=kafka1:9092 acks=-1 linger.ms=2000 compression.type=lz4
#輸出結果
705600 records sent, 141063.6 records/sec (137.76 MB/sec), 54.8 ms avg latency, 557.0 ms max latency.
1204178 records sent, 240739.3 records/sec (235.10 MB/sec), 44.1 ms avg latency, 402.0 ms max latency.
1370938 records sent, 274187.6 records/sec (267.76 MB/sec), 27.9 ms avg latency, 311.0 ms max latency.
1464605 records sent, 292628.4 records/sec (285.77 MB/sec), 19.2 ms avg latency, 139.0 ms max latency.
1477239 records sent, 295447.8 records/sec (288.52 MB/sec), 31.8 ms avg latency, 290.0 ms max latency.
1446682 records sent, 289336.4 records/sec (282.56 MB/sec), 26.4 ms avg latency, 281.0 ms max latency.
1555098 records sent, 311019.6 records/sec (303.73 MB/sec), 37.6 ms avg latency, 344.0 ms max latency.
10000000 records sent, 263894.020162 records/sec (257.71 MB/sec), 32.60 ms avg latency, 557.00 ms max latency, 12 ms 50th, 140 ms 95th, 262 ms 99th, 396 ms 99.9th.      

我們應該關心延時的機率分布情況,僅僅知道一個平均值是沒有意義的。這就是這裡計算分位數的原因。通常我們關注到 99th 分位就可以了。比如在上面的輸出中,99th 值是 262 ms,這表明測試生産者生産的消息中,有 99% 消息的延時都在 262 ms 以内。你完全可以把這個資料當作這個生産者對外承諾的 SLA。

消費者性能測試

[root@kafka1 ~]# kafka-consumer-perf-test.sh --broker-list kafka1:9092 --messages 10000000 --topic test_producer_perf
#輸出結果
start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec, rebalance.time.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec
2021-03-09 10:34:18:447, 2021-03-09 10:34:33:948, 9765.6250, 629.9997, 10000000, 645119.6697, 1615257259068, -1615257243567, -0.0000, -0.0062      

雖然輸出格式有所差别,但該腳本也會列印出消費者的吞吐量資料。比如本例中的 629.9997MB/s。有點令人遺憾的是,它沒有計算不同分位數下的分布情況。是以,在實際使用過程中,這個腳本的使用率要比生産者性能測試腳本的使用率低。

修改動态參數

檢視支援的動态參數

如果你想要知道動态 Broker 參數都有哪些,一種方式是在 Kafka 官網中檢視 Broker 端參數清單,另一種方式是直接運作無參數的 kafka-configs 腳本,該腳本的說明文檔會告訴你目前動态 Broker 參數都有哪些。

[root@kafka1 ~]# kafka-configs.sh 
This tool helps to manipulate and describe entity config for a topic, client, user or broker
Option                                 Description                            
------                                 -----------                            
--add-config <String>                  Key Value pairs of configs to add.     
                                         Square brackets can be used to group 
                                         values which contain commas: 'k1=v1, 
                                         k2=[v1,v2,v2],k3=v3'. The following  
                                         #Topic 動态參數
                                         is a list of valid configurations:   
                                         For entity-type 'topics':            
                                        cleanup.policy                        
                                        compression.type                      
                                        delete.retention.ms                   
                                        file.delete.delay.ms                  
                                        flush.messages                        
                                        flush.ms                              
                                        follower.replication.throttled.       
                                         replicas                             
                                        index.interval.bytes                  
                                        leader.replication.throttled.replicas 
                                        max.compaction.lag.ms                 
                                        max.message.bytes                     
                                        message.downconversion.enable         
                                        message.format.version                
                                        message.timestamp.difference.max.ms   
                                        message.timestamp.type                
                                        min.cleanable.dirty.ratio             
                                        min.compaction.lag.ms                 
                                        min.insync.replicas                   
                                        preallocate                           
                                        retention.bytes                       
                                        retention.ms                          
                                        segment.bytes                         
                                        segment.index.bytes                   
                                        segment.jitter.ms                     
                                        segment.ms                            
                                        unclean.leader.election.enable        
                                        #Broker 動态參數
                                       For entity-type 'brokers':             
                                        advertised.listeners                  
                                        background.threads                    
                                        compression.type                      
                                        follower.replication.throttled.rate   
                                        leader.replication.throttled.rate     
                                        listener.security.protocol.map        
                                        listeners                             
                                        log.cleaner.backoff.ms                
                                        log.cleaner.dedupe.buffer.size        
                                        log.cleaner.delete.retention.ms       
                                        log.cleaner.io.buffer.load.factor     
                                        log.cleaner.io.buffer.size            
                                        log.cleaner.io.max.bytes.per.second   
                                        log.cleaner.max.compaction.lag.ms     
                                        log.cleaner.min.cleanable.ratio       
                                        log.cleaner.min.compaction.lag.ms     
                                        log.cleaner.threads                   
                                        log.cleanup.policy                    
                                        log.flush.interval.messages           
                                        log.flush.interval.ms                 
                                        log.index.interval.bytes              
                                        log.index.size.max.bytes              
                                        log.message.downconversion.enable     
                                        log.message.timestamp.difference.max. 
                                         ms                                   
                                        log.message.timestamp.type            
                                        log.preallocate                       
                                        log.retention.bytes                   
                                        log.retention.ms                      
                                        log.roll.jitter.ms                    
                                        log.roll.ms                           
                                        log.segment.bytes                     
                                        log.segment.delete.delay.ms           
                                        max.connection.creation.rate          
                                        max.connections                       
                                        max.connections.per.ip                
                                        max.connections.per.ip.overrides      
                                        message.max.bytes                     
                                        metric.reporters                      
                                        min.insync.replicas                   
                                        num.io.threads                        
                                        num.network.threads                   
                                        num.recovery.threads.per.data.dir     
                                        num.replica.fetchers                  
                                        principal.builder.class               
                                        replica.alter.log.dirs.io.max.bytes.  
                                         per.second                           
                                        sasl.enabled.mechanisms               
                                        sasl.jaas.config                      
                                        sasl.kerberos.kinit.cmd               
                                        sasl.kerberos.min.time.before.relogin 
                                        sasl.kerberos.principal.to.local.rules
                                        sasl.kerberos.service.name            
                                        sasl.kerberos.ticket.renew.jitter     
                                        sasl.kerberos.ticket.renew.window.    
                                         factor                               
                                        sasl.login.refresh.buffer.seconds     
                                        sasl.login.refresh.min.period.seconds 
                                        sasl.login.refresh.window.factor      
                                        sasl.login.refresh.window.jitter      
                                        sasl.mechanism.inter.broker.protocol  
                                        ssl.cipher.suites                     
                                        ssl.client.auth                       
                                        ssl.enabled.protocols                 
                                        ssl.endpoint.identification.algorithm 
                                        ssl.engine.factory.class              
                                        ssl.key.password                      
                                        ssl.keymanager.algorithm              
                                        ssl.keystore.certificate.chain        
                                        ssl.keystore.key                      
                                        ssl.keystore.location                 
                                        ssl.keystore.password                 
                                        ssl.keystore.type                     
                                        ssl.protocol                          
                                        ssl.provider                          
                                        ssl.secure.random.implementation      
                                        ssl.trustmanager.algorithm            
                                        ssl.truststore.certificates           
                                        ssl.truststore.location               
                                        ssl.truststore.password               
                                        ssl.truststore.type                   
                                        unclean.leader.election.enable        
                                       For entity-type 'users':               
                                        SCRAM-SHA-256                         
                                        SCRAM-SHA-512                         
                                        consumer_byte_rate                    
                                        controller_mutation_rate              
                                        producer_byte_rate                    
                                        request_percentage                    
                                       For entity-type 'clients':             
                                        consumer_byte_rate                    
                                        controller_mutation_rate              
                                        producer_byte_rate                    
                                        request_percentage                    
                                       Entity types 'users' and 'clients' may 
                                         be specified together to update      
                                         config for clients of a specific     
                                         user.      

修改 Broker 動态參數

修改動态參數無需重新開機 Broker,動态 Broker 參數的使用場景非常廣泛,通常包括但不限于以下幾種:

  • 動态調整 Broker 端各種線程池大小,實時應對突發流量。
  • 動态調整 Broker 端連接配接資訊或安全配置資訊。
  • 動态更新 SSL Keystore 有效期。
  • 動态調整 Broker 端 Compact 操作性能。
  • 實時變更 JMX 名額收集器 (JMX Metrics Reporter)。

Kafka Broker Config 的參數有以下 3 種類型:

  • read-only:被标記為 read-only 的參數和原來的參數行為一樣,隻有重新開機 Broker,才能令修改生效。
  • per-broker:被标記為 per-broker 的參數屬于動态參數,修改它之後,隻會在對應的 Broker 上生效。
  • cluster-wide:被标記為 cluster-wide 的參數也屬于動态參數,修改它之後,會在整個叢集範圍内生效,也就是說,對所有 Broker 都生效。你也可以為具體的 Broker 修改 cluster-wide 參數。

在叢集層面設定全局值,即設定 cluster-wide 範圍值,将

unclean.leader.election.enable

參數在叢集層面設定為 true。

kafka-configs.sh --bootstrap-server  10.37.249.58:9092 \
--entity-type brokers --entity-default --alter \
--add-config unclean.leader.election.enable=true
#傳回結果
Completed updating default config for brokers in the cluster.      

如果要設定 cluster-wide 範圍的動态參數,需要顯式指定 entity-default。現在,我們使用下面的指令來檢視一下剛才的配置是否成功。

kafka-configs.sh --bootstrap-server 10.37.249.58:9092 \
--entity-type brokers --entity-default --describe
#傳回結果
Default configs for brokers in the cluster are:
  unclean.leader.election.enable=true sensitive=false synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:unclean.leader.election.enable=true}      

在 Zookeeper 上檢視 /config/brokers/ 節點可以檢視 cluster-wide 的動态參數設定。

[zk: (CONNECTED) ] > get /config/brokers/<default>
{"version":1,"config":{"unclean.leader.election.enable":"true"}}
cZxid = 17179869570
ctime = 1631246402937
mZxid = 17179869570
mtime = 1631246402937
pZxid = 17179869570
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0
dataLength = 64
numChildren = 0      

設定 per-broker 範圍參數。我們還是以

unclean.leader.election.enable

參數為例,我現在為 ID 為 1 的 Broker 設定一個不同的值。指令如下:

kafka-configs.sh --bootstrap-server  10.37.249.58:9092 --entity-type brokers --entity-name 1 --alter --add-config unclean.leader.election.enable=false
#傳回結果
Completed updating config for broker 1.      

我們使用下列指令檢視 Broker ID 為 1 的節點動态參數,可以看到

DYNAMIC_BROKER_CONFIG:unclean.leader.election.enable=false

,表示我們剛才對 per-broker 參數的調整生效了。

kafka-configs.sh --bootstrap-server 10.37.249.58:9092 --entity-type brokers --entity-name 1 --describe
#傳回結果
Dynamic configs for broker 1 are:
  unclean.leader.election.enable=false sensitive=false synonyms={DYNAMIC_BROKER_CONFIG:unclean.leader.election.enable=false, DYNAMIC_DEFAULT_BROKER_CONFIG:unclean.leader.election.enable=true, STATIC_BROKER_CONFIG:unclean.leader.election.enable=false, DEFAULT_CONFIG:unclean.leader.election.enable=false}      

在 Zookeeper 上檢視 /config/brokers/1 節點可以檢視 Broker ID 為 1 的節點的動态參數設定。

[zk: (CONNECTED) ] > get /config/brokers/1
{"version":1,"config":{"unclean.leader.election.enable":"false"}}
cZxid = 17179869574
ctime = 1631246495120
mZxid = 17179869574
mtime = 1631246495120
pZxid = 17179869574
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0
dataLength = 65
numChildren = 0[zk: (CONNECTED) ] > get /config/brokers/<default>[zk: (CONNECTED) ] > get /config/brokers/1      

删除 cluster-wide 範圍動态參數。

kafka-configs.sh --bootstrap-server 10.37.249.58:9092 \
--entity-type brokers --entity-default --alter \
--delete-config unclean.leader.election.enable
#傳回結果
Completed updating default config for brokers in the cluster.      

删除 per-broker 範圍參數。

kafka-configs.sh --bootstrap-server 10.37.249.58:9092 \
--entity-type brokers --entity-name 1 --alter \
--delete-config unclean.leader.election.enable
#傳回結果
Completed updating config for broker 1.      

修改 Topic 動态參數

設定 Topic test-topic 的

retention.ms

為 10000。

kafka-configs.sh --bootstrap-server  10.37.249.58:9092 \
--entity-type topics --entity-name test-topic --alter \
--add-config retention.ms=10000      

檢視設定的 Topic 動态參數。

kafka-configs.sh --bootstrap-server  10.37.249.58:9092 \
--entity-type topics --entity-name test-topic --describe
#傳回結果
Dynamic configs for topic test-topic are:
  retention.ms=10000 sensitive=false synonyms={DYNAMIC_TOPIC_CONFIG:retention.ms=10000}      

在 Zookeeper 上可以檢視 /config/topics/ 來檢視 Topic 動态參數。

[zk: (CONNECTED) ] > get /config/topics/test-topic
{"version":1,"config":{"retention.ms":"10000"}}
cZxid = 17179869460
ctime = 1631245744105
mZxid = 17179869619
mtime = 1631250116481
pZxid = 17179869460
cversion = 0
dataVersion = 10
aclVersion = 0
ephemeralOwner = 0
dataLength = 47
numChildren = 0[zk: (CONNECTED) ] > get /config/topics/test-topic      

删除 Topic 動态參數。

kafka-configs.sh --bootstrap-server  10.37.249.58:9092 \
--entity-type topics --entity-name test-topic --alter \
--delete-config retention.ms      

Kafka 叢集一鍵啟動/停止腳本

環境變量設定:

#/etc/profile 檔案
export KAFKA_HOME=/usr/local/kafka
export PATH=$PATH:$KAFKA_HOME/bin      

一鍵啟動/停止腳本,檢視狀态需要安裝 jps 工具。

#! /bin/bash
# 填寫 Kafka Broker 節點位址
hosts=(kafka1 kafka2 kafka3)
# 列印啟動分布式腳本資訊
mill=`date "+%N"`
tdate=`date "+%Y-%m-%d %H:%M:%S,${mill:0:3}"`
echo [$tdate] INFO [Kafka Cluster] begins to execute the $1 operation.
# 執行分布式開啟指令
function start()
{
        for i in ${hosts[@]}
                do
                        smill=`date "+%N"`
                        stdate=`date "+%Y-%m-%d %H:%M:%S,${smill:0:3}"`
                        ssh root@$i "source /etc/profile;echo [$stdate] INFO [Kafka Broker $i] begins to execute the startup operation.;kafka-server-start.sh $KAFKA_HOME/config/server.properties>/dev/null" &
                        sleep 1
                done
}
# 執行分布式關閉指令
function stop()
{
        for i in ${hosts[@]}
                do
                        smill=`date "+%N"`
                        stdate=`date "+%Y-%m-%d %H:%M:%S,${smill:0:3}"`
                        ssh root@$i "source /etc/profile;echo [$stdate] INFO [Kafka Broker $i] begins to execute the shutdown operation.;kafka-server-stop.sh>/dev/null;" &
                        sleep 1
                done
}
# 檢視 Kafka Broker 節點狀态
function status()
{
        for i in ${hosts[@]}
                do
                        smill=`date "+%N"`
                        stdate=`date "+%Y-%m-%d %H:%M:%S,${smill:0:3}"`
                        ssh root@$i "source /etc/profile;echo [$stdate] INFO [Kafka Broker $i] status message is :;jps | grep Kafka;" &
                        sleep 1
                done
}
# 判斷輸入的 Kafka 指令參數是否有效
case "$1" in
    start)
        start
        ;;
    stop)
        stop
        ;;
    status)
        status
        ;;
    *)
        echo "Usage: $0 {start|stop|status}"
        RETVAL=1
esac      

參考資料

  • Kafka 動态配置了解下?(https://time.geekbang.org/column/article/113504)
  • 常見工具腳本大彙總(https://time.geekbang.org/column/article/116111)
  • Kafka 并不難學!