type Raft struct { // ... // snapshot lastIncludedIndex int// the snapshot replaces all entries up through and including this index lastIncludedTerm int// term of lastIncludedIndex }
func(kv *KVServer) waitAgree() { for { select { case <-kv.killCh: return case msg := <-kv.applyCh: op := msg.Command.(Op) kv.mu.Lock() // ... kv.checkSnapshot(msg.CommandIndex) // use committed index as snapshot LastIncludedIndex kv.mu.Unlock() } } }
func(kv *KVServer) checkSnapshot(appliedId int) { if kv.maxraftstate == -1 { return } // take snapshot when raft size come near upper limit if kv.persister.RaftStateSize() < kv.maxraftstate*9/10 { return } rawSnapshot := kv.encodeSnapshot() go kv.rf.TakeSnapshot(appliedId, rawSnapshot) // not take long time with KVServer's lock }
// leader take snapshot should be async like Start(), must return quickly func(rf *Raft) TakeSnapshot(appliedId int, rawSnapshot []byte) { rf.mu.Lock() defer rf.mu.Unlock()
// lock competition may delayed snapshot call, check this otherwise rf.logs[0] may out of bounds if appliedId <= rf.lastIncludedIndex { return }
// discard the entries before that index, preserved it for AppendEntries consistency check rf.logs = rf.logs[rf.subIdx(appliedId):] rf.lastIncludedIndex = appliedId rf.lastIncludedTerm = rf.logs[0].Term rf.persistStatesAndSnapshot(rawSnapshot) }
// leader sync logs to followers func(rf *Raft) sync() { for i := range rf.peers { if i == rf.me { continue }
gofunc(server int) { for { if !rf.isRunningLeader() { return }
rf.mu.Lock() rf.syncConds[server].Wait() // wait for trigger
// sync new log or missing logs to server next := rf.nextIndex[server]
// if follower far behind from leader, just send snapshot to it for speeding up replay if next <= rf.lastIncludedIndex { rf.syncSnapshot(server) // InstallSnapshot RPC logic continue } // AppendEntries RPC logic } }
InstallSnapshot RPC
参考论文的图 13,RPC 参数如下:
1 2 3 4 5 6 7 8 9 10 11 12 13
type InstallSnapshotArgs struct { Term int// leader's term LeaderId int// so follower can redirect clients LastIncludedIndex int// the snapshot replaces all entries up through and including this index LastIncludedTerm int// term of lastIncludedIndex Data []byte// raw bytes of the snapshot chunk, starting at offset // Offset int // byte offset where chunk is positioned in the snapshot file // Done bool // true if this is the last chunk }
type InstallSnapshotReply struct { Term int// currentTerm, for leader to update itself }
var reply InstallSnapshotReply respCh := make(chanstruct{}) gofunc() { if ok := rf.sendInstallSnapshot(server, &args, &reply); ok { respCh <- struct{}{} } }() select { case <-time.After(RPC_CALL_TIMEOUT): return case <-respCh: close(respCh) }
// follower receive snapshot from leader and force overwrite local logs func(rf *Raft) InstallSnapshot(args *InstallSnapshotArgs, reply *InstallSnapshotReply) { rf.mu.Lock() defer rf.mu.Unlock() reply.Term = rf.curTerm
// 1. reply false if term < currentTerm if args.Term < rf.curTerm { return } if args.Term > rf.curTerm { reply.Term = args.Term rf.back2Follower(args.Term, VOTE_NIL) } rf.resetElectTimer()
// check snapshot may expired by lock competition, otherwise rf.logs may overflow below if args.LastIncludedIndex <= rf.lastIncludedIndex { return }
// 2. Create new snapshot file if first chunk (offset is 0) // 3. Write data into snapshot file at given offset // 4. Reply and wait for more data chunks if done is false // 5. Save snapshot file, discard any existing or partial snapshot with a smaller index
// 6. If existing log entry has same index and term as snapshot's last included entry, retain log entries following it and reply if args.LastIncludedIndex < rf.lastIdx() { // the args.LastIncludedIndex log has agreed, if there are more logs, just retain them rf.logs = rf.logs[args.LastIncludedIndex-rf.lastIncludedIndex:] } else { // 7. Discard the entire log // empty log use for AppendEntries RPC consistency check rf.logs = []LogEntry{{Term: args.LastIncludedTerm, Command: nil}} }
// update snapshot state and persist them rf.lastIncludedIndex = args.LastIncludedIndex rf.lastIncludedTerm = args.LastIncludedTerm rf.persistStatesAndSnapshot(args.Data)
// force the follower's log catch up with leader rf.commitIndex = max(rf.commitIndex, args.LastIncludedIndex) rf.lastApplied = max(rf.lastApplied, args.LastIncludedIndex)
// 8. Reset state machine using snapshot contents (and load snapshot's cluster configuration) rf.applyCh <- ApplyMsg{ CommandValid: false, // it's snapshot raw data, not a command CommandIndex: -1, Command: args.Data, // use for KVServer restore kvDB } }
对应修改 KVServer 监听 applyCh 的逻辑,从中取出 leader 发来的快照数据:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
func(kv *KVServer) waitAgree() { for { select { case <-kv.killCh: return case msg := <-kv.applyCh: if !msg.CommandValid { // snapshot data buf := msg.Command.([]byte) kv.mu.Lock() kv.db, kv.cid2seq = kv.decodeSnapshot(buf) // restore kvDB and cid2seq kv.mu.Unlock() continue } // log agreement loginc } } }