文章

kafka源码阅读(4)-索引查找

kafka源码阅读(4)-索引查找

基于开源 kafka 2.5 版本。

如无特殊说明,文中代码片段将删除 debug 信息、异常触发、英文注释等代码,以便观看核心代码。

kafka 对索引查找的优化

本篇开篇先介绍点理论知识,即 kafka 对索引查找的优化。首先 kafka 的索引项在索引文件中是按 key 升序的,这是为了使用二分快速定位索引项所在位置。然后关于 kafka 为什么要优化索引的二分查找算法,在AbstractIndex.scala源码里的注释已经说得非常明白,下面将结合自己的理解再总结一下。

首先,kafka 会通过 mmap 机制将索引文件映射到 page cache 中以提高访问索引的速度。现代操作系统的 page cache 一般用 LRU(即 least recently used 算法,或者其变体)来管理 page cache 页,而 kafka 总是在索引的末尾新增内容,并且总是查找末尾的索引(比如 consumer 读取最新消息、follower 同步最新消息),也就是说 kafka 在一段时间内对索引的访问也就末尾那几个页,这与 LRU 算法十分匹配,这样一来这些经常被访问的索引页的命中率就比较高,但事实真是如此吗?

现在给定一个偏移量,通过索引进行查找的时候,二分对缓存并不友好,具体表现如下:

  1. 当索引页增加后,整个二分搜索路径就会发生变更,导致需要访问的大量新页发生缺页。举个例子:当前索引有 13 页,指定偏移量在最后一页(#12),那么搜索路径为 #0, 12, 6, 9, 11, 12,在索引页后续增加到 14 页后,此时指定偏移量在最后一页(#13),那么搜索路径为 #0, 13, 7, 10, 11, 12, 13,可以发现 #7 和 #10 是第一次访问的页,这将导致缺页中断。
  2. 通过第 1 点中的例子可以发现,二分首先得获取头尾两个索引,分别位于第一页和最后一页,然后访问中间页….总而言之就是跳跃幅度太大,空间局部性不好,并且每次都要访问前面几个实际上不包含偏移量的索引页,也会导致运行效率降低。

为了解决这些问题,kafka 对索引文件进行了冷热区域的划分,其中热区域由索引文件末尾的 N 个索引项组成,其余的都属于冷区域,该热区域占整个索引文件的小部分空间。具体来说,当查找一个偏移量对应的索引时,首先确定该偏移量所在的区域,再在对应的区域内进行二分,用伪代码表示如下:

1
2
3
4
if (target > indexEntry[end - N]) // if the target is in the last N entries of the index
	binarySearch(end - N, end)
else
	binarySearch(begin, end - N)

这里相比传统二分改进的点在于,由于大量访问集中在热区域,先用 O(1) 的复杂度确定了索引所在区域,将大大减少对冷区域页的访问,大大减少缺页率,增加了热区域那几个页的命中率。

在代码实现上,N = 8192 / entrySize,现代操作系统最小页大小为 4096,因此我们可以说热区域最多占据 3 页。

索引基类AbstractIndex

AbstractIndex是 .index 索引(OffsetIndex)和 .timeindex 索引(TimeIndex)的基类,而 .txnindex 索引(TransactionIndex)则是单独实现,不继承自AbstractIndex

这个类包含了一些索引的公共操作,以及维护索引文件等。先来看下类的定义,成员的定义都用注释进行说明:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
abstract class AbstractIndex(
  // 索引文件
  @volatile var file: File,
  // 索引基础偏移量,文件以该偏移量命名,比如00000000123.index
  val baseOffset: Long,
  // 索引文件最大字节大小,由broker端的segment.index.bytes参数确定,默认是10MB
  val maxIndexSize: Int = -1,
  // 索引文件打开方式,true表示文件可读写,false表示只读
  val writable: Boolean
) extends Closeable {
  // 索引项大小,由具体子类定义
  protected def entrySize: Int
  // 即上一小节中的N
  protected def _warmEntries: Int = 8192 / entrySize
}

kafka 对索引是通过 mmap 机制来进行高速访问的(关于 mmap 的介绍可以参考《APUE 中文第三版》一书里 14.8节的存储映射 I/O,或者参考我的 APUE 阅读笔记),因此AbstractIndex还有一个很重要的成员mmap,这个成员的类型MappedByteBuffer就是对操作系统 mmap 的封装:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@volatile
protected var mmap: MappedByteBuffer = {
  // 创建索引文件
  val newlyCreated = file.createNewFile()
  // 使文件可以被随机访问
  val raf = if (writable) new RandomAccessFile(file, "rw") else new RandomAccessFile(file, "r")
  try {
    // 将文件在磁盘上预先分配到最大大小
    if(newlyCreated) {
      if(maxIndexSize < entrySize)
        throw new IllegalArgumentException("Invalid max index size: " + maxIndexSize)
      raf.setLength(roundDownToExactMultiple(maxIndexSize, entrySize))
    }

    // 创建MappedByteBuffer,并将其的将读写指针定位到最后一个索引项
    _length = raf.length()
    val idx = {
      if (writable)
        raf.getChannel.map(FileChannel.MapMode.READ_WRITE, 0, _length)
      else
        raf.getChannel.map(FileChannel.MapMode.READ_ONLY, 0, _length)
    }
    if(newlyCreated)
      idx.position(0)
    else
      idx.position(roundDownToExactMultiple(idx.limit(), entrySize))
    // 返回该MappedByteBuffer
    idx
  } finally {
    CoreUtils.swallow(raf.close(), AbstractIndex)
  }
}

索引项查找

索引项的查找也由该基类实现,如前所述,具体是通过二分查找算法实现的,这里的二分就是开篇提到的优化后的二分:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
// 查找指定值的 lower bound 和 upper bound,即指定值的左右相邻的两个值
// lower bound 即小于等于指定值的最大值
// upper bound 即大于等于指定值的最小值
private def indexSlotRangeFor(
  // 待查找的索引文件
  idx: ByteBuffer,
  // 指定值
  target: Long,
  // 该值为索引项的key还是value
  searchEntity: IndexSearchEntity
): (Int, Int) = {
  if(_entries == 0)
    return (-1, -1)

  // 创建二分查找函数,方便后续调用
  def binarySearch(begin: Int, end: Int) : (Int, Int) = {
    var lo = begin
    var hi = end
    while(lo < hi) {
      val mid = (lo + hi + 1) >>> 1
      val found = parseEntry(idx, mid)
      val compareResult = compareIndexEntry(found, target, searchEntity)
      if(compareResult > 0)
        hi = mid - 1
      else if(compareResult < 0)
        lo = mid
      else
        return (mid, mid)
    }
    (lo, if (lo == _entries - 1) -1 else lo + 1)
  }

  // 如果指定值处于索引文件的热区域,则搜索热区域
  val firstHotEntry = Math.max(0, _entries - 1 - _warmEntries)
  if(compareIndexEntry(parseEntry(idx, firstHotEntry), target, searchEntity) < 0) {
    return binarySearch(firstHotEntry, _entries - 1)
  }

  if(compareIndexEntry(parseEntry(idx, 0), target, searchEntity) > 0)
    return (-1, 0)

  // 否则搜索冷区域
  binarySearch(0, firstHotEntry)
}

// 获取第n个索引项
// 由于具体的索引类型不同,交给子类实现
protected def parseEntry(buffer: ByteBuffer, n: Int): IndexEntry

索引文件大小调整

我们发现AbastractIndex还有一个比较长的resize方法,这个方法在kafka源码阅读(2)中其实有涉及到,即在日志初始化时的加载日志段中会被调用,目的是将当前索引文件大小预分配到最大大小以便提高新增索引项时的效率。另外当日志段关闭的时候也会将索引用resize对齐到当前的索引项个数,以节省磁盘空间。下面看一下具体实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
def resize(newSize: Int): Boolean = {
  inLock(lock) {
    val roundedNewSize = roundDownToExactMultiple(newSize, entrySize)

    if (_length == roundedNewSize) {
      debug(s"Index ${file.getAbsolutePath} was not resized because it already has size $roundedNewSize")
      false
    } else {
      // 使文件可以随机访问
      val raf = new RandomAccessFile(file, "rw")
      try {
        val position = mmap.position()

        // Windows不允许文件在被mmap的时候改变大小,需要先取消映射
        if (OperatingSystem.IS_WINDOWS)
          safeForceUnmap()
        // 改变文件大小
        raf.setLength(roundedNewSize)
        _length = roundedNewSize
        // 重新mmap到新的大小
        mmap = raf.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, roundedNewSize)
        _maxEntries = mmap.limit() / entrySize
        mmap.position(position)
        debug(s"Resized ${file.getAbsolutePath} to $roundedNewSize, position is ${mmap.position()} " +
          s"and limit is ${mmap.limit()}")
        true
      } finally {
        CoreUtils.swallow(raf.close(), AbstractIndex)
      }
    }
  }
}

