文章目錄
- 1. 為什麼需要存儲Offset
- 2. __consumer_offsets
-
- 2.1 OffsetCommitRequest
-
- 2.1.1 Key 和 Value
- 2.2 OffsetCommitResponse
- 2.3 檢視__consumer_offsets中的消息
- 3. offset要送出到__consumer_offsets的哪個分區
- 參考
1. 為什麼需要存儲Offset
由于消費者在消費消息的時候可能會由于各種原因而斷開消費,當重新啟動消費者時我們需要讓它接着上次消費的位置offset繼續消費,是以消費者需要實時的記錄自己以及消費的位置。
在0.90版本之前,這個資訊是記錄在zookeeper内的,在0.90之後的版本,offset儲存在__consumer_offsets 這個topic内。
每個consumer會定期将自己消費分區的offset送出給kafka内部topic:__consumer_offsets,送出過去的時候,key是consumerGroupId+topic+分區号,value就是目前offset的值,kafka會定期清理topic裡的消息,最後就保留最新的那條資料
因為
__consumer_offsets可能會接收高并發的請求,kafka預設給其配置設定50個分區
(可以通過offsets.topic.num.partitions設定),這樣可以通過加機器的方式抗大并發。
2. __consumer_offsets
__consumer_offsets 是 kafka 自行建立的,和普通的 topic 相同。它存在的目的之一就是儲存 consumer 送出的位移。
__consumer_offsets 就是系統自動幫我們建立的隐藏的主題,既然是主題,就可以通過建立多個分區提高通路性能,kafka 預設為該 topic 建立了50個分區。
__consumer_offsets 的每條消息格式大緻如圖所示:

可以想象成一個 KV 格式的消息,key 就是一個三元組:group.id+topic+分區号,而 value 就是 offset 的值。
考慮到一個 kafka 生成環境中可能有很多 consumer 和 consumer group,如果這些 consumer 同時送出位移,則必将加重 __consumer_offsets 的寫入負載,是以
kafka 預設為該 topic 建立了50個分區,并且對每個 group.id 做哈希求模運算,進而将負載分散到不同的 __consumer_offsets 分區上
。
一般情況下,當叢集中第一次有消費者消費消息時會自動建立 __consumer_offsets,它的副本因子受 offsets.topic.replication.factor 參數的限制,預設值為3(注意:該參數的使用限制在0.11.0.0版本發生變化),分區數可以通過 offsets.topic.num.partitions 參數設定,預設值為50。
2.1 OffsetCommitRequest
用戶端送出消費位移是使用 OffsetCommitRequest請求實作的,OffsetCommitRequest 的結構如下圖所示:
請求體第一層中的 group_id、generation_id 和 member_id 表示消費者具體資訊, retention_time 表示目前送出的消費位移所能保留的時長,不過對于消費者而言這個值置為1。也就是說,按照 broker 端的配置 offsets.retention.minutes 來确定保留時長,預設為10080,即7天,超過這個時間後消費位移的資訊就會被删除(使用墓碑消息和日志壓縮政策)。
注意:這個參數在2.0.0版本之前的預設值為1440,即1天。
OffsetCommitRequest 中的其餘字段大抵也是按照分區的粒度來劃分消費位移的,注意還有一個 metadata 字段。metadata 是自定義的中繼資料資訊,如果不指定這個參數,那麼就會被設定為空字元串,注意 metadata 的長度不能超過 broker 端參數 offset.metadata.max.bytes 參數所配置的大小,預設值為4096。
2.1.1 Key 和 Value
同消費組的中繼資料資訊一樣,最終送出的消費位移也會以消息的形式發送至 __consumer_offsets,與消費位移對應的消息隻定義了 key和 value 字段的具體内容,它不依賴于具體版本的消息格式,以此做到與具體的消息格式無關。
下圖中展示了消費位移對應的消息内容格式,上面是消息的 key,下面是消息的 value。可以看到 key 和 value 中都包含了 version 字段,這個用來辨別具體的 key 和 value 的版本資訊,不同的版本對應的内容格式可能并不相同。到目前版本版本(2.x)而言 key 和 value 的 version 值都為1。
Key
key 中除了 version 字段還有 group、 topic 、 partition 字段,分别表示消費組的 groupId、topic 和 partition 編号。雖然 key 中包含了4個字段,但最終确定這條消息所要存儲的分區還是根據單獨的 group 字段來計算的,這樣就可以保證消費位移資訊與消費組對應的 GroupCoordinator 處于同一個 broker 節點上,省去了中間輪轉的開銷,這一點與消費組的中繼資料資訊的存儲是一樣的。
Value
value 中包含了5個字段,除 version 字段外,其餘的 offset、metadata、commit_timestamp、expire_timestamp 字段分别表示消費位移、自定義的中繼資料資訊、位移送出到 Kafka 的時間戳、消費位移被判定為逾時的時間戳。其中 offset 和 metadata 與 OffsetCommitRequest 請求體中的offset 和metadata 對應,而 commit_timestamp 和 OffsetCommitRequest 請求體中的 retention_time 也有關聯,commit_timestamp 值與 offsets.retention.minutes 參數值之和即為 expire_timestamp (預設情況下)。
2.2 OffsetCommitResponse
2.3 檢視__consumer_offsets中的消息
可以通過 kafka-console-consumer.sh 腳本來檢視 __consumer_offsets 中的内容,不過要設定
--formatter
參數為
kafka.coordinator.group.GroupMetadataManager$OffsetsMessageFormatter
注意,該參數在0.11.0版本之前為 kafka.coordinator.GroupMetadataManager$OffsetsMessageFormatter
示例:
一般情況下, 使用 OffsetsMessageFormatter 列印的格式可以概括為:
如果某個 key( version + group + topic +partition 的組合)對應的消費位移過期了,那麼對應的 value 就會被設定為 null,也就是墓碑消息(__consumer_offsets 使用的是日志壓縮政策)。對應的列印結果也會變成如下的形式:
當在檢視主題 __consumer_offsets 中的内容時出現下面這種情況:
[consumer_test,testTopic,0]::NULL
,這說明對應的消費位移己經過期了。
在 Kafka 中有一個名為“
delete-expired-group-metadata
”的定時任務來負責清理過期的消費位移,這個定時任務的執行周期由參數
offsets.retention.check.interval.ms
控制,預設值為600000,即10分鐘。
還有 metadata,一般情況下它的值要麼為 null 要麼為空字元串,OffsetsMessageFormatter 會把它展示為 NO_METADATA,否則就按實際值進行展示。
3. offset要送出到__consumer_offsets的哪個分區
通過如下公式可以選出consumer消費的offset要送出到__consumer_offsets的哪個分區
公式:
hash(consumerGroupId) % __consumer_offsets主題的分區數
參考
__consumer_offsets