天天看點

Kafka源碼分析之MemoryRecords

        MemoryRecords是Kakfa中Record在記憶體中的實作形式,它基于Java NIO中ByteBuffer來實作。MemoryRecords中成員變量如下:

        其中,compressor是僅僅用于appends的壓縮器Compressor執行個體,writeLimit是可寫緩沖區的寫限制,它可能小于緩存區的裝載能力,initialCapacity是最初的緩沖區的裝載能力,僅用于重新配置設定可寫的記錄,buffer是java NIO的ByteBuffer執行個體,用于讀的底層緩沖區,而writable是一個标志記憶體記錄是否可寫的狀态位。

        再來看下MemoryRecords的構造函數,如下:

        通過MemoryRecords的構造函數我們可以知道,MemoryRecords有兩種基本的狀态,一個是隻寫,一個是隻讀,當為隻寫時,标志位writable為true,此時MemoryRecords中ByteBuffer類型的成員變量buffer被設定為null,同時利用入參ByteBuffer類型的buffer和CompressionType類型的type構造Compressor執行個體compressor;當為隻讀時,buffer設定為入參ByteBuffer類型的buffer,compressor設定為null。

        MemoryRecords最主要的一個功能就是添加記錄,而實作這一功能的方法就是append()方法,代碼如下:

        上面為key、value形式的append,而還有一種Record形式的append,代碼如下:

        MemoryRecords中還有一個針對指定Record的key、value來判斷是否尚有餘地的hasRoomFor()方法,代碼如下:

        hasRoomFor()方法中,需要首先判斷writable是否為true,writable為false的話會直接傳回false,直接通知調用者MemoryRecords已無餘地存儲Record。writable為true的話,還需要判斷compressor的已寫入資料大小numRecordsWritten是否為0,為0 的話,根據MemoryRecords的最初的緩沖區的裝載能力initialCapacity是否大于key、value、offset所占大小、size所占大小之和來确定是否尚有足夠空間容納一個Record,不為0

的話,則根據可寫緩沖區的寫限制writeLimit是否大于compressor預估算的大小與key、value、offset所占大小、size所占大小之和來确定是否尚有足夠空間容納一個Record。compressor預估算的大小通過如下方法來判斷: