消息引擎的核心職責就是将生産者生産的消息傳輸到消費者,設計消息格式是各大消息引擎架構的關鍵問題,因為消息格式決定了消息引擎的性能和效率。本文帶大家探究消息引擎kafka目前所用的message格式是什麼。
一、Kafka message format
kafka從0.11.0版本開始所使用的消息格式版本為v2,參考了 Protocol Buffer而引入了變長整型(Varints)和 ZigZag 編碼。Varints是使用一個或多個位元組來序列化整數的一種方法,數值越小,其所占用的位元組數就越少。ZigZag編碼以一種鋸齒形(zig-zags)的方式來回穿梭于正負整數之間, 以使得帶符号整數映射為無符号整數,這樣可以使得絕對值較小的負數仍然享有較小的Varints編碼值,比如-1編碼為1,1編碼為2,-2編碼為3。
kafka v0和v1版本的消息格式,如果消息本身沒有key,那麼key length字段為-1,int類型的需要4個位元組來儲存,而如果采用Varints來編碼則隻需要一個位元組。根據Varints的規則可以推導出0-63之間的數字占1個位元組,64-8191之間的數字占2個位元組,8192-1048575之間的數字占3個位元組。而kafka broker的配置message.max.bytes的預設大小為1000012(Varints編碼占3個位元組),如果消息格式中與長度有關的字段采用Varints的編碼的話, 絕大多數情況下都會節省空間,而v2版本的消息格式也正是這樣做的。不過需要注意的是Varints并非一直會省空間,一個int32最長會占用5個位元組(大于預設的4位元組), 一個int64最長會占用10位元組(大于預設的8位元組)。
因為Kafka的message經曆過幾次的版本疊代更改,本文以v2版本為例講述。
二、Record Batch
在Kafka中,資料是按照topic和partition的方式進行組織和存儲的。每個partition的資料被分成一個或多個segment檔案,并且每個segment檔案包含若幹個Record Batch。是以,Record Batch也是Kafka中重要的資料結構之一。
在Kafka中,Record Batch指的是一組相關的消息集合,它們具有相同的key、value類型和所屬的topic和partition。每個Record Batch包含若幹條消息(Record),并且這些消息被順序地寫入到磁盤中,以提高讀取效率。
具體而言,Record Batch由以下幾部分構成:
Record Batch Header:包含了目前Batch的中繼資料,如Magic Code、Batch Size、First Offset等資訊。
Record Header:每個Record都附帶有一個Header,用于描述該Record的中繼資料資訊,例如時間戳、壓縮類型、CRC校驗值等。
Record Body:記錄具體的消息内容,包括Key、Value等字段。
需要注意的是,Kafka的Record Batch通常具有比較大的體積(預設大小為16KB),是以可以将多個相關的消息打包在一起進行傳輸和處理,進而提高了消息的傳輸效率和吞吐量。另外,Kafka還支援對Record Batch進行壓縮和批量操作,以進一步提高資料的傳輸效率和性能。
總的來說,Record Batch是Kafka中定義的一個重要資料結構,用于管理群組織消息,提高消息的讀寫效率和傳輸性能。
baseOffset: int64 辨別目前的batch的起始偏移量
batchLength: int32 該batch的長度
partitionLeaderEpoch: int32 確定資料可靠性
magic: int8 魔法數字,目前為2,也即目前的message版本為v2版本
crc: int32 crc校驗
attributes: int16 消息屬性
bit 0~2: 是否壓縮和壓縮的格式
0: no compression
1: gzip
2: snappy
3: lz4
4: zstd
bit 3: timestampType
bit 4: isTransactional (0 means not transactional)
bit 5: isControlBatch (0 means not a control batch)
bit 6: hasDeleteHorizonMs (0 means baseTimestamp is not set as the delete horizon for compaction)
bit 7~15: unused
lastOffsetDelta: int32 RecordBatch中最後一個Record的offset與first offset的內插補點
baseTimestamp: int64 第一條時間戳
maxTimestamp: int64 最大的時間戳,保證消息組裝時的正确性
producerId: int64 支援幂等性
producerEpoch: int16 支援幂等性
baseSequence: int32 支援幂等性,消息序号
records: [Record] Record個數
用以下圖表示 V2 版本消息批次的樣子:
三、Record
在Kafka中,Record Batch和Record是兩種不同的資料結構,但它們之間存在着緊密的關系。
Record是指Kafka中的一條消息,通常由Key、Value、Timestamp等字段組成。而Record Batch是指将多個相關的Record打包在一起進行傳輸和處理的資料結構,每個Record Batch通常包含若幹條記錄,并且這些記錄具有相同的key、value類型和所屬的topic和partition。
具體來說,每個Record Batch中的Record都被依次存儲在一個連續的二進制資料塊中,每個Record包含自己的Header和Body部分。而Record Batch則包含了目前Batch的中繼資料資訊和所有記錄的中繼資料資訊,如Batch Size、First Offset、Last Offset、CRC校驗值等。
消息格式如下所示:
#消息長度
length: varint
#消息屬性
attributes: int8
# 時間戳增量
bit 0~7: unusedtimestampDelta: varlong
#偏移量增量
offsetDelta: varint
#key長度
keyLength: varint
#key值
key: byte[]
#value長度
valueLen: varint
#value值
value: byte[]
#header資訊
Headers => [Header]
Record資訊通過如下方式封裝
public static int writeTo(DataOutputStream out,
int offsetDelta,
long timestampDelta,
ByteBuffer key,
ByteBuffer value,
Header[] headers) throws IOException {
// 消息總數
int sizeInBytes = sizeOfBodyInBytes(offsetDelta, timestampDelta, key, value, headers);
ByteUtils.writeVarint(sizeInBytes, out);
// 屬性
byte attributes = 0; // there are no used record attributes at the moment
out.write(attributes);
// 時間增量
ByteUtils.writeVarlong(timestampDelta, out);
// 位移增量
ByteUtils.writeVarint(offsetDelta, out);
// key
if (key == null) {
ByteUtils.writeVarint(-1, out);
} else {
int keySize = key.remaining();
// key size
ByteUtils.writeVarint(keySize, out);
// key
Utils.writeTo(out, key, keySize);
}
// Value
if (value == null) {
ByteUtils.writeVarint(-1, out);
} else {
int valueSize = value.remaining();
// value size
ByteUtils.writeVarint(valueSize, out);
// value
Utils.writeTo(out, value, valueSize);
}
// header
ByteUtils.writeVarint(headers.length, out);
for (Header header : headers) {
// header key
String headerKey = header.key();
byte[] utf8Bytes = Utils.utf8(headerKey);
// header key 長度
ByteUtils.writeVarint(utf8Bytes.length, out);
// header key 值
out.write(utf8Bytes);
// header value
byte[] headerValue = header.value();
if (headerValue == null) {
ByteUtils.writeVarint(-1, out);
} else {
// header value 長度
ByteUtils.writeVarint(headerValue.length, out);
// header value 值
out.write(headerValue);
}
}
return ByteUtils.sizeOfVarint(sizeInBytes) + sizeInBytes;
}
根據以上代碼邏輯,用以下圖表示 V2 版本消息格式的樣子:
四、總結
message(又稱record)總是分批寫入的。一批消息的技術術語是一個record batch:
- 一個record batch包含一個或多個record。
- 在退化的情況下,我們可以有一個包含單個record的record batch。
- record batch和record有它們自己的headers。