Raft原理及其实现 Part 2

日志复制

在 MIT 6.5840 实验中, 客户端通过Start函数向所有Raft节点发送Command。

这符合我们之前的谈到的设计,Leader负责所有的写操作。Leader在得到了集群中多数节点通过后就会Commit这个Command。同样地,节点间的通信也是通过AppendEntries RPC实现的。

Leader逻辑

  1. 接收上层Client的Command,将其附加到自己的日志中。
  2. 向其他节点发送AppendEntries RPC,将这个Command同步给其他节点。
  3. 如果其他Follower节点确认了请求,则说明Follower的日志和Leader的日志一致,Leader维护并更新Follower对应的nextIndex和matchIndex。
    • 此处如果响应了False,则存在三种情况,具体逻辑可以先按住不表:
      • The Follower’s log is shorter than the Leader’s log, the Leader decrements the nextIndex to XLen.
      • The Follower’s log contains an entry with a different term (Xterm) at PrevLogIndex, it sets its nextIndex to the last index of the conflicting term in leader’s logs plus 1.
      • However, if the Leader doesn’t have the entry with the conflicting term, it sets the nextIndex to XIndex.

Follower逻辑

  1. 接收Leader的AppendEntries RPC,更新心跳时间,验证任期Term并在必要时更新任期。(注意改点很重要,同步也是在选举后进行的,此时必须保证接收到的RPC附带的任期和自己的任期一致,排除收到过期消息的可能)
  2. Folllower做日志一致性的检查,检查自己的log[PrevLogIndex]的日志是否和PrevLogTerm(Leader发过来的)一致,此处也会引出如下情况:
    • If the Follower’s log is shorter than the Leader’s log, it returns Success = False, Xterm = -1,and XLen = log length.
    • If the term of the log entry at PrevLogIndex does not match PrevLogTerm, the Follower identifies the conflicting term and the index of the first entry with that term.
  3. Follower需要处理冲突的日志数据,删除冲突日志和其后的所有日志。
  4. Follower会将Leader的日志中自己未同步的部分追加到自己的日志中。
  5. Follower更新自己的commitIndex, 取日志最大的index和Leader的commitIndex的最小值。

具体实现

注意下图中的State machine不是Raft的一部分,但Raft会保证所有节点的State machine都拿到相同的Command序列。

structure

Start函数由客户端调用,写入新Command。如果该节点是Leader,它会将这个Command追加到日志中。

// in raft.go

func (rf *Raft) Start(command interface{}) (int, int, bool) {
    index := -1
    term := -1
    isLeader := true

    // Your code here (2B).
    rf.mu.Lock()
    defer rf.mu.Unlock()
    if rf.state != Leader {
        isLeader = false
    } else {
        entry := LogEntry{Term: rf.currentTerm, Command: command}
        rf.log = append(rf.log, entry)
        index = rf.globalLastLogIndex()
        term = rf.currentTerm
        logger.Printf(rf, Start, "Received command %v,index %v, current log %v", command, index, rf.log)
        rf.persist()
    }
    return index, term, isLeader
}

ApplyWorker()将已经Committed的Command应用到状态机,通过go的channel发送消息。

// in apply.go

func (rf *Raft) ApplyWorker() {
    // Check if there are any new logs to commit
    for !rf.killed() {
        // avoid get lock too frequently, otherwise it will slow down the system
        if rf.commitIndex <= rf.lastApplied {
            time.Sleep(time.Duration(CommitCheckInterval) * time.Millisecond)
            continue
        }
        rf.mu.Lock()
        startLastApplied := rf.lastApplied
        // If commitIndex > lastApplied: increment lastApplied, apply log[lastApplied] to state machine(§5.3)
        for rf.commitIndex > rf.lastApplied {
            if rf.lastApplied != startLastApplied {
                break
            }
            rf.lastApplied += 1
            msg := &ApplyMsg{
                CommandValid: true,
                Command:      rf.log[rf.toLocalIndex(rf.lastApplied)].Command,
                CommandIndex: rf.lastApplied,
            }
            logger.Printf(rf, Apply, "apply log index %v command %v, current log %v", msg.CommandIndex, msg.Command, rf.log)
            rf.mu.Unlock()
            rf.applyCh <- *msg
            rf.mu.Lock()
        }
        rf.mu.Unlock()
    }
}

日志复制

一些关键字段,主要是Leader的视角:

nextIndex[] 和 matchIndex[] 在选举结束后重新初始化:

// a piece of rf.gatherVote() in vote.go

