spring-kafka 重複消費資料
- spring內建kafuka架構
-
- 版本介紹
- 解決方案
spring內建kafuka架構
最近公司需要對接kafka拉取資料,在使用spring-kafka架構時候,總是無法持續消費,總是出現持續消費,相當糾結。因為也是剛接手任務,故整理了一下遇到的難題,特此整理一下,望對各位同學有些幫助。
版本介紹
項目架構主要是spring mvc 架構版本是5.0.2.RELEASE版本,spring-kafka版本是2.1.0.RELEASE ,kafka-client的版本是1.0.0版本。
在消費資料的時,遇見如下異常錯誤資訊:
了解下來就是當kafka處理完500記錄後去送出offset偏移量的時候,發現會話已經結束了,認為系統處理完成了,需要重新配置設定分區,在分區的時候出現了混亂,導緻系統又要重新去消費已消費過的資料。重複消費資料。
MAX_POLL_INTERVAL_MS_CONFIG屬性意思為kafka消費者在每一輪poll()調用之間的最大延遲,消費者在擷取更多記錄之前可以空閑的時間量的上限。這個參數不能跟 SESSION_TIMEOUT_MS_CONFIG時間數相同 需要比SESSION_TIMEOUT_MS_CONFIG時間要大些。
解決方案
随後根據業務處理的複雜程度以及耗時的長短做出了參數調整,
主要是配置consumerConfig對象,截圖如下:
主要注意的是需要将ENABLE_AUTO_COMMIT_CONFIG 送出改為false,借此轉交給spring-kafka架構幫忙送出
offset偏移量,同時需要将SESSION_TIMEOUT_MS_CONFIG時間長度根據設定MAX_POLL_RECORDS_CONFIG的記錄數來計算一個會話時長。最後進行平衡調整。
目前生産者傳入2個隊列,每個隊列有5個分區,實時去推送資料。而對應消費者需要建立5個及以上的副本去消費資料。
目前隻是針對一個topic隊列做了副本增量。後續會對另一個topic隊列做副本增量。
在經曆了幾點的資料跑試之後,目前發現錄入資料庫的效率非常緩慢,平均處理一條訂單資料耗時在3s左右,實在不能忍受了,最後直接換了一個新庫資料專門錄入此庫,效率非常明顯了,房單資料堆積量已為0達到了實時同步,訂單資料堆積量已剩下40萬左右,可以在一天左右可以将堆積量消耗為0 。 講這些的意思,這次的問題出現應該屬于資料庫達到瓶頸。需要從資料庫吞吐量考慮程式效率問題。