文章

kafka源码阅读(5)-请求与响应

kafka源码阅读(5)-请求与响应

基于开源 kafka 2.5 版本。

如无特殊说明,文中代码片段将删除 debug 信息、异常触发、英文注释等代码,以便观看核心代码。

本篇跟随胡夕大佬的步伐,本篇开启 kafka 客户端请求与响应的源码阅读,特别是针对请求队列源码进行分析。

请求

首先看到 RequestChannel.scala 文件,该文件包含了请求和响应类的定义,以及处理请求与响应的相关代码,这些代码均封装在RequestChannel这个类中,这个类我们之后会分析,首先看一下请求类的定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
sealed trait BaseRequest
// 用于通知请求处理器(RequestHandler)broker关闭事件,只在内部进行使用
case object ShutdownRequest extends BaseRequest

class Request(
  // 请求所属的处理线程编号
  val processor: Int,
  // 请求的参数(addr、header等)
  val context: RequestContext,
  // 请求对象被创建的时间,用于监控指标
  val startTimeNanos: Long,
  // 缓冲对象池,用于复用缓冲对象,减少gc压力
  memoryPool: MemoryPool,
  // 请求体
  @volatile private var buffer: ByteBuffer,
  // 管理监控指标
  metrics: RequestChannel.Metrics
) extends BaseRequest {
  // ...
}

首先看到第一个参数processor,这个参数指明了该请求由哪个 processor 线程进行处理。什么是 processor 线程?每个 processor 相当于线程池中的一个线程,线程的数量由 broker 的 num.network.threads 参数决定,processor 负责处理网络请求的接收和响应。因此请求的processor参数是为了便于追踪请求由哪个 processor 处理、监控每个 processor 的负载等。

参考

极客时间《Kafka核心源码解读》——胡夕

本文由作者按照 CC BY 4.0 进行授权