Etcd源码raft设计

2018/05/05 etcd

Raft设计 Raft 概述 随着分布式技术的不断发展,单点故障场景下系统异常时有发生。为了解决单点问题,软件系统工程师引入了数据复制技术,实现多副本。而多副本间的数据复制就会出现一致性问题。所以需要共识算法来解决该问题。

共识算法(consensus algorithm)的祖师爷是 Paxos, 但是由于它过于复杂,难于理解,工程实践上也较难落地,导致在工程界落地较慢。 Raft 算法作为一种共识算法,正是为了可理解性、易实现而诞生的。

raft 会先选举出 leader,leader 完全负责 replicated log 的管理。leader 负责接受所有客户端更新请求,然后复制到 follower 节点,并在“安全”的时候执行这些请求。如果 leader 故障,followes 会重新选举出新的 leader。

通过 leader,raft 将一致性问题分解成三个相当独立的子问题:

Leader Election:当集群启动或者 leader 失效时必须选出一个新的leader。 Log Replication:leader 必须接收客户端提交的日志,并将其复制到集群中的其他节点,强制其他节点的日志与 leader 一样。 Safety:最关键的安全点就状态机安全属性(State Machine Safety Property)。如果任何一个 server 已经在它的状态机apply了一条日志,其他的 server 不可能在相同的 index 处 apply 其他不同的日志条目。在Log Replication的实现中有具体体现,就不单独赘述。 后面将会详细的讲述具体如何实现。

领导选举 在 RAFT 协议实现的代码中,node[raft/node.go] 是其核心的实现,也是整个分布式算法的核心所在。首先在 Raft 协议中它定义了集群中的如下节点状态,任何时刻,每个节点肯定处于其中一个状态:

Follower,跟随者, 同步从 Leader 收到的日志,etcd 启动的时候默认为此状态; Candidate,竞选者,可以发起 Leader 选举; Leader,集群领导者, 唯一性,拥有同步日志的特权,需定时广播心跳给 Follower 节点,以维持领导者身份。 Raft 协议将时间划分成一个个任期(Term),任期用连续的整数表示,每个任期从一次选举开始,赢得选举的节点在该任期内充当 Leader 的职责,随着时间的消逝,集群可能会发生新的选举,任期号也会单调递增。通过任期号,可以比较各个节点的数据新旧、识别过期的 Leader 等,它在 Raft 算法中充当逻辑时钟,发挥着重要作用。

另外,通过 raftNode[etcdserver/raft.go] 对 node 进一步封装,只对 EtcdServer 暴露了 startNode()、start()、apply()、processMessages() 等少数几个接口。就如3.4.3处理流程提到的,其中核心部分是通过 start() 方法启动的一个协程,这里会等待从 readyc 通道上报的数据。

