- 位移主題(Offset Topic)
__consumer_offsets是kafka内部的主題,這裡使用位移主題指代__consumer_offsets。
在上一章中我們講過,老版本Consumer的位移管理依托于Apache Zookeeper的,自動的或手動的将位移送出給Zookeeper中儲存。這種設計使得Kafka Broker不需要儲存位移資料,減少了Broker端需要持有的狀态空間,有利于實作高伸縮性。但是Zookeeper并不适用于高頻的寫操作,kafka社群自0.8.2.x開始,醞釀着修改掉這種設計,最終在更新版本Consumer中正式推出全新的位移管理機制,不過0.9新版本consumer剛剛推出,一堆bug,還是不要用它了。
将Consumer的位移資料作為一條條普通的Kafka消息,送出到__consumer_offsets主題中。位移主題和普通的kafka主題一樣,可以手動的建立、删除,隻不過是個内部主題,大部分情況都是由kafka自己管理。位移主體的消息格式kafka已經定好了,使用者不能修改,也就是不能随意向這個主意寫消息,因為一旦寫入的消息不滿足kafka規定的格式,kafka内部就無法成功解析。造成Broker崩潰。事實上,Kafka Consumer有API幫你送出位移,也就是向位移主題寫消息。
那麼位移主題的消息格式是什麼樣的呢?前面說過他的消息格式就是KV對,Key表示鍵值,Value表示消息體。Key的内容:<Gruop ID, 主題名, 分區号>。Group ID唯一辨別消費消息的主題,主題名和分區号自然表示Consumer Group消費的主題群組内執行個體消費的分區。另外多說幾句,除了Consumer Group,kafka還支援Consumer,也稱 Standalone Consumer。它的運作機制與Consumer Group完全不同,但是位移管理的機制相同,是以也是适用這套消息格式的。消息體的設計,除了儲存位移值外,還要儲存位移送出的其他一些中繼資料,如時間戳和使用者自定義的資料等。儲存這些中繼資料,是為了幫助Kafka執行各種各樣的後續操作,比如删除過期位移消息等。
位移主題的格式還有另外2種:用于儲存Consumer Group資訊的消息;用于删除Group過期位移甚至删除Group的消息。前面一種格式非常神秘,以至于幾乎無法搜到它,隻需要記住它是用來注冊Consumer Group的。是以一般情況,位移主題中會有兩種格式消息,一種是儲存執行個體消費的位移消息,一種就是這個組注冊消息,Key是Group ID。後面一種格式,有專屬的稱呼:tombstone消息,即墓碑消息也稱delete mark。這些消息隻出現在源碼中,主要特點是他的消息體是null,即空消息體。什麼時候寫入這類消息,一旦某個Consumer Group下所有Consumer執行個體都停止,而且他們的位移資料都已被删除,Kafka會向位移主題對應分區寫入tombstone消息,表明要徹底删除這個Group 的資訊。
接下來看看位移主題怎麼被建立?當Kafka叢集中第一個Consumer程式啟動時,Kafka會自動建立位移主題。分區數由Brokerd端參數offsets.topic.num.partitions的指定,預設值50,副本數或備份因子有Broker端另一個參數offfsets.topic.replication.factor指定,預設值3。當然你也可以手動建立位移主題,具體方法是,在Kafka叢集尚未啟動Consumer之前,使用Kafka API建立它。手動建立的好處在于,可以建立滿足實際場景需要的位移主題,比如說50個分區太多,可以自己建立它,不用理會參數值。不過還是推薦Kafka自動建立,目前Kafka源碼有一些地方寫死50個分區數,如果你自行建立一個不同于預設分區數的位移主題,可能會碰到各種各樣的問題,這個是社群的BUG,目前已修複但仍在稽核中。
什麼時候會用到位移主題?Kafka Consumer送出位移的時候寫入該主題,送出位移方式有兩種:手動送出和自動送出。Consumer端有個參數auto.commit.enable,如果值為true,則背景預設定期送出位移,送出間隔由一個專屬參數auto.commit.interval.ms控制。自動送出省事兒,但是喪失了靈活性和可控性。事實上,很多內建了Kafka的的大資料架構都是禁用自動送出的,如Spark、Flink等。那就要使用手動送出位移,先要将參數設定成false,Kafka Consumer API為你提供了位移送出的方法,如consumer.commitSync等。另外,自動送出還有一個問題,那就是隻要Consumer一直啟動着,就會無限期的往位移主題寫入消息。舉一個極端的例子,假設Consumer目前消費的某主題的最新一條消息,位移是100,之後該主題沒有任何消息産生,故Consumer無消息可消費,由于是自動送出,位移主題中會不停的寫入位移等于100的消息。顯然隻需要儲存一條這樣的消息就可以了,這就要求Kafka必須要有針對位移主題消息特點的消息删除政策,否則消息會越來越多,最終撐爆整個磁盤。
Kafka是怎麼删除位移主題中的過期消息?答案是Compaction。翻譯成壓實或者整理,但是很多人翻譯成壓縮,有些欠妥。Kafka使用Compact政策删除位移主題過期消息,避免該主題無限膨脹。那如何定義過期呢?對于同一個Key的兩條消息M1和M2,如果M1發送時間早于M2,那M1就是過期消息,對應參數是offsets.retention.minutes。Compact過程就是掃描日志所有消息,剔除過期的消息,把剩下的放到一起。這裡貼一張官網的圖檔:

