要想kafka跑的好,如下幾點要知曉。
Graceful shutdown
建議開啟如下參數:
controlled.shutdown.enable=true
注意:
隻有在broker上承載的分區都具有副本時(副本大于1,且副本存活),controller節點才會成功關閉
Balancing leadership
每當Broker停止或崩潰時,該broker的分區的上司權就轉移到其他副本。
這意味着,預設情況下,當broker重新啟動時,它将隻是所有分區的關注者,這意味着它不會用于用戶端讀寫,這對于整個叢集來說吞吐會受到1/N的降低(N表示叢集節點數)
為了避免這種不平衡,kafka提供了一種優先副本的概念
preferred replicas
.
如果一個分區的副本清單是1、5、9,那麼節點1比節點5或節點9更适合作為leader,因為它位于副本清單的前面。
可以使用如下指令來恢複已恢複副本的上司權:
# 老版本工具
bin/kafka-preferred-replica-election.sh --zookeeper zk_host:port/chroot
# 新版本工具
當然每次在伺服器啟動後執行該操作,可能很無聊,是以可以設定如下參數來自動執行:
auto.leader.rebalance.enable=true
Balancing Replicas Across Racks
機架層面的副本均衡。
機架感覺特性将相同分區的副本分散到不同的機架上,這擴充了Kafka為broker失敗提供的覆寫機架失敗的保證,如果機架上的所有broker同時失敗,就限制了資料丢失的風險。
該特性還可以應用于其他broker分組,如EC2中的可用性區域。
您可以通過向broker配置添加屬性來指定broker屬于特定的機架:
broker.rack=my-rack-id
當建立、修改或重新分發一個主題時,将遵循機架限制,確定副本盡可能多地跨越多個機架(一個分區将跨越最小(#機架,複制因子)不同的機架)。
用于将副本副本配置設定給broker的算法,會確定每個broker的leader數量是恒定的,而不管broker是如何分布在機架上的。這確定了平衡的吞吐量。
注意:
明智的做法是為每個機架配置相同數量的broker
Mirroring data between clusters
我們将在Kafka叢集之間複制資料的過程稱為“鏡像”,以避免與單個叢集中節點之間的複制混淆。
Kafka附帶一個用于在Kafka叢集之間鏡像資料的工具,即
MirrorMaker
,該工具可以從源叢集進行消費,并生産到目标叢集。
常用的場景就是在另外一個資料中心提供副本。
您可以運作許多這樣的鏡像程序來提高吞吐量和容錯能力。
使用mirrormaker進行遷移topic到另外的叢集:
bin/kafka-mirror-maker.sh
--consumer.config consumer.properties
--producer.config producer.properties --whitelist my-topic
需要注意,我們必須使用
--whitelist
參數指定topic,該參數支援java的正規表達式結構,比如
--whitelist 'A|B'
,或者
--whitelist '*'
.
通常在使用kafka-mirror-maker時,建議配合
auto.create.topics.enable=true
使用,可以大批量的進行topic遷移。
Checking consumer position
檢查消費者的位移,有時候了解消費者目前的位置時很有必要的。
kafka有一個工具,它将顯示所有消費者在一個消費者組中的位置,以及他們與日志結束的距離
# 在my-group的消費者上消費my-topic的主題
# 可以檢視整個消費者組的消費情況
$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group
Managing Consumer Groups
ConsumerGroupCommand工具可以
list, describe, or delete
一個消費組,消費者組可以手動删除,也可以在該組最後送出的偏移量過期時自動删除。
隻有在組中沒有任何活動成員時,手動删除才有效。
如下指令可以列出所有主題的所有消費者組:
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
檢視指定消費者組
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group
當然可用試用一些額外的參數來檢視更多的消費者資訊:
- –members: 檢視消費者組中活躍的消費者
- –members --verbose: 該參數還可以檢視配置設定給每個成員的分區
- –offsets: 該參數實際上可以被
參數中的内容覆寫掉--describe
- –state: 該參數可以提供組級别的資訊
- –delete: 該參數可以手動删除一個或多個消費者組
- -reset-offsets: 該參數用于重置消費者組的偏移量,此選項在同一時間支援一個消費者組,同時需要使用
指定範圍--all-topics或--topic
# 檢視消費者組成員
$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group --members
CONSUMER-ID HOST CLIENT-ID #PARTITIONS
consumer1-3fc8d6f1-581a-4472-bdf3-3515b4aee8c1 /127.0.0.1 consumer1 2
consumer4-117fe4d3-c6c1-4178-8ee9-eb4a3954bee0 /127.0.0.1 consumer4 1
consumer2-e76ea8c3-5d30-4299-9005-47eb41f3d3c4 /127.0.0.1 consumer2 3
consumer3-ecea43e4-1f01-479f-8349-f9130b75d8ee /127.0.0.1 consumer3 0
# 檢視消費者組成員詳細資訊(分區)
$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group --members --verbose
CONSUMER-ID HOST CLIENT-ID #PARTITIONS ASSIGNMENT
consumer1-3fc8d6f1-581a-4472-bdf3-3515b4aee8c1 /127.0.0.1 consumer1 2 topic1(0), topic2(0)
consumer4-117fe4d3-c6c1-4178-8ee9-eb4a3954bee0 /127.0.0.1 consumer4 1 topic3(2)
consumer2-e76ea8c3-5d30-4299-9005-47eb41f3d3c4 /127.0.0.1 consumer2 3 topic2(1), topic3(0,1)
consumer3-ecea43e4-1f01-479f-8349-f9130b75d8ee /127.0.0.1 consumer3 0 -
# --state參數
$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group --state
COORDINATOR (ID) ASSIGNMENT-STRATEGY STATE #MEMBERS
localhost:9092 (0) range Stable 4
# 删除消費者組
$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --delete --group my-group --group my-other-group
Deletion of requested consumer groups ('my-group', 'my-other-group') was successful.
注意:
--reset-offsets
選項支援如下三個執行選項:
- 顯示要重置的偏移量
- –execute: 執行
程序--reset-offsets
- –export: 以csv格式導出執行結果
--reset-offsets
參數還有如下方案可供選擇:
- –to-datetime: 重置offset到另外一個offset (format:YYYY-MM-DDTHH:mm:SS.sss)
- –to-earliest: 重置offset到最早的offset
- –to-latest: 重置為最新的offset
- –shift-by: 重置offset為n
- –from-file: 重置到csv中定義的offset
- –to-current: 重置offset到目前
- –by-duration: 重置offset為目前時間( Format: ‘PnDTnHnMnS’)
- –to-offset: 重置offset為指定的值
# 設定消費者組的offset為最新
$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --reset-offsets --group consumergroup1 --topic topic1 --to-latest
TOPIC PARTITION NEW-OFFSET
topic1 0 0
如果你還是使用老的
high-level
消費者,并且将組的中繼資料存儲在zk中(
offsets.storage=zookeeper
),可以使用
--zookeeper
來代替
bootstrap-server
參數:
bin/kafka-consumer-groups.sh --zookeeper localhost:2181 --list
Expanding your cluster
叢集的擴充.
将伺服器添加到Kafka叢集很容易,隻需為它們配置設定一個惟一的brokerid,并在新伺服器上啟動Kafka.
然而,這些新伺服器不會自動配置設定任何資料分區,是以,除非将分區移動到它們,否則在建立新主題之前,它們不會做任何工作。
是以,通常在向叢集中添加機器時,您會希望将一些現有資料遷移到這些機器上。
遷移資料的過程是手動啟動的,但是完全自動化。
實際上,Kafka将添加新伺服器作為它正在遷移的分區的追随者,并允許它完全複制該分區中的現有資料。
當新伺服器完全複制了該分區的内容并加入同步副本時,現有副本中的一個将删除其分區的資料。
可以使用分區重新配置設定工具在broker之間移動分區。
理想的分區分布應該確定所有broker之間的資料負載和分區大小是一緻的。
分區重新配置設定工具不具備自動研究Kafka叢集中的資料分布并移動分區以獲得均勻的負載分布的能力,是以,管理者必須确定應該移動哪些主題或分區。
分區遷移工具可以運作在三種互斥模式下:
-
: 給定主題清單和broker清單,該工具生成一個候選重新配置設定,将指定主題的所有分區移動到新的broker,該參數僅幫助管理者友善的來生成給定主題和目标broker清單的分區重新配置設定計劃--generate
-
: 該工具根據使用者提供的重新配置設定計劃開始重新配置設定分區(使用--execute
指定生成的遷移配置),---reassignment-json-file
: 該工具将驗證列出的所有分區的重新配置設定狀态,可以是成功完成、失敗或正在進行--verify
1.自動遷移資料到新的伺服器
分區重新配置設定工具可用于将某些主題從目前broker集移到新添加的broker。 這在擴充現有叢集時通常很有用,因為與一次移動一個分區相比,将整個主題移至新的broker集更容易。 用于執行此操作時,使用者應提供應移至新的一組broker的主題清單和新broker的目标清單。 然後,該工具将給定主題清單中的所有分區平均配置設定到新的一組broker中。 在此過程中,主題的複制因子保持不變。 有效地,将輸入主題清單的所有分區的副本從舊的broker集移至新添加的broker。
如下示例(topic:foo1和foo2的全部分區移動到broker 5和6上,遷移完成後,該topic的全部分區将在Broker5和6上):
$ cat topics-to-move.json
{"topics": [{"topic": "foo1"},
{"topic": "foo2"}],
"version":1
}
# 準備好json檔案之後,使用分區重新配置設定工具生成一個候選配置設定
$ bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --topics-to-move-json-file topics-to-move.json --broker-list "5,6" --generate
Current partition replica assignment
{"version":1,
"partitions":[{"topic":"foo1","partition":2,"replicas":[1,2]},
{"topic":"foo1","partition":0,"replicas":[3,4]},
{"topic":"foo2","partition":2,"replicas":[1,2]},
{"topic":"foo2","partition":0,"replicas":[3,4]},
{"topic":"foo1","partition":1,"replicas":[2,3]},
{"topic":"foo2","partition":1,"replicas":[2,3]}]
}
Proposed partition reassignment configuration
{"version":1,
"partitions":[{"topic":"foo1","partition":2,"replicas":[5,6]},
{"topic":"foo1","partition":0,"replicas":[5,6]},
{"topic":"foo2","partition":2,"replicas":[5,6]},
{"topic":"foo2","partition":0,"replicas":[5,6]},
{"topic":"foo1","partition":1,"replicas":[5,6]},
{"topic":"foo2","partition":1,"replicas":[5,6]}]
}
# 該工具生成一個将移動所有分區的候選配置設定(配置設定到哪些broker上),此時分區移動還沒有開始,它隻告訴您目前的配置設定和建議的新配置設定。
# 應該儲存目前的指派,以防您想要復原到它.
# 使用生成的遷移計劃進行遷移
$ bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file expand-cluster-reassignment.json --execute
Current partition replica assignment
{"version":1,
"partitions":[{"topic":"foo1","partition":2,"replicas":[1,2]},
{"topic":"foo1","partition":0,"replicas":[3,4]},
{"topic":"foo2","partition":2,"replicas":[1,2]},
{"topic":"foo2","partition":0,"replicas":[3,4]},
{"topic":"foo1","partition":1,"replicas":[2,3]},
{"topic":"foo2","partition":1,"replicas":[2,3]}]
}
Save this to use as the --reassignment-json-file option during rollback
Successfully started reassignment of partitions
{"version":1,
"partitions":[{"topic":"foo1","partition":2,"replicas":[5,6]},
{"topic":"foo1","partition":0,"replicas":[5,6]},
{"topic":"foo2","partition":2,"replicas":[5,6]},
{"topic":"foo2","partition":0,"replicas":[5,6]},
{"topic":"foo1","partition":1,"replicas":[5,6]},
{"topic":"foo2","partition":1,"replicas":[5,6]}]
}
# 最後使用--verify選項驗證遷移狀态
# 注意:使用相同的遷移計劃任務expand-cluster-reassignment.json
$ bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file expand-cluster-reassignment.json --verify
Status of partition reassignment:
Reassignment of partition [foo1,0] completed successfully
Reassignment of partition [foo1,1] is in progress
Reassignment of partition [foo1,2] is in progress
Reassignment of partition [foo2,0] completed successfully
Reassignment of partition [foo2,1] completed successfully
Reassignment of partition [foo2,2] completed successfully
2.自定義分區配置設定和遷移
分區重配置設定工具還可以用于有選擇地将分區的副本移動到一組特定的broker。
當我們以這種方式使用時,假設使用者知道重新配置設定計劃,并且不需要該工具來生成候選的重新配置設定,即直接使用使用者的配置設定政策進行資料遷移。
示例: 遷移topic
foo1
的partition-0到broker5和6,partition-1到broker2和3
# 自定義遷移計劃
$ cat custom-reassignment.json
{"version":1,"partitions":[{"topic":"foo1","partition":0,"replicas":[5,6]},{"topic":"foo2","partition":1,"replicas":[2,3]}]}
# 使用--execute選項執行上述的遷移計劃
$ bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file custom-reassignment.json --execute
Current partition replica assignment
{"version":1,
"partitions":[{"topic":"foo1","partition":0,"replicas":[1,2]},
{"topic":"foo2","partition":1,"replicas":[3,4]}]
}
Save this to use as the --reassignment-json-file option during rollback
Successfully started reassignment of partitions
{"version":1,
"partitions":[{"topic":"foo1","partition":0,"replicas":[5,6]},
{"topic":"foo2","partition":1,"replicas":[2,3]}]
}
# --verify來驗證遷移狀态
$ bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file custom-reassignment.json --verify
Status of partition reassignment:
Reassignment of partition [foo1,0] completed successfully
Reassignment of partition [foo2,1] completed successfully
Decommissioning brokers
分區重新配置設定工具還不能自動生成用于退役broker的重新配置設定計劃,是以,管理者必須制定一個重新配置設定計劃,将托管在代理上的所有分區的副本遷移到代理的其餘部分。
這可能比較繁瑣,因為重新配置設定需要確定所有副本不會從已退役的代理移動到另一個代理。為了簡化這個過程,我們計劃在将來為退役代理添加工具支援。
Increasing replication factor
增加現有分區的複制因子很容易,隻需在定制的重新配置設定json檔案中指定額外的副本,并使用—execute選項來增加指定分區的複制因子。
示例: 将主題foo的partition-0的副本從1增加到3。在增加複制因子之前,代理5上隻存在分區的副本,作為增加複制因子的一部分,我們将在代理6和代理7上添加更多的副本。
# 自定義遷移配置
$ cat increase-replication-factor.json
{"version":1,
"partitions":[{"topic":"foo","partition":0,"replicas":[5,6,7]}]}
# 執行
bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file increase-replication-factor.json --execute
Current partition replica assignment
{"version":1,
"partitions":[{"topic":"foo","partition":0,"replicas":[5]}]}
Save this to use as the --reassignment-json-file option during rollback
Successfully started reassignment of partitions
{"version":1,
"partitions":[{"topic":"foo","partition":0,"replicas":[5,6,7]}]}
# 驗證
bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file increase-replication-factor.json --verify
Status of partition reassignment:
Reassignment of partition [foo,0] completed successfully
# 檢視副本數
$ bin/kafka-topics.sh --bootstrap-server localhost:9092 --topic foo --describe
Topic:foo PartitionCount:1 ReplicationFactor:3 Configs:
Topic: foo Partition: 0 Leader: 5 Replicas: 5,6,7 Isr: 5,6,7
Limiting Bandwidth Usage during Data Migration
Kafka允許您對複制流量施加限制,設定用于将副本從一台機器移動到另一台機器的帶寬上限。
這在重新平衡叢集、引導新代理或添加或删除代理時非常有用,因為它限制了這些資料密集型操作對使用者的影響
最簡單的方式就是在使用
kafka-reassign-partitions.sh
腳本時,使用限流功能,不過
kafka-configs.sh
腳本也具有該功能。
# 限制在執行重平衡時,遷移速度不能超過50MB/s.
$ bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --execute --reassignment-json-file bigger-cluster.json —throttle 50000000
# 當然,如果要改變速率的限制,可以重新運作
$ bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --execute --reassignment-json-file bigger-cluster.json --throttle 700000000
There is an existing assignment running.
The throttle limit was set to 700000000 B/s
# 一旦完成重平衡後,就可以再次驗證
# 需要注意:當沖平衡完成後,使用--verify驗證時需要删除限流,否則會影響正常複制
$ bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --verify --reassignment-json-file bigger-cluster.json
Status of partition reassignment:
Reassignment of partition [my-topic,1] completed successfully
Reassignment of partition [mytopic,0] completed successfully
Throttle was removed.
檢視broker的限流配置:
$ bin/kafka-configs.sh --describe --zookeeper localhost:2181 --entity-type brokers
Configs for brokers '2' are leader.replication.throttled.rate=700000000,follower.replication.throttled.rate=700000000
Configs for brokers '1' are leader.replication.throttled.rate=700000000,follower.replication.throttled.rate=700000000
這顯示了應用于複制協定的leader和follower端上的節流。預設情況下,兩邊配置設定相同的節流吞吐量值。
檢視限流的副本:
$ bin/kafka-configs.sh --describe --zookeeper localhost:2181 --entity-type topics
Configs for topic 'my-topic' are leader.replication.throttled.replicas=1:102,0:101,
follower.replication.throttled.replicas=1:101,0:102
Setting quotas
配額設定。
# 為user=user1, client-id=clientA配置自定義配額
$ bin/kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' --entity-type users --entity-name user1 --entity-type clients --entity-name clientA
Updated config for entity: user-principal 'user1', client-id 'clientA'.
# 為user=user1配置配額
$ bin/kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' --entity-type users --entity-name user1
Updated config for entity: user-principal 'user1'.
# 為client-id=clientA配置配額
$ bin/kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' --entity-type clients --entity-name clientA
Updated config for entity: client-id 'clientA'.
# 為user=userA配置預設的配額
$ bin/kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' --entity-type users --entity-name user1 --entity-type clients --entity-default
Updated config for entity: user-principal 'user1', default client-id.
# 為user配置預設配額
$ bin/kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' --entity-type users --entity-default
Updated config for entity: default user-principal.
# 為client-id配置預設配額
$ bin/kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' --entity-type clients --entity-default
Updated config for entity: default client-id.
# 檢視配額
$ bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-type users --entity-name user1 --entity-type clients --entity-name clientA
Configs for user-principal 'user1', client-id 'clientA' are producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200
$ bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-type users --entity-name user1
Configs for user-principal 'user1' are producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200
$ bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-type clients --entity-name clientA
Configs for client-id 'clientA' are producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200
# 如果不指定entity name ,将顯示所有的entity-type的配額
$ bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-type users
Configs for user-principal 'user1' are producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200
Configs for default user-principal are producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200
$ bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-type users --entity-type clients
Configs for user-principal 'user1', default client-id are producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200
Configs for user-principal 'user1', client-id 'clientA' are producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200
注意
: 在broker的配置中增加如下配置,會預設為全部的生産者和消費者進行配額限制.
# 生産者和消費者10MB/s
quota.producer.default=10485760
quota.consumer.default=10485760
kafka-operations
