Skip to content

17服务端如何处理客户端的一次请求?

本专栏的第二模块主要讲了 etcd 实现的核心原理,从整体剖析 etcd 的架构到依次介绍通信接口、存储机制、etcd-raft 分布式一致性、MVCC 多版本控制、事务、watch 机制、lease 租约以及 etcd 启动过程的分析。

这一讲我们将会介绍 etcd 服务端处理客户端请求的完整过程,在回忆之前所讲内容的同时,对服务端处理客户端请求的过程进行总结。

请求过程概述

我们先来回顾一下 etcd 的整体架构。客户端访问 etcd 服务端,按照分层的架构可以划分为客户端层、API 接口层、etcd Raft 层、业务逻辑层以及 etcd 存储。

客户端层如 clientv3 库和 etcdctl 等工具,用户通过 RESTful 方式进行调用,降低了 etcd 的使用复杂度;API 接口层提供了客户端访问服务端的通信协议和接口定义,以及服务端节点之间相互通信的协议。etcd v3 使用 gRPC 作为消息传输协议;etcd Raft 层负责 Leader 选举和日志复制等功能,除了与本节点的 etcd server 通信之外,还与集群中的其他 etcd 节点进行交互,实现分布式一致性数据同步的关键工作;etcd 的业务逻辑层,包括鉴权、租约、KVServer、MVCC 和 Compactor 压缩等核心功能特性;etcd 存储实现了快照、预写式日志 WAL(Write Ahead Log)。etcd V3 版本中,使用 boltdb 来持久化存储集群元数据和用户写入的数据。

结合上述内容,我们来看 KV 请求所涉及的 etcd 各个模块之间的交互过程。

gRPC API

我们先来看 etcd 服务端对外提供服务时 gRPC API 注册的过程。

服务端 gRPC 接口定义在 rpc.proto 文件中,这与 service KV 中的定义相对应。在 etcd 启动时,gRPC Server 也需要注册这些 KV 接口。具体启动的实现则定义在了 grpc 包下,代码如下所示:

go
// 位于 etcdserver/api/v3rpc/grpc.go:38
func Server(s *etcdserver.EtcdServer, tls *tls.Config, gopts ...grpc.ServerOption) *grpc.Server {
var opts []grpc.ServerOption
opts = append(opts, grpc.CustomCodec(&codec{}))
// 创建 grpc Server
grpcServer := grpc.NewServer(append(opts, gopts...)...)
// 注册各种服务到 gRPC Server
pb.RegisterKVServer(grpcServer, NewQuotaKVServer(s))
pb.RegisterWatchServer(grpcServer, NewWatchServer(s))
pb.RegisterLeaseServer(grpcServer, NewQuotaLeaseServer(s))
pb.RegisterClusterServer(grpcServer, NewClusterServer(s))
pb.RegisterAuthServer(grpcServer, NewAuthServer(s))
pb.RegisterMaintenanceServer(grpcServer, NewMaintenanceServer(s))
hsrv := health.NewServer()
hsrv.SetServingStatus("", healthpb.HealthCheckResponse_SERVING)
healthpb.RegisterHealthServer(grpcServer, hsrv)
grpc_prometheus.Register(grpcServer)
return grpcServer
}

Server方法主要用于启动各种服务。上述实现中,首先构建所需要的参数以创建 gRPC Server;然后在启动时将实现 KV 各个方法的对象注册到 gRPC Server,在其上注册对应的拦截器,包括 KVServer、WatchServer、LeaseServer、ClusterServer、AuthServer 和 MaintenanceServer 等。

下面我们以 KVServer 为例,具体分析 etcd 服务端提供键值对读写的流程。

读请求

读请求是 etcd 中的高频操作。etcd 中读取单个 key 和批量 key 的方法所使用的都是 Range。因此对于读请求,我们围绕 Range 方法分析即可。

实现细节

client 发送 Range RPC 请求给 etcd 服务端之后,首先会经过 gRPC Server 上注册的拦截器拦截。我们从 gRPC 包中 KVServer 实现的 Range 方法看起:

go
// 位于 etcdserver/api/v3rpc/key.go:41
func (s *kvServer) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error) {
// 检验 Range 请求的参数
if err := checkRangeRequest(r); err != nil {
return nil, err
}
resp, err := s.kv.Range(ctx, r) //调用 RaftKV.Range()
if err != nil {
return nil, togRPCError(err)
}
// 使用 etcd Server 的信息填充 pb.ResponseHeader
s.hdr.fill(resp.Header)
return resp, nil
}

Range 请求的主要部分在于调用RaftKV.Range()方法。这将会调用到 etcdserver 包中对 RaftKV 的实现:

