天天看点

Kafka 客户端 Consumer 常用配置

分类

  • 消费者组(consume grou)
  • 独立消费者(standalone consume)

介绍:

一个消费者组可以包含多个消费者,对于主题中的消息同一个消费者组的每个消费者消费一部分。也就是说,所有的消费者消费的消息合在一起才是一个主题的完整消息。这种消费者和消费者组的设计可以让整体的消费能力具有横向伸缩性,比如在主题消息量非常大的情况下,单个消费者处理该主题会非常吃力,可以增加更多的消费者,让它们分担负载,每个消费者只处理部分消息,这样就可以提高整体的消费能力。

对于多个消费者组订阅同一个主题,每个消费者组之间是互不影响的。如有消费者组A和消费者组B,同时订阅了一个主题TopicA,在 Kafka 中消费者组A会获取到 TopicA 中的所有消息,消费者B也会获取到 TopicA 的所有消息。由此可以知道每个消费者组是相互独立的,消费者组之间不会互相影响。

Kafka 是同时支持点对点模式的和发布/订阅模式两种模式,这些都是通过消费者和消费者组来实现的:

  • 如果所有的消费者都在同一个消费者组,那么每个消息只会被一个消费者处理,这就相当于点对点模式的应用
  • 如果所有的消费者不再同一个消费者组,那么所有的消息都会被广播给所有的消费者,这就相当于发布/订阅模式
  • 消费者组是一个逻辑上的概念,它将属于它的消费者归为一类,每个消费者只属于一个消费组。每个消费者组都会有一个固定的名称

    group.id

    ,消费者在进行消费之前需要指定其所属消费者组的名称,这个通过消费者客户端参数 group.id 来配置。

客户端位移(offset):consumer定期向kafka集群发送自己数据消费进度,这一过程称之位移提交。

消费者组重平衡(consumer group rebalance):只针对于consumer group 有效,消费者组中各个消费者对于订阅主题的分区分配过程。

配置

KafkaConsume 非线程安全

配置类 作用 demo
org.apache.kafka.clients.consumer.ConsumerConfig#SESSION_TIMEOUT_MS_CONFIG session.timeout.ms 使用Kafka的组管理工具时用于检测客户端故障的超时。客户机发送周期性的心跳信号以指示其活动性给代理。如果在此会话超时过期之前代理未接收到心跳,则代理将从组中删除此客户端并启动重新平衡。请注意,值必须在代理配置中通过

group.min.session.timeout.ms

group.max.session.timeout.ms

配置的允许范围内。
org.apache.kafka.clients.consumer.ConsumerConfig#MAX_POLL_INTERVAL_MS_CONFIG max.poll.interval.ms 使用使用者组管理时调用poll()之间的最大延迟。这给使用者在获取更多记录之前可以空闲的时间量设置了上限。如果在此超时到期之前未调用poll(),则认为使用者失败,组将重新平衡,以便将分区重新分配给另一个成员对于使用达到此超时的非空

group.instance.id

的使用者,不会立即重新分配分区。相反,使用者将停止发送心跳信号,并且分区将在

session.timeout.ms

过期后重新分配。这反映了已关闭的静态使用者的行为。
org.apache.kafka.clients.consumer.ConsumerConfig#FETCH_MAX_BYTES_CONFIG fetch.max.bytes 服务器应为获取请求返回的最大数据量。记录由使用者分批获取,如果获取的第一个非空分区中的第一个记录批大于此值,则仍将返回该记录批,以确保使用者能够取得进展。因此,这不是绝对最大值。代理接受的最大记录批大小是通过

message.max.bytes

(broker 配置)或

max.message.bytes

(topic配置)定义的。请注意,使用者并行执行多个提取。
当业务环境消息数据很大时,必须设置一个很大的值,负责导致数据无法被处理
org.apache.kafka.clients.CommonClientConfigs#HEARTBEAT_INTERVAL_MS_CONFIG heartbeat.interval.ms 使用Kafka的组管理工具时,从心跳到消费者协调器的预期时间间隔。心跳用于确保消费者会话保持活动状态,并在新消费者加入或离开组时促进重新平衡该值必须设置为小于session.timeout.ms,但通常不应设置为大于该值的1/3。它可以调整得更低,以控制正常再平衡的预期时间;
org.apache.kafka.clients.CommonClientConfigs#CONNECTIONS_MAX_IDLE_MS_CONFIG connections.max.idle.ms 在此配置指定的毫秒数之后关闭空闲连接 kafka定期关闭空闲的socket连接,默认9分钟