面试官:聊聊 etcd 中的 Raft 吧( 五 )

应用程序得到这个Ready之后 , 需要:

  1. 将 HardState, Entries, Snapshot 持久化到 storage 。
  2. 将 Messages 广播给其他节点 。
  3. 将 CommittedEntries(已经 commit 还没有 apply)应用到状态机 。
  4. 如果发现 CommittedEntries 中有成员变更类型的 entry , 调用node.ApplyConfChange()方法让node知道 。
  5. 最后再调用node.Advance()告诉 raft , 这批状态更新处理完了 , 状态已经演进了 , 可以给我下一批 Ready 让我处理 。
Life of a Request前面我们把整个包的结构过了一遍 , 下面来结合具体的代码看看 raft 对一个请求的处理过程是怎样的 。 我一直觉得 , 如果能从代码的层面追踪到一个请求的处理过程 , 那无论是从宏观还是微观的角度 , 对理解整个系统都是非常有帮助的 。
Life of a Vote Request
  1. 首先 , 在node的大循环里 , 有一个会定时输出的tick channel , 它来触发raft.tick()函数 , 根据上面的介绍可知 , 如果当前节点是 follower , 那它的tick函数会指向tickElection 。 tickElection的处理逻辑是给自己发送一个MsgHup的内部消息 , Step函数看到这个消息后会调用campaign函数 , 进入竞选状态 。
// tickElection is run by followers and candidates after r.electionTimeout.func (r *raft) tickElection() {r.electionElapsed++if r.promotable()--tt-darkmode-color: #EF7060;">campaign则会调用becomeCandidate把自己切换到 candidate 模式 , 并递增Term值 。 然后再将自己的Term及日志信息发送给其他的节点 , 请求投票 。 func (r *raft) campaign(t CampaignType) {//...r.becomeCandidate()// Get peer id from progressfor id := range r.prs {//...r.send(pb.Message{Term: term, To: id, Type: voteMsg, Index: r.raftLog.lastIndex(), LogTerm: r.raftLog.lastTerm(), Context: ctx})}}
  1. 另一方面 , 其他节点在接受到这个请求后 , 会首先比较接收到的Term是不是比自己的大 , 以及接受到的日志信息是不是比自己的要新 , 从而决定是否投票 。 这个逻辑我们还是可以从Step函数中找到:
func (r *raft) Step(m pb.Message) error {//...switch m.Type {case pb.MsgVote, pb.MsgPreVote:// We can vote if this is a repeat of a vote we've already cast...canVote := r.Vote == m.From ||// ...we haven't voted and we don't think there's a leader yet in this term...(r.Vote == None--tt-darkmode-color: #EF7060;">node.Propose开始 , Propose方法将这个写请求封装到一个MsgProp消息里面 , 发送给自己处理 。
  • 消息处理函数Step无法直接处理这个消息 , 它会调用那个小写的step函数 , 来根据当前的状态进行处理 。
  • 如果当前是 follower , 那它会把这个消息转发给 leader 。
  • func stepFollower(r *raft, m pb.Message) error {switch m.Type {case pb.MsgProp://...m.To = r.leadr.send(m)}}
    1. Leader 收到这个消息后(不管是 follower 转发过来的还是自己内部产生的)会有两步操作:
      1. 将这个消息添加到自己的 log 里
      2. 向其他 follower 广播这个消息
    func stepLeader(r *raft, m pb.Message) error {switch m.Type {case pb.MsgProp://...if !r.appendEntry(m.Entries...) {return ErrProposalDropped}r.bcastAppend()return nil}}
    1. 在 follower 接受完这个 log 后 , 会返回一个MsgAppResp消息 。
    2. 当 leader 确认已经有足够多的 follower 接受了这个 log 后 , 它首先会 commit 这个 log , 然后再广播一次 , 告诉别人它的 commit 状态 。 这里的实现就有点像两阶段提交了 。


      推荐阅读