Raft原理及其实现 Part 1

Raft是一个共识算法。本文主要参考论文In Search of an Understandable Consensus Algorithm以及MIT 6.824实验。在实现上,也主要参考了ToniXWD Github的实验,并做了一些小改动,使得代码拆分更加合理,逻辑更加清晰。如果有时间,我更推荐听一听MIT的录课Youtube 6.824

基本结构

Raft的最终实现,是让所有节点都拥有了以相同顺序写入的日志。实际中Raft要怎么应用呢?观察下图,我们有一个KV数据库,理解成Redis问题也不大。客户端接触到的是KV数据库,可以进行一些读写操作。在实际完成写操作之前,KV数据库会将这个Command传递给下层的Raft层,而Raft层就会记录这个操作Operation,然后将这个Operation同步给其他节点,得到过半确认后再通知KV数据库,KV数据库就可以通知用户操作已经完成。

这样的设计有什么好处,从KV数据库的角度来看,它不需要关心Raft具体是怎么实现的,对它来说只有发送指令,等待指令完成回调这两步。从Raft角度来看,也无需关注业务,只需要关注自己的同步算法即可,而上层也不一定要是KV数据库,能拓展出更多的用法。

layer.png

Raft实现中主要有4个内容,选举,同步,持久化和快照。Raft是有Leader的,所有请求都必须要有Leader进行处理,并同步给其他节点。持久化是为了不丢数据,快照则是为了压缩日志。

状态转换

status_convert.png

基本算法

algorithm.png

快照

snapshot.png

选举

心跳 Heartbeats

领导者选举 Leader Election

  1. 选举时机
    • 当未从 Leader 接收心跳且心跳超时时,Follower 转换为 Candidate 并启动选举。
    • Follower 给自己投票并增加其任期term
  2. 投票机制
    • 每个节点(包括Follower、Candidate或Leader)在接收到 RequestVote RPC 请求时,都会检查该请求是否符合 Raft 论文中图 2 所述的条件,简单来说就是看日志和任期够不够新。
    • 如果条件满足,节点将投票给请求的节点。
    • 节点在同一任期不会重复投票,除非它收到一个更高任期的 RequestVote 请求。
  3. Leader选举
    • 一旦Candidate获得集群大多数选票,它就成为新的Leader。
    • 新Leader通过向所有其他节点发送带有空条目的 AppendEntries RPCs(作为心跳)来宣布其领导地位。
    • 其他节点接收心跳更新后相应地更新其状态,例如退回到Follower。
  4. 处理选举超时
    • 如果没有在超时期间选出Leader,选举过程将重新开始。
    • 为降低同时选举导致重复失败的可能性,每个节点应具有随机化的选举超时时间。

Raft 结构

type Raft struct {
    state         State      // current state of the server
    currentTerm   int        // latest term server has seen
    votedFor      int        // candidateId that received vote in current term
    lastHeartBeat time.Time  // last time server received a heartbeat
}
go

election.png

如图所示,ticker循环进行检查是否需要启动选举,一旦启动选举,则开始收集选票,成为Leader后发送心跳包。

代码实现

当 Raft 节点启动时,其状态被初始化为 Follower,并创建一个 goroutine 来持续检查是否需要触发新的选举,也就是下一个term。

// in raft.go

func Make(peers []*labrpc.ClientEnd, me int, persister *Persister, applyCh chan ApplyMsg) *Raft {
    rf := &Raft{}
    rf.state = Follower
    go rf.ticker()
    return rf
}
go

ticker 函数负责检查心跳超时并在必要时触发选举。

// in raft.go

func (rf *Raft) ticker() {
    for rf.killed() == false {
        electionTimeout := GenRandomElectionTimeout()
        rf.mu.Lock()
        if rf.state != Leader && time.Since(rf.lastHeartBeat) > electionTimeout {
            go rf.startElection()
        }
        rf.mu.Unlock()
        ms := TickerBaseInterval + (rand.Int() % TickerRandomInterval)
        time.Sleep(time.Duration(ms) * time.Millisecond)
    }
}
go

在 startElection 函数中,Candidate增加其任期term,为自己投票,并向所有其他服务器发送 RequestVote RPC 请求。

// in raft.go

