gRPC阅读(2)—— 客户端
启动客户端
客户端的启动也是三部曲:
- 初始化grpc.ClientConn
- 创建service对应的Client(比如codegen生成的GreeterClient)
- 发起rpc调用
第二步比较简单,只是把ClientConn作为GreeterClient的成员变量,重点分析建立连接和RPC调用
初始化ClientConn
初始化ClientConn做了很多准备工作,包括但不限于:
- 应用选项(DialOption)
- 构建拦截器调用链(Interceptor)
- 决定使用什么resolver(resolver.Builder)
- 检查传输层凭证,比如TLS(TransportCredentials)
- 解析自定义服务端配置(ServerConfig)
- …
但还有一些配置是在真正发起RPC调用的时候才会被设置和触发,比如重试限流器、RPC配置选择器、RPC负载均衡器等。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
func NewClient(target string, opts ...DialOption) (conn *ClientConn, err error) {
cc := &ClientConn{
target: target,
conns: make(map[*addrConn]struct{}),
dopts: defaultDialOptions(),
}
// 重试限流器
cc.retryThrottler.Store((*retryThrottler)(nil))
// 配置选择器,动态选择每个RPC的调用配置
cc.safeConfigSelector.UpdateConfigSelector(&defaultConfigSelector{nil})
cc.ctx, cc.cancel = context.WithCancel(context.Background())
// options
...
// 确定使用哪个resolver(默认为dns)
if err := cc.initParsedTargetAndResolverBuilder(); err != nil {
return nil, err
}
// 内部使用的全局perTarget options
for _, opt := range globalPerTargetDialOptions {
opt.DialOptionForTarget(cc.parsedTarget.URL).apply(&cc.dopts)
}
// 初始化拦截器调用链
chainUnaryClientInterceptors(cc)
chainStreamClientInterceptors(cc)
// 验证安全传输,如TLS
if err := cc.validateTransportCredentials(); err != nil {
return nil, err
}
// 解析以json格式指定的配置
// 如负载均衡配置、per-RPC方法超时等
if cc.dopts.defaultServiceConfigRawJSON != nil {
scpr := parseServiceConfig(*cc.dopts.defaultServiceConfigRawJSON, cc.dopts.maxCallAttempts)
if scpr.Err != nil {
return nil, fmt.Errorf("%s: %v", invalidDefaultServiceConfigErrPrefix, scpr.Err)
}
cc.dopts.defaultServiceConfig, _ = scpr.Config.(*ServiceConfig)
}
// keepalive对服务端探活
cc.mkp = cc.dopts.copts.KeepaliveParams
// 获取authority,作为请求头中的:authority字段
if err = cc.initAuthority(); err != nil {
return nil, err
}
// 注册channelz,用于监测grpc的运行
// 可通过http协议访问/grpc/channelz/v1查看grpc的状态
cc.channelzRegistration(target)
channelz.Infof(logger, cc.channelz, "parsed dial target is: %#v", cc.parsedTarget)
channelz.Infof(logger, cc.channelz, "Channel authority set to %q", cc.authority)
// 连接状态管理器
cc.csMgr = newConnectivityStateManager(cc.ctx, cc.channelz)
// 负载均衡器,动态选择每个RPC的子通道
cc.pickerWrapper = newPickerWrapper(cc.dopts.copts.StatsHandlers)
// stats
cc.metricsRecorderList = stats.NewMetricsRecorderList(cc.dopts.copts.StatsHandlers)
cc.initIdleStateLocked() // Safe to call without the lock, since nothing else has a reference to cc.
// idle状态管理
cc.idlenessMgr = idle.NewManager((*idler)(cc), cc.dopts.idleTimeout)
return cc, nil
}
这么一套下来可以看到,初始化ClientConn的时候并没有建立连接,所以猜测是在第一次发起RPC调用的时候才去尝试建立连接。还有一种验证方法是,把服务端关闭,尝试NewClient,是不会返回错误的。
发起RPC调用
从官方例子中的SayHello方法进入分析:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
func (c *greeterClient) SayHello(ctx context.Context, in *HelloRequest, opts ...grpc.CallOption) (*HelloReply, error) {
// 调用RPC时指定的选项
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
out := new(HelloReply)
// 调用ClientConn.Invoke,传入相应的方法名
err := c.cc.Invoke(ctx, Greeter_SayHello_FullMethodName, in, out, cOpts...)
if err != nil {
return nil, err
}
return out, nil
}
func (cc *ClientConn) Invoke(ctx context.Context, method string, args, reply any, opts ...CallOption) error {
// 调用RPC时指定的选项
opts = combine(cc.dopts.callOptions, opts)
// 如果设置了拦截器,调用拦截器
if cc.dopts.unaryInt != nil {
return cc.dopts.unaryInt(ctx, method, args, reply, cc, invoke, opts...)
}
// 直接调用invoke
return invoke(ctx, method, args, reply, cc, opts...)
}
默认情况下没有任何拦截器,我们直接分析invoke即可:
1
2
3
4
5
6
7
8
9
10
11
12
13
func invoke(ctx context.Context, method string, req, reply any, cc *ClientConn, opts ...CallOption) error {
// 创建stream
cs, err := newClientStream(ctx, unaryStreamDesc, cc, method, opts...)
if err != nil {
return err
}
// 发送请求
if err := cs.SendMsg(req); err != nil {
return err
}
// 等待响应
return cs.RecvMsg(reply)
}
ok,从现在开始,下面的事情就开始变得复杂了,首先是创建stream,创建连接就是在这一步完成的,忽略其它细节,最终会来到newClientStreamWithParams这个函数中,在里面会创建连接(注意实际上不一定是“创建”,而是复用,但是第一次调用的时候肯定是建立),然后基于这个连接创建stream:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
func newClientStreamWithParams(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, mc serviceconfig.MethodConfig, onCommit, doneFunc func(), opts ...CallOption) (_ iresolver.ClientStream, err error) {
...
// Pick the transport to use and create a new stream on the transport.
// Assign cs.attempt upon success.
op := func(a *csAttempt) error {
// 创建连接
if err := a.getTransport(); err != nil {
return err
}
// 创建stream
if err := a.newStream(); err != nil {
return err
}
cs.attempt = a
return nil
}
// 带有重试的创建stream
if err := cs.withRetry(op, func() { cs.bufferForRetryLocked(0, op, nil) }); err != nil {
return nil, err
}
...
return cs, nil
}
创建好stream之后,下面就基于这个stream发送请求和接收响应,首先看看发送请求的数据流转:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
func (cs *clientStream) SendMsg(m any) (err error) {
...
// load hdr, payload, data
hdr, data, payload, pf, err := prepareMsg(m, cs.codec, cs.cp, cs.comp, cs.cc.dopts.copts.BufferPool)
if err != nil {
return err
}
...
// 带有重试的发送数据
op := func(a *csAttempt) error {
return a.sendMsg(m, hdr, payload, dataLen, payloadLen)
}
onSuccessCalled := false
err = cs.withRetry(op, func() {
cs.bufferForRetryLocked(len(hdr)+payloadLen, op, payload.Free)
onSuccessCalled = true
})
...
return err
}
其中调用了csAttempt.sendMsg来发送数据:
1
2
3
4
5
6
7
8
9
func (a *csAttempt) sendMsg(m any, hdr []byte, payld mem.BufferSlice, dataLength, payloadLength int) error {
...
// 写数据
if err := a.t.Write(a.s, hdr, payld, &transport.Options{Last: !cs.desc.ClientStreams}); err != nil {
...
}
...
}
其中使用的是ClientTransport来发送数据:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
func (t *http2Client) Write(s *ClientStream, hdr []byte, data mem.BufferSlice, opts *Options) error {
...
// 创建dataFrame
df := &dataFrame{
streamID: s.id,
endStream: opts.Last,
h: hdr,
reader: reader,
}
...
// 通知loopy发送数据
if err := t.controlBuf.put(df); err != nil {
_ = reader.Close()
return err
}
return nil
}
最终是通过loopy来将请求包含的数据发送出去。
再看下接收响应的数据流转:
1
2
3
4
5
6
7
func (cs *clientStream) RecvMsg(m any) error {
...
err := cs.withRetry(func(a *csAttempt) error {
return a.recvMsg(m, recvInfo)
}, cs.commitAttemptLocked)
}
同样是通过csAttempt上的recvMsg方法接收数据:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
func (a *csAttempt) recvMsg(m any, payInfo *payloadInfo) (err error) {
if err := recv(a.p, cs.codec, a.s, a.dc, m, *cs.callInfo.maxReceiveMessageSize, payInfo, a.decomp, false); err != nil {
...
}
}
func recv(p *parser, c baseCodec, s recvCompressor, dc Decompressor, m any, maxReceiveMessageSize int, payInfo *payloadInfo, compressor encoding.Compressor, isServer bool) error {
data, err := recvAndDecompress(p, s, dc, maxReceiveMessageSize, payInfo, compressor, isServer)
...
}
func recvAndDecompress(p *parser, s recvCompressor, dc Decompressor, maxReceiveMessageSize int, payInfo *payloadInfo, compressor encoding.Compressor, isServer bool,
) (out mem.BufferSlice, err error) {
pf, compressed, err := p.recvMsg(maxReceiveMessageSize)
...
}
...
一路点进去各个接收数据的关键方法/函数,可以发现数据是由其它reader线程读出并解码成帧,并封装为recvMsg送入channel中,当从stream试图接收数据的时候,实际上是从channel中读取这些帧,然后进行处理。
总结
这篇写得总感觉头重脚轻,但是没办法,越写越发现客户端还真的挺复杂的,负载均衡、动态配置、连接懒建立、重试机制…这一套下来,限于篇幅不打算再在此赘述。
所以这篇的核心目的是,简单梳理发起一次RPC调用的数据流转,并且大概了解负载均衡…这套东西很多是在RPC调用的时候才会触发的即可。