天天看點

Kafka消息的實體存放路徑

https://blog.csdn.net/weixin_42628594/article/details/85571380

Kafka的Log存儲解析

https://blog.csdn.net/jewes/article/details/42970799

Kafka中的Message是以topic為基本機關組織的,不同的topic之間是互相獨立的。每個topic又可以分成幾個不同的partition(每個topic有幾個partition是在建立topic時指定的),每個partition存儲一部分Message。借用官方的一張圖,可以直覺地看到topic和partition的關系。

Kafka消息的實體存放路徑

partition是以檔案的形式存儲在檔案系統中,比如,建立了一個名為page_visits的topic,其有5個partition,那麼在Kafka的資料目錄中(由配置檔案中的log.dirs指定的)中就有這樣5個目錄: page_visits-0, page_visits-1,page_visits-2,page_visits-3,page_visits-4,其命名規則為<topic_name>-<partition_id>,裡面存儲的分别就是這5個partition的資料。

接下來,本文将分析partition目錄中的檔案的存儲格式和相關的代碼所在的位置。

Partition的資料檔案

Partition中的每條Message由offset來表示它在這個partition中的偏移量,這個offset不是該Message在partition資料檔案中的實際存儲位置,而是邏輯上一個值,它唯一确定了partition中的一條Message。是以,可以認為offset是partition中Message的id。partition中的每條Message包含了以下三個屬性:

offset

MessageSize

data

其中offset為long型,MessageSize為int32,表示data有多大,data為message的具體内容。它的格式和Kafka通訊協定中介紹的MessageSet格式是一緻。

Partition的資料檔案則包含了若幹條上述格式的Message,按offset由小到大排列在一起。它的實作類為FileMessageSet,類圖如下:

Kafka消息的實體存放路徑

它的主要方法如下:

append: 把給定的ByteBufferMessageSet中的Message寫入到這個資料檔案中。

searchFor: 從指定的startingPosition開始搜尋找到第一個Message其offset是大于或者等于指定的offset,并傳回其在檔案中的位置Position。它的實作方式是從startingPosition開始讀取12個位元組,分别是目前MessageSet的offset和size。如果目前offset小于指定的offset,那麼将position向後移動LogOverHead+MessageSize(其中LogOverHead為offset+messagesize,為12個位元組)。

read:準确名字應該是slice,它截取其中一部分傳回一個新的FileMessageSet。它不保證截取的位置資料的完整性。

sizeInBytes: 表示這個FileMessageSet占有了多少位元組的空間。

truncateTo: 把這個檔案截斷,這個方法不保證截斷位置的Message的完整性。

readInto: 從指定的相對位置開始把檔案的内容讀取到對應的ByteBuffer中。

我們來思考一下,如果一個partition隻有一個資料檔案會怎麼樣?

新資料是添加在檔案末尾(調用FileMessageSet的append方法),不論檔案資料檔案有多大,這個操作永遠都是O(1)的。

查找某個offset的Message(調用FileMessageSet的searchFor方法)是順序查找的。是以,如果資料檔案很大的話,查找的效率就低。

那Kafka是如何解決查找效率的的問題呢?有兩大法寶:1) 分段 2) 索引。

資料檔案的分段

Kafka解決查詢效率的手段之一是将資料檔案分段,比如有100條Message,它們的offset是從0到99。假設将資料檔案分成5段,第一段為0-19,第二段為20-39,以此類推,每段放在一個單獨的資料檔案裡面,資料檔案以該段中最小的offset命名。這樣在查找指定offset的Message的時候,用二分查找就可以定位到該Message在哪個段中。

為資料檔案建索引

資料檔案分段使得可以在一個較小的資料檔案中查找對應offset的Message了,但是這依然需要順序掃描才能找到對應offset的Message。為了進一步提高查找的效率,Kafka為每個分段後的資料檔案建立了索引檔案,檔案名與資料檔案的名字是一樣的,隻是檔案擴充名為.index。

索引檔案中包含若幹個索引條目,每個條目表示資料檔案中一條Message的索引。索引包含兩個部分(均為4個位元組的數字),分别為相對offset和position。

相對offset:因為資料檔案分段以後,每個資料檔案的起始offset不為0,相對offset表示這條Message相對于其所屬資料檔案中最小的offset的大小。舉例,分段後的一個資料檔案的offset是從20開始,那麼offset為25的Message在index檔案中的相對offset就是25-20 = 5。存儲相對offset可以減小索引檔案占用的空間。

position,表示該條Message在資料檔案中的絕對位置。隻要打開檔案并移動檔案指針到這個position就可以讀取對應的Message了。

index檔案中并沒有為資料檔案中的每條Message建立索引,而是采用了稀疏存儲的方式,每隔一定位元組的資料建立一條索引。這樣避免了索引檔案占用過多的空間,進而可以将索引檔案保留在記憶體中。但缺點是沒有建立索引的Message也不能一次定位到其在資料檔案的位置,進而需要做一次順序掃描,但是這次順序掃描的範圍就很小了。

在Kafka中,索引檔案的實作類為OffsetIndex,它的類圖如下:

Kafka消息的實體存放路徑

---------------------

主要的方法有:

append方法,添加一對offset和position到index檔案中,這裡的offset将會被轉成相對的offset。

lookup, 用二分查找的方式去查找小于或等于給定offset的最大的那個offset

小結

我們以幾張圖來總結一下Message是如何在Kafka中存儲的,以及如何查找指定offset的Message的。

Message是按照topic來組織,每個topic可以分成多個的partition,比如:有5個partition的名為為page_visits的topic的目錄結構為:

partition是分段的,每個段叫LogSegment,包括了一個資料檔案和一個索引檔案,下圖是某個partition目錄下的檔案:

可以看到,這個partition有4個LogSegment。

Kafka消息的實體存放路徑

比如:要查找絕對offset為7的Message:

首先是用二分查找确定它是在哪個LogSegment中,自然是在第一個Segment中。

打開這個Segment的index檔案,也是用二分查找找到offset小于或者等于指定offset的索引條目中最大的那個offset。自然offset為6的那個索引是我們要找的,通過索引檔案我們知道offset為6的Message在資料檔案中的位置為9807。

打開資料檔案,從位置為9807的那個地方開始順序掃描直到找到offset為7的那條Message。

這套機制是建立在offset是有序的。索引檔案被映射到記憶體中,是以查找的速度還是很快的。

繼續閱讀