文章

字节RPC框架kitex源码阅读(二)

字节RPC框架kitex源码阅读(二)

基于kitex@v0.11.3

开篇

在上篇字节RPC框架kitex源码阅读(一)中,简单过了一遍从创建服务、监听端口、建立连接&派发、退出清理的流程,对于代码生成的回调如何在kitex内部得到调用也有了初步的认知。

这篇是(一)的续篇,深入分析remote.Server如何基于与客户端建立的连接做交互,包括传输、解码、编码等。

remote.ServerTransHandler

我们知道server.Server主要构建调用链、调用用户定义的回调。与远程传输有关的remote.Server提供了简单的几个接口方法给server.Server使用,相当于server.Server只需要关心调用链要怎么消费封装好的数据,不用管传输如何建立、数据如何封装:

1
2
3
4
5
6
// remote.Server
type Server interface {
	Start() chan error
	Stop() error
	Address() net.Addr
}

而连接建立、数据收发、连接关闭等工作,remote.Server又依托remote.TransServer接口提供的这几个方法实现:

1
2
3
4
5
6
type TransServer interface {
	CreateListener(net.Addr) (net.Listener, error)
	BootstrapServer(net.Listener) (err error)
	Shutdown() error
	ConnCount() utils.AtomicInt
}

remote.TransServer又依赖于remote.TransServerHandler接口提供的这些方法,贯穿连接的整个生命周期所做的一些工作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
type ServerTransHandler interface {
	Write(ctx context.Context, conn net.Conn, send Message) (nctx context.Context, err error)
	Read(ctx context.Context, conn net.Conn, msg Message) (nctx context.Context, err error)
    
    // 连接关闭时调用
	OnInactive(ctx context.Context, conn net.Conn)
    // 处理RPC请求出错时调用
	OnError(ctx context.Context, err error, conn net.Conn)
    // 处理RPC请求完成后调用,调用调用链交给上层处理,包括业务endpoint
	OnMessage(ctx context.Context, args, result Message) (context.Context, error)
	SetPipeline(pipeline *TransPipeline)
    
    // 连接新建时调用
	OnActive(ctx context.Context, conn net.Conn) (context.Context, error)
    // 有RPC请求时调用
	OnRead(ctx context.Context, conn net.Conn) error
}

梳理完这几个接口之后,我们从server.Server.Run开始往下分析,remote.Serverremote.TransServerremote.ServerTransHandler都在此时被创建出来:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func (s *server) Run() (err error) {
	...
    
    // 创建ServerTransHandler,负责数据传输过程中的关键步骤处理
	transHdlr, err := s.newSvrTransHandler()
	if err != nil {
		return err
	}
	s.Lock()
    // TransServer在NewServer中被创建,并与ServerTransHandler绑定
	s.svr, err = remotesvr.NewServer(s.opt.RemoteOpt, s.eps, transHdlr)
	s.Unlock()
	if err != nil {
		return err
	}

	...

    // Start开始异步监听
	errCh := s.svr.Start()
    
	...
}

其中我们要重点关注的是remote.ServerTransHandler,因为它几乎贯穿整个连接的生命周期,包揽了与客户端的数据交互,看一下他是怎么被创建出来的,具体使用的是哪个实现类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
func (s *server) newSvrTransHandler() (handler remote.ServerTransHandler, err error) {
    // 使用工厂模式创建
	transHdlrFactory := s.opt.RemoteOpt.SvrHandlerFactory
	transHdlr, err := transHdlrFactory.NewTransHandler(s.opt.RemoteOpt)
	...
    
    // 创建TransPipeline
	transPl := remote.NewTransPipeline(transHdlr)

    // 添加inBound和outBound中间件
	for _, ib := range s.opt.RemoteOpt.Inbounds {
		transPl.AddInboundHandler(ib)
	}
	for _, ob := range s.opt.RemoteOpt.Outbounds {
		transPl.AddOutboundHandler(ob)
	}
    
	return transPl, nil
}

返回得到一个名为TransPipeline的结构体,TransPipeline是代理类,代理了真正从工厂创建出来的ServerTransHandler对象,主要是为了调用inBound、outBound中间件。这里说的中间件不同于调用链上的中间件,是在流量入口和出口处做一些简单的工作,基本上不接触业务对象(req、reply)。

image-20241023183320421

比如限流功能就是通过serverLimiterHandler中间件实现的(详见pkg/remote/bound/limiter_inbound.go)。

TransPipeline包装的ServerTransHandler是通过工厂生成的,工厂默认是:

1
2
3
4
5
6
func newServerRemoteOption() *remote.ServerOption {
	return &remote.ServerOption{
		SvrHandlerFactory:     detection.NewSvrTransHandlerFactory(netpoll.NewSvrTransHandlerFactory(), nphttp2.NewSvrTransHandlerFactory()),
		...
	}
}

这个detection.svrTransHandlerFactory工厂生产的依然不是具体实现,而是detection.svrTransHandler。在说他的作用前我先提一嘴,我们用kitex创建服务端时并没有指定使用什么传输协议(ttheader、http2等),像官网介绍的那样,kitex一个服务端同时支持所有传输协议:

回到代码,detection.svrTransHandler的作用就是检测连接上发送过来的数据匹配的是什么协议,从而选择使用对应的实现。比如默认的两个具体工厂分别生产的trans.svrTransHandlernphttp2.svrTransHandler就分别对应着TTHeader协议和http2协议,后者可以通过在客户端创建时使用client.WithTransportProtocol(transport.GRPC)指定。

上述整个处理流程可以表示成下图,左边大箭头代表不同的连接,并且选择不同的传输协议与服务端交互,经过协议检测,最后选择对应的ServerTransHandler实现来处理数据:

image-20241023191425835

下面就以TTHeader协议为例,同样按照我们熟悉的gonet实现的ServerTransHandler进行分析,与图中绿色的netpoll可视为功能平替。自己在本地调试运行gonet实现的时候,需要加上这两个服务端选项:

1
2
server.WithTransServerFactory(gonet.NewTransServerFactory())
server.WithTransHandlerFactory(gonet.NewSvrTransHandlerFactory())

这样一来,相当于不需要detection.svrTransHandler根据协议选择工厂,工厂被直接指定为gonet.NewSvrTransHandlerFactory()了。

另外,分别打开gonet和netpoll下的server_handler.go,可以发现并不像nphttp2和detection那样有定义自己svrTransHandler实现类,而是复用了trans/default_server_handler.go中定义的svrTransHandler

1
2
3
4
// netpoll
func newSvrTransHandler(opt *remote.ServerOption) (remote.ServerTransHandler, error) {
	return trans.NewDefaultSvrTransHandler(opt, NewNetpollConnExtension())
}
1
2
3
4
// gonet
func newSvrTransHandler(opt *remote.ServerOption) (remote.ServerTransHandler, error) {
	return trans.NewDefaultSvrTransHandler(opt, NewGonetExtension())
}

因为gonetnetpollServerTransHandler大部分处理逻辑相同,处理的都是TTHeader协议。有差异的部分以插件的方式实现。default_server_handler.go中定义的svrTransHandler如下:

1
2
3
4
5
6
7
8
9
type svrTransHandler struct {
	opt           *remote.ServerOption
	svcSearcher   remote.ServiceSearcher
	targetSvcInfo *serviceinfo.ServiceInfo
	inkHdlFunc    endpoint.Endpoint
	codec         remote.Codec
	transPipe     *remote.TransPipeline
	ext           Extension
}

因为ServerTransHandler是被TransServer直接来调用以处理连接生命周期的工作的,所以我们回到BootstrapServer这个函数,从连接建立到派发的地方开始分析:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
func (ts *transServer) BootstrapServer(ln net.Listener) (err error) {
	if ln == nil {
		return errors.New("listener is nil in gonet transport server")
	}
	ts.ln = ln
	for {
        // 建立连接
		conn, err := ts.ln.Accept()
		if err != nil {
			klog.Errorf("KITEX: BootstrapServer accept failed, err=%s", err.Error())
			os.Exit(1)
		}
		go func() {
			var (
				ctx = context.Background()
				err error
			)
			defer func() {
				transRecover(ctx, conn, "OnRead")
			}()
            // 封装该连接,借助零拷贝reader的能力提升读性能
			bc := newBufioConn(conn)
            
            // OnActive
			ctx, err = ts.transHdlr.OnActive(ctx, bc)
			if err != nil {
				klog.CtxErrorf(ctx, "KITEX: OnActive error=%s", err)
				return
			}
			for {
				ts.refreshDeadline(rpcinfo.GetRPCInfo(ctx), bc)
                // OnRead
				err := ts.transHdlr.OnRead(ctx, bc)
				if err != nil {
                    // OnError
					ts.onError(ctx, err, bc)
					_ = bc.Close()
					return
				}
			}
		}()
	}
}