rf.state = Leader
// nextIndex: initialized to leader last log index + 1
// matchIndex: initialized to 0, increases monotonically
for i := 0; i < len(rf.nextIndex); i++ {
    rf.nextIndex[i] = rf.globalLastLogIndex() + 1
    rf.matchIndex[i] = rf.lastIncludedIndex
}
logger.Printf(rf, StateChange, "becomes leader")
rf.mu.Unlock()
go rf.SyncLogHeartbeat()

AppendEntriesArgs

// in appendEntries.go

type AppendEntriesArgs struct {
    Term         int        // leader’s term
    LeaderId     int        // so follower can redirect clients
    PrevLogIndex int        // index of log entry immediately preceding new ones
    PrevLogTerm  int        // term of prevLogIndex entry
    Entries      []LogEntry // log entries to store (empty for heartbeat; may send more than one for efficiency)
    LeaderCommit int        // leader’s commitIndex
}

如果有新的日志需要同步到Follower,Leader将通过心跳机制在 SyncLogHeartbeat() 函数中将新日志发送给Follower。

// in heartbeat.go

func (rf *Raft) SyncLogHeartbeat() {
    for !rf.killed() {
        rf.mu.Lock()
        // if the server is dead or is not the leader, end the loop
        if rf.state != Leader {
            rf.mu.Unlock()
            return
        }

        for serverId := 0; serverId < len(rf.peers); serverId++ {
            if serverId == rf.me {
                continue
            }
            prevLogIndex := rf.nextIndex[serverId] - 1
            args := &AppendEntriesArgs{
                Term:         rf.currentTerm,
                LeaderId:     rf.me,
                PrevLogIndex: prevLogIndex,
                LeaderCommit: rf.commitIndex,
            }
            // codes ...
            if rf.isNewLogToReplicate(serverId) {
                args.PrevLogTerm = rf.log[rf.toLocalIndex(prevLogIndex)].Term
                logger.Printf(rf, Replicate, "sending new log to server %v, args=%v", serverId, args)
                args.Entries = rf.log[rf.toLocalIndex(rf.nextIndex[serverId]):]
                go rf.handleAppendEntries(serverId, args)
            }

        rf.mu.Unlock()

        time.Sleep(time.Duration(HeartbeatTimeout) * time.Millisecond)
    }
}

AppendEntriesReply

// in appendEntries.go

type AppendEntriesReply struct {
   Term    int  // currentTerm, for leader to update itself
   Success bool // true if follower contained entry matching prevLogIndex and prevLogTerm
   XTerm   int  // term of the conflicting entry
   XIndex  int  // index of the first entry with the conflicting term
   XLen    int  // length of raft.log
}

Follower 接收新的日志并在 AppendEntries() 函数中更新他们的日志。 回顾一下论文的要求:

  1. Reply false if term < currentTerm (§5.1)
  2. Reply false if log doesn’t contain an entry at prevLogIndex whose term matches prevLogTerm (§5.3)
  3. If an existing entry conflicts with a new one (same index but different terms), delete the existing entry and all that follow it (§5.3)
  4. Append any new entries not already in the log
  5. If leaderCommit > commitIndex, set commitIndex = min(leaderCommit, index of last new entry)

逻辑总的来说可以概括为,Follower以Leader的日志为准,删除自己和Leader不一样的,并将Leader的日志追加到自己的日志中。问题啥,如果Follower的日志和Leader的日志不一致怎么办,最简单的方法就是逐步回退nextIndex,直到找到和Leader相同的地方,然后Follower从这个位置开始删除冲突数据,并追加Leader的日志。简单来说可以这样实现:

if reply.Term == rf.currentTerm && rf.state == Leader {
    // If AppendEntries fails because of log inconsistency: decrement nextIndex and retry
    rf.nextIndex[serverId] -= 1
    rf.mu.Unlock()
    return
}

实际上,这样做很慢,所以我们需要使用快速回退(Fast backup)。这里引入三个字段:

XTerm   int  // term of the conflicting entry
XIndex  int  // index of the first entry with the conflicting term
XLen    int  // length of raft.log

从Follower角度来看,整体逻辑如下:

// in appendEntries.go

func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) {
    // Your code here (2A, 2B).
    rf.mu.Lock()
    defer rf.mu.Unlock()

    rf.lastHeartBeat = time.Now()

    // 1. Reply false if term < currentTerm (§5.1)
    if args.Term < rf.currentTerm {
        reply.Term = rf.currentTerm
        reply.Success = false
        return
    }

    // If RPC request or response contains term T > currentTerm: set currentTerm = T, convert to follower (§5.1)
    if args.Term > rf.currentTerm {
        rf.currentTerm = args.Term
        rf.votedFor = -1
        rf.state = Follower
        rf.persist()
    }

    reply.Term = rf.currentTerm

    logger.Printf(rf, Replicate, "same term, receive append entries from leader %v, args %v", args.LeaderId, args)
    // 2. Reply false if log doesn’t contain an entry at prevLogIndex whose term matches prevLogTerm (§5.3)
    if args.PrevLogIndex > rf.globalLastLogIndex() {
        reply.XTerm = -1
        // node's log is too short, reply its log length to leader
        reply.XLen = rf.toGlobalIndex(len(rf.log))
        reply.Success = false
        return
    } else if rf.log[rf.toLocalIndex(args.PrevLogIndex)].Term != args.PrevLogTerm {
        reply.XTerm = rf.log[rf.toLocalIndex(args.PrevLogIndex)].Term
        _, reply.XIndex = rf.firstIndexOfTerm(reply.XTerm, rf.toLocalIndex(args.PrevLogIndex))
        reply.Success = false
        return
    }

    // 3. If an existing entry conflicts with a new one (same index but different terms), delete the existing entry and all that follow it (§5.3)
    entriesOffset := rf.toLocalIndex(args.PrevLogIndex) + 1
    entriesIndex := 0
    for logIndex := entriesOffset + entriesIndex; logIndex < len(rf.log) && entriesIndex < len(args.Entries); {
        if rf.log[logIndex].Term != args.Entries[entriesIndex].Term {
            // delete the conflicting entry and all that follow it
            rf.log = rf.log[:logIndex]
            break
        }
        entriesIndex++
        logIndex++
    }

    // 4. Append any new entries not already in the log
    rf.log = append(rf.log, args.Entries[entriesIndex:]...)
    rf.persist()

    reply.Success = true

    // 5. If leaderCommit > commitIndex, set commitIndex = min(leaderCommit, index of last new entry)
    if args.LeaderCommit > rf.commitIndex {
        rf.commitIndex = Min(args.LeaderCommit, rf.globalLastLogIndex())
        logger.Printf(rf, Replicate, "accept log, commitIndex updated to %v, current log %v", rf.commitIndex, rf.log)
    }
    rf.persist()
}

