服务端设计 作为etcd的最核心模块,etcd 服务端模块定义了主要的接口以及数据交互格式,并解决以下问题:接收客户端的请求并对节点进行分发;感知集群成员变动,对成员通知;同步或者启动恢复快照等。
Etcd 服务端主要由几大组件构成,各部分介绍如下:
(1) EtcdServer[etcdserver/server.go] 主进程,直接或者间接包含了raftNode、WAL、snapshotter 等多个核心组件,可以理解为一个容器。
(2) raftNode[etcdserver/raft.go] 对内部 RAFT 协议实现的封装,暴露简单的接口,用来保证写事务的集群一致性。
接下来,将以EtcdServer为核心进行源码分析,有关具体的Raft协议内容,将会在下一节具体给出。
Server对象 ETCD 服务器是通过 EtcdServer 结构抽象,对应了 etcdserver/server.go 中的代码,包含属性 r raftNode,代表 RAFT 集群中的一个节点,启动入口在 etcdmain/main.go 文件中。具体代码如下:
// etcdmain/main.go
func Main(args []string) {
checkSupportArch()
if len(args) > 1 {
cmd := args[1]
switch cmd {
case “gateway”, “grpc-proxy”:
if err := rootCmd.Execute(); err != nil {
fmt.Fprint(os.Stderr, err)
os.Exit(1)
}
return
}
}
startEtcdOrProxyV2(args)
}
// embed/etcd.go 此处列出部分字段
// Etcd contains a running etcd server and its listeners.
type Etcd struct {
Peers []peerListener
Clients []net.Listener
// a map of contexts for the servers that serves client requests.
sctxs map[string]serveCtx
metricsListeners []net.Listener
…
// 核心对象服务器接口的生产实现
Server *etcdserver.EtcdServer
…
}
// 启动调用链过程(考虑到服务器的分析的复杂性,此处通过函数分支的方式进行介绍)
main() etcdmain/main.go
|-checkSupportArch()
|-startEtcdOrProxyV2() etcdmain/etcd.go
|-newConfig()
|-setupLogging()
|-startEtcd()
// 为客户端/服务器通信启动etcd服务器和HTTP处理程序。
| |-embed.StartEtcd() embed/etcd.go
| |-configurePeerListeners()
| |-configureClientListeners()
| |-EtcdServer.ServerConfig() 生成新的配置
| |-EtcdServer.NewServer() etcdserver/server.go正式启动RAFT服务«<1»>
| |-EtcdServer.Start() 开始启动服务
| | |-EtcdServer.start() 执行服务器开始服务请求所需的任何初始化
| | |-wait.New() 新建WaitGroup组以及一些管道服务
| | |-EtcdServer.run() etcdserver/raft.go 启动应用层的处理协程«<2»>
| | | |-raft.start() 启动处理协程
| |-Etcd.servePeers() 启动集群内部通讯
| | |-etcdhttp.NewPeerHandler() 新建http.Handler来处理etcd中其他成员请求
| | |-srv.Serve() 配置http服务
| | |-e.errHandler(l.serve()) 启动集群间http协程,开始多路传输监听
| |-Etcd.serveClients() 启动协程处理客户请求
| | |-http.NewServeMux() 启动监听客户端的协程
| | |-sctxs.serve() 启动服务器
| | |-v3rpc.Server() 启动gRPC服务 /etcdserver/api/v3rpc/grpc.go
| | | |-grpc.NewServer() 调用gRPC的接口创建
| | | |-pb.RegisterKVServer() 注册各种的服务,这里包含了多个
| | | |-pb.RegisterWatchServer()
| |-Etcd.serveMetrics()
|-notifySystemd()
|-select() 等待stopped
|-osutil.Exit()
其中,
在标记 1 处会启动 RAFT 协议的核心部分,也就是node.run()[raft/node.go] 。 在标记 2 处启动的是 ETCD 应用层的处理协程,对应了 raftNode.start()[etcdserver/raft.go] 。 这里基本上是大致的启动流程,主要是解析参数,设置日志,启动监听端口等,接下来就是其核心部分 etcdserver.NewServer() 。
应用通过 raft.StartNode() 来启动 raft 中的一个副本,函数内部会通过启动一个 goroutine 运行。
//etcdserver/server.go
// NewServer creates a new EtcdServer from the supplied configuration. The
// configuration is considered static for the lifetime of the EtcdServer.
func NewServer(cfg config.ServerConfig) (srv EtcdServer, err error) {
b, err := bootstrap(cfg)
//…
sstats := stats.NewServerStats(cfg.Name, b.cluster.cl.String())
lstats := stats.NewLeaderStats(cfg.Logger, b.cluster.nodeID.String())
heartbeat := time.Duration(cfg.TickMs) * time.Millisecond
srv = &EtcdServer{
…
r: *b.raft.newRaftNode(b.ss, b.storage.wal.w, b.cluster.cl),
…
}
serverID.With(prometheus.Labels{“server_id”: b.cluster.nodeID.String()}).Set(1)
srv.cluster.SetVersionChangedNotifier(srv.clusterVersionChanged)
srv.applyV2 = NewApplierV2(cfg.Logger, srv.v2store, srv.cluster)
srv.be = b.storage.backend.be
srv.beHooks = b.storage.backend.beHooks
minTTL := time.Duration((3cfg.ElectionTicks)/2) * heartbeat
// always recover lessor before kv. When we recover the mvcc.KV it will reattach keys to its leases.
// If we recover mvcc.KV first, it will attach the keys to the wrong lessor before it recovers.
srv.lessor = lease.NewLessor(…
})
tp, err := auth.NewTokenProvider(cfg.Logger, cfg.AuthToken,
func(index uint64) <-chan struct{} {
return srv.applyWait.Wait(index)
},
time.Duration(cfg.TokenTTL)*time.Second,
)
//…
mvccStoreConfig := mvcc.StoreConfig{
CompactionBatchLimit: cfg.CompactionBatchLimit,
CompactionSleepInterval: cfg.CompactionSleepInterval,
}
srv.kv = mvcc.New(srv.Logger(), srv.be, srv.lessor, mvccStoreConfig)
srv.corruptionChecker = newCorruptionChecker(cfg.Logger, srv, srv.kv.HashStorage())
srv.authStore = auth.NewAuthStore(srv.Logger(), schema.NewAuthBackend(srv.Logger(), srv.be), tp, int(cfg.BcryptCost))
newSrv := srv // since srv == nil in defer if srv is returned as nil
//…
if num := cfg.AutoCompactionRetention; num != 0 {
srv.compactor, err = v3compactor.New(cfg.Logger, cfg.AutoCompactionMode, num, srv.kv, srv)
if err != nil {
return nil, err
}
srv.compactor.Run()
}
if err = srv.restoreAlarms(); err != nil {
return nil, err
}
srv.uberApply = srv.NewUberApplier()
if srv.Cfg.EnableLeaseCheckpoint {
// setting checkpointer enables lease checkpoint feature.
srv.lessor.SetCheckpointer(func(ctx context.Context, cp *pb.LeaseCheckpointRequest) {
srv.raftRequestOnce(ctx, pb.InternalRaftRequest{LeaseCheckpoint: cp})
})
}
// Set the hook after EtcdServer finishes the initialization to avoid
// the hook being called during the initialization process.
srv.be.SetTxPostLockInsideApplyHook(srv.getTxPostLockInsideApplyHook())
// TODO: move transport initialization near the definition of remote
tr := &rafthttp.Transport{…
}
if err = tr.Start(); err != nil {
return nil, err
}
// add all remotes into transport
for _, m := range b.cluster.remotes {
if m.ID != b.cluster.nodeID {
tr.AddRemote(m.ID, m.PeerURLs)
}
}
for _, m := range b.cluster.cl.Members() {
if m.ID != b.cluster.nodeID {
tr.AddPeer(m.ID, m.PeerURLs)
}
}
srv.r.transport = tr
return srv, nil
}
// 函数调用关系链
NewServer() etcdserver/server.go 通过配置创建一个新的EtcdServer对象,不同场景不同
|-bootstrap(cfg)
| |-bootstrapSnapshot(cfg) 新建snap对象,等待存储或恢复
| |-wal.Exist()
| |-v2store.New() 初始化内存B+树
| |-bootstrapBackend() 初始化操作treeIndex以及BoltDB持久化的对象
| |-recoverSnapshot() 从快照中恢复
| |-bootstrapWALFromSnapshot() 已有WAL,在原有数据中恢复未提交数据
| |-bootstrapCluster() 对于每一个集群成员进行存储配置
| |-bootstrapStorage() 对于没有WAL情况,进行统一分配存储
| |-cluster.Finalize() 对于每一个集群成员进行最后配置
| |-bootstrapRaft() 为当前服务器配置raft 对象
|-stats.NewServerStats() 配置server状态
|-stats.NewLeaderStats() 成为Leader状态
|-srv=&EtcdServer{raft.newRaftNode() …}新建etcdserver对象,并初始化raft对象运行
| |-raft.RestartNode() 若无其他成员则重启节点
| |-raft.StartNode() 若有则新启节点
| |-setupNode() 配置节点
| | |-rn = NewRawNode() raft/node.go 新建一个type node struct对象
| | |-rn.Bootstrap(peers) 通过追加配置来初始化RawNode
| | |-raft.becomeFollower() 成为Follower状态
| | |-n := newNode(rn) 封装rawNode
| |-go node.run() 循环运行节点监听任务
|-lease.NewLessor() 恢复Lessor状态 mvcc.New
|- mvcc.New() 新建mvcc存储管理对象
|-auth.NewAuthStore()
raft.StartNode() <====会根据不同的启动场景执行相关任务
|-setupNode() 新建一个节点
| |-rn = NewRawNode() raft/node.go 新建一个type node struct对象
| |- rn.Bootstrap(peers) 通过追加配置来初始化RawNode
//这里会对关键对象初始化以及赋值,包括step=stepFollower r.tick=r.tickElection函数
| |-raft.becomeFollower()
| | |-raft.reset() 开始启动时设置term为1
| | |-raft.resetRandomizedElectionTimeout() 更新选举的随机超时时间
| |-raftLog.append() 将配置更新日志添加
|-node.run() raft/node.go 节点运行,会启动一个协程运行 «
在etcd中,mvcc底层使用 bolt 实现,bolt是一个基于B+树的KV存储。不同于以往认知,实际在存储过程中,Key是revision,而Value是用户给出的KV。为此,内存里维护了一个treeIndex的B+树,它存储了所有KV的所有版本记录。在查询数据时,先要通过内存btree在keyIndex.generations[0].revs中找到最后一条revision,即可去bbolt中读取对应的数据。相应的,etcd支持按key前缀查询,其实也就是遍历btree的同时根据revision去bbolt中获取用户的value。
与此同时,etcd封装了backend接口用来操作treeIndex以及BoltDB的持久化,使其能够从内存到磁盘BoltDB持久化流程。使用*bolt.DB来操作boltDB数据库。这里不展开各个操作的细节。
gRPC 服务 与客户端gRPC服务相似,服务器 RPC 接口的定义在 etcdserver/etcdserverpb/rpc.proto 文件中,对应了 service KV 中的定义,而真正的启动对应了 api/v3rpc/grpc.go 中的实现。
以 KV 存储为例,其对应了 NewQuotaKVServer() 中的实现,这里实际上是封装了一层,用来检查是否有足够的空间。
quotaKVServer.Put() api/v3rpc/quota.go 首先检查是否满足需求 |-quotoAlarm.check() 检查 |-KVServer.Put() api/v3rpc/key.go 真正的处理请求 |-checkPutRequest() 校验请求参数是否合法 |-RaftKV.Put() etcdserver/v3_server.go 处理请求 |=EtcdServer.Put() 实际调用的是该函数 | |-raftRequest() | |-raftRequestOnce() | |-processInternalRaftRequestOnce() 真正开始处理请求 | |-context.WithTimeout() 创建超时的上下文信息 | |-raftNode.Propose() raft/node.go | |-raftNode.step() 对于类型为MsgProp类型消息,向propc通道中传入数据 |-header.fill() etcdserver/api/v3rpc/header.go填充响应的头部信息 处理流程 在运行处理数据过程中,etcd 服务端采用的是异步状态机,基于 GoLang 的 Channel 机制,RAFT 状态机作为一个 Background Thread/Routine 运行,会通过 Channel 接收上层传来的消息,状态机处理完成之后,再通过 Ready() 接口返回给上层。
其中 type Ready struct 结构体封装了一批更新操作,具体代码如下所示:
/ Ready encapsulates the entries and messages that are ready to read, // be saved to stable storage, committed or sent to other peers. // All fields in Ready are read-only. type Ready struct { *SoftState pb.HardState ReadStates []ReadState Entries []pb.Entry Snapshot pb.Snapshot CommittedEntries []pb.Entry Messages []pb.Message MustSync bool } 其中包括了:
(1) pb.HardState 需要在发送消息前持久化的消息,包含当前节点见过的最大的 term,在这个 term 给谁投过票,已经当前节点知道的 commit index;
(2) Messages 需要广播给所有 peers 的消息;
(3) CommittedEntries 已经提交但是还没有apply到状态机的日志;
(4) Snapshot 需要持久化的快照。
在 raft/node.go 中定义了 type node struct 对应的结构,一个 RAFT 结构通过 Node 表示各结点信息,该结构体内定义了各个管道,用于同步信息,下面会逐一遇到。
type node struct { propc chan pb.Message recvc chan pb.Message confc chan pb.ConfChange confstatec chan pb.ConfState readyc chan Ready advancec chan struct{} tickc chan struct{} done chan struct{} stop chan struct{} status chan chan Status } 库的使用者从 type node struct 结构体提供的 ready channel 中不断 pop 出一个个 Ready 进行处理,库使用者通过如下方法拿到 Ready channel 。
// raft/node.go func (n *node) Ready() <-chan Ready { return n.readyc } 应用需要对 Ready 的处理,在etcdserver/raft.go的start函数有明确指出,包括如下内容:
(1) 将 HardState、Entries、Snapshot 持久化到 storage;
(2) 将 Messages 非阻塞的广播给其他 peers;
(3) 将 CommittedEntries (已经提交但是还没有应用的日志) 应用到状态机;
(4) 如果发现 CommittedEntries 中有成员变更类型的 entry,则调用 node 的 ApplyConfChange() 方法让 node 知道;
(5) 调用 Node.Advance() 告诉 raft node 这批状态更新处理完,状态已经演进了,可以给我下一批 Ready 让我处理了。