線上kafka消息堆積,所有consumer全部掉線,到底怎麼回事?
最近處理了一次線上故障,具體故障表現就是kafka某個topic消息堆積,這個topic的相關consumer全部掉線。
整體排查過程和事後的複盤都很有意思,并且結合本次故障,對kafka使用的最佳實踐有了更深刻的了解。
好了,一起來回顧下這次線上故障吧,最佳實踐總結放在最後,千萬不要錯過。
1、現象
- 線上kafka消息突然開始堆積
- 消費者應用回報沒有收到消息(沒有處理消息的日志)
- kafka的consumer group上看沒有消費者注冊
- 消費者應用和kafka叢集最近一周内沒有代碼、配置相關變更
2、排查過程
服務端、用戶端都沒有特别的異常日志,kafka其他topic的生産和消費都是正常,是以基本可以判斷是用戶端消費存在問題。
是以我們重點放在用戶端排查上。
1)arthas線上修改日志等級,輸出debug
由于用戶端并沒有明顯異常日志,是以隻能通過arthas修改應用日志等級,來尋找線索。
果然有比較重要的發現:
2022-10-25 17:36:17,774 DEBUG [org.apache.kafka.clients.consumer.internals.AbstractCoordinator] - [Consumer clientId=consumer-1, groupId=xxxx] Disabling heartbeat thread
2022-10-25 17:36:17,773 DEBUG [org.apache.kafka.clients.consumer.internals.AbstractCoordinator] - [Consumer clientId=consumer-1, groupId=xxxx] Sending LeaveGroup request to coordinator xxxxxx (id: 2147483644 rack: null)
看起來是kafka-client自己主動發送消息給kafka叢集,進行自我驅逐了。是以consumer都掉線了。
2)arthas檢視相關線程狀态變量
用arthas vmtool指令進一步看下kafka-client相關線程的狀态。
可以看到 HeartbeatThread線程狀态是WAITING,Cordinator狀态是UNJOINED。
此時,結合源碼看,大概推斷是由于消費時間過長,導緻用戶端自我驅逐了。
于是立刻嘗試修改max.poll.records,減少一批拉取的消息數量,同時增大max.poll.interval.ms參數,避免由于拉取間隔時間過長導緻自我驅逐。
參數修改上線後,發現consumer确實不掉線了,但是消費一段時間後,還是就停止消費了。
3、最終原因
相關同學去檢視了消費邏輯,發現了業務代碼中的死循環,确認了最終原因。
消息内容中的一個字段有新的值,觸發了消費者消費邏輯的死循環,導緻後續消息無法消費。
消費阻塞導緻消費者自我驅逐,partition重新reblance,所有消費者逐個自我驅逐。
這裡核心涉及到kafka的消費者和kafka之間的保活機制,可以簡單了解一下。
kafka-client會有一個獨立線程HeartbeatThread跟kafka叢集進行定時心跳,這個線程跟lisenter無關,完全獨立。
根據debug日志顯示的“Sending LeaveGroup request”資訊,我們可以很容易定位到自我驅逐的邏輯。
HeartbeatThread線程在發送心跳前,會比較一下目前時間跟上次poll時間,一旦大于max.poll.interval.ms 參數,就會發起自我驅逐了。
4、進一步思考
雖然最後原因找到了,但是回顧下整個排查過程,其實并不順利,主要有兩點:
- kafka-client對某個消息消費逾時能否有明确異常?而不是隻看到自我驅逐和rebalance
- 有沒有辦法通過什麼手段發現 消費死循環?
4.1 kafka-client對某個消息消費逾時能否有明确異常?
4.1.1 kafka似乎沒有類似機制
我們對消費邏輯進行斷點,可以很容易看到整個調用鍊路。
對消費者來說,主要采用一個線程池來處理每個kafkaListener,一個listener就是一個獨立線程。
這個線程會同步處理 poll消息,然後動态代理回調使用者自定義的消息消費邏輯,也就是我們在@KafkaListener中寫的業務。
是以,從這裡可以知道兩件事情。
第一點,如果業務消費邏輯很慢或者卡住了,會影響poll。
第二點,這裡沒有看到直接設定消費逾時的參數,其實也不太好做。
因為這裡做了逾時中斷,那麼poll也會被中斷,是在同一個線程中。是以要麼poll和消費邏輯在兩個工作線程,要麼中斷掉目前線程後,重新起一個線程poll。
是以從業務使用角度來說,可能的實作,還是自己設定業務逾時。比較通用的實作,可以是在消費邏輯中,用線程池處理消費邏輯,同時用Future get阻塞逾時中斷。
google了一下,發現kafka 0.8 曾經有consumer.timeout.ms這個參數,但是現在的版本沒有這個參數了,不知道是不是類似的作用。
4.1.2 RocketMQ有點相關機制
然後去看了下RocketMQ是否有相關實作,果然有發現。
在RocketMQ中,可以對consumer設定consumeTimeout,這個逾時就跟我們的設想有一點像了。
consumer會啟動一個異步線程池對正在消費的消息做定時做 cleanExpiredMsg() 處理。
注意,如果消息類型是順序消費(orderly),這個機制就不生效。
如果是并發消費,那麼就會進行逾時判斷,如果逾時了,就會将這條消息的資訊通過sendMessageBack() 方法發回給broker進行重試。
如果消息重試超過一定次數,就會進入RocketMQ的死信隊列。
spring-kafka其實也有做類似的封裝,可以自定義一個死信topic,做異常處理
4.2 有沒有辦法通過什麼手段快速發現死循環?
一般來說,死循環的線程會導緻CPU飙高、OOM等現象,在本次故障中,并沒有相關異常表現,是以并沒有聯系到死循環的問題。
那通過這次故障後,對kafka相關機制有了更深刻了解,poll間隔逾時很有可能就是消費阻塞甚至死循環導緻。
是以,如果下次出現類似問題,消費者停止消費,但是kafkaListener線程還在,可以直接通過arthas的 thread id 指令檢視對應線程的調用棧,看看是否有異常方法死循環調用。
5、最佳實踐
通過此次故障,我們也可以總結幾點kafka使用的最佳實踐:
- 使用消息隊列進行消費時,一定需要多考慮異常情況,包括幂等、耗時處理(甚至死循環)的情況。
- 盡量提高用戶端的消費速度,消費邏輯另起線程進行處理,并最好做逾時控制。
- 減少Group訂閱Topic的數量,一個Group訂閱的Topic最好不要超過5個,建議一個Group隻訂閱一個Topic。
- 參考以下說明調整參數值:max.poll.records:降低該參數值,建議遠遠小于<單個線程每秒消費的條數> * <消費線程的個數> * <max.poll.interval.ms>的積。max.poll.interval.ms: 該值要大于<max.poll.records> / (<單個線程每秒消費的條數> * <消費線程的個數>)的值。
希望能得到您的 關注、評論、轉發,謝謝!
阿丸把 Canal源碼解析與實戰筆記、HBase原理與實戰筆記、MySQL實戰筆記、Java實戰技巧筆記 等整理為合集,全是原創手打幹貨,免費分享給大家。
關注我,私信回複【資料】即可獲得。