至于为什么改变文件大小后要重新创建mmap对象,如之前所说的,可以翻阅《APUE 中文第三版》一书里 14.8节的存储映射 I/O,或者参考我的 APUE 阅读笔记。

最后,索引基类的核心内容基本就到这里,我们下面接着分析各个索引子类。

偏移量索引OffsetIndex

本系列博客的第一篇就介绍过,偏移量索引是为了加速查找某个偏移量对应的消息。

类构造方法就不多说了,与AbstractIndex一样。我们先过一下之前遗留的一些问题,首先是索引项大小entrySize方法:

1
2
// 8 = 4 (相对偏移量大小) + 4 (文件物理位置)
override def entrySize = 8

之所以存储的是相对偏移量,是因为完整的偏移量是 8 字节的长整型,相比只需要占据 4 字节的相对偏移量更加浪费存储空间。我们已经知道了基础偏移量 baseOffset,在获取完整偏移量的时候只需要加上这个 baseOffset 就行。

然后是获取第 n 个索引项的parseEntry方法,也比较简单:

1
2
3
4
5
6
7
8
9
override protected def parseEntry(buffer: ByteBuffer, n: Int): OffsetPosition = {
  OffsetPosition(baseOffset + relativeOffset(buffer, n), physical(buffer, n))
}

