type AppendEntriesArgs struct { Term int// leader term LeaderID int// so follower can redirect clients PrevLogIndex int// index of log entry immediately preceding new ones PrevLogTerm int// term of prevLogIndex entry Entries []LogEntry // log entries to store (empty for heartbeat;may send more than one for efficiency) LeaderCommit int// leader’s commitIndex }
type AppendEntriesReply struct { Term int// currentTerm, for leader to update itself Succ bool// true if follower contained entry matching prevLogIndex and prevLogTerm }
被调用方(Servers)
参考论文图 2,当节点收到此调用后,依次进行五个判断:
Reply false if term < currentTerm
Reply false if log doesn’t contain an entry at prevLogIndex whose term matches prevLogTerm
If an existing entry conflicts with a new one (same index but different terms), delete the existing entry and all that follow it
Append any new entries not already in the log
If leaderCommit > commitIndex, set commitIndex =min(leaderCommit, index of last new entry)
if args.Term < rf.curTerm { return// leader expired }
if args.Term > rf.curTerm { rf.curTerm = args.Term rf.back2Follower(args.Term, VOTE_NIL) } // now terms are same rf.resetElectTimer()
// consistency check last := len(rf.logs) - 1 if last < args.PrevLogIndex { // missing logs return } // now peer and leader have same prevIndex and same prevTerm
// check conflict and append new logs committed := prevIdx for i, e := range args.Entries { cur := prevIdx + 1 + i if cur <= last && rf.logs[cur].Term != e.Term { // term conflict, overwrite it rf.logs[cur] = e committed = cur } if cur > last { rf.logs = append(rf.logs, e) // new log, just append committed = len(rf.logs) - 1 } }
// if leaderCommit > commitIndex, set commitIndex =min(leaderCommit, index of last new entry) if args.LeaderCommit > rf.commitIndex { rf.commitIndex = min(committed, args.LeaderCommit) // need to commit rf.applyCond.Broadcast() // trigger apply }
// leader replicate logs or send heartneat to other nodes func(rf *Raft) sync() { for i := range rf.peers { if i == rf.me { rf.resetElectTimer() continue }
gofunc(server int) { for { if !rf.isLeader() { return }
rf.mu.Lock() rf.syncConds[server].Wait() // wait for heartbeat or Start to trigger
// sync new log or missing logs to server next := rf.nextIndex[server] args := AppendEntriesArgs{ Term: rf.curTerm, LeaderID: rf.me, Entries: nil, LeaderCommit: rf.commitIndex, } if next < len(rf.logs) { // logs need sync args.PrevLogIndex = next - 1 args.PrevLogTerm = rf.logs[next-1].Term args.Entries = append(args.Entries, rf.logs[next:]...) } rf.mu.Unlock()
// do not depend on labrpc to call timeout(it may more bigger than heartbeat), so should be check manually var reply AppendEntriesReply respCh := make(chanstruct{}) gofunc() { rf.sendAppendEntries(server, &args, &reply) respCh <- struct{}{} }() select { case <-time.After(RPC_CALL_TIMEOUT): // After() with currency may be inefficient continue case <-respCh: }
if !reply.Succ { if reply.Term > rf.curTerm { // higher term rf.back2Follower(reply.Term, VOTE_NIL) return } continue }
// apply (lastApplied, commitIndex] func(rf *Raft) waitApply() { for { rf.mu.Lock() rf.applyCond.Wait() // wait for new commit log trigger
var logs []LogEntry // un apply logs applied := rf.lastApplied committed := rf.commitIndex if applied < committed { for i := applied + 1; i <= committed; i++ { logs = append(logs, rf.logs[i]) } rf.lastApplied = committed // update applied } rf.mu.Unlock()
for i, l := range logs { msg := ApplyMsg{ Command: l.Command, CommandIndex: applied + 1 + i, // apply to state machine CommandValid: true, } rf.applyCh <- msg } } }
// leader daemon detect and commit log which has been replicated on majority successfully func(rf *Raft) leaderCommit() { for { if !rf.isLeader() { return }
rf.mu.Lock() majority := len(rf.peers)/2 + 1 n := len(rf.logs) for i := n - 1; i > rf.commitIndex; i-- { // looking for newest commit index from tail to head // in current term, if replicated on majority, commit it replicated := 0 if rf.logs[i].Term == rf.curTerm { for server := range rf.peers { if rf.matchIndex[server] >= i { replicated += 1 } } }
if replicated >= majority { // all (commitIndex, newest commitIndex] logs are committed // leader now apply them rf.applyCond.Broadcast() rf.commitIndex = i break } } rf.mu.Unlock() } }
if args.Term < rf.curTerm { return// candidate expired } if args.Term > rf.curTerm { rf.back2Follower(args.Term, VOTE_NIL) } // now the term are same
// check up-to-date, from Paper: // if the logs have last entries with different terms, then the log with the later term is more up-to-date. // if the logs end with the same term, then whichever log is longer is more up-to-date. i := len(rf.logs) - 1 lastTerm := rf.logs[i].Term if lastTerm > args.LastLogTerm { return } if lastTerm == args.LastLogTerm && i > args.LastLogIndex { return } // now last index and term both matched