文章

gRPC阅读(3)—— 服务发现

gRPC阅读(3)—— 服务发现

服务发现概述

平时用浏览器上过网都知道,输入一个网址比如google.com就能访问内容,背后是DNS帮我们将google.com解析成IP地址,最终浏览器才能基于TCP协议,从本地连接到这个服务提供商的IP地址。所以DNS属于服务发现的其中一种方式。

所以服务发现提供的就是通过自动化的方式帮助服务在网络中找到彼此,无需手动配置。

一个好的服务发现需要:

  1. 服务地址动态变化:服务的 IP 或端口可能因为容器化或自动扩展而频繁改变。
  2. 高可用:需要在服务实例宕机时快速感知并移除不健康的实例。
  3. 负载均衡:服务发现需要为调用方提供负载均衡能力,选择最佳的服务实例。

服务发现通常与负载均衡同时实现,分为两种方式:

  1. 客户端服务发现(如eureka、consul):在客户端做负载均衡,选择一个实例进行调用,优点是避免集中式LB可能存在的瓶颈,性能较好,但是每个客户端需要维护服务端列表,服务端这部分的负载可能变高。并且更新LB或其他相关组件的策略时需要所有客户端都一起更新,管理不方便。并且需要多语言支持
  2. 代理服务发现(如k8s+coreDNS、nginx+consul):客户端将请求发送到负载均衡器(如 API 网关),由负载均衡器查询服务注册中心并将请求转发给目标服务实例。
  3. 独立LB进程:LB与消费者在同一个主机中,但分别作为不同的进程,避免了需要多语言支持,以及LB的更新不需要调用方改代码。

服务发现的核心组件有:注册中心服务提供者客户端(服务消费者)

服务发现的关键功能有:服务注册服务查询健康检查动态更新

gRPC服务发现

image

gRPC使用客户端服务发现,gRPC中称为名称解析(Name Resolution),默认情况下使用DNS-resolver。通过服务发现解析出IP列表后就通过LB组件进行负载均衡并建立连接。

下面基于target=localhost:50052这个服务端地址来进行分析,并且是默认的DNS作为resolver(不用官方例子的50051端口是因为被mac的launchd进程占用了)。

首先gRPC在创建cc(ClientConn)的时候,使用initParsedTargetAndResolverBuilder创建resolver.Builder。这一步决定的是采用什么服务发现机制,默认是DNS。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
func (cc *ClientConn) initParsedTargetAndResolverBuilder() error {
	logger.Infof("original dial target is: %q", cc.target)

    // 尝试直接解析target并获取相应的resolver.Builder
	var rb resolver.Builder
	parsedTarget, err := parseTarget(cc.target)
	if err == nil {
		rb = cc.getResolver(parsedTarget.URL.Scheme)
		if rb != nil {
			cc.parsedTarget = parsedTarget
			cc.resolverBuilder = rb
			return nil
		}
	}

    // target没有指定schema(比如我们的localhost:50052是没有指定schema的)或者无法匹配schema对应的resolver.Builder
    // 那么使用默认的schema,即dns
	defScheme := cc.dopts.defaultScheme
	if internal.UserSetDefaultScheme {
		defScheme = resolver.GetDefaultScheme()
	}

    // 此处canonicalTarget为dns:///localhost:50052
    // "//"与第三个"/"之间的是authority
	canonicalTarget := defScheme + ":///" + cc.target

    // 再次尝试target并获取相应的resolver.Builder,此处会拿到dns.dnsBuilder
	parsedTarget, err = parseTarget(canonicalTarget)
	if err != nil {
		return err
	}
	rb = cc.getResolver(parsedTarget.URL.Scheme)
	if rb == nil {
		return fmt.Errorf("could not get resolver for default scheme: %q", parsedTarget.URL.Scheme)
	}
    
    // 保存parsedTarget和resolverBuilder
	cc.parsedTarget = parsedTarget
	cc.resolverBuilder = rb
	return nil
}

那么resolverBuilder在什么时候会Build一个resolver出来呢?在ide的帮助下,可以直接定位到这个函数中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
func (ccr *ccResolverWrapper) start() error {
	errCh := make(chan error)
	ccr.serializer.TrySchedule(func(ctx context.Context) {
		if ctx.Err() != nil {
			return
		}
		opts := resolver.BuildOptions{
			DisableServiceConfig: ccr.cc.dopts.disableServiceConfig,
			DialCreds:            ccr.cc.dopts.copts.TransportCredentials,
			CredsBundle:          ccr.cc.dopts.copts.CredsBundle,
			Dialer:               ccr.cc.dopts.copts.Dialer,
			Authority:            ccr.cc.authority,
		}
		var err error
        // 这里Build resolver
		ccr.resolver, err = ccr.cc.resolverBuilder.Build(ccr.cc.parsedTarget, ccr, opts)
		errCh <- err
	})
	return <-errCh
}

这里的serializer是用来进行调用排队的,在多线程环境下某些任务需要线性执行。那么start方法又是谁调用的,ccResolverWrapper这个又是什么东西?最简单的,打个断点到start方法,直接看调用栈:

image-20241119160659829

所以我们可以确定的是,resolver是在发起RPC调用的时候才会被Build出来的。接着随着调用栈的步伐,来到newClientStream这个函数,在上一篇中讲过这个函数用于客户端创建一个stream,然后用这个stream来收发数据。

1
2
3
4
5
6
7
8
9
func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (_ ClientStream, err error) {
	if err := cc.idlenessMgr.OnCallBegin(); err != nil {
		return nil, err
	}
    
    ...
    
    return newStream(ctx, func() {})
}

在最后创建stream之前,会先调用idlenessMgr.OnCallBegin这个方法,idlenessMgr用于管理空闲连接,每次RPC的开始都会调用OnCallBegin;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// OnCallBegin is invoked at the start of every RPC.
func (m *Manager) OnCallBegin() error {
	if m.isClosed() {
		return nil
	}

    // 记录一次调用
	if atomic.AddInt32(&m.activeCallsCount, 1) > 0 {
		atomic.StoreInt32(&m.activeSinceLastTimerCheck, 1)
        // 本来就不是空闲,直接返回
		return nil
	}

	// 退出空闲模式
	if err := m.ExitIdleMode(); err != nil {
		// 发生错误,回滚这次调用记录
		atomic.AddInt32(&m.activeCallsCount, -1)
		return err
	}

	atomic.StoreInt32(&m.activeSinceLastTimerCheck, 1)
	return nil
}

接着往下看退出空闲模式如何处理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// ExitIdleMode instructs m to call the enforcer's ExitIdleMode and update m's
// internal state.
func (m *Manager) ExitIdleMode() error {
	...

    // 这里的enforcer就是cc
	if err := m.enforcer.ExitIdleMode(); err != nil {
		return fmt.Errorf("failed to exit idle mode: %w", err)
	}
    
    ...
}

func (i *idler) ExitIdleMode() error {
	return (*ClientConn)(i).exitIdleMode()
}

// exitIdleMode 退出连接的空闲模式,重新创建resolver和balancer
func (cc *ClientConn) exitIdleMode() (err error) {
	...

    // 使用resolverWrapper创建resolver
	if err := cc.resolverWrapper.start(); err != nil {
		return err
	}

	cc.addTraceEvent("exiting idle mode")
	return nil
}

这下我们知道,cc从空闲状态发起第一次RPC调用的时候,即退出空闲状态的时候,会Build resolver。具体怎么Build的我们定位到dns.dnsBuilder的Build方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
// 创建一个dns resolver,然后监控这个resolving过程,当发生更新的时候通知cc:
func (b *dnsBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
    // 解析出host和port,此处为localhost和50052
	host, port, err := parseTarget(target.Endpoint(), defaultPort)
	if err != nil {
		return nil, err
	}

	// host是IP,直接无需解析,调用UpdateState更新状态
	if ipAddr, ok := formatIP(host); ok {
        addr := []resolver.Address
		cc.UpdateState(resolver.State{Addresses: addr})
		return deadResolver{}, nil
	}

	// host不是IP,创建dnsResolver
	ctx, cancel := context.WithCancel(context.Background())
	d := &dnsResolver{
		host:                 host,
		port:                 port,
		ctx:                  ctx,
		cancel:               cancel,
		cc:                   cc,
		rn:                   make(chan struct{}, 1),
		disableServiceConfig: opts.DisableServiceConfig,
	}

    // 获取真正的resolver
	d.resolver, err = internal.NewNetResolver(target.URL.Host)
	if err != nil {
		return nil, err
	}

	d.wg.Add(1)
    // 开启监控,并更新状态
	go d.watcher()
	return d, nil
}

对于创建resolver这一步:

  1. 没有指定authority的话,获取标准库的net.DefaultResolver,这个resolver会使用/etc/hosts的映射,以及本地/etc/resolve.conf指定的DNS服务器
  2. 否则使用authority指定的nameserver