// 读取第n个索引项的key(偏移量)
private def relativeOffset(buffer: ByteBuffer, n: Int): Int = buffer.getInt(n * entrySize)

// 读取第n个索引项的value(物理位置)
private def physical(buffer: ByteBuffer, n: Int): Int = buffer.getInt(n * entrySize + 4)

再来看下新增索引项的方法append

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
def append(offset: Long, position: Int): Unit = {
  inLock(lock) {
    require(!isFull, "Attempt to append to a full index (size = " + _entries + ").")
    if (_entries == 0 || offset > _lastOffset) {
      // 向文件尾增加一个索引项
      mmap.putInt(relativeOffset(offset))
      mmap.putInt(position)
      _entries += 1
      _lastOffset = offset
      // 校验当前读写位置必须是索引项的整数倍
      require(_entries * entrySize == mmap.position(), entries + " entries but file position in index is " + mmap.position() + ".")
    } else {
      // 索引项的key必须是单调递增的,否则直接抛异常
      throw new InvalidOffsetException(s"Attempt to append an offset ($offset) to position $entries no larger than" +
        s" the last offset appended (${_lastOffset}) to ${file.getAbsolutePath}.")
    }
  }
}

最后,对于查找索引项,OffsetIndex定义了lookup方法,用于查找给定偏移量的 lower bound,另外还有一个fetchUpperBoundOffset方法用于查找给定偏移量的 upper bound。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
def lookup(targetOffset: Long): OffsetPosition = {
  maybeLock(lock) {
    // 这里的duplicate相当于复制了一份mmap的position、limit和mark,而mmap的内容是共享的
    // 这是为了不影响mmap的写入,当lookup结束之后,原来的mmap的这几个内部变量都没发生改变
    val idx = mmap.duplicate
    val slot = largestLowerBoundSlotFor(idx, targetOffset, IndexSearchType.KEY)
    if(slot == -1)
      OffsetPosition(baseOffset, 0) // 注意这里当找不到lower bound时的返回值
    else
      parseEntry(idx, slot)
  }
}

