1.重要的配置参数
- 存储
- log.dir
- log.dirs
- topic
- auto.create.topics.enable:是否允许自动创建Topic
- unclean.leader.election.enable:是否允许Unclean Leader选举
- auto.leader.rebalance.enable:是否允许定期进行Leader选举
- retention.ms:规定了该Topic消息被保存的时长。默认是7天
2.生产者消息分区机制
- Kafka的消息组织方式实际上是三级结构:主题-分区-消息
- 分区是实现负载均衡以及高吞吐量的关键
- 分区策略:轮训、随机、按消息键保序
3.生产者压缩算法
- 压缩发生场景:producer端、broker端
- broker端发生压缩的场景
- broker端配置了和producer端不一样的压缩算法
- broker端发生了消息格式转化:主要是为了兼容老版本的消费者程序,这个过程中会涉及消息的解压缩和重新压缩
- 压缩算法
- 在吞吐量方面:LZ4 > Snappy > zstd和GZIP;
- 而在压缩比方面,zstd > LZ4 > GZIP > Snappy
4.无消息丢失怎么配置
1、不要使用producer.send(msg),而要使用producer.send(msg, callback)。记住,一定要使用带有回调通知的send方法。
2、设置acks = all。
- acks是Producer的一个参数,代表了你对“已提交”消息的定义。如果设置成all,则表明所有副本Broker都要接收到消息,该消息才算是“已提交”。这是最高等级的“已提交”定义。
- 0:发出去就算成功
- 1:只要Partition Leader接收到消息而且写入本地磁盘了,就认为成功了,不管他其他的Follower有没有同步过去这条消息
- all:Partition Leader接收到消息之后,还必须要求ISR列表里跟Leader保持同步的那些Follower都要把消息同步过去,才能认为这条消息是写入成功
- acks=all 就可以代表数据一定不会丢失了吗? 当然不是,如果你的Partition只有一个副本,也就是一个Leader,任何Follower都没有,你认为acks=all有用吗? 当然没用了,因为ISR里就一个Leader,他接收完消息后宕机,也会导致数据丢失。 所以说,这个acks=all,必须跟ISR列表里至少有2个以上的副本配合使用,起码是有一个Leader和一个Follower才可以
3、设置retries为一个较大的值。这里的retries同样是Producer的参数,对应前面提到的Producer自动重试。当出现网络的瞬时抖动时,消息发送可能会失败,此时配置了retries > 0的Producer能够自动重试消息发送,避免消息丢失。
4、设置unclean.leader.election.enable = false。这是Broker端的参数,它控制的是哪些Broker有资格竞选分区的Leader。如果一个Broker落后原先的Leader太多,那么它一旦成为新的Leader,必然会造成消息的丢失。故一般都要将该参数设置成false,即不允许这种情况的发生。
5、设置replication.factor >= 3。这也是Broker端的参数。其实这里想表述的是,最好将消息多保存几份,毕竟目前防止消息丢失的主要机制就是冗余。
6、设置min.insync.replicas > 1。这依然是Broker端参数,控制的是消息至少要被写入到多少个副本才算是“已提交”。设置成大于1可以提升消息持久性。在实际环境中千万不要使用默认值1。
7、确保replication.factor > min.insync.replicas。如果两者相等,那么只要有一个副本挂机,整个分区就无法正常工作了。我们不仅要改善消息的持久性,防止数据丢失,还要在不降低可用性的基础上完成。推荐设置成replication.factor = min.insync.replicas + 1。
8、确保消息消费完成再提交。Consumer端有个参数enable.auto.commit,最好把它设置成false,并采用手动提交位移的方式。就像前面说的,这对于单Consumer多线程处理的场景而言是至关重要的。
5.生成者管理TCP连接
- KafkaProducer实例创建时启动Sender线程,从而创建与bootstrap.servers中所有Broker的TCP连接。
- KafkaProducer实例首次更新元数据信息之后,还会再次创建与集群中所有Broker的TCP连接。
- 如果Producer端发送消息到某台Broker时发现没有与该Broker的TCP连接,那么也会立即创建连接。
- 如果设置Producer端connections.max.idle.ms参数大于0,则步骤1中创建的TCP连接会被自动关闭;如果设置该参数=-1,那么步骤1中创建的TCP连接将无法被关闭,从而成为“僵尸”连接。
6.幂等生产者与事务生产者
- 幂等性Producer只能保证单分区、单会话上的消息幂等性
- 事务型Producer能够保证将消息原子性地写入到多个分区中。这批消息要么全部写入成功,要么全部失败。另外,事务型Producer也不惧进程的重启。Producer重启回来后,Kafka依然保证它们发送消息的精确一次处理实际上即使写入失败,Kafka也会把它们写入到底层的日志中,也就是说Consumer还是会看到这些消息
- 事务生产者一文读懂 kafka 的事务机制
- transaction coordinator
- transaction marker
- transaction log
- 两阶段提交在两阶段提交协议的第一阶段,transactional coordinator 更新内存中的事务状态为 “prepare_commit”,并将该状态持久化到 transaction log 中;
- 在两阶段提交协议的第二阶段, coordinator 首先写 transaction marker 标记到目标 topic 的目标 partition,这里的 transaction marker,就是我们上文说的控制消息,控制消息共有两种类型:commit 和 abort,分别用来表征事务已经成功提交或已经被成功终止;
- 在两阶段提交协议的第二阶段, coordinator 在向目标 topic 的目标 partition 写完控制消息后,会更新事务状态为 “commited” 或 “abort”, 并将该状态持久化到 transaction log 中;
7.消费者组到底是什么
- Consumer Group是Kafka提供的可扩展且具有容错性的消费者机制
- Consumer Group下可以有一个或多个Consumer实例。这里的实例可以是一个单独的进程,也可以是同一进程下的线程。在实际场景中,使用进程更为常见一些。
- Group ID是一个字符串,在一个Kafka集群中,它标识唯一的一个Consumer Group。
- Consumer Group下所有实例订阅的主题的单个分区,只能分配给组内的某个Consumer实例消费。这个分区当然也可以被其他的Group消费。
- Rebalance本质上是一种协议,规定了一个Consumer Group下的所有Consumer如何达成一致,来分配订阅Topic的每个分区
- 组成员数发生变更。比如有新的Consumer实例加入组或者离开组,抑或是有Consumer实例崩溃被“踢出”组。
- 订阅主题数发生变更。Consumer Group可以使用正则表达式的方式订阅主题,比如consumer.subscribe(Pattern.compile(“t.*c”))就表明该Group订阅所有以字母t开头、字母c结尾的主题。在Consumer Group的运行过程中,你新创建了一个满足这样条件的主题,那么该Group就会发生Rebalance。
- 订阅主题的分区数发生变更。Kafka当前只能允许增加一个主题的分区数。当分区数增加时,就会触发订阅该主题的所有Group开启Rebalance。
8.消费者重平衡过程
消费者端,重平衡分为两个步骤:分别是加入组和等待领导者消费者(Leader Consumer)分配方案。这两个步骤分别对应两类特定的请求:JoinGroup请求和SyncGroup请求
- 当组内成员加入组时,它会向协调者发送JoinGroup请求。在该请求中,每个成员都要将自己订阅的主题上报,这样协调者就能收集到所有成员的订阅信息。一旦收集了全部成员的JoinGroup请求后,协调者会从这些成员中选择一个担任这个消费者组的领导者。通常情况下,第一个发送JoinGroup请求的成员自动成为领导者。你一定要注意区分这里的领导者和之前我们介绍的领导者副本,它们不是一个概念。这里的领导者是具体的消费者实例,它既不是副本,也不是协调者。领导者消费者的任务是收集所有成员的订阅信息,然后根据这些信息,制定具体的分区消费分配方案。
- 选出领导者之后,协调者会把消费者组订阅信息封装进JoinGroup请求的响应体中,然后发给领导者,由领导者统一做出分配方案后,进入到下一步:发送SyncGroup请求。
- 在这一步中,领导者向协调者发送SyncGroup请求,将刚刚做出的分配方案发给协调者。值得注意的是,其他成员也会向协调者发送SyncGroup请求,只不过请求体中并没有实际的内容。这一步的主要目的是让协调者接收分配方案,然后统一以SyncGroup响应的方式分发给所有成员,这样组内所有成员就都知道自己该消费哪些分区了