我们顺着代码,先去OnActive里都做了些什么(忽略TransPipelinedetection,直接分析实现类的OnActive方法):

1
2
3
4
5
6
7
8
9
10
11
12
13
func (t *svrTransHandler) OnActive(ctx context.Context, conn net.Conn) (context.Context, error) {
	// 从缓存池拿一个rpcinfo对象,用来保存RPC调用信息
	ri := t.opt.InitOrResetRPCInfoFunc(nil, conn.RemoteAddr())
    // 将对象保存到context当中,作为该连接的根context
	return rpcinfo.NewCtxWithRPCInfo(ctx, ri), nil
}

func NewCtxWithRPCInfo(ctx context.Context, ri RPCInfo) context.Context {
	if ri != nil {
		return context.WithValue(ctx, ctxRPCInfoKey, ri)
	}
	return ctx
}

OnActive比较简单,初始化一下该连接的rpcinfo对象和context。

接着是OnRead,看着比较长,但基本逻辑就是「读请求 -> 解析请求 -> 处理请求 -> 写响应」:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
// The connection should be closed after returning error.
func (t *svrTransHandler) OnRead(ctx context.Context, conn net.Conn) (err error) {
	ctx, ri := t.newCtxWithRPCInfo(ctx, conn)
	t.ext.SetReadTimeout(ctx, conn, ri.Config(), remote.Server)
	var recvMsg remote.Message
	var sendMsg remote.Message
	closeConnOutsideIfErr := true
	defer func() {
		panicErr := recover()
		var wrapErr error
        // 处理panic,打印日志
		...
        
        // 结束该RPC调用相关的tracer、profiler
		t.finishTracer(ctx, ri, err, panicErr)
		t.finishProfiler(ctx)
        // 回收一些缓存池的资源
		remote.RecycleMessage(recvMsg)
		remote.RecycleMessage(sendMsg)
		if rpcinfo.PoolEnabled() {
			t.opt.InitOrResetRPCInfoFunc(ri, conn.RemoteAddr())
		}
		if wrapErr != nil {
			err = wrapErr
		}
		if err != nil && !closeConnOutsideIfErr {
			err = nil
		}
	}()
    
    // 开启RPC调用相关的tracer、profiler
	ctx = t.startTracer(ctx, ri)
	ctx = t.startProfiler(ctx)
    // 创建请求数据的载体
	recvMsg = remote.NewMessageWithNewer(t.targetSvcInfo, t.svcSearcher, ri, remote.Call, remote.Server)
	recvMsg.SetPayloadCodec(t.opt.PayloadCodec)
    
    // Read读请求数据
	ctx, err = t.transPipe.Read(ctx, conn, recvMsg)
	if err != nil {
		t.writeErrorReplyIfNeeded(ctx, recvMsg, conn, err, ri, true)
		return err
	}

	svcInfo := recvMsg.ServiceInfo()
    
    ...
    
    // 创建响应数据的载体
    sendMsg = remote.NewMessage(methodInfo.NewResult(), svcInfo, ri, remote.Reply, remote.Server)
    // OnMessage通知上层处理
    ctx, err = t.transPipe.OnMessage(ctx, recvMsg, sendMsg)
    if err != nil {
        // OnError
        t.OnError(ctx, err, conn)
        err = remote.NewTransError(remote.InternalError, err)
        // 有必要的话,将错误信息响应给客户端
        if closeConn := t.writeErrorReplyIfNeeded(ctx, recvMsg, conn, err, ri, false); closeConn {
            return err
        }
        // connection don't need to be closed when the error is return by the server handler
        closeConnOutsideIfErr = false
        return
    }

	remote.FillSendMsgFromRecvMsg(recvMsg, sendMsg)
    // Write写响应数据
	if ctx, err = t.transPipe.Write(ctx, conn, sendMsg); err != nil {
		return err
	}
	return
}

分析具体代码之前,我们注意由OnMessage返回的错误(比如调用链返回了错误),在trans中视为业务逻辑错误,而不是传输错误,因此不会将这个错误返回,这个函数只会返回数据传输相关错误,让TransServer感知到传输错误后断开这个连接。

