天天看點

Kafka 用戶端 Consumer 常用配置

分類

  • 消費者組(consume grou)
  • 獨立消費者(standalone consume)

介紹:

一個消費者組可以包含多個消費者,對于主題中的消息同一個消費者組的每個消費者消費一部分。也就是說,所有的消費者消費的消息合在一起才是一個主題的完整消息。這種消費者和消費者組的設計可以讓整體的消費能力具有橫向伸縮性,比如在主題消息量非常大的情況下,單個消費者處理該主題會非常吃力,可以增加更多的消費者,讓它們分擔負載,每個消費者隻處理部分消息,這樣就可以提高整體的消費能力。

對于多個消費者組訂閱同一個主題,每個消費者組之間是互不影響的。如有消費者組A和消費者組B,同時訂閱了一個主題TopicA,在 Kafka 中消費者組A會擷取到 TopicA 中的所有消息,消費者B也會擷取到 TopicA 的所有消息。由此可以知道每個消費者組是互相獨立的,消費者組之間不會互相影響。

Kafka 是同時支援點對點模式的和釋出/訂閱模式兩種模式,這些都是通過消費者和消費者組來實作的:

  • 如果所有的消費者都在同一個消費者組,那麼每個消息隻會被一個消費者處理,這就相當于點對點模式的應用
  • 如果所有的消費者不再同一個消費者組,那麼所有的消息都會被廣播給所有的消費者,這就相當于釋出/訂閱模式
  • 消費者組是一個邏輯上的概念,它将屬于它的消費者歸為一類,每個消費者隻屬于一個消費組。每個消費者組都會有一個固定的名稱

    group.id

    ,消費者在進行消費之前需要指定其所屬消費者組的名稱,這個通過消費者用戶端參數 group.id 來配置。

用戶端位移(offset):consumer定期向kafka叢集發送自己資料消費進度,這一過程稱之位移送出。

消費者組重平衡(consumer group rebalance):隻針對于consumer group 有效,消費者組中各個消費者對于訂閱主題的分區配置設定過程。

配置

KafkaConsume 非線程安全

配置類 作用 demo
org.apache.kafka.clients.consumer.ConsumerConfig#SESSION_TIMEOUT_MS_CONFIG session.timeout.ms 使用Kafka的組管理工具時用于檢測用戶端故障的逾時。客戶機發送周期性的心跳信号以訓示其活動性給代理。如果在此會話逾時過期之前代理未接收到心跳,則代理将從組中删除此用戶端并啟動重新平衡。請注意,值必須在代理配置中通過

group.min.session.timeout.ms

group.max.session.timeout.ms

配置的允許範圍内。
org.apache.kafka.clients.consumer.ConsumerConfig#MAX_POLL_INTERVAL_MS_CONFIG max.poll.interval.ms 使用使用者組管理時調用poll()之間的最大延遲。這給使用者在擷取更多記錄之前可以空閑的時間量設定了上限。如果在此逾時到期之前未調用poll(),則認為使用者失敗,組将重新平衡,以便将分區重新配置設定給另一個成員對于使用達到此逾時的非空

group.instance.id

的使用者,不會立即重新配置設定分區。相反,使用者将停止發送心跳信号,并且分區将在

session.timeout.ms

過期後重新配置設定。這反映了已關閉的靜态使用者的行為。
org.apache.kafka.clients.consumer.ConsumerConfig#FETCH_MAX_BYTES_CONFIG fetch.max.bytes 伺服器應為擷取請求傳回的最大資料量。記錄由使用者分批擷取,如果擷取的第一個非空分區中的第一個記錄批大于此值,則仍将傳回該記錄批,以確定使用者能夠取得進展。是以,這不是絕對最大值。代理接受的最大記錄批大小是通過

message.max.bytes

(broker 配置)或

max.message.bytes

(topic配置)定義的。請注意,使用者并行執行多個提取。
當業務環境消息資料很大時,必須設定一個很大的值,負責導緻資料無法被處理
org.apache.kafka.clients.CommonClientConfigs#HEARTBEAT_INTERVAL_MS_CONFIG heartbeat.interval.ms 使用Kafka的組管理工具時,從心跳到消費者協調器的預期時間間隔。心跳用于確定消費者會話保持活動狀态,并在新消費者加入或離開組時促進重新平衡該值必須設定為小于session.timeout.ms,但通常不應設定為大于該值的1/3。它可以調整得更低,以控制正常再平衡的預期時間;
org.apache.kafka.clients.CommonClientConfigs#CONNECTIONS_MAX_IDLE_MS_CONFIG connections.max.idle.ms 在此配置指定的毫秒數之後關閉空閑連接配接 kafka定期關閉空閑的socket連接配接,預設9分鐘