// etcdserver/raft.go // start prepares and starts raftNode in a new goroutine. It is no longer safe // to modify the fields after it has been started. func (r *raftNode) start(rh *raftReadyHandler) { internalTimeout := time.Second ​ go func() { defer r.onStop() islead := false ​ for { select { case <-r.ticker.C: r.tick() case rd := <-r.Ready(): if rd.SoftState != nil { //申请Leader newLeader := rd.SoftState.Lead != raft.None && rh.getLead() != rd.SoftState.Lead if newLeader { leaderChanges.Inc() } ​ if rd.SoftState.Lead == raft.None { hasLeader.Set(0) } else { hasLeader.Set(1) } ​ rh.updateLead(rd.SoftState.Lead) islead = rd.RaftState == raft.StateLeader if islead { isLeader.Set(1) } else { isLeader.Set(0) } rh.updateLeadership(newLeader) r.td.Reset() } ​ if len(rd.ReadStates) != 0 { select { case r.readStateC <- rd.ReadStates[len(rd.ReadStates)-1]: case <-time.After(internalTimeout): r.lg.Warn(“timed out sending read state”, zap.Duration(“timeout”, internalTimeout)) case <-r.stopped: return } } ​ notifyc := make(chan struct{}, 1) ap := toApply{ entries: rd.CommittedEntries, snapshot: rd.Snapshot, notifyc: notifyc, } ​ updateCommittedIndex(&ap, rh) ​ select { case r.applyc <- ap: case <-r.stopped: return } ​ // the leader can write to its disk in parallel with replicating to the followers and them // writing to their disks. // For more details, check raft thesis 10.2.1 if islead { // gofail: var raftBeforeLeaderSend struct{} r.transport.Send(r.processMessages(rd.Messages)) } ​ // Must save the snapshot file and WAL snapshot entry before saving any other entries or hardstate to // ensure that recovery after a snapshot restore is possible. if !raft.IsEmptySnap(rd.Snapshot) { // gofail: var raftBeforeSaveSnap struct{} if err := r.storage.SaveSnap(rd.Snapshot); err != nil { r.lg.Fatal(“failed to save Raft snapshot”, zap.Error(err)) } // gofail: var raftAfterSaveSnap struct{} } ​ // gofail: var raftBeforeSave struct{} if err := r.storage.Save(rd.HardState, rd.Entries); err != nil { r.lg.Fatal(“failed to save Raft hard state and entries”, zap.Error(err)) } if !raft.IsEmptyHardState(rd.HardState) { proposalsCommitted.Set(float64(rd.HardState.Commit)) } // gofail: var raftAfterSave struct{} ​ if !raft.IsEmptySnap(rd.Snapshot) { // Force WAL to fsync its hard state before Release() releases // old data from the WAL. Otherwise could get an error like: // panic: tocommit(107) is out of range [lastIndex(84)]. Was the raft log corrupted, truncated, or lost? // See https://github.com/etcd-io/etcd/issues/10219 for more details. if err := r.storage.Sync(); err != nil { r.lg.Fatal(“failed to sync Raft snapshot”, zap.Error(err)) } ​ // etcdserver now claim the snapshot has been persisted onto the disk notifyc <- struct{}{} ​ // gofail: var raftBeforeApplySnap struct{} r.raftStorage.ApplySnapshot(rd.Snapshot) r.lg.Info(“applied incoming Raft snapshot”, zap.Uint64(“snapshot-index”, rd.Snapshot.Metadata.Index)) // gofail: var raftAfterApplySnap struct{} ​ if err := r.storage.Release(rd.Snapshot); err != nil { r.lg.Fatal(“failed to release Raft wal”, zap.Error(err)) } // gofail: var raftAfterWALRelease struct{} } ​ r.raftStorage.Append(rd.Entries) ​ if !islead { // finish processing incoming messages before we signal raftdone chan msgs := r.processMessages(rd.Messages) ​ // now unblocks ‘applyAll’ that waits on Raft log disk writes before triggering snapshots notifyc <- struct{}{} ​ // Candidate or follower needs to wait for all pending configuration // changes to be applied before sending messages. // Otherwise we might incorrectly count votes (e.g. votes from removed members). // Also slow machine’s follower raft-layer could proceed to become the leader // on its own single-node cluster, before toApply-layer applies the config change. // We simply wait for ALL pending entries to be applied for now. // We might improve this later on if it causes unnecessary long blocking issues. waitApply := false for _, ent := range rd.CommittedEntries { if ent.Type == raftpb.EntryConfChange { waitApply = true break } } if waitApply { // blocks until ‘applyAll’ calls ‘applyWait.Trigger’ // to be in sync with scheduled config-change job // (assume notifyc has cap of 1) select { case notifyc <- struct{}{}: case <-r.stopped: return } } ​ // gofail: var raftBeforeFollowerSend struct{} r.transport.Send(msgs) } else { // leader already processed ‘MsgSnap’ and signaled notifyc <- struct{}{} } ​ r.Advance() case <-r.stopped: return } } }() } 接下来,我们再回过头来看一下后台启动程序node.run(),具体代码如下:

