__consumer_offsets(兩個下劃線) 在 Kafka 源碼中有個更為正式的名字,叫位移主題,即 Offsets Topic。
新版本 Consumer 的位移管理機制就是将 Consumer 的位移資料作為一條條普通的 Kafka 消息,送出到 __consumer_offsets 中。可以這麼說,__consumer_offsets 的主要作用是儲存 Kafka 消費者的位移資訊。它要求這個送出過程不僅要實作高持久性,還要支援高頻的寫操作。
位移主題就是普通的 Kafka 主題。你可以手動地建立它、修改它,甚至是删除它。隻不過,它同時也是一個内部主題,大部分情況下,你其實并不需要“搭理”它,也不用花心思去管理它,把它丢給 Kafka 就完事了。
雖說位移主題是一個普通的 Kafka 主題,但它的消息格式卻是 Kafka 自己定義的,使用者不能修改,也就是說你不能随意地向這個主題寫消息,因為一旦你寫入的消息不滿足 Kafka 規定的格式,那麼 Kafka 内部無法成功解析,就會造成 Broker 的崩潰。事實上,Kafka Consumer 有 API 幫你送出位移,也就是向位移主題寫消息。你千萬不要自己寫個 Producer 随意向該主題發送消息。
這個主題存的到底是什麼格式的消息呢?所謂的消息格式,你可以簡單地了解為是一個 KV 對。Key 和 Value 分别表示消息的鍵值和消息體,在 Kafka 中它們就是位元組數組而已。
位移主題的 Key 中應該儲存 3 部分内容:<Group ID,主題名,分區号 >。
消息體儲存了位移送出的一些其他中繼資料,諸如時間戳和使用者自定義的資料等。儲存這些中繼資料是為了幫助 Kafka 執行各種各樣後續的操作,比如删除過期位移消息等。但總體來說,我們還是可以簡單地認為消息體就是儲存了位移值。
位移主題的消息格式有 3 種,除了剛剛我們說的這種格式,還有 2 種格式:
1.用于儲存 Consumer Group 資訊的消息。
2.用于删除 Group 過期位移甚至是删除 Group 的消息。
第 1 種格式非常神秘,以至于你幾乎無法在搜尋引擎中搜到它的身影。不過,你隻需要記住它是用來注冊 Consumer Group 的就可以了。
第 2 種格式相對更加有名一些。它有個專屬的名字:tombstone 消息,即墓碑消息,也稱 delete mark。這些消息隻出現在源碼中而不暴露給你。它的主要特點是它的消息體是 null,即空消息體。
何時會寫入這類消息呢?一旦某個 Consumer Group 下的所有 Consumer 執行個體都停止了,而且它們的位移資料都已被删除時,Kafka 會向位移主題的對應分區寫入 tombstone 消息,表明要徹底删除這個 Group 的資訊。
通常來說,當 Kafka 叢集中的第一個 Consumer 程式啟動時,Kafka 會自動建立位移主題。
位移主題就是普通的 Kafka 主題,那麼它自然也有對應的分區數。但如果是 Kafka 自動建立的,分區數是怎麼設定的呢?這就要看 Broker 端參數 offsets.topic.num.partitions 的取值了。它的預設值是 50,是以 Kafka 會自動建立一個 50 分區的位移主題。
除了分區數,副本數或備份因子是怎麼控制的呢?答案也很簡單,這就是 Broker 端另一個參數 offsets.topic.replication.factor 要做的事情了。它的預設值是 3。
總結一下,如果位移主題是 Kafka 自動建立的,那麼該主題的分區數是 50,副本數是 3。
手動建立位移主題:
具體方法就是,在 Kafka 叢集尚未啟動任何 Consumer 之前,使用 Kafka API 建立它。手動建立的好處在于,你可以建立滿足你實際場景需要的位移主題。比如很多人說 50 個分區對我來講太多了,我不想要這麼多分區,那麼你可以自己建立它,不用理會 offsets.topic.num.partitions 的值。
不過我給你的建議是,還是讓 Kafka 自動建立比較好。目前 Kafka 源碼中有一些地方寫死了 50 分區數,是以如果你自行建立了一個不同于預設分區數的位移主題,可能會碰到各種各種奇怪的問題。這是社群的一個 bug,目前代碼已經修複了,但依然在稽核中。
建立位移主題當然是為了用的,那麼什麼地方會用到位移主題呢?我們前面一直在說 Kafka Consumer 送出位移時會寫入該主題,那 Consumer 是怎麼送出位移的呢?目前 Kafka Consumer 送出位移的方式有兩種:自動送出位移和手動送出位移。
Consumer 端有個參數叫 enable.auto.commit,如果值是 true,則 Consumer 在背景默默地為你定期送出位移,送出間隔由一個專屬的參數 auto.commit.interval.ms 來控制。自動送出位移有一個顯著的優點,就是省事,你不用操心位移送出的事情,就能保證消息消費不會丢失。但這一點同時也是缺點。因為它太省事了,以至于喪失了很大的靈活性和可控性,你完全沒法把控 Consumer 端的位移管理。
很多與 Kafka 內建的大資料架構都是禁用自動送出位移的,如 Spark、Flink 等。這就引出了另一種位移送出方式:手動送出位移,即設定 enable.auto.commit = false。一旦設定了 false,作為 Consumer 應用開發的你就要承擔起位移送出的責任。Kafka Consumer API 為你提供了位移送出的方法,如 consumer.commitSync 等。當調用這些方法時,Kafka 會向位移主題寫入相應的消息。
如果你選擇的是自動送出位移,那麼就可能存在一個問題:隻要 Consumer 一直啟動着,它就會無限期地向位移主題寫入消息。
我們來舉個極端一點的例子。假設 Consumer 目前消費到了某個主題的最新一條消息,位移是 100,之後該主題沒有任何新消息産生,故 Consumer 無消息可消費了,是以位移永遠保持在 100。由于是自動送出位移,位移主題中會不停地寫入位移 =100 的消息。顯然 Kafka 隻需要保留這類消息中的最新一條就可以了,之前的消息都是可以删除的。這就要求 Kafka 必須要有針對位移主題消息特點的消息删除政策,否則這種消息會越來越多,最終撐爆整個磁盤。
Kafka 是怎麼删除位移主題中的過期消息的呢?答案就是 Compaction。國内很多文獻都将其翻譯成壓縮,我個人是有一點保留意見的。在英語中,壓縮的專有術語是 Compression,它的原理和 Compaction 很不相同,我更傾向于翻譯成壓實,或幹脆采用 JVM 垃圾回收中的術語:整理。
Kafka 使用Compact 政策來删除位移主題中的過期消息,避免該主題無限期膨脹。那麼應該如何定義 Compact 政策中的過期呢?對于同一個 Key 的兩條消息 M1 和 M2,如果 M1 的發送時間早于 M2,那麼 M1 就是過期消息。Compact 的過程就是掃描日志的所有消息,剔除那些過期的消息,然後把剩下的消息整理在一起。
Kafka 提供了專門的背景線程定期地巡檢待 Compact 的主題,看看是否存在滿足條件的可删除資料。這個背景線程叫 Log Cleaner。很多實際生産環境中都出現過位移主題無限膨脹占用過多磁盤空間的問題,如果你的環境中也有這個問題,我建議你去檢查一下 Log Cleaner 線程的狀态,通常都是這個線程挂掉了導緻的。