go
// 位于 etcdserver/v3_server.go:90
func (s *EtcdServer) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error) {
ctx = context.WithValue(ctx, traceutil.TraceKey, trace)
var resp *pb.RangeResponse
var err error
// 认证校验
chk := func(ai *auth.AuthInfo) error {
return s.authStore.IsRangePermitted(ai, r.Key, r.RangeEnd)
}
// 查询结果
get := func() { resp, err = s.applyV3Base.Range(ctx, nil, r) }
if serr := s.doSerialize(ctx, chk, get); serr != nil {
err = serr
return nil, err
}
return resp, err
}

EtcdServer 实现的 Range 方法较为简单,主要是先进行认证校验,然后调用 applierV3 接口中定义的 Range 方法来查询结果,该方法的实现如下:

go
// 位于 etcdserver/apply.go:280
func (a *applierV3backend) Range(ctx context.Context, txn mvcc.TxnRead, r *pb.RangeRequest) (*pb.RangeResponse, error) {
trace := traceutil.Get(ctx)
resp := &pb.RangeResponse{}
resp.Header = &pb.ResponseHeader{}
if txn == nil {
txn = a.s.kv.Read(trace)
defer txn.End()
}
// 分页大小
limit := r.Limit
if r.SortOrder != pb.RangeRequest_NONE ||
r.MinModRevision != 0 || r.MaxModRevision != 0 ||
r.MinCreateRevision != 0 || r.MaxCreateRevision != 0 {
limit = 0
}
if limit > 0 {
// 多取一个,判断分页
limit = limit + 1
}
// 构造 Range 请求
ro := mvcc.RangeOptions{
Limit: limit,
Rev:   r.Revision,
Count: r.CountOnly,
}
// 获取 Range 结果
rr, err := txn.Range(r.Key, mkGteRange(r.RangeEnd), ro)
if err != nil {
return nil, err
}
// 排序
sortOrder := r.SortOrder
if r.SortTarget != pb.RangeRequest_KEY && sortOrder == pb.RangeRequest_NONE {
sortOrder = pb.RangeRequest_ASCEND
}
}
// 分页取
if r.Limit > 0 && len(rr.KVs) > int(r.Limit) {
rr.KVs = rr.KVs[:r.Limit]
resp.More = true
}
resp.Header.Revision = rr.Rev
resp.Count = int64(rr.Count)
resp.Kvs = make([]*mvccpb.KeyValue, len(rr.KVs))
for i := range rr.KVs {
if r.KeysOnly {
rr.KVs[i].Value = nil
}
resp.Kvs[i] = &rr.KVs[i]
}
// 组装响应
return resp, nil
}

applierV3backend 中的 Range 在实现时,首先准备分页的大小,多取一个,用于判断分页是否存在下一页。随后构造 Range 请求,调用 mvcc 包中的 Range 方法获取结果,最后对结果进行排序并将结果返回给客户端,由于当前的 mvcc.Range 实现返回按字典序升序 的结果,因此默认情况下仅当目标不是KEY时才进行升序排序。

读请求过程描述

经过上述代码分析,我们可以总结客户端发起读请求之后的处理流程,如下图所示:

客户端发起读请求后的处理流程图

  • 客户端发起请求之后,clientv3 首先会根据负载均衡算法选择一个合适的 etcd 节点,接着调用 KVServer 模块对应的 RPC 接口,发起 Range 请求的 gRPC 远程调用;

  • gRPC Server 上注册的拦截器拦截到 Range 请求,实现 Metrics 统计、日志记录等功能;

  • 然后进入读的主要过程,etcd 模式实现了线性读,使得任何客户端通过线性读都能及时访问到键值对的更新;

  • 线性读获取到 Leader 已提交日志索引构造的最新 ReadState 对象,实现本节点状态机的同步;

  • 接着就是调用 MVCC 模块,根据 treeIndex 模块 B-tree 快速查找 key 对应的版本号;

  • 通过获取的版本号作为 key,查询存储在 boltdb 中的键值对,我们在之前的存储部分讲解过此过程。

写请求

put 操作用于插入或者更新指定的键值对。下面我们具体看一下写请求的整体流程。

put 方法更新或者写入键值对到 etcd 中,相比于读请求,多了一步Quota 配额检查存储空间的情况,用来检查写入时是否有足够的空间。实际执行时只会针对 Put 和 Txn 操作,其他的请求如 Range 则会直接调用对应的 handler。

go
// etcdserver/api/v3rpc/quota.go:59
func (s *quotaKVServer) Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, error) {
if err := s.qa.check(ctx, r); err != nil {
return nil, err
}
return s.KVServer.Put(ctx, r)
}

check方法将检查请求是否满足配额。如果空间不足,将会忽略请求并发出可用空间不足的警报。根据 put 方法的调用过程,我们可以列出如下的主要方法:

