天天看點

線上kafka消息堆積,consumer掉線,怎麼辦?

線上kafka消息堆積,所有consumer全部掉線,到底怎麼回事?

最近處理了一次線上故障,具體故障表現就是kafka某個topic消息堆積,這個topic的相關consumer全部掉線。

整體排查過程和事後的複盤都很有意思,并且結合本次故障,對kafka使用的最佳實踐有了更深刻的了解。

好了,一起來回顧下這次線上故障吧,最佳實踐總結放在最後,千萬不要錯過。

1、現象

  • 線上kafka消息突然開始堆積
  • 消費者應用回報沒有收到消息(沒有處理消息的日志)
  • kafka的consumer group上看沒有消費者注冊
  • 消費者應用和kafka叢集最近一周内沒有代碼、配置相關變更

2、排查過程

服務端、用戶端都沒有特别的異常日志,kafka其他topic的生産和消費都是正常,是以基本可以判斷是用戶端消費存在問題。

是以我們重點放在用戶端排查上。

1)arthas線上修改日志等級,輸出debug

由于用戶端并沒有明顯異常日志,是以隻能通過arthas修改應用日志等級,來尋找線索。

果然有比較重要的發現:

2022-10-25 17:36:17,774 DEBUG [org.apache.kafka.clients.consumer.internals.AbstractCoordinator] - [Consumer clientId=consumer-1, groupId=xxxx] Disabling heartbeat thread
 
2022-10-25 17:36:17,773 DEBUG [org.apache.kafka.clients.consumer.internals.AbstractCoordinator] - [Consumer clientId=consumer-1, groupId=xxxx] Sending LeaveGroup request to coordinator xxxxxx (id: 2147483644 rack: null)
           

看起來是kafka-client自己主動發送消息給kafka叢集,進行自我驅逐了。是以consumer都掉線了。

2)arthas檢視相關線程狀态變量

用arthas vmtool指令進一步看下kafka-client相關線程的狀态。

線上kafka消息堆積,consumer掉線,怎麼辦?

可以看到 HeartbeatThread線程狀态是WAITING,Cordinator狀态是UNJOINED。

此時,結合源碼看,大概推斷是由于消費時間過長,導緻用戶端自我驅逐了。

于是立刻嘗試修改max.poll.records,減少一批拉取的消息數量,同時增大max.poll.interval.ms參數,避免由于拉取間隔時間過長導緻自我驅逐。

參數修改上線後,發現consumer确實不掉線了,但是消費一段時間後,還是就停止消費了。

3、最終原因

相關同學去檢視了消費邏輯,發現了業務代碼中的死循環,确認了最終原因。

消息内容中的一個字段有新的值,觸發了消費者消費邏輯的死循環,導緻後續消息無法消費。

消費阻塞導緻消費者自我驅逐,partition重新reblance,所有消費者逐個自我驅逐。

這裡核心涉及到kafka的消費者和kafka之間的保活機制,可以簡單了解一下。

線上kafka消息堆積,consumer掉線,怎麼辦?

kafka-client會有一個獨立線程HeartbeatThread跟kafka叢集進行定時心跳,這個線程跟lisenter無關,完全獨立。

根據debug日志顯示的“Sending LeaveGroup request”資訊,我們可以很容易定位到自我驅逐的邏輯。

線上kafka消息堆積,consumer掉線,怎麼辦?

HeartbeatThread線程在發送心跳前,會比較一下目前時間跟上次poll時間,一旦大于max.poll.interval.ms 參數,就會發起自我驅逐了。

4、進一步思考

雖然最後原因找到了,但是回顧下整個排查過程,其實并不順利,主要有兩點:

  • kafka-client對某個消息消費逾時能否有明确異常?而不是隻看到自我驅逐和rebalance
  • 有沒有辦法通過什麼手段發現 消費死循環?

4.1 kafka-client對某個消息消費逾時能否有明确異常?