我们逐步解释这个逻辑,首先,我们需要确保请求中PrevLogIndex字段比于Follower的日志最大的索引要大: if args.PrevLogIndex > rf.globalLastLogIndex()

这是Follower的逻辑:

// a piece of rf.AppendEntries() in appendEntries.go

// 2. Reply false if log doesn’t contain an entry at prevLogIndex whose term matches prevLogTerm (§5.3)
if args.PrevLogIndex > rf.globalLastLogIndex() {
    reply.XTerm = -1
    // node's log is too short, reply its log length to leader
    reply.XLen = rf.toGlobalIndex(len(rf.log))
    reply.Success = false
    return
} else if rf.log[rf.toLocalIndex(args.PrevLogIndex)].Term != args.PrevLogTerm {
    reply.XTerm = rf.log[rf.toLocalIndex(args.PrevLogIndex)].Term
    _, reply.XIndex = rf.firstIndexOfTerm(reply.XTerm, rf.toLocalIndex(args.PrevLogIndex))
    reply.Success = false
    return
}

这是Leader的逻辑:

if reply.Term == rf.currentTerm && rf.state == Leader {
    logger.Printf(rf, Replicate, "handle conflict, current log %v,lastInclude Index %v", rf.log, rf.lastIncludedIndex)
    // If AppendEntries fails because of log inconsistency: decrement nextIndex and retry
    if reply.XTerm == -1 {
        rf.nextIndex[serverId] = reply.XLen
    } else {
        found, lastIndex := rf.lastIndexOfTerm(reply.XTerm, rf.toLocalIndex(rf.nextIndex[serverId])-1)
        if found {
            rf.nextIndex[serverId] = rf.toGlobalIndex(lastIndex) + 1
        } else {
            rf.nextIndex[serverId] = reply.XIndex
        }
    }
    rf.mu.Unlock()
    return
}

总结

在Raft中,只有Leader可以接收到Command,然后将其附加到自己的日志中。Leader通过AppendEntries RPC将这个Command同步给其他节点。Follower接收到Leader的日志后,会根据Leader的日志进行日志一致性检查,如果发现不一致,Follower会删除冲突的日志,并将Leader的日志追加到自己的日志中。使用快速回退,Leader会根据Follower的返回结果来更新自己的nextIndex,以便更快地同步日志。和选举类似,当Leader获得了过半节点的确认后,Leader会将这个Command Commit,然后将这个Command应用到State Machine。