Appearance
16启动etcd过程中发生了什么?
etcd 服务端是一个综合的模块,整合了我们前面所讲的 Raft、存储、WAL 等功能。etcd 服务端启动时,需要经过初始化创建 etcdServer 实例,接着依次启动 Raft 和 rafthttp 模块,最后启动 etcd 服务端,实现集群内部通信,此时就可以处理客户端的请求了。这一讲我们就结合源码介绍 etcd 服务端启动的具体实现。
etcd Server 启动总览
我们使用分层的方式来描绘 etcd 的架构,etcd 可分为 Client 客户端层、API 网络接口层、etcd Raft 算法层、逻辑层和 etcd 存储层。如下图所示:
etcd 分层架构图
etcd 服务端涉及的模块代码比较多,分析所有的代码,显然是不太可能的事情。为了帮你抓住重点,我将从 etcd Server 启动的流程开始,选取其中的重点步骤进行详细分析。
etcd 服务端对 EtcdServer 结构进行了抽象,其包含了 raftNode 属性,代表 Raft 集群中的一个节点,启动入口在 etcdmain 包中的主函数。其主要的逻辑在startEtcdOrProxyV2
函数中:
go
// 位于 etcdmain/etcd.go:52
func startEtcdOrProxyV2() {
grpc.EnableTracing = false
cfg := newConfig()
defaultInitialCluster := cfg.ec.InitialCluster
// 异常日志处理
defaultHost, dhErr := (&cfg.ec).UpdateDefaultClusterFromName(defaultInitialCluster)
var stopped <-chan struct{}
var errc <-chan error
// identifyDataDirOrDie 返回 data 目录的类型
which := identifyDataDirOrDie(cfg.ec.GetLogger(), cfg.ec.Dir)
if which != dirEmpty {
switch which {
// 以何种模式启动 etcd
case dirMember:
stopped, errc, err = startEtcd(&cfg.ec)
case dirProxy:
err = startProxy(cfg)
default:
lg.Panic(..)
}
} else {
shouldProxy := cfg.isProxy()
if !shouldProxy {
stopped, errc, err = startEtcd(&cfg.ec)
if derr, ok := err.(*etcdserver.DiscoveryError); ok && derr.Err == v2discovery.ErrFullCluster {
if cfg.shouldFallbackToProxy() {
shouldProxy = true
}
}
}
if shouldProxy {
err = startProxy(cfg)
}
}
if err != nil {
// ... 有省略
// 异常日志记录
}
osutil.HandleInterrupts(lg)
notifySystemd(lg)
select {
case lerr := <-errc:
lg.Fatal("listener failed", zap.Error(lerr))
case <-stopped:
}
osutil.Exit(0)
}
根据上述实现,我们可以绘制出如下的 startEtcdOrProxyV2 调用流程图:
startEtcdOrProxyV2 调用流程图
我们来具体解释一下上图中的每一个步骤。
cfg := newConfig()
用于初始化配置,cfg.parse(os.Args[1:])
,随后从第二个参数开始解析命令行输入参数。setupLogging()
,用于初始化日志配置。identifyDataDirOrDie
,判断 data 目录的类型,有 dirMember、dirProxy、dirEmpty,分别对应 etcd 目录、Proxy 目录和空目录。etcd 首先根据 data 目录的类型,判断启动 etcd 还是启动代理。如果是 dirEmpty,再根据命令行参数是否指定了 proxy 模式来判断。startEtcd,核心的方法,用于启动 etcd,我们将在下文讲解这部分内容。
osutil.HandleInterrupts(lg) 注册信号,包括 SIGINT、SIGTERM,用来终止程序,并清理系统。
notifySystemd(lg),初始化完成,监听对外的连接。
select(),监听 channel 上的数据流动,异常捕获与等待退出。
osutil.Exit(),接收到异常或退出的命令。
通过上述流程,我们可以看到 startEtcdOrProxyV2 的重点是 startEtcd。下面我们就来具体分析其启动的过程。
startEtcd 启动 etcd 服务
startEtcd
启动 etcd 服务主要是通过调用StartEtcd
方法,该方法的实现位于 embed 包,用于启动 etcd 服务器和 HTTP 处理程序,以进行客户端/服务器通信。
go
// 位于 embed/etcd.go:92
func StartEtcd(inCfg *Config) (e *Etcd, err error) {
// 校验 etcd 配置
if err = inCfg.Validate(); err != nil {
return nil, err
}
serving := false
// 根据合法的配置,创建 etcd 实例
e = &Etcd{cfg: *inCfg, stopc: make(chan struct{})}
cfg := &e.cfg
// 为每个 peer 创建一个 peerListener(rafthttp.NewListener),用于接收 peer 的消息
if e.Peers, err = configurePeerListeners(cfg); err != nil {
return e, err
}
// 创建 client 的 listener(transport.NewKeepAliveListener) contexts 的 map,用于服务端处理客户端的请求
if e.sctxs, err = configureClientListeners(cfg); err != nil {
return e, err
}
for _, sctx := range e.sctxs {
e.Clients = append(e.Clients, sctx.l)
}
// 创建 etcdServer
if e.Server, err = etcdserver.NewServer(srvcfg); err != nil {
return e, err
}
e.Server.Start()
// 在 rafthttp 启动之后,配置 peer Handler
if err = e.servePeers(); err != nil {
return e, err
}
// ...有删减
return e, nil
}
根据上述代码,我们可以总结出如下的调用步骤:
inCfg.Validate()
检查配置是否正确;e = &Etcd{cfg: *inCfg, stopc: make(chan struct{})}
创建一个 etcd 实例;configurePeerListeners 为每个 peer 创建一个 peerListener(rafthttp.NewListener),用于接收 peer 的消息;
configureClientListeners 创建 client 的 listener(transport.NewKeepAliveListener),用于服务端处理客户端的请求;
etcdserver.NewServer(srvcfg)
创建一个 etcdServer 实例;启动
etcdServer.Start()
;配置 peer handler。
其中etcdserver.NewServer(srvcfg)
和
etcdServer.Start()
分别用于创建一个 etcdServer 实例和启动 etcd,下面我们就分别介绍一下这两个步骤。
服务端初始化
服务端初始化涉及比较多的业务操作,包括 etcdServer 的创建、启动 backend、启动 raftNode 等,下面我们具体介绍这些操作。
NewServer 创建实例
NewServer 方法用于创建一个 etcdServer 实例,我们可以根据传递过来的配置创建一个新的 etcdServer,在 etcdServer 的生存期内,该配置被认为是静态的。
我们来总结一下 etcd Server 的初始化涉及的主要方法,如下内容:
shell
NewServer()
|-v2store.New() // 创建 store,根据给定的命名空间来创建初始目录
|-wal.Exist() // 判断 wal 文件是否存在
|-fileutil.TouchDirAll // 创建文件夹
|-openBackend // 使用当前的 etcd db 返回一个 backend
|-restartNode() // 已有 WAL,直接根据 SnapShot 启动,最常见的场景
|-startNode() // 在没有 WAL 的情况下,新建一个节点
|-tr.Start // 启动 rafthttp
|-time.NewTicker() 通过创建 &EtcdServer{} 结构体时新建 tick 时钟
需要注意的是,我们要在 kv 键值对重建之前恢复租期。当恢复 mvcc.KV 时,重新将 key 绑定到租约上。如果先恢复 mvcc.KV,它有可能在恢复之前将 key 绑定到错误的 lease。
另外就是最后的清理逻辑,在没有先关闭 kv 的情况下关闭 backend,可能导致恢复的压缩失败,并出现 TX 错误。
启动 backend
创建好 etcdServer 实例之后,另一个重要的操作便是启动 backend。backend 是 etcd 的存储支撑,openBackend
调用当前的 db 返回一个 backend。openBackend
方法的具体实现如下:
go
// 位于 etcdserver/backend.go:68
func openBackend(cfg ServerConfig) backend.Backend {
// db 存储的路径
fn := cfg.backendPath()
now, beOpened := time.Now(), make(chan backend.Backend)
go func() {
// 单独协程启动 backend
beOpened <- newBackend(cfg)
}()
// 阻塞,等待 backend 启动,或者 10s 超时
select {
case be := <-beOpened:
return be
case <-time.After(10 * time.Second):
// 超时,db 文件被占用
)
}
return <-beOpened
}
可以看到,我们在openBackend
的实现中首先创建一个 backend.Backend 类型的 chan,并使用单独的协程启动 backend,设置启动的超时时间为 10s。beOpened <- newBackend(cfg)
主要用来配置 backend 启动参数,具体的实现则在 backend 包中。
etcd 底层的存储基于 boltdb,使用newBackend
方法构建 boltdb 需要的参数,bolt.Open(bcfg.Path, 0600, bopts)
在给定路径下创建并打开数据库,其中第二个参数为打开文件的权限。如果该文件不存在,将自动创建。传递 nil 参数将使 boltdb 使用默认选项打开数据库连接。
启动 Raft
在NewServer
的实现中,我们可以基于条件语句判断 Raft 的启动方式,具体实现如下:
go
switch {
case !haveWAL && !cfg.NewCluster:
// startNode
case !haveWAL && cfg.NewCluster:
// startNode
case haveWAL:
// restartAsStandaloneNode
// restartNode
default:
return nil, fmt.Errorf("unsupported Bootstrap config")
}
haveWAL
变量对应的表达式为wal.Exist(cfg.WALDir())
,用来判断是否存在 WAL,cfg.NewCluster
则对应 etcd 启动时的--initial-cluster-state
,标识节点初始化方式,该配置默认为new
,对应的变量 haveWAL 的值为 true。new 表示没有集群存在,所有成员以静态方式或 DNS 方式启动,创建新集群;existing 表示集群存在,节点将尝试加入集群。
在三种不同的条件下,raft 对应三种启动的方式,分别是:startNode、restartAsStandaloneNode 和 restartNode。下面我们将结合判断条件,具体介绍这三种启动方式。
startNode
在如下的两种条件下,raft 将会调用 raft 中的startNode
方法。
java
- !haveWAL && cfg.NewCluster
- !haveWAL && !cfg.NewCluster
- startNode(cfg, cl, cl.MemberIDs())
- startNode(cfg, cl, nil)
// startNode 的定义
func startNode(cfg ServerConfig, cl *membership.RaftCluster, ids []types.ID) (id types.ID, n raft.Node, s *raft.MemoryStorage, w *wal.WAL) ;
可以看到,这两个条件下都会调用 startNode 方法,只不过调用的参数有差异。在没有 WAL 日志,并且是新配置结点的场景下,需要传入集群的成员 ids,如果加入已有的集群则不需要。
我们以其中的一种 case,具体分析:
go
case !haveWAL && !cfg.NewCluster:
// 加入现有集群时检查初始配置,如有问题则返回错误
if err = cfg.VerifyJoinExisting(); err != nil {
return nil, err
}
// 使用提供的地址映射创建一个新 raft 集群
cl, err = membership.NewClusterFromURLsMap(cfg.Logger, cfg.InitialClusterToken, cfg.InitialPeerURLsMap)
if err != nil {
return nil, err
}
// GetClusterFromRemotePeers 采用一组表示 etcd peer 的 URL,并尝试通过访问其中一个 URL 上的成员端点来构造集群
existingCluster, gerr := GetClusterFromRemotePeers(cfg.Logger, getRemotePeerURLs(cl, cfg.Name), prt)
if gerr != nil {
return nil, fmt.Errorf("cannot fetch cluster info from peer urls: %v", gerr)
}
if err = membership.ValidateClusterAndAssignIDs(cfg.Logger, cl, existingCluster); err != nil {
return nil, fmt.Errorf("error validating peerURLs %s: %v", existingCluster, err)
}
// 校验兼容性
if !isCompatibleWithCluster(cfg.Logger, cl, cl.MemberByName(cfg.Name).ID, prt) {
return nil, fmt.Errorf("incompatible with current running cluster")
}
remotes = existingCluster.Members()
cl.SetID(types.ID(0), existingCluster.ID())
cl.SetStore(st)
cl.SetBackend(be)
// 启动 raft Node
id, n, s, w = startNode(cfg, cl, nil)
cl.SetID(id, existingCluster.ID())
从上面的主流程来看,首先是做配置的校验,然后使用提供的地址映射创建一个新的 raft 集群,校验加入集群的兼容性,最后启动 raft Node。
StartNode 基于给定的配置和 raft 成员列表,返回一个新的节点,它将每个给定 peer 的 ConfChangeAddNode 条目附加到初始日志中。peers 的长度不能为零,如果长度为零将调用 RestartNode 方法。
RestartNode 与 StartNode 类似,但不包含 peers 列表,集群的当前成员关系将从存储中恢复。如果调用方存在状态机,则传入已应用到该状态机的最新一个日志索引值;否则直接使用零作为参数。
重启 raft Node
当已存在 WAL 文件时,raft Node 启动时首先需要检查响应文件夹的读写权限(当集群初始化之后,discovery token 将不会生效);接着将会加载快照文件,并从 snapshot 恢复 backend 存储。
cfg.ForceNewCluster
对应 etcd 配置中的--force-new-cluster
,如果为 true,则会强制创建一个新的单成员集群;否则重新启动 raft Node。
restartAsStandaloneNode
当--force-new-cluster
配置为 true 时,则会调用 restartAsStandaloneNode,即强制创建一个新的单成员集群。该节点将会提交配置更新,强制删除集群中的所有成员,并添加自身作为集群的一个节点,同时我们需要将其备份设置进行还原。
restartAsStandaloneNode 的实现中,首先读取 WAL 文件,并且丢弃本地未提交的 entries。createConfigChangeEnts 创建一系列 Raft 条目(即 EntryConfChange),用于从集群中删除一组给定的 ID。如果当前节点self
出现在条目中,也不会被删除;如果self
不在给定的 ID 内,它将创建一个 Raft 条目以添加给定的self
默认成员,随后强制追加新提交的 entries 到现有的数据存储中。
最后就是设置一些状态,构造 raftNode 的配置,重启 raft Node。
restartNode
在已有 WAL 数据的情况中,除了restartAsStandaloneNode
场景,当--force-new-cluster
为默认的 false 时,直接重启 raftNode。这种操作相对来说比较简单,减少了丢弃本地未提交的 entries 以及强制追加新提交的 entries 的步骤。接下来要做的就是直接重启 raftNode 还原之前集群节点的状态,读取 WAL 和快照数据,最后启动并更新 raftStatus。
rafthttp 启动
分析完 raft Node 的启动,接下来我们看 rafthttp 的启动。Transport 实现了 Transporter 接口,它提供了将 raft 消息发送到 peer 并从 peer 接收 raft 消息的功能。我们需要调用 Handler 方法来获取处理程序,以处理从 peerURLs 接收到的请求。用户需要先调用 Start 才能调用其他功能,并在停止使用 Transport 时调用 Stop。
rafthttp 的启动过程中首先要构建 Transport,并将 m.PeerURLs 分别赋值到 Transport 中的 Remote 和 Peer 中,之后将 srv.r.transport 指向构建好的 Transport 即可。
启动 etcd 服务端
接下来就是 etcd 的真正启动了,我们来看主要调用步骤:
go
// 位于 embed/etcd.go:220
e.Server.Start()
// 接收 peer 消息
if err = e.servePeers(); err != nil {
return e, err
}
// 接收客户端请求
if err = e.serveClients(); err != nil {
return e, err
}
// 提供导出 metrics
if err = e.serveMetrics(); err != nil {
return e, err
}
serving = true
启动 etcd Server,包括三个主要的步骤:首先e.Server.Start
初始化 Server 启动的必要信息;接着实现集群内部通讯;最后开始接收 peer 和客户端的请求,包括 range、put 等请求。
e.Server.Start
在处理请求之前,Start
方法初始化 Server 的必要信息,需要在Do
和Process
之前调用,且必须是非阻塞 的,任何耗时的函数都必须在单独的协程中运行。Start
方法的实现中还启动了多个 goroutine,这些协程用于选举时钟设置以及注册自身信息到服务器等异步操作。
集群内部通信
集群内部的通信主要由 Etcd.servePeers 实现,在 rafthttp.Transport 启动之后,配置集群成员的处理器。首先生成 http.Handler 来处理 etcd 集群成员的请求,并做一些配置校验。goroutine 读取 gRPC 请求,然后调用 srv.Handler 处理这些请求。srv.Serve
总是返回非空的错误,当 Shutdown 或者 Close 时,返回的错误则是 ErrServerClosed。最后srv.Serve
在独立协程启动对集群成员的监听。
处理客户端请求
Etcd.serveClients
主要用来处理客户端请求,比如我们常见的 range、put 等请求。etcd 处理客户端的请求,每个客户端的请求对应一个 goroutine 协程,这也是 etcd 高性能的支撑,etcd Server 为每个监听的地址启动一个客户端服务协程,根据 v2、v3 版本进行不同的处理。在serveClients
中,还设置了 gRPC 的属性,包括 GRPCKeepAliveMinTime 、GRPCKeepAliveInterval 以及 GRPCKeepAliveTimeout 等。
小结
这一讲我们主要介绍了 etcd 服务端启动涉及的一些细节。
etcd 服务端的启动包括两大块:
etcdServer 主进程,直接或者间接包含了 raftNode、WAL、Snapshotter 等多个核心组件,可以理解为一个容器;
另一块则是 raftNode,对内部 Raft 协议实现的封装,暴露简单的接口,用来保证写事务的集群一致性。
通过结合源码对 etcd 启动过程的分析,我们知道 etcd 启动时需要经历服务端配置的初始化、根据不同的场景选择合适的方式启动 raft 和 rafthttp,最后则是 etcd 服务器的启动,实现集群内部通信,之后才可以处理客户端的请求。
本讲内容总结如下:
学习完这一讲,我想给大家留一个问题,你经历过哪些 etcd 启动时的异常场景,又是如何解决的它们呢?欢迎你在留言区和我分享你的经历。下一讲,我们将介绍服务端处理客户端请求的步骤和原理。