Etcd源码结构及功能
相较于以往的版本有所不同,etcd 项目(从 3.5 版开始)被组织成多个 golang 模块。具体目录结构如下:
├─api 定义 etcd 客户端和服务器之间通信协议的 API 定义
├─CHANGELOG
├─client Go语言客户端SDK
│ ├─pkg
│ ├─v2
│ └─v3
├─contrib raftexample实现
├─Documentation
├─etcdctl 命令行客户端实现,用于访问和管理 etcd 的命令行工具
├─etcdutl 命令行管理实用程序,可直接操作etcd数据文件
├─hack 为开发人员提供的扩展
├─pkg ectd使用的工具集合
├─raft raft算法模块。分布式共识协议的实现
├─scripts 脚本、测试等相关内容
├─security 脚本、测试等相关内容
├─server etcd server模块,etcd 内部实现
├─tests 一个包含所有 etcd 集成测试的模块
└─tools 脚本、测试等相关内容
客户端设计 作为etcd的基础模块,etcd 客户端包括 client v2 和 v3 两个大版本 API 客户端库,提供了简洁易用的 API,同时支持负载均衡、节点间故障自动转移,可极大降低业务使用 etcd 复杂度,提升开发效率、服务可用性。接下来,我们将以client v3版本为例,介绍客户端相关设计内容。
Client对象 要访问etcd第一件事就是创建client对象,而client对象需要传入一个Config配置,具体初始化方法如下:
cli, err := clientv3.New(clientv3.Config{ Endpoints: []string{“localhost:2379”}, DialTimeout: 5 * time.Second, } ) 这里传了2个选项:
Endpoints:etcd的多个节点服务地址,因为我是单点测试,所以只传1个; DialTimeout:创建client的首次连接超时,这里传了5秒,如果5秒都没有连接成功就会返回err;值得注意的是,一旦client创建成功,我们就不用再关心后续底层连接的状态了,client内部会重连。 当然,如果上述err != nil,那么一般情况下我们可以选择重试几次,或者退出程序。
这里我们重点了解一下Client 定义:
client/client.go: // Client provides and manages an etcd v3 client session. type Client struct { Cluster KV Lease Watcher Auth Maintenance // Username is a user name for authentication. Username string // Password is a password for authentication. Password string // contains filtered or unexported fields } 这里显示的都是可导出的模块结构字段,代表了客户端能够使用的几大核心模块,具体功能如下:
(1) Cluster:向集群里增加 etcd 服务端节点之类,属于管理员操作;
(2) KV:实际操作中主要使用的功能,即操作 K-V对象。实际上client.KV是一个interface,提供了关于k-v操作的所有方法,在使用过程中,常需要使用装饰者模式包装后的KV对象,对象中内置错误重试机制。具体代码如下:
client/kv.go:
type KV interface {
Put(…) (PutResponse, error) // 存储键值对
Get(…) (GetResponse, error) // 检索键值对
Delete(…) (DeleteResponse, error) // 删除键值对
Compact(…) (CompactResponse, error) // 压缩给定版本之前的 KV 历史
Do(…) (OpResponse, error) // 指定某种没有事务的操作
Txn(…) Txn // Txn 创建一个事务
}
client/kv.go:
func NewKV(c *Client) KV {
api := &kv{remote: RetryKVClient(c)}
if c != nil {
api.callOpts = c.callOpts
}
return api
}
client/kv.go:
// 需要注意的是此处具体构建装饰的对象是KVClient
// KVClient 对象是实际是KVService的客户端API。
type kv struct {
remote pb.KVClient
callOpts []grpc.CallOption
}
client/retry.go:
// RetryKVClient implements a KVClient.
func RetryKVClient(c *Client) pb.KVClient {
return &retryKVClient{
kc: pb.NewKVClient(c.conn),
}
}
(3) Lease:租约相关操作,比如申请一个 TTL=10 秒的租约。和获取KV对象一样,通过下面代码获取它:
type Lease interface { Grant(…) (LeaseGrantResponse, error) 分配租约 Revoke(…) (LeaseRevokeResponse, error) 释放租约 TimeToLive(…) (LeaseTimeToLiveResponse, error) 获取剩余TTL时间 Leases(…) (LeaseLeasesResponse, error) 列举所有etcd中的租约 KeepAlive(…) (<-chan LeaseKeepAliveResponse, error) 自动定时的续约某个租约 KeepAliveOnce(…) (LeaseKeepAliveResponse, error) 为某个租约续约一次 Close() error 关闭客户端建立的所有租约 } func NewLease(c *Client) Lease { return NewLeaseFromLeaseClient(RetryLeaseClient(c), c, c.cfg.DialTimeout+time.Second) } (4) Watcher:观察订阅,从而监听最新的数据变化;
(5) Auth:管理 etcd 的用户和权限,属于管理员操作;
(6) Maintenance:维护 etcd,比如主动迁移 etcd 的 leader 节点,属于管理员操作。
gRPC服务 etcd client v3库针对gRPC服务的实现采用的负载均衡算法为 Round-robin。即针对每一个请求,Round-robin 算法通过轮询的方式依次从 endpoint 列表中选择一个 endpoint 访问(长连接),使 etcd server 负载尽量均衡。
client/client.go func newClient: conn, err := client.dialWithBalancer() client/client.go 建立了到etcd的服务端链接 func (c Client) dialWithBalancer(dopts …grpc.DialOption) (grpc.ClientConn, error) { creds := c.credentialsForEndpoint(c.Endpoints()[0]) opts := append(dopts, grpc.WithResolvers(c.resolver)) return c.dial(creds, opts…) } client/client.go func (c Client) dial(creds grpccredentials.TransportCredentials, dopts …grpc.DialOption) (grpc.ClientConn, error) { // 首先,ETCD通过这行代码,向GRPC框架加入了一些自己的 // 配置,比如:KeepAlive特性(配置里提到的配置项)、 // TLS证书配置、还有最重要的重试策略。 opts, err := c.dialSetupOpts(creds, dopts…) …
// context 的一段经典样例代码
// 问:如果我同时把非零的DialTimeout和带超时的 context 传给客户端,
// 到底以哪个超时为准?
// 答:这里新建了子context(dctx),父context和DialTimeout
// 哪个先到deadline,就以哪个为准。
dctx := c.ctx
if c.cfg.DialTimeout > 0 {
var cancel context.CancelFunc
// 同时包含父context和DialTimeout
// 哪个先倒时间就以哪个为准。
dctx, cancel = context.WithTimeout(c.ctx, c.cfg.DialTimeout)
defer cancel()
} // 最终调用grpc.DialContext()建立连接 conn, err := grpc.DialContext(dctx, target, opts...) ... return conn, nil } 重试策略分析 自动重试是ETCD能提供高可用特性的重要保证,此处需要注意的是:
自动重试不会在etcd集群的同一节点上进行,这跟我们平常做的重试不同,就如前面提到的 etcd是通过grpc框架提供对集群访问的负载均衡策略的,所以此时client端会轮询的重试集群的每个节点,此外,自动重试只会重试一些特定的错误。
具体代码如下:
//client/client.go 这段代码在dialWithBalancer->dial->dialSetupOpts中 // dialSetupOpts gives the dial opts prior to any authentication. func (c *Client) dialSetupOpts(creds grpccredentials.TransportCredentials, dopts …grpc.DialOption) (opts []grpc.DialOption, err error) { if c.cfg.DialKeepAliveTime > 0 { params := keepalive.ClientParameters{ Time: c.cfg.DialKeepAliveTime, Timeout: c.cfg.DialKeepAliveTimeout, PermitWithoutStream: c.cfg.PermitWithoutStream, } opts = append(opts, grpc.WithKeepaliveParams(params)) } opts = append(opts, dopts…) if creds != nil { opts = append(opts, grpc.WithTransportCredentials(creds)) } else { opts = append(opts, grpc.WithInsecure()) } // Interceptor retry and backoff. // TODO: Replace all of clientv3/retry.go with RetryPolicy: // https://github.com/grpc/grpc-proto/blob/cdd9ed5c3d3f87aef62f373b93361cf7bddc620d/grpc/service_config/service_config.proto#L130 rrBackoff := withBackoff(c.roundRobinQuorumBackoff(defaultBackoffWaitBetween, defaultBackoffJitterFraction)) opts = append(opts, // Disable stream retry by default since go-grpc-middleware/retry does not support client streams. // Streams that are safe to retry are enabled individually. grpc.WithStreamInterceptor(c.streamClientInterceptor(withMax(0), rrBackoff)), grpc.WithUnaryInterceptor(c.unaryClientInterceptor(withMax(defaultUnaryMaxRetries), rrBackoff)), ) return opts, nil } 看以上的代码,要自动重试只需两步:
(1) 创建backoff函数,也就是计算重试等待时间的函数。
(2) 通过WithXXXInterceptor(),注册重试拦截器,这样每次GRPC有请求都会回调该拦截器。
值得注意的是,这里我们看到Stream的重试拦截器的注册,其最大重试次数设置为了0(withMax()),也就是不重试,这其实是故意为之,因为Client端的Stream重试不被支持,即Client端需要重试Stream,需要自己做单独处理,不能通过拦截器。
首先看看如何计算等待时间:
//client/client.go // waitBetween 重试间隔时长,jitterFraction 随机抖动率, // 比如:默认重试间隔为25ms,抖动率:0.1, // 那么实际重试间隔就在 25土2.5ms 之间,attempt 实际重试了多少次 // roundRobinQuorumBackoff retries against quorum between each backoff. // This is intended for use with a round robin load balancer. func (c *Client) roundRobinQuorumBackoff(waitBetween time.Duration, jitterFraction float64) backoffFunc { return func(attempt uint) time.Duration { // after each round robin across quorum, backoff for our wait between duration n := uint(len(c.Endpoints())) quorum := (n/2 + 1) if attempt%quorum == 0 { c.lg.Debug(“backoff”, zap.Uint(“attempt”, attempt), zap.Uint(“quorum”, quorum), zap.Duration(“waitBetween”, waitBetween), zap.Float64(“jitterFraction”, jitterFraction)) return jitterUp(waitBetween, jitterFraction) } c.lg.Debug(“backoff skipped”, zap.Uint(“attempt”, attempt), zap.Uint(“quorum”, quorum)) return 0 } } 可以发现的是roundRobinQuorumBackoff返回了一个闭包,内部是重试间隔时长计算逻辑,这个逻辑说来也简单:
(1) 若重试次数已经达到集群的法定人数(quorum),则真正的计算间隔时长,间隔时长到期后,才进行重试。
(2) 否则,直接返回0,也就是马上重试。
就如前面所提到的,负载均衡策略是轮询,而这个重试逻辑一定要配合负载均衡是轮询策略才能达到的效果:假如你访问集群中的一台节点失败,可能是那台节点出问题了,但如果整个集群是好的,这时候马上重试,轮询到下台节点就行。
但是,如果重试多次,集群大多数节点(法定人数)都失败了,那应该是集群出问题了,这时候就需要计算间隔时间,等会儿再重试看看问题能不能解决。
这里也可以看到etcd的Client端,考虑的细节问题是非常多的,一个简单的重试时间计算,也能进行逻辑上的小小优化。
重试拦截器相关代码实现如下:
func (c *Client) unaryClientInterceptor(optFuncs …retryOption) grpc.UnaryClientInterceptor { … // 如果最大重试次数设置为0,那就不重试。 if callOpts.max == 0 { return invoker(ctx, method, req, reply, cc, grpcOpts…) } var lastErr error // 开始重试计数 for attempt := uint(0); attempt < callOpts.max; attempt++ { // 计算重试间隔时间,并阻塞代码,等待 // 这里最终会调用到 roundRobinQuorumBackoff 来计算时间 if err := waitRetryBackoff(ctx, attempt, callOpts); err != nil { return err }
// 再次重新执行GRPC请求
lastErr = invoker(ctx, method, req, reply, cc, grpcOpts...)
if lastErr == nil {
// 重试成功,退出
return nil
}
// 这段代码分析了两种情况
// 1. 服务端返回了 Context Error(超时、被取消),直接重试
// 2. 客户端的 Context 也出现了Error
if isContextError(lastErr) {
if ctx.Err() != nil {
// 客户端本身的ctx也报错了,不重试了,退出。
return lastErr
}
// 服务端返回,直接重试
continue
}
if callOpts.retryAuth && rpctypes.Error(lastErr) == rpctypes.ErrInvalidAuthToken {
// 是AuthToken不正确,重新获取Token
gterr := c.getToken(ctx)
...
continue
}
// 只有在特定错误才重试(code.Unavailable)
// 否则返回Err,不重试。
if !isSafeRetry(c.lg, lastErr, callOpts) {
return lastErr
}
}
return lastErr
} } API 接口层设计 通信方式概述 时至今日,etcd通信方式经过了两次较为明显的版本更替。早期的etcd通信版本是v2,通过HTTP+JSON暴露外部API,由于经常需要服务感知节点配置变动或者监听推送,客户端和服务端需要频繁通信,早期的v2采用HTTP1.1的长连接来维持,经过如此通信效率依然不高。
因此,etcd v3版本引进了grpc+protobuf的通信方式(基于HTTP/2.0),其优势在于:
(1) 采用二进制压缩传输,序列/反序列化更加高效
(2) 基于HTTP2.0的服务端主动推送特性,对事件多路复用做了优化
(3) Watch功能使用grpc的stream流进行感知,同一个客户端和服务端采用单一连接,替代v2的HTTP长轮询检测,直接减少连接数和内存开销
etcd v3版本为了向上兼容v2 API版本,提供了grpc网关来支持原HTTP JSON接口支持。考虑到etcd v3版本提供了丰富的功能优化,后续的代码分析将以v3版本为主体进行介绍。
etcd v3 etcd v3在设计之初的通信基于 gRPC,proto 文件是定义服务端和客户端通讯接口的标准。即客户端该传什么样的参数,服务端该返回什么样子的参数,客户端该怎么调用,是阻塞还是非阻塞,是同步还是异步。
在进行核心API接口层设计的学习之前,gRPC 推荐使用 proto3,我们需要对 proto3 的基本语法有初步的了解。proto3 是原有 Protocol Buffer 2(被称为 proto2)的升级版本,删除了一部分特性,优化了对移动设备的支持,另外增加了对android和ios的支持,使得 gRPC 可以顺利的在移动设备上使用。
一个 .proto 文件的编译之后,编译器会为你选择的语言生成代码。你在文件中描述的消息类型,包括获取和设置字段的值,序列化你的消息到一个输出流,以及从一个输入流中转换出你的消息。
(1) 对于 C++,编译器会为每个 .proto 文件生成一个 .h 和一个 .cc 的文件,为每一个给出的消息类型生成一个类。
(2) 对于 Java,编译器会生成一个java文件,其中为每一个消息类型生成一个类,还有特殊的用来创建这些消息类实例的Builder类,
(3) Python编译器生成一个模块,其中为每一个消息类型生成一个静态的描述器,在运行时,和一个 metaclass 一起使用来创建必要的 Python 数据访问类。
(4) 对于 Go,编译器为每个消息类型生成一个 .pb.go 文件。
gRPC服务 在proto文件(api/etcdserverpb/rpc.proto)中,etcd 的 RPC 接口定义根据功能分类到服务中。
其中,处理 etcd 键值的重要服务包括:
KV Service:创建、更新、获取和删除键值对; Watch Service:监视键的更改; Lease Service:实现键值对过期,客户端用来续租、保持心跳; Lock Service:etcd 提供分布式共享锁的支持; Election Service:暴露客户端选举机制; 这些服务统一在api/etcdserverpb/*.proto文件中进行了相应的声明。例如,这里是Range RPC描述:
service KV { // Range gets the keys in the range from the key-value store. rpc Range(RangeRequest) returns (RangeResponse) { option (google.api.http) = { post: “/v3/kv/range” body: “*” }; } …. } 此外,etcd API 的所有响应都有一个附加的响应标头,其中包括响应的群集元数据:
message ResponseHeader { uint64 cluster_id = 1; // 产生响应的集群的 ID uint64 member_id = 2; // 产生响应的成员的 ID int64 revision = 3; // 产生响应时键值存储的修订版本号 uint64 raft_term = 4; //产生响应时,成员的 Raft 称谓 } 这些元数据在实际使用过程中,都提供了如下功能:
(1) 应用服务可以通过 Cluster_ID 和 Member_ID 字段来确保,当前与之通信的正是预期的那个集群或者成员;
(2) 应用服务可以使用修订号字段来知悉当前键值存储库最新的修订号。当应用程序指定历史修订版以进行时程查询并希望在请求时知道最新修订版时,此功能特别有用;
(3) 应用服务可以使用 Raft_Term 来检测集群何时完成一个新的 leader 选举;