type Raft struct { mu sync.Mutex // Lock to protect shared access to this peer's state peers []*labrpc.ClientEnd // RPC end points of all peers persister *Persister // Object to hold this peer's persisted state me int// this peer's index into peers[]
// persistent states curTerm int// latest term server has seen(initialized to 0 on first boot, increases monotonically) votedFor int// candidateId that received vote in current term(or null if none) logs []LogEntry // log entries; each entry contains command for state machine, and term when entry was received by leader(first index is 1)
// implementation state PeerState timer *RaftTimer syncConds []*sync.Cond // every Raft peer has a condition, use for trigger AppendEntries RPC }
每个节点在 Make 初始化时都选择时长随机的 RaftTimer,之后启动新的 goroutine 监听 election timer 超时:
1 2 3 4 5 6 7 8 9
gofunc() { for { select { case <-rf.timer.t.C: // election timeout rf.resetElectTimer() // this reset is necessary, reset it when timeout rf.vote() } } }()
// start vote // leader can start vote repeatedly, such as 2 nodes are crashed in 3 nodes cluster // leader should reset election timeout when heartbeat to prevent this func(rf *Raft) vote() { pr("Vote|Timeout|%v", rf) rf.curTerm++ rf.state = Candidate rf.votedFor = rf.me
args := RequestVoteArgs{ Term: rf.curTerm, CandidateID: rf.me, } replyCh := make(chan RequestVoteReply, len(rf.peers)) var wg sync.WaitGroup for i := range rf.peers { if i == rf.me { rf.resetElectTimer() // other followers will reset when receive valid RPC, leader same continue }
// do not depend on labrpc to call timeout(it may more bigger than heartbeat), so should be check manually var reply AppendEntriesReply respCh := make(chanstruct{}) gofunc() { rf.sendAppendEntries(server, &args, &reply) respCh <- struct{}{} }() select { case <-time.After(RPC_CALL_TIMEOUT): // After() with currency may be inefficient continue case <-respCh: // response succ }
if reply.Term > rf.curTerm { // higher term rf.back2Follower(reply.Term, VOTE_NIL) return } } }(i) } }
同时开启心跳 tick,准备广播通知 sync
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
// send heartbeat func(rf *Raft) heartbeat() { ch := time.Tick(HEARTBEAT_INTERVAL) for { if !rf.isLeader() { return }
for i := range rf.peers { if i == rf.me { rf.resetElectTimer() // leader reset timer voluntary, so it won't elect again continue }
rf.syncConds[i].Broadcast() } <-ch } }
响应心跳
对于心跳请求,节点暂时只需对比任期号,若 term 未过期则调用成功。2B 部分将实现日志的一致性检查: