Appearance
21控制面:实现xDS配置管理
今天我要和你分享的是如何用代码实现 Service Mesh 控制面,实现 xDS 配置管理,以及数据面 Sidecar 如何与控制面进行通信。
这一讲的代码实现在https://github.com/beck917/easypilot和https://github.com/beck917/easymesh中,你可以配合代码阅读。
Envoy 官方的实例和著名的控制面 Istio,都选择使用 Go 语言来实现,所以这里我们也采用 Go 语言实现控制面代码。控制面的基本代码相对简单,主要复杂点在于和公司运维环境的配合上,所以各个公司的控制面大多数都要自研或者二次开发。
代码解析
我们直接使用 Envoy 官方提供的 go -control-plane 类库来实现具体功能。
首先看一下 main.go 的入口代码,这部分代码创建了一个 gRPC Server,来提供 xDS 服务。
go-control-plane 类库使用 Snapshot 的方式存储数据,比如 EDS 中的 Endpoint 数据,如果进行数据更新,只需刷新 Snapshot 数据就可以了。
这里首先创建了 SnapshotCache,然后用 Mock 的数据生成了一份 Snapshot,最后将 Snapshop 存储到 Cache 中,为对应的 NodeID 生成一份 Cache 数据,这样 xDS Server 在提供服务的时候,就可以将对应的数据发送到客户端了:
java
// 创建缓存
cache := cachev3.NewSnapshotCache(false, cachev3.IDHash{}, l)
// 创建 snapshot 数据,用于给数据面提供 xds 数据
snapshot := example.GenerateSnapshot()
if err := snapshot.Consistent(); err != nil {
l.Errorf("snapshot inconsistency: %+v\n%+v", snapshot, err)
os.Exit(1)
}
l.Debugf("will serve snapshot %+v", snapshot)
// 将 snapshot 写入缓存
if err := cache.SetSnapshot(nodeID, snapshot); err != nil {
l.Errorf("snapshot error %q for %+v", err, snapshot)
os.Exit(1)
}
// 运行 xds 服务器
ctx := context.Background()
cb := &testv3.Callbacks{Debug: l.Debug}
srv := serverv3.NewServer(ctx, cache, cb)
example.RunServer(ctx, srv, port)
接下来,我们进入具体方法的学习,先来看 example.GenerateSnapshot() 这个方法,这个方法模拟了一些测试数据。
下面这段代码创建了 CDS、EDS、RDS、LDS 对应的数据,其中包括集群、节点、路由、监听器等数据:
java
func GenerateSnapshot() cache.Snapshot {
return cache.NewSnapshot(
"1",
[]types.Resource{}, // endpoints
[]types.Resource{makeCluster(ClusterName)},
[]types.Resource{makeRoute(RouteName, ClusterName)},
[]types.Resource{makeHTTPListener(ListenerName, RouteName)},
[]types.Resource{}, // runtimes
[]types.Resource{}, // secrets
)
}
我们再来看 CDS 和 EDS 的数据创建。
Cluster 和 Endpoint 的数据结构在第15 讲《xDS:控制面和数据面的通信桥梁》中有提到过,这里我们结合具体的代码进行深入了解。
我们先来看 Cluster 的数据类型的参数:
java
func makeCluster(clusterName string) *cluster.Cluster {
return &cluster.Cluster{
Name: clusterName,
ConnectTimeout: ptypes.DurationProto(5 * time.Second),
ClusterDiscoveryType: &cluster.Cluster_Type{Type: cluster.Cluster_LOGICAL_DNS},
LbPolicy: cluster.Cluster_ROUND_ROBIN,
LoadAssignment: makeEndpoint(clusterName),
DnsLookupFamily: cluster.Cluster_V4_ONLY,
}
}
func makeEndpoint(clusterName string) *endpoint.ClusterLoadAssignment {
return &endpoint.ClusterLoadAssignment{
ClusterName: clusterName,
Endpoints: []*endpoint.LocalityLbEndpoints{ {
LbEndpoints: []*endpoint.LbEndpoint{ {
HostIdentifier: &endpoint.LbEndpoint_Endpoint{
Endpoint: &endpoint.Endpoint{
Address: &core.Address{
Address: &core.Address_SocketAddress{
SocketAddress: &core.SocketAddress{
Protocol: core.SocketAddress_TCP,
Address: UpstreamHost,
PortSpecifier: &core.SocketAddress_PortValue{
PortValue: UpstreamPort,
},
},
},
},
},
},
}},
}},
}
}
接下来看一下 example.RunServer(ctx, srv, port) 这个方法。该方法首先创建了一个 gRPC Server,并设置了 grpcMaxConcurrentStreams:
java
// 启动 xds server
func RunServer(ctx context.Context, srv3 serverv3.Server, port uint) {
// 设置 grpcMaxConcurrentStreams 参数,用于增加默认的单连接并发数量
var grpcOptions []grpc.ServerOption
grpcOptions = append(grpcOptions, grpc.MaxConcurrentStreams(grpcMaxConcurrentStreams))
grpcServer := grpc.NewServer(grpcOptions...)
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", port))
if err != nil {
log.Fatal(err)
}
registerServer(grpcServer, srv3)
log.Printf("management server listening on %d\n", port)
if err = grpcServer.Serve(lis); err != nil {
log.Println(err)
}
}
接来下通过 registerServer 方法,将提供 xDS 服务的 Service 注册到已经创建好的 gRPC Server 上面,用于提供对应的 xDS Server服务。你要注意,这里有一个 RegisterAggregatedDiscoveryServiceServer 的 Server 提供聚合查询服务,通过这个 Server 可以为所有的 xDS 协议提供服务:
java
func registerServer(grpcServer *grpc.Server, server serverv3.Server) {
// 注册服务
discoverygrpc.RegisterAggregatedDiscoveryServiceServer(grpcServer, server)
endpointservice.RegisterEndpointDiscoveryServiceServer(grpcServer, server)
clusterservice.RegisterClusterDiscoveryServiceServer(grpcServer, server)
routeservice.RegisterRouteDiscoveryServiceServer(grpcServer, server)
listenerservice.RegisterListenerDiscoveryServiceServer(grpcServer, server)
secretservice.RegisterSecretDiscoveryServiceServer(grpcServer, server)
runtimeservice.RegisterRuntimeDiscoveryServiceServer(grpcServer, server)
}
至此,一个简单的 xDS Server 就讲完了。这里的 xDS Server 填充了一些测试数据来模拟结果,如果在生产环境使用,需要接入相应的平台才能提供数据。
比如在 Istio 中通过 Kubernetes 的 API 为 xDS Server 提供数据;通过在 YAML 中声明 VirtualService 类型来为 RDS 服务提供数据。又或者使用了第三方的注册中心,比如 Consul,则需要通过 Consul API 为 EDS 服务提供数据。
接下来,我们通过一个 EDS Client,来看一下如何在数据面 Sidecar 中通过 xDS 和控制面协议通信。这部分代码还是在https://github.com/beck917/easymesh代码仓库中。
我们先来看 utils/eds 文件夹下的代码。
首先通过 NewWithInterval 创建一个 EDS 对象,然后连接 EDS 服务器,并且创建一个 Goroutine,通过 Loop 的方法定时发送请求数据到 EDS 服务器。这里我们采用了推拉结合的方式来获取 EDS 数据:
java
func NewWithInterval(addr string, interval time.Duration) (*adapter, error) {
if interval <= 0 {
interval = time.Minute
}
urlobj, err := url.Parse(addr)
if err == nil && len(urlobj.Host) > 0 {
addr = urlobj.Host
}
eds := &adapter{
addr: addr,
node: &core.Node{
Id: generateNodeID(),
},
single: new(singleflight.Group),
cache: registry.NewServiceCache(nil, 0),
streams: sync.Map{},
fetchInterval: interval,
}
err = eds.connect()
if err != nil {
return nil, errors.Wrap(err)
}
// 定时拉取数据
go eds.loop()
return eds, nil
}
同时在负载均衡器中,我们采用 getService 来获取服务节点数据。这个方法首先会读取缓存中的数据,如果缓存中没有数据则会通过创建和 EDS 服务器建立 Watch 机制,动态更新 Cache 中的数据:
java
func (eds *adapter) GetServices(name string, opts ...registry.DiscoveryOption) (services []*registry.Service, err error) {
if eds.ccErr != nil {
err = errors.Wrap(eds.ccErr)
return
}
options := registry.NewCommonDiscoveryOption(opts...)
key := registry.NewServiceKey(name, options.Tags, options.DC)
// 第一步,从缓存读取
services, err = eds.cache.GetServices(key)
if err == nil && len(services) > 0 {
return
}
// 从 eds 服务获取
return eds.fetchServices(key, options.Tags)
}
startWatch 方法创建了一个新的 Goroutine 用于监听 EDS 数据的变化,通过 parseStream 阻塞获取 EDS Server 的推送获取数据,获取数据后,将数据存入 Cache 中:
java
func (eds *adapter) startWatch(client discovery.AggregatedDiscoveryService_StreamAggregatedResourcesClient, key registry.ServiceKey, tags []string) {
logger.Infof("eds.Watch(%+v) ...", key)
go func(streamClient discovery.AggregatedDiscoveryService_StreamAggregatedResourcesClient, serviceKey registry.ServiceKey) {
defer func() {
if panicErr := recover(); panicErr != nil {
logger.Errorf("eds.startWatch() panic: %+v", panicErr)
} else {
//eds.streams.Delete(key)
}
}()
for {
services, err := eds.parseStream(eds.getStreamClient(key), tags)
if err != nil {
logger.Errorf("eds.Watch(%+v): %+v", key, err)
time.Sleep(1 * time.Second)
continue
}
if len(services) <= 0 {
logger.Warnf("eds.Watch(%+v): empty services, ignored!", key)
continue
}
ipv4s := make([]string, len(services))
for i, svc := range services {
ipv4s[i] = svc.ServiceIP()
}
logger.Infof("eds.Watch(%+v): total=%d, services=%+v", key, len(services), ipv4s)
eds.cache.Set(key, services)
if eds.watcher != nil {
eds.watcher.Handle(key, services)
}
}
}(client, key)
}
至此,整个 EDS 的交互过程就讲完了,需要注意的是:当发送数据,或者获取数据错误的时候,需要重新连接 EDS 服务器,否则一旦网络异常或者 EDS 服务器重启,导致 EDS 连接断开,就无法正确地更新数据了。
现在,数据面和控制面如何通过 xDS 交互就讲完了。但是如果要在代码中真正地跑起来,还需要和负载均衡器结合,下面我们就看一下如何在 Sidecar 中实现负载均衡器。
和传统的在 Client SDK 中实现负载均衡器有所不同,SDK 中的负载均衡器一般是创建好后传入 Client 对象。Client 对象在发出请求前,先调用负载均衡器的 Next 方法,获取到被调服务的 IP,然后进行服务请求。比如下面的代码:
java
var srv *registry.Service
srv, err = tp.rr.Next(ireq.Context(), host)
if err != nil {
logger.Errorf("get host:%v err:%v", host, err)
continue
}
ireq.URL.Host = srv.Addr()
resp, err = tp.rt.RoundTrip(ireq)、
但在 Sidecar 中,负载均衡器组件也被抽象成了中间件 的方式,具体在 Go-Micro 中,就是创建一个新的 Wrapper 。而且这些中间件都被统一写在了 Server 的中间件中,并不需要写在 Client 的中间件中,因为这种做法的抽象程度更好,模型也更加统一,对于理解和编写、维护代码都有帮助。
在讲解负载均衡器的中间件代码前,我们先看一下路由器的中间件。路由器中间件的作用是根据我们的请求参数,比如 header、path 等,找到对应的服务。只有通过路由器中间件找到了对应的服务名,我们才能让负载均衡器中间件通过服务名找到对应的 Endpoint。
这个中间件通过 match 方法获取到了对应的 router 对象和 router 对象内包含的 handlers,这个 handler 实际上是绑定在 router 上的另外一套中间件体系,这样做的目的是让中间件更容易获取 router 的一些信息,否则还需要通过 routername 查询才能获取到相关信息。
java
// HandlerWrapper 服务端中间件
func (wrap *Wrappers) HandlerWrapper() server.HandlerWrapper {
return func(next server.HandlerFunc) server.HandlerFunc {
return func(ctx context.Context, req server.Request, rsp interface{}) error {
route, handlers := wrap.match(req.Service(), req.Method())
if route == nil {
return errors.NotFound("mesh.wrap.router", "route mismatch")
}
ctx = context.WithValue(ctx, defs.CtxRouter{}, route)
handler := func(ctx context.Context, req server.Request, rsp interface{}) error {
return next(ctx, req, rsp)
}
// 嵌套自定义 wraps
for i := len(handlers); i > 0; i-- {
handler = handlers[i-1](handler)
}
return handler(ctx, req, rsp)
}
}
}
每个中间件,都有一个 Reload 的方法,这个 Reload 方法用来更新 xDS 获取的配置到具体的中间件对象中。比如 router 中间件的 Reload 方法,就将所有的路由信息解析出来,存储在 ServiceRoutes 对象中,用于 match 方法的匹配,这样才能找到请求具体对应的服务名。
接下来,我们来看负载均衡器的中间件,这个中间件也绑定在路由对象中,当匹配到对应的路由后,会自动通过 Next 方法调用这个中间件。
java
// 注入中间件
routeHandlers := handlers
lbWrap, err := loadbalance.HandlerWrapper(srv, config, dis)
if err != nil {
return fmt.Errorf("service key:%s empty route:%+v build lb wrap err:%v", srvKey, route, err)
}
负载均衡器会通过 nextNode 的方法获取请求的节点,nextNode 方法直接从 LB 对象中获取相应的 Endpoint 数据。这里和 EDS 的类库有一个交互,EDS 类库在获取到新的数据时会主动更新 LB 对象中的数据,以确保 LB 中的 Next 方法可以一直获取到最新的节点数据。
java
// HandlerWrapper 负载均衡和服务发现
func HandlerWrapper(cluster *qdx.Cluster, config *types.Config, dis qudiscovery.Discovery) (server.HandlerWrapper, error) {
nextNode, err := newNext(cluster, config.Upstreams, dis)
if err != nil {
return nil, err
}
return func(next server.HandlerFunc) server.HandlerFunc {
return func(ctx context.Context, req server.Request, rsp interface{}) error {
node, err := nextNode()
if err != nil {
return errors.BadRequest("mesh.wrap.lb", "lb.next err:%v", err)
}
ctx = context.WithValue(ctx, defs.CtxAddress{}, node.Address)
return next(ctx, req, rsp)
}
}, nil
}
至此,负载均衡器和路由器原理的代码解析部分就讲完了,下面我们对今天的内容做一个简单的总结。
总结
这一讲我主要介绍了如何在代码层面实现 xDS Server,以及数据面如何和控制面通过 xDS 协议进行交互。通过今天的学习,相信你已经对 xDS 协议有了更清晰的认识,也掌握了如何实现一个完整的 Mesh 系统。另外,我们还从代码层面讲解了路由器和负载均衡器模块,这两个部分的学习,有助于你更清晰地理解 Sidecar 的原理。
结合今天讲解的内容,如果让你实现 RDS 的交互,也就是路由组件和 RDS Server 的交互,你会如何做呢,欢迎在留言区和我分享你的观点。
今天内容到这里就结束了,下一讲我会讲解 Service Mesh 如何落地:在实践落地中可能遇到的问题和困难。众所周知,任何新的架构在实践落地中都会遇到非常大的阻力,Service Mesh 也不例外。我们下一讲再见。