要想kafka跑的好,如下几点要知晓。
Graceful shutdown
建议开启如下参数:
controlled.shutdown.enable=true
注意:
只有在broker上承载的分区都具有副本时(副本大于1,且副本存活),controller节点才会成功关闭
Balancing leadership
每当Broker停止或崩溃时,该broker的分区的领导权就转移到其他副本。
这意味着,默认情况下,当broker重新启动时,它将只是所有分区的关注者,这意味着它不会用于客户端读写,这对于整个集群来说吞吐会受到1/N的降低(N表示集群节点数)
为了避免这种不平衡,kafka提供了一种优先副本的概念
preferred replicas
.
如果一个分区的副本列表是1、5、9,那么节点1比节点5或节点9更适合作为leader,因为它位于副本列表的前面。
可以使用如下命令来恢复已恢复副本的领导权:
# 老版本工具
bin/kafka-preferred-replica-election.sh --zookeeper zk_host:port/chroot
# 新版本工具
当然每次在服务器启动后执行该操作,可能很无聊,因此可以设置如下参数来自动执行:
auto.leader.rebalance.enable=true
Balancing Replicas Across Racks
机架层面的副本均衡。
机架感知特性将相同分区的副本分散到不同的机架上,这扩展了Kafka为broker失败提供的覆盖机架失败的保证,如果机架上的所有broker同时失败,就限制了数据丢失的风险。
该特性还可以应用于其他broker分组,如EC2中的可用性区域。
您可以通过向broker配置添加属性来指定broker属于特定的机架:
broker.rack=my-rack-id
当创建、修改或重新分发一个主题时,将遵循机架约束,确保副本尽可能多地跨越多个机架(一个分区将跨越最小(#机架,复制因子)不同的机架)。
用于将副本副本分配给broker的算法,会确保每个broker的leader数量是恒定的,而不管broker是如何分布在机架上的。这确保了平衡的吞吐量。
注意:
明智的做法是为每个机架配置相同数量的broker
Mirroring data between clusters
我们将在Kafka集群之间复制数据的过程称为“镜像”,以避免与单个集群中节点之间的复制混淆。
Kafka附带一个用于在Kafka集群之间镜像数据的工具,即
MirrorMaker
,该工具可以从源集群进行消费,并生产到目标集群。
常用的场景就是在另外一个数据中心提供副本。
您可以运行许多这样的镜像进程来提高吞吐量和容错能力。
使用mirrormaker进行迁移topic到另外的集群:
bin/kafka-mirror-maker.sh
--consumer.config consumer.properties
--producer.config producer.properties --whitelist my-topic
需要注意,我们必须使用
--whitelist
参数指定topic,该参数支持java的正则表达式结构,比如
--whitelist 'A|B'
,或者
--whitelist '*'
.
通常在使用kafka-mirror-maker时,建议配合
auto.create.topics.enable=true
使用,可以大批量的进行topic迁移。
Checking consumer position
检查消费者的位移,有时候了解消费者当前的位置时很有必要的。
kafka有一个工具,它将显示所有消费者在一个消费者组中的位置,以及他们与日志结束的距离
# 在my-group的消费者上消费my-topic的主题
# 可以查看整个消费者组的消费情况
$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group
Managing Consumer Groups
ConsumerGroupCommand工具可以
list, describe, or delete
一个消费组,消费者组可以手动删除,也可以在该组最后提交的偏移量过期时自动删除。
只有在组中没有任何活动成员时,手动删除才有效。
如下命令可以列出所有主题的所有消费者组:
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
查看指定消费者组
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group
当然可用试用一些额外的参数来查看更多的消费者信息:
- –members: 查看消费者组中活跃的消费者
- –members --verbose: 该参数还可以查看分配给每个成员的分区
- –offsets: 该参数实际上可以被
参数中的内容覆盖掉--describe
- –state: 该参数可以提供组级别的信息
- –delete: 该参数可以手动删除一个或多个消费者组
- -reset-offsets: 该参数用于重置消费者组的偏移量,此选项在同一时间支持一个消费者组,同时需要使用
指定范围--all-topics或--topic
# 查看消费者组成员
$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group --members
CONSUMER-ID HOST CLIENT-ID #PARTITIONS
consumer1-3fc8d6f1-581a-4472-bdf3-3515b4aee8c1 /127.0.0.1 consumer1 2
consumer4-117fe4d3-c6c1-4178-8ee9-eb4a3954bee0 /127.0.0.1 consumer4 1
consumer2-e76ea8c3-5d30-4299-9005-47eb41f3d3c4 /127.0.0.1 consumer2 3
consumer3-ecea43e4-1f01-479f-8349-f9130b75d8ee /127.0.0.1 consumer3 0
# 查看消费者组成员详细信息(分区)
$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group --members --verbose
CONSUMER-ID HOST CLIENT-ID #PARTITIONS ASSIGNMENT
consumer1-3fc8d6f1-581a-4472-bdf3-3515b4aee8c1 /127.0.0.1 consumer1 2 topic1(0), topic2(0)
consumer4-117fe4d3-c6c1-4178-8ee9-eb4a3954bee0 /127.0.0.1 consumer4 1 topic3(2)
consumer2-e76ea8c3-5d30-4299-9005-47eb41f3d3c4 /127.0.0.1 consumer2 3 topic2(1), topic3(0,1)
consumer3-ecea43e4-1f01-479f-8349-f9130b75d8ee /127.0.0.1 consumer3 0 -
# --state参数
$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group --state
COORDINATOR (ID) ASSIGNMENT-STRATEGY STATE #MEMBERS
localhost:9092 (0) range Stable 4
# 删除消费者组
$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --delete --group my-group --group my-other-group
Deletion of requested consumer groups ('my-group', 'my-other-group') was successful.
注意:
--reset-offsets
选项支持如下三个执行选项:
- 显示要重置的偏移量
- –execute: 执行
进程--reset-offsets
- –export: 以csv格式导出执行结果
--reset-offsets
参数还有如下方案可供选择:
- –to-datetime: 重置offset到另外一个offset (format:YYYY-MM-DDTHH:mm:SS.sss)
- –to-earliest: 重置offset到最早的offset
- –to-latest: 重置为最新的offset
- –shift-by: 重置offset为n
- –from-file: 重置到csv中定义的offset
- –to-current: 重置offset到当前
- –by-duration: 重置offset为当前时间( Format: ‘PnDTnHnMnS’)
- –to-offset: 重置offset为指定的值
# 设置消费者组的offset为最新
$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --reset-offsets --group consumergroup1 --topic topic1 --to-latest
TOPIC PARTITION NEW-OFFSET
topic1 0 0
如果你还是使用老的
high-level
消费者,并且将组的元数据存储在zk中(
offsets.storage=zookeeper
),可以使用
--zookeeper
来代替
bootstrap-server
参数:
bin/kafka-consumer-groups.sh --zookeeper localhost:2181 --list
Expanding your cluster
集群的扩展.
将服务器添加到Kafka集群很容易,只需为它们分配一个惟一的brokerid,并在新服务器上启动Kafka.
然而,这些新服务器不会自动分配任何数据分区,因此,除非将分区移动到它们,否则在创建新主题之前,它们不会做任何工作。
因此,通常在向集群中添加机器时,您会希望将一些现有数据迁移到这些机器上。
迁移数据的过程是手动启动的,但是完全自动化。
实际上,Kafka将添加新服务器作为它正在迁移的分区的追随者,并允许它完全复制该分区中的现有数据。
当新服务器完全复制了该分区的内容并加入同步副本时,现有副本中的一个将删除其分区的数据。
可以使用分区重新分配工具在broker之间移动分区。
理想的分区分布应该确保所有broker之间的数据负载和分区大小是一致的。
分区重新分配工具不具备自动研究Kafka集群中的数据分布并移动分区以获得均匀的负载分布的能力,因此,管理员必须确定应该移动哪些主题或分区。
分区迁移工具可以运行在三种互斥模式下:
-
: 给定主题列表和broker列表,该工具生成一个候选重新分配,将指定主题的所有分区移动到新的broker,该参数仅帮助管理员方便的来生成给定主题和目标broker列表的分区重新分配计划--generate
-
: 该工具根据用户提供的重新分配计划开始重新分配分区(使用--execute
指定生成的迁移配置),---reassignment-json-file
: 该工具将验证列出的所有分区的重新分配状态,可以是成功完成、失败或正在进行--verify
1.自动迁移数据到新的服务器
分区重新分配工具可用于将某些主题从当前broker集移到新添加的broker。 这在扩展现有集群时通常很有用,因为与一次移动一个分区相比,将整个主题移至新的broker集更容易。 用于执行此操作时,用户应提供应移至新的一组broker的主题列表和新broker的目标列表。 然后,该工具将给定主题列表中的所有分区平均分配到新的一组broker中。 在此过程中,主题的复制因子保持不变。 有效地,将输入主题列表的所有分区的副本从旧的broker集移至新添加的broker。
如下示例(topic:foo1和foo2的全部分区移动到broker 5和6上,迁移完成后,该topic的全部分区将在Broker5和6上):
$ cat topics-to-move.json
{"topics": [{"topic": "foo1"},
{"topic": "foo2"}],
"version":1
}
# 准备好json文件之后,使用分区重新分配工具生成一个候选分配
$ bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --topics-to-move-json-file topics-to-move.json --broker-list "5,6" --generate
Current partition replica assignment
{"version":1,
"partitions":[{"topic":"foo1","partition":2,"replicas":[1,2]},
{"topic":"foo1","partition":0,"replicas":[3,4]},
{"topic":"foo2","partition":2,"replicas":[1,2]},
{"topic":"foo2","partition":0,"replicas":[3,4]},
{"topic":"foo1","partition":1,"replicas":[2,3]},
{"topic":"foo2","partition":1,"replicas":[2,3]}]
}
Proposed partition reassignment configuration
{"version":1,
"partitions":[{"topic":"foo1","partition":2,"replicas":[5,6]},
{"topic":"foo1","partition":0,"replicas":[5,6]},
{"topic":"foo2","partition":2,"replicas":[5,6]},
{"topic":"foo2","partition":0,"replicas":[5,6]},
{"topic":"foo1","partition":1,"replicas":[5,6]},
{"topic":"foo2","partition":1,"replicas":[5,6]}]
}
# 该工具生成一个将移动所有分区的候选分配(分配到哪些broker上),此时分区移动还没有开始,它只告诉您当前的分配和建议的新分配。
# 应该保存当前的赋值,以防您想要回滚到它.
# 使用生成的迁移计划进行迁移
$ bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file expand-cluster-reassignment.json --execute
Current partition replica assignment
{"version":1,
"partitions":[{"topic":"foo1","partition":2,"replicas":[1,2]},
{"topic":"foo1","partition":0,"replicas":[3,4]},
{"topic":"foo2","partition":2,"replicas":[1,2]},
{"topic":"foo2","partition":0,"replicas":[3,4]},
{"topic":"foo1","partition":1,"replicas":[2,3]},
{"topic":"foo2","partition":1,"replicas":[2,3]}]
}
Save this to use as the --reassignment-json-file option during rollback
Successfully started reassignment of partitions
{"version":1,
"partitions":[{"topic":"foo1","partition":2,"replicas":[5,6]},
{"topic":"foo1","partition":0,"replicas":[5,6]},
{"topic":"foo2","partition":2,"replicas":[5,6]},
{"topic":"foo2","partition":0,"replicas":[5,6]},
{"topic":"foo1","partition":1,"replicas":[5,6]},
{"topic":"foo2","partition":1,"replicas":[5,6]}]
}
# 最后使用--verify选项验证迁移状态
# 注意:使用相同的迁移计划任务expand-cluster-reassignment.json
$ bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file expand-cluster-reassignment.json --verify
Status of partition reassignment:
Reassignment of partition [foo1,0] completed successfully
Reassignment of partition [foo1,1] is in progress
Reassignment of partition [foo1,2] is in progress
Reassignment of partition [foo2,0] completed successfully
Reassignment of partition [foo2,1] completed successfully
Reassignment of partition [foo2,2] completed successfully
2.自定义分区分配和迁移
分区重分配工具还可以用于有选择地将分区的副本移动到一组特定的broker。
当我们以这种方式使用时,假设用户知道重新分配计划,并且不需要该工具来生成候选的重新分配,即直接使用用户的分配策略进行数据迁移。
示例: 迁移topic
foo1
的partition-0到broker5和6,partition-1到broker2和3
# 自定义迁移计划
$ cat custom-reassignment.json
{"version":1,"partitions":[{"topic":"foo1","partition":0,"replicas":[5,6]},{"topic":"foo2","partition":1,"replicas":[2,3]}]}
# 使用--execute选项执行上述的迁移计划
$ bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file custom-reassignment.json --execute
Current partition replica assignment
{"version":1,
"partitions":[{"topic":"foo1","partition":0,"replicas":[1,2]},
{"topic":"foo2","partition":1,"replicas":[3,4]}]
}
Save this to use as the --reassignment-json-file option during rollback
Successfully started reassignment of partitions
{"version":1,
"partitions":[{"topic":"foo1","partition":0,"replicas":[5,6]},
{"topic":"foo2","partition":1,"replicas":[2,3]}]
}
# --verify来验证迁移状态
$ bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file custom-reassignment.json --verify
Status of partition reassignment:
Reassignment of partition [foo1,0] completed successfully
Reassignment of partition [foo2,1] completed successfully
Decommissioning brokers
分区重新分配工具还不能自动生成用于退役broker的重新分配计划,因此,管理员必须制定一个重新分配计划,将托管在代理上的所有分区的副本迁移到代理的其余部分。
这可能比较繁琐,因为重新分配需要确保所有副本不会从已退役的代理移动到另一个代理。为了简化这个过程,我们计划在将来为退役代理添加工具支持。
Increasing replication factor
增加现有分区的复制因子很容易,只需在定制的重新分配json文件中指定额外的副本,并使用—execute选项来增加指定分区的复制因子。
示例: 将主题foo的partition-0的副本从1增加到3。在增加复制因子之前,代理5上只存在分区的副本,作为增加复制因子的一部分,我们将在代理6和代理7上添加更多的副本。
# 自定义迁移配置
$ cat increase-replication-factor.json
{"version":1,
"partitions":[{"topic":"foo","partition":0,"replicas":[5,6,7]}]}
# 执行
bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file increase-replication-factor.json --execute
Current partition replica assignment
{"version":1,
"partitions":[{"topic":"foo","partition":0,"replicas":[5]}]}
Save this to use as the --reassignment-json-file option during rollback
Successfully started reassignment of partitions
{"version":1,
"partitions":[{"topic":"foo","partition":0,"replicas":[5,6,7]}]}
# 验证
bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file increase-replication-factor.json --verify
Status of partition reassignment:
Reassignment of partition [foo,0] completed successfully
# 查看副本数
$ bin/kafka-topics.sh --bootstrap-server localhost:9092 --topic foo --describe
Topic:foo PartitionCount:1 ReplicationFactor:3 Configs:
Topic: foo Partition: 0 Leader: 5 Replicas: 5,6,7 Isr: 5,6,7
Limiting Bandwidth Usage during Data Migration
Kafka允许您对复制流量施加限制,设置用于将副本从一台机器移动到另一台机器的带宽上限。
这在重新平衡集群、引导新代理或添加或删除代理时非常有用,因为它限制了这些数据密集型操作对用户的影响
最简单的方式就是在使用
kafka-reassign-partitions.sh
脚本时,使用限流功能,不过
kafka-configs.sh
脚本也具有该功能。
# 限制在执行重平衡时,迁移速度不能超过50MB/s.
$ bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --execute --reassignment-json-file bigger-cluster.json —throttle 50000000
# 当然,如果要改变速率的限制,可以重新运行
$ bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --execute --reassignment-json-file bigger-cluster.json --throttle 700000000
There is an existing assignment running.
The throttle limit was set to 700000000 B/s
# 一旦完成重平衡后,就可以再次验证
# 需要注意:当冲平衡完成后,使用--verify验证时需要删除限流,否则会影响正常复制
$ bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --verify --reassignment-json-file bigger-cluster.json
Status of partition reassignment:
Reassignment of partition [my-topic,1] completed successfully
Reassignment of partition [mytopic,0] completed successfully
Throttle was removed.
查看broker的限流配置:
$ bin/kafka-configs.sh --describe --zookeeper localhost:2181 --entity-type brokers
Configs for brokers '2' are leader.replication.throttled.rate=700000000,follower.replication.throttled.rate=700000000
Configs for brokers '1' are leader.replication.throttled.rate=700000000,follower.replication.throttled.rate=700000000
这显示了应用于复制协议的leader和follower端上的节流。默认情况下,两边分配相同的节流吞吐量值。
查看限流的副本:
$ bin/kafka-configs.sh --describe --zookeeper localhost:2181 --entity-type topics
Configs for topic 'my-topic' are leader.replication.throttled.replicas=1:102,0:101,
follower.replication.throttled.replicas=1:101,0:102
Setting quotas
配额设置。
# 为user=user1, client-id=clientA配置自定义配额
$ bin/kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' --entity-type users --entity-name user1 --entity-type clients --entity-name clientA
Updated config for entity: user-principal 'user1', client-id 'clientA'.
# 为user=user1配置配额
$ bin/kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' --entity-type users --entity-name user1
Updated config for entity: user-principal 'user1'.
# 为client-id=clientA配置配额
$ bin/kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' --entity-type clients --entity-name clientA
Updated config for entity: client-id 'clientA'.
# 为user=userA配置默认的配额
$ bin/kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' --entity-type users --entity-name user1 --entity-type clients --entity-default
Updated config for entity: user-principal 'user1', default client-id.
# 为user配置默认配额
$ bin/kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' --entity-type users --entity-default
Updated config for entity: default user-principal.
# 为client-id配置默认配额
$ bin/kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' --entity-type clients --entity-default
Updated config for entity: default client-id.
# 查看配额
$ bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-type users --entity-name user1 --entity-type clients --entity-name clientA
Configs for user-principal 'user1', client-id 'clientA' are producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200
$ bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-type users --entity-name user1
Configs for user-principal 'user1' are producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200
$ bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-type clients --entity-name clientA
Configs for client-id 'clientA' are producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200
# 如果不指定entity name ,将显示所有的entity-type的配额
$ bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-type users
Configs for user-principal 'user1' are producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200
Configs for default user-principal are producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200
$ bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-type users --entity-type clients
Configs for user-principal 'user1', default client-id are producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200
Configs for user-principal 'user1', client-id 'clientA' are producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200
注意
: 在broker的配置中增加如下配置,会默认为全部的生产者和消费者进行配额限制.
# 生产者和消费者10MB/s
quota.producer.default=10485760
quota.consumer.default=10485760
kafka-operations
