Skip to content

21控制面:实现xDS配置管理

今天我要和你分享的是如何用代码实现 Service Mesh 控制面,实现 xDS 配置管理,以及数据面 Sidecar 如何与控制面进行通信。

这一讲的代码实现在https://github.com/beck917/easypilothttps://github.com/beck917/easymesh中,你可以配合代码阅读。

Envoy 官方的实例和著名的控制面 Istio,都选择使用 Go 语言来实现,所以这里我们也采用 Go 语言实现控制面代码。控制面的基本代码相对简单,主要复杂点在于和公司运维环境的配合上,所以各个公司的控制面大多数都要自研或者二次开发。

代码解析

我们直接使用 Envoy 官方提供的 go -control-plane 类库来实现具体功能。

首先看一下 main.go 的入口代码,这部分代码创建了一个 gRPC Server,来提供 xDS 服务。

go-control-plane 类库使用 Snapshot 的方式存储数据,比如 EDS 中的 Endpoint 数据,如果进行数据更新,只需刷新 Snapshot 数据就可以了。

这里首先创建了 SnapshotCache,然后用 Mock 的数据生成了一份 Snapshot,最后将 Snapshop 存储到 Cache 中,为对应的 NodeID 生成一份 Cache 数据,这样 xDS Server 在提供服务的时候,就可以将对应的数据发送到客户端了:

java
    // 创建缓存
    cache := cachev3.NewSnapshotCache(false, cachev3.IDHash{}, l)
    // 创建 snapshot 数据,用于给数据面提供 xds 数据
    snapshot := example.GenerateSnapshot()
    if err := snapshot.Consistent(); err != nil {
        l.Errorf("snapshot inconsistency: %+v\n%+v", snapshot, err)
        os.Exit(1)
    }
    l.Debugf("will serve snapshot %+v", snapshot)
    // 将 snapshot 写入缓存
    if err := cache.SetSnapshot(nodeID, snapshot); err != nil {
        l.Errorf("snapshot error %q for %+v", err, snapshot)
        os.Exit(1)
    }
    // 运行 xds 服务器
    ctx := context.Background()
    cb := &testv3.Callbacks{Debug: l.Debug}
    srv := serverv3.NewServer(ctx, cache, cb)
    example.RunServer(ctx, srv, port)

接下来,我们进入具体方法的学习,先来看 example.GenerateSnapshot() 这个方法,这个方法模拟了一些测试数据。

下面这段代码创建了 CDS、EDS、RDS、LDS 对应的数据,其中包括集群、节点、路由、监听器等数据:

java
func GenerateSnapshot() cache.Snapshot {
		return cache.NewSnapshot(
			"1",
			[]types.Resource{}, // endpoints
			[]types.Resource{makeCluster(ClusterName)},
			[]types.Resource{makeRoute(RouteName, ClusterName)},
			[]types.Resource{makeHTTPListener(ListenerName, RouteName)},
			[]types.Resource{}, // runtimes
			[]types.Resource{}, // secrets
		)
	}

我们再来看 CDS 和 EDS 的数据创建。

Cluster 和 Endpoint 的数据结构在第15 讲《xDS:控制面和数据面的通信桥梁》中有提到过,这里我们结合具体的代码进行深入了解。

我们先来看 Cluster 的数据类型的参数:

java
func makeCluster(clusterName string) *cluster.Cluster {
		return &cluster.Cluster{
			Name:                 clusterName,
			ConnectTimeout:       ptypes.DurationProto(5 * time.Second),
			ClusterDiscoveryType: &cluster.Cluster_Type{Type: cluster.Cluster_LOGICAL_DNS},
			LbPolicy:             cluster.Cluster_ROUND_ROBIN,
			LoadAssignment:       makeEndpoint(clusterName),
			DnsLookupFamily:      cluster.Cluster_V4_ONLY,
		}
	}
	
	func makeEndpoint(clusterName string) *endpoint.ClusterLoadAssignment {
		return &endpoint.ClusterLoadAssignment{
			ClusterName: clusterName,
			Endpoints: []*endpoint.LocalityLbEndpoints{ {
				LbEndpoints: []*endpoint.LbEndpoint{ {
					HostIdentifier: &endpoint.LbEndpoint_Endpoint{
						Endpoint: &endpoint.Endpoint{
							Address: &core.Address{
								Address: &core.Address_SocketAddress{
									SocketAddress: &core.SocketAddress{
										Protocol: core.SocketAddress_TCP,
										Address:  UpstreamHost,
										PortSpecifier: &core.SocketAddress_PortValue{
											PortValue: UpstreamPort,
										},
									},
								},
							},
						},
					},
				}},
			}},
		}
	}

接下来看一下 example.RunServer(ctx, srv, port) 这个方法。该方法首先创建了一个 gRPC Server,并设置了 grpcMaxConcurrentStreams:

java
// 启动 xds server
	func RunServer(ctx context.Context, srv3 serverv3.Server, port uint) {
		// 设置 grpcMaxConcurrentStreams 参数,用于增加默认的单连接并发数量
		var grpcOptions []grpc.ServerOption
		grpcOptions = append(grpcOptions, grpc.MaxConcurrentStreams(grpcMaxConcurrentStreams))
		grpcServer := grpc.NewServer(grpcOptions...)
	
		lis, err := net.Listen("tcp", fmt.Sprintf(":%d", port))
		if err != nil {
			log.Fatal(err)
		}
	
		registerServer(grpcServer, srv3)
	
		log.Printf("management server listening on %d\n", port)
		if err = grpcServer.Serve(lis); err != nil {
			log.Println(err)
		}
	}

接来下通过 registerServer 方法,将提供 xDS 服务的 Service 注册到已经创建好的 gRPC Server 上面,用于提供对应的 xDS Server服务。你要注意,这里有一个 RegisterAggregatedDiscoveryServiceServer 的 Server 提供聚合查询服务,通过这个 Server 可以为所有的 xDS 协议提供服务:

java
func registerServer(grpcServer *grpc.Server, server serverv3.Server) {
		// 注册服务
		discoverygrpc.RegisterAggregatedDiscoveryServiceServer(grpcServer, server)
		endpointservice.RegisterEndpointDiscoveryServiceServer(grpcServer, server)
		clusterservice.RegisterClusterDiscoveryServiceServer(grpcServer, server)
		routeservice.RegisterRouteDiscoveryServiceServer(grpcServer, server)
		listenerservice.RegisterListenerDiscoveryServiceServer(grpcServer, server)
		secretservice.RegisterSecretDiscoveryServiceServer(grpcServer, server)
		runtimeservice.RegisterRuntimeDiscoveryServiceServer(grpcServer, server)
	}

至此,一个简单的 xDS Server 就讲完了。这里的 xDS Server 填充了一些测试数据来模拟结果,如果在生产环境使用,需要接入相应的平台才能提供数据。

比如在 Istio 中通过 Kubernetes 的 API 为 xDS Server 提供数据;通过在 YAML 中声明 VirtualService 类型来为 RDS 服务提供数据。又或者使用了第三方的注册中心,比如 Consul,则需要通过 Consul API 为 EDS 服务提供数据。

接下来,我们通过一个 EDS Client,来看一下如何在数据面 Sidecar 中通过 xDS 和控制面协议通信。这部分代码还是在https://github.com/beck917/easymesh代码仓库中。

我们先来看 utils/eds 文件夹下的代码。

首先通过 NewWithInterval 创建一个 EDS 对象,然后连接 EDS 服务器,并且创建一个 Goroutine,通过 Loop 的方法定时发送请求数据到 EDS 服务器。这里我们采用了推拉结合的方式来获取 EDS 数据:

java
func NewWithInterval(addr string, interval time.Duration) (*adapter, error) {
    if interval <= 0 {
        interval = time.Minute
    }
    urlobj, err := url.Parse(addr)
    if err == nil && len(urlobj.Host) > 0 {
        addr = urlobj.Host
    }
    eds := &adapter{
        addr: addr,
        node: &core.Node{
            Id: generateNodeID(),
        },
        single:        new(singleflight.Group),
        cache:         registry.NewServiceCache(nil, 0),
        streams:       sync.Map{},
        fetchInterval: interval,
    }
    err = eds.connect()
    if err != nil {
        return nil, errors.Wrap(err)
    }
    // 定时拉取数据
    go eds.loop()
    return eds, nil
}

同时在负载均衡器中,我们采用 getService 来获取服务节点数据。这个方法首先会读取缓存中的数据,如果缓存中没有数据则会通过创建和 EDS 服务器建立 Watch 机制,动态更新 Cache 中的数据:

java
func (eds *adapter) GetServices(name string, opts ...registry.DiscoveryOption) (services []*registry.Service, err error) {
    if eds.ccErr != nil {
        err = errors.Wrap(eds.ccErr)
        return
    }
    options := registry.NewCommonDiscoveryOption(opts...)
    key := registry.NewServiceKey(name, options.Tags, options.DC)
    // 第一步,从缓存读取
    services, err = eds.cache.GetServices(key)
    if err == nil && len(services) > 0 {
        return
    }
    // 从 eds 服务获取
    return eds.fetchServices(key, options.Tags)
}

startWatch 方法创建了一个新的 Goroutine 用于监听 EDS 数据的变化,通过 parseStream 阻塞获取 EDS Server 的推送获取数据,获取数据后,将数据存入 Cache 中:

java
func (eds *adapter) startWatch(client discovery.AggregatedDiscoveryService_StreamAggregatedResourcesClient, key registry.ServiceKey, tags []string) {
    logger.Infof("eds.Watch(%+v) ...", key)
    go func(streamClient discovery.AggregatedDiscoveryService_StreamAggregatedResourcesClient, serviceKey registry.ServiceKey) {
        defer func() {
            if panicErr := recover(); panicErr != nil {
                logger.Errorf("eds.startWatch() panic: %+v", panicErr)
            } else {
                //eds.streams.Delete(key)
            }
        }()
        for {
            services, err := eds.parseStream(eds.getStreamClient(key), tags)
            if err != nil {
                logger.Errorf("eds.Watch(%+v): %+v", key, err)
                time.Sleep(1 * time.Second)
                continue
            }
            if len(services) <= 0 {
                logger.Warnf("eds.Watch(%+v): empty services, ignored!", key)
                continue
            }
            ipv4s := make([]string, len(services))
            for i, svc := range services {
                ipv4s[i] = svc.ServiceIP()
            }
            logger.Infof("eds.Watch(%+v): total=%d, services=%+v", key, len(services), ipv4s)
            eds.cache.Set(key, services)
            if eds.watcher != nil {
                eds.watcher.Handle(key, services)
            }
        }
    }(client, key)
}

至此,整个 EDS 的交互过程就讲完了,需要注意的是:当发送数据,或者获取数据错误的时候,需要重新连接 EDS 服务器,否则一旦网络异常或者 EDS 服务器重启,导致 EDS 连接断开,就无法正确地更新数据了。

现在,数据面和控制面如何通过 xDS 交互就讲完了。但是如果要在代码中真正地跑起来,还需要和负载均衡器结合,下面我们就看一下如何在 Sidecar 中实现负载均衡器。

和传统的在 Client SDK 中实现负载均衡器有所不同,SDK 中的负载均衡器一般是创建好后传入 Client 对象。Client 对象在发出请求前,先调用负载均衡器的 Next 方法,获取到被调服务的 IP,然后进行服务请求。比如下面的代码:

java
var srv *registry.Service
srv, err = tp.rr.Next(ireq.Context(), host)
if err != nil {
	logger.Errorf("get host:%v err:%v", host, err)
	continue
}
ireq.URL.Host = srv.Addr()
resp, err = tp.rt.RoundTrip(ireq)、

但在 Sidecar 中,负载均衡器组件也被抽象成了中间件 的方式,具体在 Go-Micro 中,就是创建一个新的 Wrapper 。而且这些中间件都被统一写在了 Server 的中间件中,并不需要写在 Client 的中间件中,因为这种做法的抽象程度更好,模型也更加统一,对于理解和编写、维护代码都有帮助。

在讲解负载均衡器的中间件代码前,我们先看一下路由器的中间件。路由器中间件的作用是根据我们的请求参数,比如 header、path 等,找到对应的服务。只有通过路由器中间件找到了对应的服务名,我们才能让负载均衡器中间件通过服务名找到对应的 Endpoint。

这个中间件通过 match 方法获取到了对应的 router 对象和 router 对象内包含的 handlers,这个 handler 实际上是绑定在 router 上的另外一套中间件体系,这样做的目的是让中间件更容易获取 router 的一些信息,否则还需要通过 routername 查询才能获取到相关信息。

java
// HandlerWrapper 服务端中间件
func (wrap *Wrappers) HandlerWrapper() server.HandlerWrapper {
	return func(next server.HandlerFunc) server.HandlerFunc {
		return func(ctx context.Context, req server.Request, rsp interface{}) error {
			route, handlers := wrap.match(req.Service(), req.Method())
			if route == nil {
				return errors.NotFound("mesh.wrap.router", "route mismatch")
			}
			ctx = context.WithValue(ctx, defs.CtxRouter{}, route)
			handler := func(ctx context.Context, req server.Request, rsp interface{}) error {
				return next(ctx, req, rsp)
			}
			// 嵌套自定义 wraps
			for i := len(handlers); i > 0; i-- {
				handler = handlers[i-1](handler)
			}
			return handler(ctx, req, rsp)
		}
	}
}

每个中间件,都有一个 Reload 的方法,这个 Reload 方法用来更新 xDS 获取的配置到具体的中间件对象中。比如 router 中间件的 Reload 方法,就将所有的路由信息解析出来,存储在 ServiceRoutes 对象中,用于 match 方法的匹配,这样才能找到请求具体对应的服务名。

接下来,我们来看负载均衡器的中间件,这个中间件也绑定在路由对象中,当匹配到对应的路由后,会自动通过 Next 方法调用这个中间件。

java
// 注入中间件
routeHandlers := handlers
lbWrap, err := loadbalance.HandlerWrapper(srv, config, dis)
if err != nil {
    return fmt.Errorf("service key:%s empty route:%+v build lb wrap err:%v", srvKey, route, err)
}

负载均衡器会通过 nextNode 的方法获取请求的节点,nextNode 方法直接从 LB 对象中获取相应的 Endpoint 数据。这里和 EDS 的类库有一个交互,EDS 类库在获取到新的数据时会主动更新 LB 对象中的数据,以确保 LB 中的 Next 方法可以一直获取到最新的节点数据。

java
// HandlerWrapper 负载均衡和服务发现
func HandlerWrapper(cluster *qdx.Cluster, config *types.Config, dis qudiscovery.Discovery) (server.HandlerWrapper, error) {
	nextNode, err := newNext(cluster, config.Upstreams, dis)
	if err != nil {
		return nil, err
	}
	return func(next server.HandlerFunc) server.HandlerFunc {
		return func(ctx context.Context, req server.Request, rsp interface{}) error {
			node, err := nextNode()
			if err != nil {
				return errors.BadRequest("mesh.wrap.lb", "lb.next err:%v", err)
			}
			ctx = context.WithValue(ctx, defs.CtxAddress{}, node.Address)
			return next(ctx, req, rsp)
		}
	}, nil
}

至此,负载均衡器和路由器原理的代码解析部分就讲完了,下面我们对今天的内容做一个简单的总结。

总结

这一讲我主要介绍了如何在代码层面实现 xDS Server,以及数据面如何和控制面通过 xDS 协议进行交互。通过今天的学习,相信你已经对 xDS 协议有了更清晰的认识,也掌握了如何实现一个完整的 Mesh 系统。另外,我们还从代码层面讲解了路由器和负载均衡器模块,这两个部分的学习,有助于你更清晰地理解 Sidecar 的原理。

结合今天讲解的内容,如果让你实现 RDS 的交互,也就是路由组件和 RDS Server 的交互,你会如何做呢,欢迎在留言区和我分享你的观点。

今天内容到这里就结束了,下一讲我会讲解 Service Mesh 如何落地:在实践落地中可能遇到的问题和困难。众所周知,任何新的架构在实践落地中都会遇到非常大的阻力,Service Mesh 也不例外。我们下一讲再见。