4.1.1 kafka似乎沒有類似機制

我們對消費邏輯進行斷點,可以很容易看到整個調用鍊路。

線上kafka消息堆積,consumer掉線,怎麼辦?

對消費者來說,主要采用一個線程池來處理每個kafkaListener,一個listener就是一個獨立線程。

這個線程會同步處理 poll消息,然後動态代理回調使用者自定義的消息消費邏輯,也就是我們在@KafkaListener中寫的業務。

線上kafka消息堆積,consumer掉線,怎麼辦?

是以,從這裡可以知道兩件事情。

第一點,如果業務消費邏輯很慢或者卡住了,會影響poll。

第二點,這裡沒有看到直接設定消費逾時的參數,其實也不太好做。

因為這裡做了逾時中斷,那麼poll也會被中斷,是在同一個線程中。是以要麼poll和消費邏輯在兩個工作線程,要麼中斷掉目前線程後,重新起一個線程poll。

是以從業務使用角度來說,可能的實作,還是自己設定業務逾時。比較通用的實作,可以是在消費邏輯中,用線程池處理消費邏輯,同時用Future get阻塞逾時中斷。

google了一下,發現kafka 0.8 曾經有consumer.timeout.ms這個參數,但是現在的版本沒有這個參數了,不知道是不是類似的作用。

4.1.2 RocketMQ有點相關機制

然後去看了下RocketMQ是否有相關實作,果然有發現。

在RocketMQ中,可以對consumer設定consumeTimeout,這個逾時就跟我們的設想有一點像了。

consumer會啟動一個異步線程池對正在消費的消息做定時做 cleanExpiredMsg() 處理。

線上kafka消息堆積,consumer掉線,怎麼辦?

注意,如果消息類型是順序消費(orderly),這個機制就不生效。

如果是并發消費,那麼就會進行逾時判斷,如果逾時了,就會将這條消息的資訊通過sendMessageBack() 方法發回給broker進行重試。

線上kafka消息堆積,consumer掉線,怎麼辦?

如果消息重試超過一定次數,就會進入RocketMQ的死信隊列。

spring-kafka其實也有做類似的封裝,可以自定義一個死信topic,做異常處理

4.2 有沒有辦法通過什麼手段快速發現死循環?

一般來說,死循環的線程會導緻CPU飙高、OOM等現象,在本次故障中,并沒有相關異常表現,是以并沒有聯系到死循環的問題。

那通過這次故障後,對kafka相關機制有了更深刻了解,poll間隔逾時很有可能就是消費阻塞甚至死循環導緻。

是以,如果下次出現類似問題,消費者停止消費,但是kafkaListener線程還在,可以直接通過arthas的 thread id 指令檢視對應線程的調用棧,看看是否有異常方法死循環調用。

5、最佳實踐

通過此次故障,我們也可以總結幾點kafka使用的最佳實踐:

  • 使用消息隊列進行消費時,一定需要多考慮異常情況,包括幂等、耗時處理(甚至死循環)的情況。
  • 盡量提高用戶端的消費速度,消費邏輯另起線程進行處理,并最好做逾時控制。
  • 減少Group訂閱Topic的數量,一個Group訂閱的Topic最好不要超過5個,建議一個Group隻訂閱一個Topic。
  • 參考以下說明調整參數值:max.poll.records:降低該參數值,建議遠遠小于<單個線程每秒消費的條數> * <消費線程的個數> * <max.poll.interval.ms>的積。max.poll.interval.ms: 該值要大于<max.poll.records> / (<單個線程每秒消費的條數> * <消費線程的個數>)的值。
希望能得到您的 關注、評論、轉發,謝謝!

阿丸把 Canal源碼解析與實戰筆記、HBase原理與實戰筆記、MySQL實戰筆記、Java實戰技巧筆記 等整理為合集,全是原創手打幹貨,免費分享給大家。

關注我,私信回複【資料】即可獲得。