Kafka專門提供背景線程定期巡檢待Compact的主題,看看是否存在滿足條件的可删除資料,這個背景線程叫Log Cleaner。如果你的生産環境中出現過位移主題無限膨脹的問題,建議檢查一下這個線程狀态,看是否挂掉。
實際上,将很多中繼資料存入内部主題的做法越來越流行,除了位移管理,kafka事務也利用這個方法,但是另外一個主題。社群想法很簡單:既然kafka天然實作高持久性和高吞吐量,那麼任何子服務有這兩方面需求,都可以Kafka自己去實作,不必求助于外部系統。
- 位移送出
之前說過,Consumer的消費位移跟分區位移不是一個概念,但是值的具體形式是一樣的,消費位移記錄的是Consumer要消費的下一條消息的位移,而不是目前消費的最新位移。那麼,Consumer需要向Kafka彙報自己的位移資料,這個彙報過程被稱為送出位移。Consumer需要為配置設定給他的每個分區送出各自的位移資料。
送出位移主要是為了表征Consumer的消費進度,這樣當Consumer發生故障重新開機後,就能夠從kafka中讀取之前送出的位移值,避免整個消費過程重來一遍。換句話說,位移送出,是kafka提供給你的一個工具或語義保障,你負責維護這個語義保障,即你送出了位移X,那麼kafka就認為所有位移值小于X的消息都被你消費過了。位移送出非常靈活,你完全可以送出任何位移值,但由此産生的後果你也要一并承擔。假設你的Consumer消費了10條消息,你送出的位移是20,那麼介于11~19之間的消息是有可能丢失的;相反地,你送出了5的位移值,那麼介于5~9之間的位移就可能被重複消費了。是以位移送出的語義保障是由你負責,kafka隻會“無腦”接收你送出的位移。
鑒于位移送出甚至是位移管理對Consumer的影響巨大,Kafka,特别是KafkaConsumer API,提供了多種送出位移的方法。從使用者角度,位移送出分為自動送出和手動送出;從Consumer角度,位移送出分為同步送出和異步送出。
所謂自動送出,就是指Kafka Consumer在背景默默地為你送出位移。在Consumer端有個參數enable.auto.commit,把他設定成true或者壓根不管他就可以,因為預設值就是true,如果你啟用了自動送出,還有一個參數是有用的,auto.commit.interval.ms,預設值是5秒,表示每5秒自動送出一次。下面展示了設定自動送出位移的方法:
如果使用手動送出位移,那麼上面的參數就要顯示地設定成false,再在代碼中調用手動送出位移的API。最簡單的API就是KafkaConsumer#commitSync(),該方法會送出KafkaConsumer#poll()傳回的最新位移。從名字看就知道它是一個同步操作,即該方法會一直等待,直到位移送出成功傳回。如果送出過程出現異常,就會抛出異常。看下面代碼展示:
再來說說自動送出可能出現的問題以及手動送出的優缺點。預設情況下,Consumer每5秒自動送出一次位移,現在我們假設送出位移之後的3秒發生了Rebalance操作,在Rebalance之後,所有Consumer從上一次送出的位移處繼續消費,但該位移已經是3秒之前的資料,故在Rebalance之前的3秒消費的所有資料都要重新消費一次。雖然你能通過調整參數auto.commit.interval.ms的值來提高頻率,但終究還是不能消除。值得注意的是,這裡送出間隔其實是至少的意思,比如單線程處理消息,那麼隻有處理完消息後才會送出位移,可能遠比你設定的時間長;反過來,如果消息已經處理完,但還沒有到達這個至少時間,仍然需要等待達到這個時間再送出。反觀手動送出,好處在于更加靈活,完全能夠把控位移送出的時機和頻率。缺陷是在調用commitSync()時,Consumer處于阻塞狀态,直到遠端Broker傳回結果,這個狀态才會結束。在任何系統中,因為程式而非資源限制造成的阻塞都可能是系統的瓶頸,會影響整個應用的TPS。如果想拉長送出時間間隔,會使送出頻率下降,下次重新開機回來,會有更多消息被重新消費。
鑒于這問題,社群為手動送出位移提供了另一個API方法:KafkaConsumer#commitAsync(),即異步送出。調用這個方法,不會阻塞,而是立即傳回,就不會影響TPS。由于是異步的,需要提供回調函數( callback ),供你實作送出後的邏輯,比如記錄日志或處理異常等。下面展示異步調用的代碼:
commitAsync是否能替代commitSync?當然不能。異步送出的問題在于,出現問題不會重試,而且異步送出的重試也沒有意義,因為你想一下,消費和送出是分開進行的,當送出失敗後自動重試,其實重試送出的位移值早已不是最新值。是以,如果手動送出時,能将同步送出和異步送出組合使用,那就能達到最理想的效果,因為:
- 我們可以利用commitSync的自動重試規避瞬時錯誤,比如網絡抖動造成、Broker端的GC等。
- 我們不希望程式總是處于阻塞狀态,影響TPS。
下面展示一下組合使用同步送出和異步送出的代碼:
對于正常性、階段性的手動送出,調用commitAsync()避免程式阻塞,而在程式關閉前,調用commitSync()執行同步阻塞式的位移送出,確定能夠儲存正确的位移資料。以上所說的都是送出poll方法傳回地所有消息的位移,比如poll方法一次傳回500條消息,當你處理完這500條消息,會一次性的将這500條消息的位移一并處理。簡單來說,直接送出最新一條消息的位移。但是如果想更細粒度的送出位移,該怎麼辦?
更細粒度的場景,先來解釋一下,假如:你的poll方法傳回的是5000條消息肯定不想把這5000條消息處理完再送出位移,因為一旦中間出錯,之前處理的全部都要重來一遍。對于一次要處理很多消息的consumer而言,有沒有方法允許在消費的中間進行位移送出,比如沒處理100個消息就送出一次位移,這樣能避免大批量的消息重新消費。Kafka Consumer API為手動提價提供了這樣的方法:commitSync(Map<TopicPartition, OffsetAndMetadata>) 和 commitAsync(Map<TopicPartition, OffsetAndMetadata>)。OffsetAndMetadata儲存的就是位移資料。
下面展示commitAsync調用示例,其實commitSync調用和他一樣:
無論是自動送出還是手動送出位移,都是無法完全避免消息的重複消費,我們可以考慮,将offset送出個事件處理結果放入一個支援原子性操作的存儲可以避免,類似于事務。另外Kafka Streams支援精确處理語義,也可以一試。
- CommitFailedException異常處理
所謂的CommitFailedException,就是Consumer在送出位移是出現錯誤或者異常,而且還是那種不可恢複的嚴重異常。如果異常可恢複,那麼送出位移API自己就可以規避,commitSync方法。每次和CommitFailedException一起出現還有一段著名的注釋,下面看看社群的最新解釋:
這段話前半部分意思是,本次送出位移失敗,原因是消費者組已經開啟Rebalance過程,并且将要送出唯一的分區配置設定其他執行個體,出現這種情況原因是,你的消費者執行個體連續兩次調用poll方法時間間隔超過了max.poll.interval.ms參數值。這通常表明你的執行個體花費太長時間來處理消息,耽誤poll調用。橙色字部分給出了相應的解決辦法:
- 增加參數期望值;
- 減少poll方法一次性傳回的消息數量,即減少max.poll.records參數值。
其實這段文字還有 兩段舊版本:
下面來讨論一下異常是什麼時候抛出的,從源代碼方面來說,是出現在手動送出位移時,即使用者顯示調用KafkaConsumer.commitSync()方法時。有兩種典型場景可能會遭遇該異常。
場景一:當消息處理總時間超過預設的max.poll.interval.ms參數值時,Consumer端會抛出這個異常。下面用一個成來模拟一下:
如果要避免這種場景下抛出異常,需要簡化的你的消息處理邏輯。有4中方法:
- 縮短單條消息處理時間,這個就需要優化消費系統。
- 增加Consumer端允許下遊系統消費一批消息的最大時長,max.poll.interval.ms,預設值是5分鐘。注意,這個參數是在0.10.1.0版本引入的。如果你依然在使用之前用戶端API,就需要增加session.timeout.ms參數值。但還是這個參數還有其他意思,是以增加值可能會引起其他不良影響,這也是社群引入max.poll.interval.ms參數的原因。
- 減少下遊系統一次性消費的消息總數,這取決于max.poll.records參數值。
- 下遊系統使用多線程來加速消費。這應該是最進階的同時也是最難實作的解決放辦法了。具體的思路就是建立多個消費線程處理poll方法傳回的消息,你可以靈活地控制線程數量,随時調整消費承載能力,再配以目前多核的硬體條件。事實上,很多主流大資料處理架構使用的都是這個方法,比如Apache Flink在內建Kafka是,就是建立多個KafkaConsumerThread線程,自行處理多線程間的資料消費。但是,多線程處理極易出現錯誤,特别是多線程在處理位移送出這個問題。
綜合以上4中處理方法,個人推薦方法1來首先嘗試預防此異常發生,但是如果方法1實作起來有難度,可以按照下面法則來實踐2、3。
首先,需要弄清楚下遊系統消費每條消息平均時延是多少。比如,你的消費邏輯從Kafka擷取消息後寫入到下遊MongoDB中,假設通路MongoDB平示範不超過2s。如果按照max.poll.records=500來計算,一批消息的總消費時長大約是1000秒,是以參數值max.poll.interval.ms不能低于1000秒。如果使用預設值,那麼就很大機率會出現CommitFailedException異常。
其次,你還要調整max.poll.records的值,還拿剛才例子,可以設定為150,甚至更少,這樣每批消息的總消費時長不會超過300秒,即max.poll.interval.ms的預設值。
場景二:從理論上講,關于該異常你了解到這個程度,已經足以幫你應對應用開發過程中由該異常帶來的“坑”了。但是還有一個比較冷門的場景,可以幫你拓寬Kafka的知識面。Kafka Java Consumer端還提供了一個名為Standalone Consumer的獨立消費者。獨立消費者沒有組的概念,彼此間獨立,但是位移送出個消費者組是一樣的,是以獨立的消費者也必須遵循之前說的那些規定。Standalone Consumer并未出現在官方文檔中,你可以在javadoc中看到一些:https://kafka.apache.org/23/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#manualassignment
那麼問題來了,如果你的應用中同時出現設定了相同的group.id值的消費者組和獨立消費者,那麼當獨立消費者程式手動送出位移時,Kafka就會抛出該異常,因為kafka無法識别具有相同group.id的消費執行個體。一旦出現不湊巧的重複group.id,出現了該異常,上面說的所有方法都不能規避該異常。比起傳回該異常隻是表明送出位移失敗,更好做法應該是,在Consumer端應用程式的某個地方,能夠以日志或其他友好的方式提示你錯誤的原因。
标注:這個系列文章是本人在極客時間專欄---kafka核心技術與實戰中的學習筆記
https://time.geekbang.org/column/article/101171