天天看點

kafka逾時導緻的重複消費的問題

今天看到群友遇到個問題:

問題的表象是逾時導緻autoCommit失敗,進而導緻重複消費

錯誤内容是:

-- :: [kudu--C-] WARN o.a.k.c.consumer.internals.ConsumerCoordinator - Auto offset commit failed for group sm: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
           

記得以前看過session.timeout設定過短,導緻commit失敗,原因是在于kafka是通過heartbeat 判斷session是否逾時,而在用戶端實作heart是同步實作的,是以當一次poll—>process—>commit時間超長的話,會導緻這種情況

查了下kafka的wiki确實有這種情況,認為是個bug

https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=61333789

Adding a KafkaConsumer#heartbeat()/ping() method

One alternative that has been discussed is adding a heartbeat() API which sends a heartbeat and returns a flag (or throws an exception) to indicate that a rebalance is needed. This might change the typical poll() loop to something like the following:

while (running) {
  ConsumerRecords<K, V> records = consumer.poll();
  for (ConsumerRecord<K, V> record : records){
    process(record);
    if (!consumer.heartbeat())
      break;
  }
  consumer.commitSync();
}
           
The problem with this approach is making it work in a reasonable way with offset commits. In the above example, breaking from the loop and committing before all messages have been processed will cause message loss. We can fix this by maintaining the offset map to commit explicitly, but it’s unclear how to make it work with an auto-commit policy. In general, we felt that this added unneeded complexity to the API and that the same effect could be achieved in a safer way by setting max.poll.records to 1 and using the original poll() loop structure.

在看kafka的更新文檔:

Notable changes in 0.10.1.0

The new Java Consumer now supports heartbeating from a background thread. There is a new configuration max.poll.interval.ms which controls the maximum time between poll invocations before the consumer will proactively leave the group (5 minutes by default). The value of the configuration request.timeout.ms must always be larger than max.poll.interval.ms because this is the maximum time that a JoinGroup request can block on the server while the consumer is rebalancing, so we have changed its default value to just above 5 minutes. Finally, the default value of session.timeout.ms has been adjusted down to 10 seconds, and the default value of max.poll.records has been changed to 500.

按這裡說的其實這裡已經把heartbeat放到背景線程處理了,然後看了下群友的使用的也是新版kafak的用戶端

看了下stackoverflow的一篇解釋豁然開朗

https://stackoverflow.com/questions/39730126/difference-between-session-timeout-ms-and-max-poll-interval-ms-for-kafka-0-10-0

Before KIP-62, there is only session.timeout.ms (ie, Kafka 0.10.0 and earlier). max.poll.interval.ms is introduced via KIP-62 (part of Kafka 0.10.1).

KIP-62, decouples heartbeats from calls to poll() via a background heartbeat thread. This, allow for a longer processing time (ie, time between two consecutive poll()) than heartbeat interval.

Assume processing a message takes 1 minute. If heartbeat and poll are coupled (ie, before KIP-62), you will need to set session.timeout.ms larger than 1 minute to prevent consumer to time out. However, if consumer dies, it also takes longer than 1 minute to detect the failed consumer.

KIP-62 decouples polling and heartbeat allowing to sent heartbeat between two consecutive polls. Now you have two threads running, the heartbeat thread and the processing thread and thus, KIP-62 introduced a timeout for each. session.timeout.ms is for the heartbeat thread while max.poll.interval.ms is for the processing thread.

Assume, you set session.timeout.ms=30000, thus, the consumer heartbeat thread must sent a heartbeat to the broker before this time expires. On the other hand, if processing of a single message takes 1 minutes, you can set max.poll.interval.ms larger than one minute to give the processing thread more time to process a message.

If the processing thread dies, it takes max.poll.interval.ms to detect this. However, if the whole consumer dies (and a dying processing thread most likely crashes the whole consumer including the heartbeat thread), it takes only session.timeout.ms to detect it.

The idea is, to allow for a quick detection of a failing consumer even if processing itself takes quite long.

這裡其實解釋了為什麼要增加max.poll.interval.ms這個屬性,從設計上看,如果按原來的設計,我們增大session.timeout會導緻什麼?會導緻服務端感覺用戶端掉線要花很長時間。是以增加了max.poll.interval.這樣session.timeout就可以設定成正常值。我們可以通過max.poll.intervale.ms這個值來控制單次poll–>process的時間。當然減少max.poll.records也是可以的。就如同錯誤提示的建議一樣。