天天看點

Kafka的Message格式

作者:尚矽谷教育

消息引擎的核心職責就是将生産者生産的消息傳輸到消費者,設計消息格式是各大消息引擎架構的關鍵問題,因為消息格式決定了消息引擎的性能和效率。本文帶大家探究消息引擎kafka目前所用的message格式是什麼。

一、Kafka message format

kafka從0.11.0版本開始所使用的消息格式版本為v2,參考了 Protocol Buffer而引入了變長整型(Varints)和 ZigZag 編碼。Varints是使用一個或多個位元組來序列化整數的一種方法,數值越小,其所占用的位元組數就越少。ZigZag編碼以一種鋸齒形(zig-zags)的方式來回穿梭于正負整數之間, 以使得帶符号整數映射為無符号整數,這樣可以使得絕對值較小的負數仍然享有較小的Varints編碼值,比如-1編碼為1,1編碼為2,-2編碼為3。

Kafka的Message格式

kafka v0和v1版本的消息格式,如果消息本身沒有key,那麼key length字段為-1,int類型的需要4個位元組來儲存,而如果采用Varints來編碼則隻需要一個位元組。根據Varints的規則可以推導出0-63之間的數字占1個位元組,64-8191之間的數字占2個位元組,8192-1048575之間的數字占3個位元組。而kafka broker的配置message.max.bytes的預設大小為1000012(Varints編碼占3個位元組),如果消息格式中與長度有關的字段采用Varints的編碼的話, 絕大多數情況下都會節省空間,而v2版本的消息格式也正是這樣做的。不過需要注意的是Varints并非一直會省空間,一個int32最長會占用5個位元組(大于預設的4位元組), 一個int64最長會占用10位元組(大于預設的8位元組)。

因為Kafka的message經曆過幾次的版本疊代更改,本文以v2版本為例講述。

二、Record Batch

在Kafka中,資料是按照topic和partition的方式進行組織和存儲的。每個partition的資料被分成一個或多個segment檔案,并且每個segment檔案包含若幹個Record Batch。是以,Record Batch也是Kafka中重要的資料結構之一。

在Kafka中,Record Batch指的是一組相關的消息集合,它們具有相同的key、value類型和所屬的topic和partition。每個Record Batch包含若幹條消息(Record),并且這些消息被順序地寫入到磁盤中,以提高讀取效率。

具體而言,Record Batch由以下幾部分構成:

Record Batch Header:包含了目前Batch的中繼資料,如Magic Code、Batch Size、First Offset等資訊。

Record Header:每個Record都附帶有一個Header,用于描述該Record的中繼資料資訊,例如時間戳、壓縮類型、CRC校驗值等。

Record Body:記錄具體的消息内容,包括Key、Value等字段。

需要注意的是,Kafka的Record Batch通常具有比較大的體積(預設大小為16KB),是以可以将多個相關的消息打包在一起進行傳輸和處理,進而提高了消息的傳輸效率和吞吐量。另外,Kafka還支援對Record Batch進行壓縮和批量操作,以進一步提高資料的傳輸效率和性能。

總的來說,Record Batch是Kafka中定義的一個重要資料結構,用于管理群組織消息,提高消息的讀寫效率和傳輸性能。

baseOffset: int64 辨別目前的batch的起始偏移量

batchLength: int32 該batch的長度

partitionLeaderEpoch: int32 確定資料可靠性

magic: int8 魔法數字,目前為2,也即目前的message版本為v2版本

crc: int32 crc校驗

attributes: int16 消息屬性

bit 0~2: 是否壓縮和壓縮的格式

0: no compression

1: gzip

2: snappy

3: lz4

4: zstd

bit 3: timestampType

bit 4: isTransactional (0 means not transactional)

bit 5: isControlBatch (0 means not a control batch)

bit 6: hasDeleteHorizonMs (0 means baseTimestamp is not set as the delete horizon for compaction)

bit 7~15: unused

lastOffsetDelta: int32 RecordBatch中最後一個Record的offset與first offset的內插補點

baseTimestamp: int64 第一條時間戳

maxTimestamp: int64 最大的時間戳,保證消息組裝時的正确性

producerId: int64 支援幂等性

producerEpoch: int16 支援幂等性

baseSequence: int32 支援幂等性,消息序号

records: [Record] Record個數

用以下圖表示 V2 版本消息批次的樣子:

Kafka的Message格式

三、Record

在Kafka中,Record Batch和Record是兩種不同的資料結構,但它們之間存在着緊密的關系。

Record是指Kafka中的一條消息,通常由Key、Value、Timestamp等字段組成。而Record Batch是指将多個相關的Record打包在一起進行傳輸和處理的資料結構,每個Record Batch通常包含若幹條記錄,并且這些記錄具有相同的key、value類型和所屬的topic和partition。

具體來說,每個Record Batch中的Record都被依次存儲在一個連續的二進制資料塊中,每個Record包含自己的Header和Body部分。而Record Batch則包含了目前Batch的中繼資料資訊和所有記錄的中繼資料資訊,如Batch Size、First Offset、Last Offset、CRC校驗值等。

消息格式如下所示:

#消息長度

length: varint

#消息屬性

attributes: int8

# 時間戳增量

bit 0~7: unusedtimestampDelta: varlong

#偏移量增量

offsetDelta: varint

#key長度

keyLength: varint

#key值

key: byte[]

#value長度

valueLen: varint

#value值

value: byte[]

#header資訊

Headers => [Header]

Record資訊通過如下方式封裝

public static int writeTo(DataOutputStream out,

int offsetDelta,

long timestampDelta,

ByteBuffer key,

ByteBuffer value,

Header[] headers) throws IOException {

// 消息總數

int sizeInBytes = sizeOfBodyInBytes(offsetDelta, timestampDelta, key, value, headers);

ByteUtils.writeVarint(sizeInBytes, out);

// 屬性

byte attributes = 0; // there are no used record attributes at the moment

out.write(attributes);

// 時間增量

ByteUtils.writeVarlong(timestampDelta, out);

// 位移增量

ByteUtils.writeVarint(offsetDelta, out);

// key

if (key == null) {

ByteUtils.writeVarint(-1, out);

} else {

int keySize = key.remaining();

// key size

ByteUtils.writeVarint(keySize, out);

// key

Utils.writeTo(out, key, keySize);

}

// Value

if (value == null) {

ByteUtils.writeVarint(-1, out);

} else {

int valueSize = value.remaining();

// value size

ByteUtils.writeVarint(valueSize, out);

// value

Utils.writeTo(out, value, valueSize);

}

// header

ByteUtils.writeVarint(headers.length, out);

for (Header header : headers) {

// header key

String headerKey = header.key();

byte[] utf8Bytes = Utils.utf8(headerKey);

// header key 長度

ByteUtils.writeVarint(utf8Bytes.length, out);

// header key 值

out.write(utf8Bytes);

// header value

byte[] headerValue = header.value();

if (headerValue == null) {

ByteUtils.writeVarint(-1, out);

} else {

// header value 長度

ByteUtils.writeVarint(headerValue.length, out);

// header value 值

out.write(headerValue);

}

}

return ByteUtils.sizeOfVarint(sizeInBytes) + sizeInBytes;

}

根據以上代碼邏輯,用以下圖表示 V2 版本消息格式的樣子:

Kafka的Message格式

四、總結

message(又稱record)總是分批寫入的。一批消息的技術術語是一個record batch:

  • 一個record batch包含一個或多個record。
  • 在退化的情況下,我們可以有一個包含單個record的record batch。
  • record batch和record有它們自己的headers。