kafka 重新開機consumer 重複消費問題
原文連結:https://blog.csdn.net/z1941563559/java/article/details/88753938
問題描述:kafka的某些topic在消費完畢後一段時間,重新開機唯一消費者,offset會重置為最小offset重新消費,一直導緻kafka消費的重複消費問題。
問題産生原因:是offset資訊過期導緻的。我一直以為消費者保持線上,最新位移資訊是不會過期的。但即使消費者線上,位移資訊也會如約過期。配置的資料保留時間log.retention.hours=168小時比位移保留時間offsets.retention.minutes=1440即24小時要長。offset資訊過期後,重新開機消費者。由于找不到offset資訊,會根據配置auto.offset.reset=earliest從最小位移開始消費,導緻之前已經消費的資料再次被消費。
解決方案:
原文 :https://issues.apache.org/jira/browse/KAFKA-3806
調整log.retention.hours和offsets.retention.minutes的預設值
在特殊情況下,将log.retention.hours(168小時= 7天)和offsets.retention.minutes(1440分鐘= 1天)的預設值結合使用可能很危險。偏移保留應始終大于對數保留。
我們已經觀察到以下情況和問題:
- 生産者更新已于兩天前禁止了該主題的資料生産,該主題未删除。
- 消費者使用了所有資料并正确配置設定了對Kafka的偏移量。
- 消費者不再對該主題進行抵消送出,因為沒有更多傳入資料,也沒有任何可确認的内容。(我們已禁用自動送出功能,我不确定啟用自動送出的行為方式。)
- 一天後:Kafka根據offsets.retention.minutes清除了太舊的偏移量。
- 兩天後:長期運作的使用者在更新後重新啟動,由于該主題已被offsets.retention.minutes删除,是以未找到該主題的任何已送出偏移量,是以從一開始就開始使用它。
- 由于log.retention.hours較長,消息仍在Kafka中,大約5天的消息被再次讀取。
解決此問題的已知解決方法:
- 顯式配置log.retention.hours和offsets.retention.minutes,不要使用預設值。
提案:
- 将offsets.retention.minutes的預設值延長到至少比log.retention.hours大兩倍。
- 在Kafka啟動期間檢查這些值,如果offsets.retention.minutes小于log.retention.hours,則記錄警告。
- 在遷移指南中添加一條注釋,以了解ZooKeeper和Kafka中存儲偏移量之間的差別(http://kafka.apache.org/documentation.html#upgrade)。
修改:
預設參數 offsets.retention.minutes & log.retention.minutes 的預設值問題。
預設參數前者是7天,後者是24小時。會導緻資料雖然儲存但offset失效導緻用戶端資料重複消費的問題。
0.10.0.0官方的參數說明:http://kafka.apache.org/0100/documentation.html#log
offsets.retention.minutes
Log retention window in minutes for offsets topic
Kafka Server端儲存的offset的過期時間。預設值1440(1440分鐘也就是24小時),應該調整為與log.retention.hours一緻,即10080。
log.retention.hours & log.retention.minutes
這兩個參數都是用來設定删除日志的,無論哪個屬性已經溢出,都會進行檔案的删除。
log.retention.hours:
The number of hours to keep a log file before deleting it (in hours), tertiary to log.retention.ms property
參數int類型,預設值:168 (168小時也就是7天)。
log.retention.minutes:
The number of minutes to keep a log file before deleting it (in minutes), secondary to log.retention.ms property. If not set, the value in log.retention.hours is used
參數int類型,預設值:null。