kafka源码阅读(5)-请求与响应
kafka源码阅读(5)-请求与响应
基于开源 kafka 2.5 版本。
如无特殊说明,文中代码片段将删除 debug 信息、异常触发、英文注释等代码,以便观看核心代码。
本篇跟随胡夕大佬的步伐,本篇开启 kafka 客户端请求与响应的源码阅读,特别是针对请求队列源码进行分析。
请求
首先看到 RequestChannel.scala 文件,该文件包含了请求和响应类的定义,以及处理请求与响应的相关代码,这些代码均封装在RequestChannel
这个类中,这个类我们之后会分析,首先看一下请求类的定义:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
sealed trait BaseRequest
// 用于通知请求处理器(RequestHandler)broker关闭事件,只在内部进行使用
case object ShutdownRequest extends BaseRequest
class Request(
// 请求所属的处理线程编号
val processor: Int,
// 请求的参数(addr、header等)
val context: RequestContext,
// 请求对象被创建的时间,用于监控指标
val startTimeNanos: Long,
// 缓冲对象池,用于复用缓冲对象,减少gc压力
memoryPool: MemoryPool,
// 请求体
@volatile private var buffer: ByteBuffer,
// 管理监控指标
metrics: RequestChannel.Metrics
) extends BaseRequest {
// ...
}
首先看到第一个参数processor
,这个参数指明了该请求由哪个 processor 线程进行处理。什么是 processor 线程?每个 processor 相当于线程池中的一个线程,线程的数量由 broker 的 num.network.threads 参数决定,processor 负责处理网络请求的接收和响应。因此请求的processor
参数是为了便于追踪请求由哪个 processor 处理、监控每个 processor 的负载等。
参考
极客时间《Kafka核心源码解读》——胡夕
本文由作者按照 CC BY 4.0 进行授权