func (rf *Raft) startElection() {
    rf.mu.Lock()

    rf.currentTerm += 1
    rf.state = Candidate
    rf.votedFor = rf.me
    rf.lastHeartBeat = time.Now()
    rf.persist()

    args := &RequestVoteArgs{
        Term:         rf.currentTerm,
        CandidateId:  rf.me,
        LastLogIndex: rf.globalLastLogIndex(),
        LastLogTerm:  rf.lastLogTerm(),
    }
    rf.mu.Unlock()

    var (
        voteCount = 1
        muVote    sync.Mutex
    )

    for serverId := 0; serverId < len(rf.peers); serverId++ {
        if serverId == rf.me {
            continue
        }
        go rf.gatherVote(serverId, args, &voteCount, &muVote)
    }
}
go

gatherVote 函数将从其他节点收集选票并检查Candidate是否获得了多数选票。如果是,Candidate将成为Leader并开始发送心跳。

// in vote.go

func (rf *Raft) gatherVote(serverId int, args *RequestVoteArgs, voteCount *int, muVote *sync.Mutex) {
    voteGranted := rf.handleVoteReply(serverId, args)
    if !voteGranted {
        return
    }
    muVote.Lock()
    if rf.isMajorityAgree(*voteCount) {
        muVote.Unlock()
        return
    }

    *voteCount += 1
    if rf.isMajorityAgree(*voteCount) {
        rf.mu.Lock()
        if rf.state == Follower {
            rf.mu.Unlock()
            muVote.Unlock()
            return
        } else {
            rf.state = Leader
            logger.Printf(rf, StateChange, "becomes leader")
            rf.mu.Unlock()
            go rf.SyncLogHeartbeat()
        }
    }

    muVote.Unlock()
}
go

RequestVote 函数检查 Raft 论文中列出的条件,并在条件满足时进行投票。

// in vote.go

func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) {
    // Your code here (2A, 2B).
    rf.mu.Lock()

    // 1. Reply false if term < currentTerm (§5.1)
    if args.Term < rf.currentTerm {
        reply.Term = rf.currentTerm
        rf.mu.Unlock()
        reply.VoteGranted = false
        return
    }

    // If RPC request or response contains term T > currentTerm: set currentTerm = T, convert to follower (§5.1)
    if args.Term > rf.currentTerm {
        rf.votedFor = -1
        rf.currentTerm = args.Term
        rf.state = Follower
        rf.persist()
    }

    // 2. If votedFor is null or candidateId, and candidate’s log is least as up-to-date as receiver’s log, grant vote (§5.2, §5.4)
    if rf.votedFor == -1 || rf.votedFor == args.CandidateId {
        if rf.isCandidateLogUpToDate(args) {
            rf.currentTerm = args.Term
            reply.Term = rf.currentTerm
            rf.votedFor = args.CandidateId
            rf.state = Follower
            rf.lastHeartBeat = time.Now()

            rf.persist()
            rf.mu.Unlock()
            reply.VoteGranted = true
            return
        }
    }

    reply.Term = rf.currentTerm
    rf.mu.Unlock()
    reply.VoteGranted = false
}
go

论文5.4.1详细提到了条件:

Raft determines which of two logs is more up-to-date by comparing the index and term of the last entries in the logs.
If the logs have last entries with different terms, then the log with the later term is more up-to-date.
If the logs end with the same term, then whichever log is longer is more up-to-date.

func (rf *Raft) isCandidateLogUpToDate(args *RequestVoteArgs) bool {
    if args.LastLogTerm > rf.lastLogTerm() {
        return true
    }
    return args.LastLogTerm == rf.lastLogTerm() && args.LastLogIndex >= rf.globalLastLogIndex()
}
go

这个条件至关重要,因为整个系统中需要保证日志的一致性,日志都是以Leader的日志为准的,如果选错了Leader,那么意味着同步出来的日志也是错误的。

发送心跳

如果Leader节点状态已更改,它将停止发送心跳。

// in heartbeat.go

func (rf *Raft) SyncLogHeartbeat() {
    for !rf.killed() {
        rf.mu.Lock()
        // if the server is dead or is not the leader, end the loop
        if rf.state != Leader {
            rf.mu.Unlock()
            return
        }

        for serverId := 0; serverId < len(rf.peers); serverId++ {
            if serverId == rf.me {
                continue
            }
            prevLogIndex := rf.nextIndex[serverId] - 1
            args := &AppendEntriesArgs{
                Term:         rf.currentTerm,
                LeaderId:     rf.me,
                PrevLogIndex: prevLogIndex,
                LeaderCommit: rf.commitIndex,
            }
            // codes ...
            args.PrevLogTerm = rf.log[rf.toLocalIndex(prevLogIndex)].Term
            args.Entries = []LogEntry{}
            go rf.handleAppendEntries(serverId, args)
        }

        rf.mu.Unlock()

        time.Sleep(time.Duration(HeartbeatTimeout) * time.Millisecond)
    }
}
go