至此,代码中还没使用resolver去进行域名解析,因为域名解析出来可能有多个IP地址并且不是永久绑定的,也就是说可能会发生变化,所以又起了一个goroutine运行watcher方法用于轮询监控这样的变化并通知cc:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
func (d *dnsResolver) watcher() {
	defer d.wg.Done()
	backoffIndex := 1
	for {
        // 解析域名进行服务发现,返回解析结果
		state, err := d.lookup()
        // 报告错误,或者解析成功并更新状态
		if err != nil {
			d.cc.ReportError(err)
		} else {
			err = d.cc.UpdateState(*state)
		}

		var nextResolutionTime time.Time
		if err == nil {
			// 解析成功,等待下一次解析
			backoffIndex = 1
			nextResolutionTime = internal.TimeNowFunc().Add(MinResolutionInterval)
			select {
			case <-d.ctx.Done():
				return
			case <-d.rn:
			}
		} else {
            // 解析失败,基于指数退避等待下一次重试
			nextResolutionTime = internal.TimeNowFunc().Add(backoff.DefaultExponential.Backoff(backoffIndex))
			backoffIndex++
		}
		select {
		case <-d.ctx.Done():
			return
		case <-internal.TimeAfterFunc(internal.TimeUntilFunc(nextResolutionTime)):
		}
	}
}

核心就在于lookup方法,里面调用了net.Resolver.LookupHost方法进行DNS域名解析,并将解析结果封装在State中:

1
2
3
4
5
6
type State struct {
	Addresses []Address
	Endpoints []Endpoint
	ServiceConfig *serviceconfig.ParseResult
	Attributes *attributes.Attributes
}

后续就是将服务发现的结果通过cc.UpdateState去更新其它组件的状态。至此基于指定端口号以及dns A记录的简单服务发现就分析暂时到这里。

我们接着往下,实际上应该使用dns SVR记录进行服务发现,因为一般target不会指定为A记录+端口,而是服务名,那么就需要SVR记录的支持,SVR记录包含:

1
2
3
4
5
6
7
// An SRV represents a single DNS SRV record.
type SRV struct {
	Target   string // 主机A记录
	Port     uint16 // 端口
	Priority uint16 // 优先级
	Weight   uint16 // 权重
}

而且基于LookupHost,返回的只有ip列表,没有其他元数据的话,无法支持高级流量路由比如蓝绿部署、金丝雀发布等高级路由策略,限制比较大。所以我们再看一下lookupSVR:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// 查找 SRV 记录,用于查询到的均衡器地址列表
func (d *dnsResolver) lookupSRV(ctx context.Context) ([]resolver.Address, error) {
	if !EnableSRVLookups {
		return nil, nil
	}
	var newAddrs []resolver.Address
    // 根据 SRV 记录,获取 grpclb 的记录(获取所有负载均衡节点名称)
	_, srvs, err := d.resolver.LookupSRV(ctx, "grpclb", "tcp", d.host)
	if err != nil {
		err = handleDNSError(err, "SRV") // may become nil
		return nil, err
	}
    // 根据上一步获取的均衡节点名称,查找对应的负载均衡器地址列表(A 记录)
	for _, s := range srvs {
		lbAddrs, err := d.resolver.LookupHost(ctx, s.Target)
		if err != nil {
			err = handleDNSError(err, "A") // may become nil
			if err == nil {
				continue
			}
			return nil, err
		}
		for _, a := range lbAddrs {
			ip, ok := formatIP(a)
			if !ok {
				return nil, fmt.Errorf("dns: error parsing A record IP address %v", a)
			}
			addr := ip + ":" + strconv.Itoa(int(s.Port))
			newAddrs = append(newAddrs, resolver.Address{Addr: addr, ServerName: s.Target})
		}
	}
	return newAddrs, nil
}

还有一种dns lookup类型,lookupTXT,从特定的dns服务器获取TXT记录,里面包含了ServiceConfig的json字符串,用于设置客户端在与服务端交互时的一些配置,比如负载均衡配置、RPC方法调用配置、重试配置、健康检查配置等。

综上,lookup查询了三种dns记录(A、SVR、TXT),分别用于发现:服务节点host、负载均衡节点、ServiceConfig。

参考

https://github.com/lubanproj/grpc-read/blob/master/5-grpc%20%E6%9C%8D%E5%8A%A1%E5%8F%91%E7%8E%B0.md

https://nos-ae.github.io/posts/k8s%E6%9C%8D%E5%8A%A1%E5%8F%91%E7%8E%B0/

本文由作者按照 CC BY 4.0 进行授权