// raft/node.go func (n *node) run() { … for { … select { case pm := <-propc: … r.Step(m) case m := <-n.recvc: … r.Step(m) case cc := <-n.confc: … case <-n.tickc: n.rn.Tick() case readyc <- rd: n.rn.acceptReady(rd) advancec = n.advancec case <-advancec: n.rn.Advance(rd) rd = Ready{} advancec = nil case c := <-n.status: c <- getStatus(r) case <-n.stop: close(n.done) return } } } ​ 主要是通过for-select-channel监听channel信息,来处理不同的请求。下面来看下几个主要的channel信息

propc和recvc中拿到的是从上层应用传进来的消息,这个消息会被交给raft层的Step函数处理。Step是etcd-raft模块负责各类信息的入口,其中default后面的step,被实现为一个状态机,它的step属性是一个函数指针,根据当前节点的不同角色,指向不同的消息处理函数:stepLeader/stepFollower/stepCandidate。与它类似的还有一个tick函数指针,根据角色的不同,也会在tickHeartbeat和tickElection之间来回切换,分别用来触发定时心跳和选举检测。

func (r *raft) Step(m pb.Message) error { //… switch m.Type { case pb.MsgHup: //… case pb.MsgVote, pb.MsgPreVote: //… default: r.step(r, m) } } (1) 作为leader

当一个节点成为leader的时候,会将节点的定时器设置为tickHeartbeat,然后周期性的调用,维持leader的地位。

// raft/raft.go func (r *raft) becomeLeader() { // 检测当 前节点的状态,禁止从 follower 状态切换成 leader 状态 if r.state == StateFollower { panic(“invalid transition [follower -> leader]”) } // 将step 字段设置成 stepLeader r.step = stepLeader r.reset(r.Term) // 设置心跳的函数 r.tick = r.tickHeartbeat // 设置lead的id值 r.lead = r.id // 更新当前的角色 r.state = StateLeader … } // raft/raft.go func (r *raft) tickHeartbeat() { // 递增心跳计数器 r.heartbeatElapsed++ // 递增选举计数器 r.electionElapsed++ … ​ if r.electionElapsed >= r.electionTimeout { r.electionElapsed = 0 // 检测当前节点时候大多数节点保持连通 if r.checkQuorum { r.Step(pb.Message{From: r.id, Type: pb.MsgCheckQuorum}) } // If current leader cannot transfer leadership in electionTimeout, it becomes leader again. if r.state == StateLeader && r.leadTransferee != None { r.abortLeaderTransfer() } } ​ if r.heartbeatElapsed >= r.heartbeatTimeout { r.heartbeatElapsed = 0 r.Step(pb.Message{From: r.id, Type: pb.MsgBeat}) } } becomeLeader中的step被设置成stepLeader,所以将会调用stepLeader来处理leader中对应的消息,并通过调用raft.bcastHeartbeat()向所有的节点发送心跳。

func stepLeader(r *raft, m pb.Message) error { // These message types do not require any progress for m.From. switch m.Type { case pb.MsgBeat: // 向所有节点发送心跳 r.bcastHeartbeat() return nil case pb.MsgCheckQuorum: // 检测是否和大部分节点保持连通 // 如果不连通切换到follower状态 if !r.prs.QuorumActive() { r.logger.Warningf(“%x stepped down to follower since quorum is not active”, r.id) r.becomeFollower(r.Term, None) } return nil … } } ​ // bcastHeartbeat sends RPC, without entries to all the peers. func (r *raft) bcastHeartbeat() { lastCtx := r.readOnly.lastPendingRequestCtx() // 这两个函数最终都将调用sendHeartbeat if len(lastCtx) == 0 { r.bcastHeartbeatWithCtx(nil) } else { r.bcastHeartbeatWithCtx([]byte(lastCtx)) } } ​ // 向指定的节点发送信息 func (r *raft) sendHeartbeat(to uint64, ctx []byte) { commit := min(r.prs.Progress[to].Match, r.raftLog.committed) m := pb.Message{ To: to, // 发送MsgHeartbeat类型的数据 Type: pb.MsgHeartbeat, Commit: commit, Context: ctx, } ​ r.send(m) } 最终的心跳通过MsgHeartbeat的消息类型进行发送,通知它们目前Leader的存活状态,重置所有Follower持有的超时计时器

(2) 作为follower

在step函数中将实现如下功能:接收到来自leader的RPC消息MsgHeartbeat,然后重置当前节点的选举超时时间,回复leader自己的存活。

func stepFollower(r *raft, m pb.Message) error { switch m.Type { case pb.MsgProp: … case pb.MsgHeartbeat: r.electionElapsed = 0 r.lead = m.From r.handleHeartbeat(m) … } return nil } ​ func (r *raft) handleHeartbeat(m pb.Message) { r.raftLog.commitTo(m.Commit) r.send(pb.Message{To: m.From, Type: pb.MsgHeartbeatResp, Context: m.Context}) } (3) 作为Candidate

candidate来处理MsgHeartbeat的信息,是先把自己变成follower,然后和上面的follower一样,回复leader自己的存活。

func stepCandidate(r *raft, m pb.Message) error { … switch m.Type { … case pb.MsgHeartbeat: r.becomeFollower(m.Term, m.From) // always m.Term == r.Term r.handleHeartbeat(m) } … return nil } ​ func (r *raft) handleHeartbeat(m pb.Message) { r.raftLog.commitTo(m.Commit) r.send(pb.Message{To: m.From, Type: pb.MsgHeartbeatResp, Context: m.Context}) } 当leader收到返回的信息的时候,会将对应的节点设置为RecentActive,表示该节点目前存活。

func stepLeader(r *raft, m pb.Message) error { … // 根据from,取出当前的follower的Progress pr := r.prs.Progress[m.From] if pr == nil { r.logger.Debugf(“%x no progress available for %x”, r.id, m.From) return nil } switch m.Type { case pb.MsgHeartbeatResp: pr.RecentActive = true … } return nil } 当 Follower 节点接收 Leader 节点心跳消息超时后,它会转变成 Candidate 节点,并可发起新一轮的竞选 Leader 投票,若获得集群多数节点的支持后,它就可转变成 Leader 节点。

在Step函数中,我们可以看到MsgHup这个消息后会调用campaign函数,进入竞选状态。

// raft/raft.go func (r *raft) Step(m pb.Message) error { //… switch m.Type { case pb.MsgHup: if r.preVote { r.hup(campaignPreElection) } else { r.hup(campaignElection) } } } ​ func (r *raft) hup(t CampaignType) { … r.campaign(t) } ​ func (r *raft) campaign(t CampaignType) { … if t == campaignPreElection { r.becomePreCandidate() voteMsg = pb.MsgPreVote // PreVote RPCs are sent for the next term before we’ve incremented r.Term. term = r.Term + 1 } else { // 切换到Candidate状态 r.becomeCandidate() voteMsg = pb.MsgVote term = r.Term } // 统计当前节点收到的选票 并统计其得票数是否超过半数,这次检测主要是为单节点设置的 // 判断是否是单节点 if _, _, res := r.poll(r.id, voteRespMsgType(voteMsg), true); res == quorum.VoteWon { if t == campaignPreElection { r.campaign(campaignElection) } else { // 是单节点直接,变成leader r.becomeLeader() } return } … // 向集群中的所有节点发送信息,请求投票 for _, id := range ids { // 跳过自身的节点 if id == r.id { continue } r.logger.Infof(“%x [logterm: %d, index: %d] sent %s request to %x at term %d”, r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), voteMsg, id, r.Term) ​ var ctx []byte if t == campaignTransfer { ctx = []byte(t) } r.send(pb.Message{Term: term, To: id, Type: voteMsg, Index: r.raftLog.lastIndex(), LogTerm: r.raftLog.lastTerm(), Context: ctx}) } } 竞选者切换到campaign状态,然后将自己的term信息发送出去,请求投票。这里,我们能看到对于Candidate会有一个PreCandidate,reCandidate这个状态的作用的是:

当系统曾出现分区,分区消失后恢复的时候,可能会造成某个被split的Follower的Term数值很大,对服务器进行分区时,它将不会收到heartbeat包,每次electionTimeout后成为Candidate都会递增Term,当服务器在一段时间后恢复连接时,Term的值将会变得很大,然后引入的重新选举会导致临时的延迟与可用性问题。而PreElection阶段并不会真正增加当前节点的Term,它的主要作用是得到当前集群能否成功选举出一个Leader的答案,避免上面这种情况的发生。

我们接着Candidate的状态来分析:对于能够投票的成员需要满足下面条件:

1、当前节点没有给任何节点投票或投票的节点term大于本节点的或者是之前已经投票的节点;

2、该节点的消息是最新的;

func (r *raft) Step(m pb.Message) error { … switch m.Type { case pb.MsgVote, pb.MsgPreVote: // We can vote if this is a repeat of a vote we’ve already cast… canVote := r.Vote == m.From || // …we haven’t voted and we don’t think there’s a leader yet in this term… (r.Vote == None && r.lead == None) || // …or this is a PreVote for a future term… (m.Type == pb.MsgPreVote && m.Term > r.Term) // …and we believe the candidate is up to date. if canVote && r.raftLog.isUpToDate(m.Index, m.LogTerm) { // 如果当前没有给任何节点投票(r.Vote == None)或者投票的节点term大于本节点的(m.Term > r.Term) // 或者是之前已经投票的节点(r.Vote == m.From) // 同时还满足该节点的消息是最新的(r.raftLog.isUpToDate(m.Index, m.LogTerm)),那么就接收这个节点的投票 r.logger.Infof(“%x [logterm: %d, index: %d, vote: %x] cast %s for %x [logterm: %d, index: %d] at term %d”, r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.Type, m.From, m.LogTerm, m.Index, r.Term) r.send(pb.Message{To: m.From, Term: m.Term, Type: voteRespMsgType(m.Type)}) if m.Type == pb.MsgVote { // 保存下来给哪个节点投票了 r.electionElapsed = 0 r.Vote = m.From } } else { r.logger.Infof(“%x [logterm: %d, index: %d, vote: %x] rejected %s from %x [logterm: %d, index: %d] at term %d”, r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.Type, m.From, m.LogTerm, m.Index, r.Term) r.send(pb.Message{To: m.From, Term: r.Term, Type: voteRespMsgType(m.Type), Reject: true}) } ​ … } return nil } 然后,candidate节点接收到投票的信息,并统计投票的数量:

1、如果投票数大于节点数的一半的预置,成为leader;

2、如果达不到设定预置,变成follower;

func stepCandidate(r *raft, m pb.Message) error { // Only handle vote responses corresponding to our candidacy (while in // StateCandidate, we may get stale MsgPreVoteResp messages in this term from // our pre-candidate state). var myVoteRespType pb.MessageType if r.state == StatePreCandidate { myVoteRespType = pb.MsgPreVoteResp } else { myVoteRespType = pb.MsgVoteResp } switch m.Type { case myVoteRespType: // 计算当前集群中有多少节点给自己投了票 gr, rj, res := r.poll(m.From, m.Type, !m.Reject) r.logger.Infof(“%x has received %d %s votes and %d vote rejections”, r.id, gr, m.Type, rj) switch res { // 大多数投票了 case quorum.VoteWon: if r.state == StatePreCandidate { r.campaign(campaignElection) } else { // 如果进行投票的节点数量正好是半数以上节点数量 r.becomeLeader() // 向集群中其他节点广 MsgApp 消息 r.bcastAppend() } // 票数不够 case quorum.VoteLost: // pb.MsgPreVoteResp contains future term of pre-candidate // m.Term > r.Term; reuse r.Term // 切换到follower r.becomeFollower(r.Term, None) } … } return nil } 每当收到一个MsgVoteResp类型的消息时,就会设置当前节点持有的votes数组,更新其中存储的节点投票状态,如果收到大多数的节点票数,切换成leader,向其他的节点发送当前节点当选的消息,通知其余节点更新Raft结构体中的Term等信息。

日志复制 在etcd中,所有数据的修改在提交前,都要先写入到WAL中。WAL(Write Ahead Log)是数据库中保证数据持久化的常用技术,即每次真正操作数据之前,先往磁盘上追加一条日志,由于日志是追加的,也就是顺序写,而不是随机写,所以写入性能还是很高的。这样做的目的是,记录了整个数据变化的全部历程

并且使用WAL进行数据的存储使得etcd拥有两个重要功能:

(1) 故障快速恢复: 当你的数据遭到破坏时,就可以通过执行所有WAL中记录的修改操作,快速从最原始的数据恢复到数据损坏前的状态。

(2) 数据回滚(undo)/重做(redo):因为所有的修改操作都被记录在WAL中,需要回滚或重做,只需要反向或正向执行日志中的操作即可。

在介绍Raft实现之前,我们先来简单概述一下WAL的基础使用。WAL对象定义如下:

// server/storage/wal/wal.go // WAL这个抽象的结构体是由一堆的文件组成的 // 每个WAL文件的头部有一部分数据,是metadata,使用 w.Save 保存数据 // 使用完成之后,使用 w.Close 关闭 // WAL中的每一条记录,都有一个循环冗余校验码(CRC) // WAL是只能打开来用于读,或者写,但是不能既读又写 type WAL struct { lg zap.Logger ​ dir string // the living directory of the underlay files ​ // dirFile is a fd for the wal directory for syncing on Rename dirFile *os.File ​ metadata []byte // metadata recorded at the head of each WAL state raftpb.HardState // hardstate recorded at the head of WAL ​ start walpb.Snapshot // snapshot to start reading decoder *decoder // decoder to decode records readClose func() error // closer for decode reader ​ unsafeNoSync bool // if set, do not fsync ​ mu sync.Mutex enti uint64 // index of the last entry saved to the wal encoder *encoder // encoder to encode records ​ locks []fileutil.LockedFile // the locked files the WAL holds (the name is increasing) fp *figolePipeline } 在Save函数中,就是写入一条记录,然后调用 w.sync,而 w.sync 做的事情就是:调用了 fileutil.Fdatasync,而 fileutil.Fdatasync 就是调用了 fsync 这个系统调用保证数据会被写到磁盘。而快照也是类似的,写入一条记录,然后同步。

回归正题,在 RAFT 协议中,整个集群所有变更都必须通过 Leader 发起,如下其入口为 node.Propose (),而日志的保存总体流程如下:

(1) 集群某个节点收到client的put请求要求修改数据。节点会生成一个Type为MsgProp的Message,发送给leader。

// 生成MsgProp消息 func (n *node) Propose(ctx context.Context, data []byte) error { return n.stepWait(ctx, pb.Message{Type: pb.MsgProp, Entries: []pb.Entry}) } ​ func stepFollower(r *raft, m pb.Message) error { switch m.Type { case pb.MsgProp: if r.lead == None { r.logger.Infof(“%x no leader at term %d; dropping proposal”, r.id, r.Term) return ErrProposalDropped } else if r.disableProposalForwarding { r.logger.Infof(“%x not forwarding to leader %x at term %d; dropping proposal”, r.id, r.lead, r.Term) return ErrProposalDropped } // 设置发送的目标为leader // 将信息发送给leader m.To = r.lead r.send(m) } return nil } (2) leader收到Message以后,会处理Message中的日志条目,将其append到raftLog的unstable的日志中,并且调用bcastAppend()广播append日志的消息。

func stepLeader(r *raft, m pb.Message) error { // These message types do not require any progress for m.From. switch m.Type { … case pb.MsgProp: … // 将Entry记录追加到当前节点的raftlog中 if !r.appendEntry(m.Entries…) { return ErrProposalDropped } // 向其他节点复制Entry记录 r.bcastAppend() return nil … } return nil } ​ func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool { pr := r.prs.Progress[to] if pr.IsPaused() { return false } m := pb.Message{} m.To = to … m.Type = pb.MsgApp m.Index = pr.Next - 1 m.LogTerm = term m.Entries = ents m.Commit = r.raftLog.committed if n := len(m.Entries); n != 0 { switch pr.State { // optimistically increase the next when in StateReplicate case tracker.StateReplicate: last := m.Entries[n-1].Index pr.OptimisticUpdate(last) pr.Inflights.Add(last) case tracker.StateProbe: pr.ProbeSent = true default: r.logger.Panicf(“%x is sending append in unhandled state %s”, r.id, pr.State) } } r.send(m) return true } (3) leader中的消息最终会以MsgApp类型的消息通知follower,follower收到这些信息之后,同leader一样,先将缓存中的日志条目持久化到磁盘中并将当前已经持久化的最新日志index返回给leader。

func stepFollower(r *raft, m pb.Message) error { switch m.Type { case pb.MsgApp: r.electionElapsed = 0 r.lead = m.From r.handleAppendEntries(m) } return nil } ​ func (r *raft) handleAppendEntries(m pb.Message) { …. if mlastIndex, ok := r.raftLog.maybeAppend(m.Index, m.LogTerm, m.Commit, m.Entries…); ok { r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: mlastIndex}) } … } ​ // maybeAppend returns (0, false) if the entries cannot be appended. Otherwise, // it returns (last index of new entries, true). func (l *raftLog) maybeAppend(index, logTerm, committed uint64, ents …pb.Entry) (lastnewi uint64, ok bool) { … l.commitTo(min(committed, lastnewi)) … return 0, false } ​ func (l *raftLog) commitTo(tocommit uint64) { // never decrease commit if l.committed < tocommit { if l.lastIndex() < tocommit { l.logger.Panicf(“tocommit(%d) is out of range [lastIndex(%d)]. Was the raft log corrupted, truncated, or lost?”, tocommit, l.lastIndex()) } l.committed = tocommit } } (4) 最后leader收到大多数的follower的确认,commit自己的log,同时再次广播通知follower自己已经提交了。

func stepLeader(r *raft, m pb.Message) error { // These message types do not require any progress for m.From. switch m.Type { … case pb.MsgAppResp: pr.RecentActive = true if r.maybeCommit() { releasePendingReadIndexMessages(r) // 如果可以commit日志,那么广播append消息 r.bcastAppend() } else if oldPaused { // 如果该节点之前状态是暂停,继续发送append消息给它 r.sendAppend(m.From) } … } return nil } ​ // 尝试提交索引,如果已经提交返回true // 然后应该调用bcastAppend通知所有的follower func (r *raft) maybeCommit() bool { mci := r.prs.Committed() return r.raftLog.maybeCommit(mci, r.Term) } ​ // 提交修改committed就可以了 func (l *raftLog) commitTo(tocommit uint64) { // never decrease commit if l.committed < tocommit { if l.lastIndex() < tocommit { l.logger.Panicf(“tocommit(%d) is out of range [lastIndex(%d)]. Was the raft log corrupted, truncated, or lost?”, tocommit, l.lastIndex()) } l.committed = tocommit } } 总结与感悟 综上所述,etcd的源码分析分析在此告一段落,总的来说本次的源码分析过程对于我来说受益匪浅,通过现象看本质,从刚开始的功能介绍及应用场景为切入点,主要是回答了是什么和怎么用的话题,后续从组成模型及源码介绍出发,深入浅出的回答了为什么的话题。

每一个云原生开源项目的出现,都针对实实在在的问题与痛点,就如天上飞的理念,落地的实现,接口与实现类的关系。就我个人而言,起初从简单理解Raft算法过程,到如今慢慢理解Raft算法如何在真实场景的应用。虽然阅读源码很难,但我自己也渐渐积累了源码阅读的方法论,总的来说有提升也有所获。

Search

    微信好友

    博士的沙漏

    Table of Contents