def fetchUpperBoundOffset(fetchOffset: OffsetPosition, fetchSize: Int): Option[OffsetPosition] = {
  maybeLock(lock) {
    val idx = mmap.duplicate
    val slot = smallestUpperBoundSlotFor(idx, fetchOffset.position + fetchSize, IndexSearchType.VALUE)
    if (slot == -1)
      None // 注意这里当找不到upper bound时的返回值
    else
      Some(parseEntry(idx, slot))
  }
}

乍一看这俩方法不是差不多吗,为什么方法名差这么多?而且它们的返回值一个是OffsetPosition另一个却是Option[OffsetPosition]。原因是这两个方法在功能上定位不太一样:

  • lookup:一般是为了确定读取日志的起始位置(下界),比如在kafka源码阅读(1)中提到,日志段的read方法调用的translateOffset方法时,其内部就使用了 lookup,如果没找到指定偏移量的 lower bound,有可能是该偏移量小于索引的基础偏移量,此时 lookup 直接返回这个基础偏移量,最终也可以。
  • fetchUpperBoundOffset:一般是为了确定读取日志的上界,此时如果找不到上界的话,就不能像 lookup 那样返回一个默认值,需要告诉调用者找不到上界,避免调用者读取日志越界。

时间戳索引TimeIndex

时间戳索引是为了根据时间戳快速查找消息的偏移量,通过时间戳索引,kafka 就可以高效地从用户指定的某个时间点开始消费、按照日志保留策略清理某个时间点之前的消息等。

至于TimeIndex的源码部分没什么好说的,比较简单,看懂了OffsetIndex就能看懂这个,只是索引项大小变成了 12,包括作为 key 的 8 字节时间戳,以及作为 value 的 4 字节相对偏移量。

懒加载索引LazyIndex

懒加载索引是为了减少 kafka 启动时的内存映射文件数量,加快服务启动速度。因此,懒加载索引并不是一个新的索引类型,只是用来懒加载AbstractIndex的一个包装类。源码就不放上来了,明白思路后,代码一眼懂。LazyIndex 的做法是先保存创建索引对象的那几个参数(file、baseOffset…),等到调用 get 方法的时候,再用这些参数去创建相应的索引对象,比较简单。

总结

本篇开篇特别介绍了 kafka 对索引项查找的相关优化,主要是通过冷热分区,让多数的访问集中在索引文件的后几页,充分适应操作系统 page cache 的 LRU 淘汰算法,减少了缺页 I/O 带来的运行效率下降问题。随后介绍了偏移量索引以及时间戳索引的实现,最后提了一嘴用于懒加载索引的 LazyIndex。关于 kafka 的核心内容之一,日志以及索引的源码分析就先告一段落,接下来将进入 kafka 的请求处理相关源码阅读以及分析。

最后,写到此处,我发现 kafka 将日志组织成日志段,还有一个好处在于,日志段与索引文件共同组成了一种二级索引:当定位某个偏移量对应的消息时,先通过 NavigableMap 定位到某个日志段,再在通过日志段的索引文件定位到数据文件上的某个位置,即日志段充当了一级索引,索引文件充当二级索引。

参考

极客时间《Kafka核心源码解读》——胡夕

本文由作者按照 CC BY 4.0 进行授权