下面我们挑OnRead中几个关键点函数进行分析。首先是负责读数据和解码数据的的Read方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
func (t *svrTransHandler) Read(ctx context.Context, conn net.Conn, recvMsg remote.Message) (nctx context.Context, err error) {
	var bufReader remote.ByteBuffer
	defer func() {
        // 释放buffer reader
		t.ext.ReleaseBuffer(bufReader, err)
		rpcinfo.Record(ctx, recvMsg.RPCInfo(), stats.ReadFinish, err)
	}()
	rpcinfo.Record(ctx, recvMsg.RPCInfo(), stats.ReadStart, nil)

    // 创建buffer reader
	bufReader = t.ext.NewReadByteBuffer(ctx, conn, recvMsg)
    
    // 数据读入并解码
	if codec, ok := t.codec.(remote.MetaDecoder); ok {
		if err = codec.DecodeMeta(ctx, recvMsg, bufReader); err == nil {
			...
			err = codec.DecodePayload(ctx, recvMsg, bufReader)
		}
	} else {
		err = t.codec.Decode(ctx, recvMsg, bufReader)
	}
    
	if err != nil {
		recvMsg.Tags()[remote.ReadFailed] = true
		return ctx, err
	}
	return ctx, nil
}

继续看负责通知消息到达并进行进一步处理的OnMessage方法,这个方法就更简单了,就是把请求数据过一遍调用链,并将调用链返回的业务结果保存到响应数据中并返回。

1
2
3
4
func (t *svrTransHandler) OnMessage(ctx context.Context, args, result remote.Message) (context.Context, error) {
	err := t.inkHdlFunc(ctx, args.Data(), result.Data())
	return ctx, err
}

最后的写响应方法Write

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func (t *svrTransHandler) Write(ctx context.Context, conn net.Conn, sendMsg remote.Message) (nctx context.Context, err error) {
	var bufWriter remote.ByteBuffer
	ri := sendMsg.RPCInfo()
	rpcinfo.Record(ctx, ri, stats.WriteStart, nil)
	defer func() {
        // 释放buffer writer
		t.ext.ReleaseBuffer(bufWriter, err)
		rpcinfo.Record(ctx, ri, stats.WriteFinish, err)
	}()

    ...

    // 创建buffer writer
	bufWriter = t.ext.NewWriteByteBuffer(ctx, conn, sendMsg)
    
    // 数据编码并写入连接中
	err = t.codec.Encode(ctx, sendMsg, bufWriter)
	if err != nil {
		return ctx, err
	}
    // 将剩余缓存全部写入连接中(类比刷盘)
	return ctx, bufWriter.Flush()
}

另外还有没提到的OnError方法,这个方法无论是业务错误还是传输错误都会被调用,主要是打印一些日志、当因为错误而发生连接关闭时设置下rpcinfo的信息(这个状态可能还以metadata的形式发回给客户端,或者用于stat等,没具体考究过)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
func (t *svrTransHandler) OnError(ctx context.Context, err error, conn net.Conn) {
	ri := rpcinfo.GetRPCInfo(ctx)
	rService, rAddr := getRemoteInfo(ri, conn)
	if t.ext.IsRemoteClosedErr(err) {
		// it should not regard error which cause by remote connection closed as server error
		if ri == nil {
			return
		}
		remote := rpcinfo.AsMutableEndpointInfo(ri.From())
		remote.SetTag(rpcinfo.RemoteClosedTag, "1")
	} else {
		var de *kerrors.DetailedError
		if ok := errors.As(err, &de); ok && de.Stack() != "" {
			klog.CtxErrorf(ctx, "KITEX: processing request error, remoteService=%s, remoteAddr=%v, error=%s\nstack=%s", rService, rAddr, err.Error(), de.Stack())
		} else {
			klog.CtxErrorf(ctx, "KITEX: processing request error, remoteService=%s, remoteAddr=%v, error=%s", rService, rAddr, err.Error())
		}
	}
}

结尾

本篇与前篇从源码的视角简要分析了整个kitex服务端从创建、运行、建立连接、接收请求、处理请求、响应、结束的大致运行流程。通过以小见大的方式,文章是基于经典的go网络标准库来进行分析的,期间顺便弄清了一些接口的功能和他们之间的依赖关系,为更加深入地对kitex中其他各个组件的探讨提供了主干、基础。

本文由作者按照 CC BY 4.0 进行授权