html
quotaKVServer.Put() api/v3rpc/quota.go 首先检查是否满足需求
|-quotoAlarm.check() 检查
|-KVServer.Put() api/v3rpc/key.go 真正的处理请求
|-checkPutRequest() 校验请求参数是否合法
|-RaftKV.Put() etcdserver/v3_server.go 处理请求
|=EtcdServer.Put() 实际调用的是该函数
| |-raftRequest()
|   |-raftRequestOnce()
|     |-processInternalRaftRequestOnce() 真正开始处理请求
|       |-context.WithTimeout() 创建超时的上下文信息
|       |-raftNode.Propose() raft/node.go
|         |-raftNode.step() 对于类型为 MsgProp 类型消息,向 propc 通道中传入数据
|-header.fill() etcdserver/api/v3rpc/header.go 填充响应的头部信息

KVServer.Put()的实现位于 api/v3rpc/key.go,用来真正地处理客户端请求。

go
// etcdserver/v3_server.go:130
func (s *EtcdServer) Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, error) {
ctx = context.WithValue(ctx, traceutil.StartTimeKey, time.Now())
resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{Put: r})
if err != nil {
return nil, err
}
return resp.(*pb.PutResponse), nil
}

将数据写入集群,涉及的内容比较复杂,还包括集群的通信。通过封装的 raftRequest,此时已经将添加记录的请求发送到了 Raft 协议的核心层处理。

我们来总结一下写请求的处理流程,如下图所示:

写请求的处理流程图

  • 客户端发送写请求,通过负载均衡算法选取合适的 etcd 节点,发起 gRPC 调用。

  • etcd server 的 gRPC Server 收到这个请求,经过 gRPC 拦截器拦截,实现 Metrics 统计和日志记录等功能。

  • Quota 模块配额检查 db 的大小,如果超过会报etcdserver: mvcc: database space exceeded的告警,通过 Raft 日志同步给集群中的节点 db 空间不足,同时告警也会持久化到 db 中。etcd 服务端拒绝写入,对外提供只读的功能。

  • 配额检查通过,KVServer 模块经过限速、鉴权、包大小判断之后,生成唯一的编号,这时才会将写请求封装为提案消息,提交给 Raft 模块。

  • 写请求的提案只能由 Leader 处理,获取到 Raft 模块的日志条目之后,Leader 会广播提案内容。WAL 模块完成 Raft 日志条目内容封装,当集群大多数节点完成日志条目的持久化,即将提案的状态变更为已提交,可以执行提案内容。

  • Apply 模块用于执行提案,首先会判断该提案是否被执行过,如果已经执行,则直接返回结束;未执行过的情况下,将会进入 MVCC 模块执行持久化提案内容的操作。

  • MVCC 模块中的 treeIndex 保存了 key 的历史版本号信息,treeIndex 使用 B-tree 结构维护了 key 对应的版本信息,包含了全局版本号、修改次数等属性。版本号代表着 etcd 中的逻辑时钟,启动时默认的版本号为 1。键值对的修改、写入和删除都会使得版本号全局单调递增。写事务在执行时,首先根据写入的 key 获取或者更新索引,如果不存在该 key,则会给予当前最大的 currentRevision 自增得到 revision;否则直接根据 key 获取 revision。

  • 根据从 treeIndex 中获取到 revision 、修改次数等属性,以及 put 请求传递的 key-value 信息,作为写入到 boltdb 的 value,而将 revision 作为写入到 boltdb 的 key。同时为了读请求能够获取最新的数据,etcd 在写入 boltdb 时也会同步数据到 buffer。因此上文介绍 etcd 读请求的过程时,会优先从 buffer 中读取,读取不到的情况下才会从 boltdb 读取,以此来保证一致性和性能。为了提高吞吐量,此时提案数据并未提交保存到 db 文件,而是由 backend 异步 goroutine 定时将批量事务提交。

  • Server 通过调用网络层接口返回结果给客户端。

总的来说,这个过程为客户端发起写请求,由 Leader 节点处理,经过拦截器、Quota 配额检查之后,KVServer 提交一个写请求的提案给 Raft 一致性模块,经过 RaftHTTP 网络转发,集群中的其他节点半数以上持久化成功日志条目,提案的状态将会变成已提交。接着 Apply 通过 MVCC 的 treeIndex、boltdb 执行提案内容,成功之后更新状态机。

小结

这一讲我们主要介绍了服务端处理客户端读写请求的过程。

etcd 服务端的 Raft、KVServer、Quota、WAL、MVCC 和 Apply 等多个模块共同保障了 etcd 的读写一致性以及正确性。通过读写请求过程的学习,你可以熟悉 etcd 各个模块之间的交互,以及分布式组件设计的原理和方法。

结合这一讲的内容,你知道如果写请求在执行提案时出现突然宕机,etcd 重启之后是如何保证数据的一致性的吗?欢迎你在留言区提出自己的观点。下一模块我们将会介绍 etcd 的一些实践应用,比如分布式锁、Leader 选举等。