文章

gRPC阅读(2)—— 客户端

gRPC阅读(2)—— 客户端

启动客户端

客户端的启动也是三部曲:

  1. 初始化grpc.ClientConn
  2. 创建service对应的Client(比如codegen生成的GreeterClient)
  3. 发起rpc调用

第二步比较简单,只是把ClientConn作为GreeterClient的成员变量,重点分析建立连接和RPC调用

初始化ClientConn

初始化ClientConn做了很多准备工作,包括但不限于:

  • 应用选项(DialOption)
  • 构建拦截器调用链(Interceptor)
  • 决定使用什么resolver(resolver.Builder)
  • 检查传输层凭证,比如TLS(TransportCredentials)
  • 解析自定义服务端配置(ServerConfig)

但还有一些配置是在真正发起RPC调用的时候才会被设置和触发,比如重试限流器、RPC配置选择器、RPC负载均衡器等。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
func NewClient(target string, opts ...DialOption) (conn *ClientConn, err error) {
	cc := &ClientConn{
		target: target,
		conns:  make(map[*addrConn]struct{}),
		dopts:  defaultDialOptions(),
	}

    // 重试限流器
	cc.retryThrottler.Store((*retryThrottler)(nil))
    // 配置选择器,动态选择每个RPC的调用配置
	cc.safeConfigSelector.UpdateConfigSelector(&defaultConfigSelector{nil})
	cc.ctx, cc.cancel = context.WithCancel(context.Background())

    // options
    ...

    // 确定使用哪个resolver(默认为dns)
	if err := cc.initParsedTargetAndResolverBuilder(); err != nil {
		return nil, err
	}

    // 内部使用的全局perTarget options
	for _, opt := range globalPerTargetDialOptions {
		opt.DialOptionForTarget(cc.parsedTarget.URL).apply(&cc.dopts)
	}

    // 初始化拦截器调用链
	chainUnaryClientInterceptors(cc)
	chainStreamClientInterceptors(cc)

    // 验证安全传输,如TLS
	if err := cc.validateTransportCredentials(); err != nil {
		return nil, err
	}

    // 解析以json格式指定的配置
    // 如负载均衡配置、per-RPC方法超时等
	if cc.dopts.defaultServiceConfigRawJSON != nil {
		scpr := parseServiceConfig(*cc.dopts.defaultServiceConfigRawJSON, cc.dopts.maxCallAttempts)
		if scpr.Err != nil {
			return nil, fmt.Errorf("%s: %v", invalidDefaultServiceConfigErrPrefix, scpr.Err)
		}
		cc.dopts.defaultServiceConfig, _ = scpr.Config.(*ServiceConfig)
	}
    
    // keepalive对服务端探活
	cc.mkp = cc.dopts.copts.KeepaliveParams

    // 获取authority,作为请求头中的:authority字段
	if err = cc.initAuthority(); err != nil {
		return nil, err
	}

	// 注册channelz,用于监测grpc的运行
    // 可通过http协议访问/grpc/channelz/v1查看grpc的状态
	cc.channelzRegistration(target)
	channelz.Infof(logger, cc.channelz, "parsed dial target is: %#v", cc.parsedTarget)
	channelz.Infof(logger, cc.channelz, "Channel authority set to %q", cc.authority)

    // 连接状态管理器
	cc.csMgr = newConnectivityStateManager(cc.ctx, cc.channelz)
    // 负载均衡器,动态选择每个RPC的子通道
	cc.pickerWrapper = newPickerWrapper(cc.dopts.copts.StatsHandlers)

    // stats
	cc.metricsRecorderList = stats.NewMetricsRecorderList(cc.dopts.copts.StatsHandlers)

	cc.initIdleStateLocked() // Safe to call without the lock, since nothing else has a reference to cc.
    
    // idle状态管理
	cc.idlenessMgr = idle.NewManager((*idler)(cc), cc.dopts.idleTimeout)

	return cc, nil
}

这么一套下来可以看到,初始化ClientConn的时候并没有建立连接,所以猜测是在第一次发起RPC调用的时候才去尝试建立连接。还有一种验证方法是,把服务端关闭,尝试NewClient,是不会返回错误的。

发起RPC调用

从官方例子中的SayHello方法进入分析:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
func (c *greeterClient) SayHello(ctx context.Context, in *HelloRequest, opts ...grpc.CallOption) (*HelloReply, error) {
    // 调用RPC时指定的选项
	cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
	out := new(HelloReply)
    // 调用ClientConn.Invoke,传入相应的方法名
	err := c.cc.Invoke(ctx, Greeter_SayHello_FullMethodName, in, out, cOpts...)
	if err != nil {
		return nil, err
	}
	return out, nil
}

func (cc *ClientConn) Invoke(ctx context.Context, method string, args, reply any, opts ...CallOption) error {
    // 调用RPC时指定的选项
	opts = combine(cc.dopts.callOptions, opts)

    // 如果设置了拦截器,调用拦截器
	if cc.dopts.unaryInt != nil {
		return cc.dopts.unaryInt(ctx, method, args, reply, cc, invoke, opts...)
	}
    
    // 直接调用invoke
	return invoke(ctx, method, args, reply, cc, opts...)
}

默认情况下没有任何拦截器,我们直接分析invoke即可:

1
2
3
4
5
6
7
8
9
10
11
12
13
func invoke(ctx context.Context, method string, req, reply any, cc *ClientConn, opts ...CallOption) error {
    // 创建stream
	cs, err := newClientStream(ctx, unaryStreamDesc, cc, method, opts...)
	if err != nil {
		return err
	}
    // 发送请求
	if err := cs.SendMsg(req); err != nil {
		return err
	}
    // 等待响应
	return cs.RecvMsg(reply)
}

ok,从现在开始,下面的事情就开始变得复杂了,首先是创建stream,创建连接就是在这一步完成的,忽略其它细节,最终会来到newClientStreamWithParams这个函数中,在里面会创建连接(注意实际上不一定是“创建”,而是复用,但是第一次调用的时候肯定是建立),然后基于这个连接创建stream:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
func newClientStreamWithParams(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, mc serviceconfig.MethodConfig, onCommit, doneFunc func(), opts ...CallOption) (_ iresolver.ClientStream, err error) {
	...

	// Pick the transport to use and create a new stream on the transport.
	// Assign cs.attempt upon success.
	op := func(a *csAttempt) error {
        // 创建连接
		if err := a.getTransport(); err != nil {
			return err
		}
        // 创建stream
		if err := a.newStream(); err != nil {
			return err
		}
		cs.attempt = a
		return nil
	}
    // 带有重试的创建stream
	if err := cs.withRetry(op, func() { cs.bufferForRetryLocked(0, op, nil) }); err != nil {
		return nil, err
	}
    
    ...
    
    return cs, nil
}

创建好stream之后,下面就基于这个stream发送请求和接收响应,首先看看发送请求的数据流转:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
func (cs *clientStream) SendMsg(m any) (err error) {
	...

	// load hdr, payload, data
	hdr, data, payload, pf, err := prepareMsg(m, cs.codec, cs.cp, cs.comp, cs.cc.dopts.copts.BufferPool)
	if err != nil {
		return err
	}

	...
    
    // 带有重试的发送数据
	op := func(a *csAttempt) error {
		return a.sendMsg(m, hdr, payload, dataLen, payloadLen)
	}
	onSuccessCalled := false
	err = cs.withRetry(op, func() {
		cs.bufferForRetryLocked(len(hdr)+payloadLen, op, payload.Free)
		onSuccessCalled = true
	})
    
	...
	return err
}

其中调用了csAttempt.sendMsg来发送数据:

1
2
3
4
5
6
7
8
9
func (a *csAttempt) sendMsg(m any, hdr []byte, payld mem.BufferSlice, dataLength, payloadLength int) error {
	...
    
    // 写数据
	if err := a.t.Write(a.s, hdr, payld, &transport.Options{Last: !cs.desc.ClientStreams}); err != nil {
		...
	}
	...
}

其中使用的是ClientTransport来发送数据:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
func (t *http2Client) Write(s *ClientStream, hdr []byte, data mem.BufferSlice, opts *Options) error {
	...
    
    // 创建dataFrame
	df := &dataFrame{
		streamID:  s.id,
		endStream: opts.Last,
		h:         hdr,
		reader:    reader,
	}
	...
    
    // 通知loopy发送数据
	if err := t.controlBuf.put(df); err != nil {
		_ = reader.Close()
		return err
	}
	return nil
}

最终是通过loopy来将请求包含的数据发送出去。

再看下接收响应的数据流转:

1
2
3
4
5
6
7
func (cs *clientStream) RecvMsg(m any) error {
	...
    
	err := cs.withRetry(func(a *csAttempt) error {
		return a.recvMsg(m, recvInfo)
	}, cs.commitAttemptLocked)
}

同样是通过csAttempt上的recvMsg方法接收数据:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
func (a *csAttempt) recvMsg(m any, payInfo *payloadInfo) (err error) {
	if err := recv(a.p, cs.codec, a.s, a.dc, m, *cs.callInfo.maxReceiveMessageSize, payInfo, a.decomp, false); err != nil {
        ...
    }
}


func recv(p *parser, c baseCodec, s recvCompressor, dc Decompressor, m any, maxReceiveMessageSize int, payInfo *payloadInfo, compressor encoding.Compressor, isServer bool) error {
	data, err := recvAndDecompress(p, s, dc, maxReceiveMessageSize, payInfo, compressor, isServer)
    
	...
}

func recvAndDecompress(p *parser, s recvCompressor, dc Decompressor, maxReceiveMessageSize int, payInfo *payloadInfo, compressor encoding.Compressor, isServer bool,
) (out mem.BufferSlice, err error) {
	pf, compressed, err := p.recvMsg(maxReceiveMessageSize)
    ...
}

...

一路点进去各个接收数据的关键方法/函数,可以发现数据是由其它reader线程读出并解码成帧,并封装为recvMsg送入channel中,当从stream试图接收数据的时候,实际上是从channel中读取这些帧,然后进行处理。

总结

这篇写得总感觉头重脚轻,但是没办法,越写越发现客户端还真的挺复杂的,负载均衡、动态配置、连接懒建立、重试机制…这一套下来,限于篇幅不打算再在此赘述。

所以这篇的核心目的是,简单梳理发起一次RPC调用的数据流转,并且大概了解负载均衡…这套东西很多是在RPC调用的时候才会触发的即可。

本文由作者按照 CC BY 4.0 进行授权