天天看点

Kafka竟然也用二分搜索算法查找索引!(上)索引类图及源文件组织架构AbstractIndex代码结构Kafka的索引底层实现原理

索引应用二分查找算法快速定位查询索引项。

难得的是,Kafka的索引组件中应用了二分查找算法,而且社区还针对Kafka自身的特点对其进行了改良。

索引类图及源文件组织架构

Kafka竟然也用二分搜索算法查找索引!(上)索引类图及源文件组织架构AbstractIndex代码结构Kafka的索引底层实现原理

都位于core包的/src/main/scala/kafka/log

AbstractIndex.scala

定义了最顶层的抽象类,这个类封装了所有索引类型的公共操作。

LazyIndex.scala

定义了AbstractIndex上的一个包装类,实现索引项延迟加载。这个类主要是为了提高性能,并无功能上的改进

OffsetIndex.scala

定义位移索引,保存“<位移值,文件磁盘物理位置>”对。

TimeIndex.scala

定义时间戳索引,保存“<时间戳,位移值>”对。

TransactionIndex.scala

定义事务索引,为已中止事务(Aborted Transcation)保存重要的元数据信息。

只有启用Kafka事务后,这个索引才有可能出现。

AbstractIndex代码结构

我们先来看下AbstractIndex的类定义:

abstract class AbstractIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSize: Int = -1, val writable: Boolean) extends Closeable {
    ......
    }           

4个属性字段。由于是一个抽象基类,它的所有子类自动地继承了这4个字段。

即Kafka所有类型的索引对象都定义了这些属性:

索引文件(file)

每个索引对象在磁盘上都对应了一个索引文件。你可能注意到了,这个字段是var型,说明它是可以被修改的。难道索引对象还能动态更换底层的索引文件吗?是的。自1.1.0版本之后,Kafka允许迁移底层的日志路径,所以,索引文件自然要是可以更换的。

起始位移值(baseOffset)。索引对象对应日志段对象的起始位移值。举个例子,如果你查看Kafka日志路径的话,就会发现,日志文件和索引文件都是成组出现的。比如说,如果日志文件是00000000000000000123.log,正常情况下,一定还有一组索引文件00000000000000000123.index、00000000000000000123.timeindex等。这里的“123”就是这组文件的起始位移值,也就是baseOffset值。

索引文件最大字节数(maxIndexSize)。它控制索引文件的最大长度。Kafka源码传入该参数的值是Broker端参数segment.index.bytes的值,即10MB。这就是在默认情况下,所有Kafka索引文件大小都是10MB的原因。

索引文件打开方式(writable)。“True”表示以“读写”方式打开,“False”表示以“只读”方式打开。如果我没记错的话,这个参数应该是我加上去的,就是为了修复我刚刚提到的那个Bug。

AbstractIndex是抽象的索引对象类。它是承载索引项的容器,而每个继承它的子类负责定义具体的索引项结构。比如,

OffsetIndex的索引项是<位移值,物理磁盘位置>对

TimeIndex的索引项是<时间戳,位移值>对

基于这样的设计理念,AbstractIndex类中定义了一个抽象方法entrySize来表示不同索引项的大小,如下所示:

protected def entrySize: Int      

子类实现该方法时需要给定自己索引项的大小,对于OffsetIndex而言,该值就是8;对于TimeIndex而言,该值是12,如下所示:

// OffsetIndex
override def entrySize = 8
// TimeIndex
override def entrySize = 12      

为什么是8和12?

在OffsetIndex中,位移值用4个字节表示,物理磁盘位置也用4个字节,所以共8字节。位移值不是长整型吗,应该8个字节才对啊。

还记得AbstractIndex已保存了baseOffset?这里的位移值,实际上是相对于baseOffset的相对位移值,即真实位移值减去baseOffset。

使用相对位移值能够有效地节省磁盘空间。

而Broker端参数log.segment.bytes是整型,这说明,Kafka中每个日志段文件的大小不会超过2^32,即4GB,这就说明同一个日志段文件上的位移值减去baseOffset的差值一定在整数范围内。因此,源码只需4个字节保存即可。

同理,TimeIndex中的时间戳类型是长整型,占用8个字节,位移依然使用相对位移值,占用4个字节,因此总共需要12个字节。

Kafka的索引底层实现原理

内存映射文件,即Java中的MappedByteBuffer。

内存映射文件的主要优势在于,它有很高的I/O性能,特别是对于索引这样的小文件来说,由于文件内存被直接映射到一段虚拟内存上,访问内存映射文件的速度要快于普通的读写文件速度。

在Linux的这段映射的内存区域就是内核的页缓存(Page Cache)。里面的数据无需重复拷贝到用户态空间,避免了大量不必要的时间、空间消耗。

在AbstractIndex中,这个MappedByteBuffer就是名为mmap的变量。接下来,我用注释的方式,带你深入了解下这个mmap的主要流程。

@volatile
      protected var mmap: MappedByteBuffer = {
        // 第1步:创建索引文件
        val newlyCreated = file.createNewFile()
        // 第2步:以writable指定的方式(读写方式或只读方式)打开索引文件
        val raf = if (writable) new RandomAccessFile(file, "rw") else new RandomAccessFile(file, "r")
        try {
          if(newlyCreated) {
            if(maxIndexSize < entrySize) // 预设的索引文件大小不能太小,如果连一个索引项都保存不了,直接抛出异常
              throw new IllegalArgumentException("Invalid max index size: " + maxIndexSize)
            // 第3步:设置索引文件长度,roundDownToExactMultiple计算的是不超过maxIndexSize的最大整数倍entrySize
            // 比如maxIndexSize=1234567,entrySize=8,那么调整后的文件长度为1234560
            raf.setLength(roundDownToExactMultiple(maxIndexSize, entrySize))
          }
    
    
          // 第4步:更新索引长度字段_length
          _length = raf.length()
          // 第5步:创建MappedByteBuffer对象
          val idx = {
            if (writable)
              raf.getChannel.map(FileChannel.MapMode.READ_WRITE, 0, _length)
            else
              raf.getChannel.map(FileChannel.MapMode.READ_ONLY, 0, _length)
          }
          /* set the position in the index for the next entry */
          // 第6步:如果是新创建的索引文件,将MappedByteBuffer对象的当前位置置成0
          // 如果索引文件已存在,将MappedByteBuffer对象的当前位置设置成最后一个索引项所在的位置
          if(newlyCreated)
            idx.position(0)
          else
            idx.position(roundDownToExactMultiple(idx.limit(), entrySize))
          // 第7步:返回创建的MappedByteBuffer对象
          idx
        } finally {
          CoreUtils.swallow(raf.close(), AbstractIndex) // 关闭打开索引文件句柄
        }
      }      

这些代码最主要的作用就是创建mmap对象。要知道,AbstractIndex其他大部分的操作都是和mmap相关。

案例:

  • 计算索引对象中当前有多少个索引项
protected var _entries: Int = mmap.position() / entrySize      
  • 计算索引文件最多能容纳多少个索引项
private[this] var _maxEntries: Int = mmap.limit() / entrySize      

再进一步,有了这两个变量,我们就能够很容易地编写一个方法,来判断当前索引文件是否已经写满:

def isFull: Boolean = _entries >= _maxEntries      

AbstractIndex最重要的就是这个mmap变量。事实上,AbstractIndex继承类实现添加索引项的主要逻辑,也就是向mmap中添加对应的字段。