字节RPC框架kitex源码阅读(二)
基于kitex@v0.11.3
开篇
在上篇字节RPC框架kitex源码阅读(一)中,简单过了一遍从创建服务、监听端口、建立连接&派发、退出清理的流程,对于代码生成的回调如何在kitex内部得到调用也有了初步的认知。
这篇是(一)的续篇,深入分析remote.Server
如何基于与客户端建立的连接做交互,包括传输、解码、编码等。
remote.ServerTransHandler
我们知道server.Server
主要构建调用链、调用用户定义的回调。与远程传输有关的remote.Server
提供了简单的几个接口方法给server.Server
使用,相当于server.Server
只需要关心调用链要怎么消费封装好的数据,不用管传输如何建立、数据如何封装:
1
2
3
4
5
6
// remote.Server
type Server interface {
Start() chan error
Stop() error
Address() net.Addr
}
而连接建立、数据收发、连接关闭等工作,remote.Server
又依托remote.TransServer
接口提供的这几个方法实现:
1
2
3
4
5
6
type TransServer interface {
CreateListener(net.Addr) (net.Listener, error)
BootstrapServer(net.Listener) (err error)
Shutdown() error
ConnCount() utils.AtomicInt
}
remote.TransServer
又依赖于remote.TransServerHandler
接口提供的这些方法,贯穿连接的整个生命周期所做的一些工作:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
type ServerTransHandler interface {
Write(ctx context.Context, conn net.Conn, send Message) (nctx context.Context, err error)
Read(ctx context.Context, conn net.Conn, msg Message) (nctx context.Context, err error)
// 连接关闭时调用
OnInactive(ctx context.Context, conn net.Conn)
// 处理RPC请求出错时调用
OnError(ctx context.Context, err error, conn net.Conn)
// 处理RPC请求完成后调用,调用调用链交给上层处理,包括业务endpoint
OnMessage(ctx context.Context, args, result Message) (context.Context, error)
SetPipeline(pipeline *TransPipeline)
// 连接新建时调用
OnActive(ctx context.Context, conn net.Conn) (context.Context, error)
// 有RPC请求时调用
OnRead(ctx context.Context, conn net.Conn) error
}
梳理完这几个接口之后,我们从server.Server.Run
开始往下分析,remote.Server
、remote.TransServer
、remote.ServerTransHandler
都在此时被创建出来:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func (s *server) Run() (err error) {
...
// 创建ServerTransHandler,负责数据传输过程中的关键步骤处理
transHdlr, err := s.newSvrTransHandler()
if err != nil {
return err
}
s.Lock()
// TransServer在NewServer中被创建,并与ServerTransHandler绑定
s.svr, err = remotesvr.NewServer(s.opt.RemoteOpt, s.eps, transHdlr)
s.Unlock()
if err != nil {
return err
}
...
// Start开始异步监听
errCh := s.svr.Start()
...
}
其中我们要重点关注的是remote.ServerTransHandler
,因为它几乎贯穿整个连接的生命周期,包揽了与客户端的数据交互,看一下他是怎么被创建出来的,具体使用的是哪个实现类:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
func (s *server) newSvrTransHandler() (handler remote.ServerTransHandler, err error) {
// 使用工厂模式创建
transHdlrFactory := s.opt.RemoteOpt.SvrHandlerFactory
transHdlr, err := transHdlrFactory.NewTransHandler(s.opt.RemoteOpt)
...
// 创建TransPipeline
transPl := remote.NewTransPipeline(transHdlr)
// 添加inBound和outBound中间件
for _, ib := range s.opt.RemoteOpt.Inbounds {
transPl.AddInboundHandler(ib)
}
for _, ob := range s.opt.RemoteOpt.Outbounds {
transPl.AddOutboundHandler(ob)
}
return transPl, nil
}
返回得到一个名为TransPipeline
的结构体,TransPipeline
是代理类,代理了真正从工厂创建出来的ServerTransHandler
对象,主要是为了调用inBound、outBound中间件。这里说的中间件不同于调用链上的中间件,是在流量入口和出口处做一些简单的工作,基本上不接触业务对象(req、reply)。
比如限流功能就是通过serverLimiterHandler中间件实现的(详见pkg/remote/bound/limiter_inbound.go)。
TransPipeline
包装的ServerTransHandler
是通过工厂生成的,工厂默认是:
1
2
3
4
5
6
func newServerRemoteOption() *remote.ServerOption {
return &remote.ServerOption{
SvrHandlerFactory: detection.NewSvrTransHandlerFactory(netpoll.NewSvrTransHandlerFactory(), nphttp2.NewSvrTransHandlerFactory()),
...
}
}
这个detection.svrTransHandlerFactory
工厂生产的依然不是具体实现,而是detection.svrTransHandler
。在说他的作用前我先提一嘴,我们用kitex创建服务端时并没有指定使用什么传输协议(ttheader、http2等),像官网介绍的那样,kitex一个服务端同时支持所有传输协议:
回到代码,detection.svrTransHandler
的作用就是检测连接上发送过来的数据匹配的是什么协议,从而选择使用对应的实现。比如默认的两个具体工厂分别生产的trans.svrTransHandler
和nphttp2.svrTransHandler
就分别对应着TTHeader协议和http2协议,后者可以通过在客户端创建时使用client.WithTransportProtocol(transport.GRPC)
指定。
上述整个处理流程可以表示成下图,左边大箭头代表不同的连接,并且选择不同的传输协议与服务端交互,经过协议检测,最后选择对应的ServerTransHandler
实现来处理数据:
下面就以TTHeader协议为例,同样按照我们熟悉的gonet实现的ServerTransHandler
进行分析,与图中绿色的netpoll
可视为功能平替。自己在本地调试运行gonet实现的时候,需要加上这两个服务端选项:
1
2
server.WithTransServerFactory(gonet.NewTransServerFactory())
server.WithTransHandlerFactory(gonet.NewSvrTransHandlerFactory())
这样一来,相当于不需要detection.svrTransHandler
根据协议选择工厂,工厂被直接指定为gonet.NewSvrTransHandlerFactory()
了。
另外,分别打开gonet和netpoll下的server_handler.go
,可以发现并不像nphttp2和detection那样有定义自己svrTransHandler
实现类,而是复用了trans/default_server_handler.go
中定义的svrTransHandler
:
1
2
3
4
// netpoll
func newSvrTransHandler(opt *remote.ServerOption) (remote.ServerTransHandler, error) {
return trans.NewDefaultSvrTransHandler(opt, NewNetpollConnExtension())
}
1
2
3
4
// gonet
func newSvrTransHandler(opt *remote.ServerOption) (remote.ServerTransHandler, error) {
return trans.NewDefaultSvrTransHandler(opt, NewGonetExtension())
}
因为gonet
和netpoll
的ServerTransHandler
大部分处理逻辑相同,处理的都是TTHeader协议。有差异的部分以插件的方式实现。default_server_handler.go
中定义的svrTransHandler
如下:
1
2
3
4
5
6
7
8
9
type svrTransHandler struct {
opt *remote.ServerOption
svcSearcher remote.ServiceSearcher
targetSvcInfo *serviceinfo.ServiceInfo
inkHdlFunc endpoint.Endpoint
codec remote.Codec
transPipe *remote.TransPipeline
ext Extension
}
因为ServerTransHandler
是被TransServer
直接来调用以处理连接生命周期的工作的,所以我们回到BootstrapServer
这个函数,从连接建立到派发的地方开始分析:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
func (ts *transServer) BootstrapServer(ln net.Listener) (err error) {
if ln == nil {
return errors.New("listener is nil in gonet transport server")
}
ts.ln = ln
for {
// 建立连接
conn, err := ts.ln.Accept()
if err != nil {
klog.Errorf("KITEX: BootstrapServer accept failed, err=%s", err.Error())
os.Exit(1)
}
go func() {
var (
ctx = context.Background()
err error
)
defer func() {
transRecover(ctx, conn, "OnRead")
}()
// 封装该连接,借助零拷贝reader的能力提升读性能
bc := newBufioConn(conn)
// OnActive
ctx, err = ts.transHdlr.OnActive(ctx, bc)
if err != nil {
klog.CtxErrorf(ctx, "KITEX: OnActive error=%s", err)
return
}
for {
ts.refreshDeadline(rpcinfo.GetRPCInfo(ctx), bc)
// OnRead
err := ts.transHdlr.OnRead(ctx, bc)
if err != nil {
// OnError
ts.onError(ctx, err, bc)
_ = bc.Close()
return
}
}
}()
}
}
我们顺着代码,先去OnActive
里都做了些什么(忽略TransPipeline
和detection
,直接分析实现类的OnActive
方法):
1
2
3
4
5
6
7
8
9
10
11
12
13
func (t *svrTransHandler) OnActive(ctx context.Context, conn net.Conn) (context.Context, error) {
// 从缓存池拿一个rpcinfo对象,用来保存RPC调用信息
ri := t.opt.InitOrResetRPCInfoFunc(nil, conn.RemoteAddr())
// 将对象保存到context当中,作为该连接的根context
return rpcinfo.NewCtxWithRPCInfo(ctx, ri), nil
}
func NewCtxWithRPCInfo(ctx context.Context, ri RPCInfo) context.Context {
if ri != nil {
return context.WithValue(ctx, ctxRPCInfoKey, ri)
}
return ctx
}
OnActive
比较简单,初始化一下该连接的rpcinfo对象和context。
接着是OnRead
,看着比较长,但基本逻辑就是「读请求 -> 解析请求 -> 处理请求 -> 写响应」:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
// The connection should be closed after returning error.
func (t *svrTransHandler) OnRead(ctx context.Context, conn net.Conn) (err error) {
ctx, ri := t.newCtxWithRPCInfo(ctx, conn)
t.ext.SetReadTimeout(ctx, conn, ri.Config(), remote.Server)
var recvMsg remote.Message
var sendMsg remote.Message
closeConnOutsideIfErr := true
defer func() {
panicErr := recover()
var wrapErr error
// 处理panic,打印日志
...
// 结束该RPC调用相关的tracer、profiler
t.finishTracer(ctx, ri, err, panicErr)
t.finishProfiler(ctx)
// 回收一些缓存池的资源
remote.RecycleMessage(recvMsg)
remote.RecycleMessage(sendMsg)
if rpcinfo.PoolEnabled() {
t.opt.InitOrResetRPCInfoFunc(ri, conn.RemoteAddr())
}
if wrapErr != nil {
err = wrapErr
}
if err != nil && !closeConnOutsideIfErr {
err = nil
}
}()
// 开启RPC调用相关的tracer、profiler
ctx = t.startTracer(ctx, ri)
ctx = t.startProfiler(ctx)
// 创建请求数据的载体
recvMsg = remote.NewMessageWithNewer(t.targetSvcInfo, t.svcSearcher, ri, remote.Call, remote.Server)
recvMsg.SetPayloadCodec(t.opt.PayloadCodec)
// Read读请求数据
ctx, err = t.transPipe.Read(ctx, conn, recvMsg)
if err != nil {
t.writeErrorReplyIfNeeded(ctx, recvMsg, conn, err, ri, true)
return err
}
svcInfo := recvMsg.ServiceInfo()
...
// 创建响应数据的载体
sendMsg = remote.NewMessage(methodInfo.NewResult(), svcInfo, ri, remote.Reply, remote.Server)
// OnMessage通知上层处理
ctx, err = t.transPipe.OnMessage(ctx, recvMsg, sendMsg)
if err != nil {
// OnError
t.OnError(ctx, err, conn)
err = remote.NewTransError(remote.InternalError, err)
// 有必要的话,将错误信息响应给客户端
if closeConn := t.writeErrorReplyIfNeeded(ctx, recvMsg, conn, err, ri, false); closeConn {
return err
}
// connection don't need to be closed when the error is return by the server handler
closeConnOutsideIfErr = false
return
}
remote.FillSendMsgFromRecvMsg(recvMsg, sendMsg)
// Write写响应数据
if ctx, err = t.transPipe.Write(ctx, conn, sendMsg); err != nil {
return err
}
return
}
分析具体代码之前,我们注意由OnMessage
返回的错误(比如调用链返回了错误),在trans中视为业务逻辑错误,而不是传输错误,因此不会将这个错误返回,这个函数只会返回数据传输相关错误,让TransServer
感知到传输错误后断开这个连接。
下面我们挑OnRead
中几个关键点函数进行分析。首先是负责读数据和解码数据的的Read
方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
func (t *svrTransHandler) Read(ctx context.Context, conn net.Conn, recvMsg remote.Message) (nctx context.Context, err error) {
var bufReader remote.ByteBuffer
defer func() {
// 释放buffer reader
t.ext.ReleaseBuffer(bufReader, err)
rpcinfo.Record(ctx, recvMsg.RPCInfo(), stats.ReadFinish, err)
}()
rpcinfo.Record(ctx, recvMsg.RPCInfo(), stats.ReadStart, nil)
// 创建buffer reader
bufReader = t.ext.NewReadByteBuffer(ctx, conn, recvMsg)
// 数据读入并解码
if codec, ok := t.codec.(remote.MetaDecoder); ok {
if err = codec.DecodeMeta(ctx, recvMsg, bufReader); err == nil {
...
err = codec.DecodePayload(ctx, recvMsg, bufReader)
}
} else {
err = t.codec.Decode(ctx, recvMsg, bufReader)
}
if err != nil {
recvMsg.Tags()[remote.ReadFailed] = true
return ctx, err
}
return ctx, nil
}
继续看负责通知消息到达并进行进一步处理的OnMessage
方法,这个方法就更简单了,就是把请求数据过一遍调用链,并将调用链返回的业务结果保存到响应数据中并返回。
1
2
3
4
func (t *svrTransHandler) OnMessage(ctx context.Context, args, result remote.Message) (context.Context, error) {
err := t.inkHdlFunc(ctx, args.Data(), result.Data())
return ctx, err
}
最后的写响应方法Write
:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func (t *svrTransHandler) Write(ctx context.Context, conn net.Conn, sendMsg remote.Message) (nctx context.Context, err error) {
var bufWriter remote.ByteBuffer
ri := sendMsg.RPCInfo()
rpcinfo.Record(ctx, ri, stats.WriteStart, nil)
defer func() {
// 释放buffer writer
t.ext.ReleaseBuffer(bufWriter, err)
rpcinfo.Record(ctx, ri, stats.WriteFinish, err)
}()
...
// 创建buffer writer
bufWriter = t.ext.NewWriteByteBuffer(ctx, conn, sendMsg)
// 数据编码并写入连接中
err = t.codec.Encode(ctx, sendMsg, bufWriter)
if err != nil {
return ctx, err
}
// 将剩余缓存全部写入连接中(类比刷盘)
return ctx, bufWriter.Flush()
}
另外还有没提到的OnError
方法,这个方法无论是业务错误还是传输错误都会被调用,主要是打印一些日志、当因为错误而发生连接关闭时设置下rpcinfo的信息(这个状态可能还以metadata的形式发回给客户端,或者用于stat等,没具体考究过)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
func (t *svrTransHandler) OnError(ctx context.Context, err error, conn net.Conn) {
ri := rpcinfo.GetRPCInfo(ctx)
rService, rAddr := getRemoteInfo(ri, conn)
if t.ext.IsRemoteClosedErr(err) {
// it should not regard error which cause by remote connection closed as server error
if ri == nil {
return
}
remote := rpcinfo.AsMutableEndpointInfo(ri.From())
remote.SetTag(rpcinfo.RemoteClosedTag, "1")
} else {
var de *kerrors.DetailedError
if ok := errors.As(err, &de); ok && de.Stack() != "" {
klog.CtxErrorf(ctx, "KITEX: processing request error, remoteService=%s, remoteAddr=%v, error=%s\nstack=%s", rService, rAddr, err.Error(), de.Stack())
} else {
klog.CtxErrorf(ctx, "KITEX: processing request error, remoteService=%s, remoteAddr=%v, error=%s", rService, rAddr, err.Error())
}
}
}
结尾
本篇与前篇从源码的视角简要分析了整个kitex服务端从创建、运行、建立连接、接收请求、处理请求、响应、结束的大致运行流程。通过以小见大的方式,文章是基于经典的go网络标准库来进行分析的,期间顺便弄清了一些接口的功能和他们之间的依赖关系,为更加深入地对kitex中其他各个组件的探讨提供了主干、基础。