0%

Lab2A. Raft 选主实现

总结 MIT6.824 Lab2A Raft 选主的实验笔记。

Lab2A

Raft 将一致性问题分解成三个子问题:Leader 选举、日志复制、安全性保证,在 Lab 的 2A, 2B 中实现,快照功能在 Lab3 实现,均可参考原论文图 2 中对 Raft 实现的简要总结。

实验目标

  • 实现 Leader 选举:选出单个 leader 并保持领导地位,直到自己 crash 或发生网络分区
  • 实现心跳通信:实现 leader 与其他节点的无日志 AppendEntries RPC 调用

测试用例

  • TestInitialElection2A:集群正常情况下保证单一 leader
  • TestReElection2A:处理好 leader 网络分区、多数节点失效造成 split vote 的情况。

测试均通过:

一些坑

RPC 调用超时

在分布式系统中,每次调用会有三种结果:成功、失败、超时。Lab 将 net rpc 库封装成 labrpc,通过隔离节点网络来模拟节点不可用。不可用节点的 RPC 调用超时会返回 false,但这里不能死等 labrpc 库不确定的超时时长(100ms,2s 均可能),应该在调用时使用 timer 有预期地控制超时(如稍大于心跳间隔:150ms)
调用超时后,不必像论文中描述的无限次重试,应简化处理,直接认为调用失败。

充分使用 sync 包来实现同步

AppendEntries RPC 用于心跳通信和日志同步,调用时机有两个:

  • 定时心跳:leader 需在后台定期向其他节点发送 heartbeat,保持领导地位。同时在心跳还充当着日志同步的作用,当某个节点日志一致性检查失败后,会将冲突信息返回,leader 需将本地日志同步到该节点。
  • 新日志同步:当客户端发来新命令时,leader 将日志 append 到本地后即响应(lab 与论文不同),随后立刻开始新日志的同步。
    要调用 AppendEntries 的地方很多,因而使用 sync.Cond 条件变量而非散落各处的 channel 来进行同步触发。

一些时机

  • 每个节点在收到有效 RPC 调用后要重置 Election Timer,即使 Leader 无需对自己进行 rpc 调用,但重置 Timer 也是必要的。
  • 当节点收到更高 term 的 RPC 调用或响应时,要立刻回退到 follower 并重置 Timer,由于不能确信对方身份就是 Leader,所以 voteFor 要重置为 nil

关于调试

改造 util.go 中 DPrintf() 来输出毫秒及调试信息,方便追溯系统的时序性等问题。比如我的调试日志:

image-20190509090156921

Leader 选举

Lab 限制 leader 每秒最多发送 10 次心跳请求,实现时取心跳间隔为 100ms。相应的,选举超时时间应比心跳大一个量级左右,我实现时取 400 + rand.Intn(4) * 100,即 400~800ms 内的随机值,尽可能避免选举 split vote 情况。

选举流程

参考上一篇文章:Leader 选举

image-20190423115736847

发起投票

定义 Raft 节点:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
type Raft struct {
mu sync.Mutex // Lock to protect shared access to this peer's state
peers []*labrpc.ClientEnd // RPC end points of all peers
persister *Persister // Object to hold this peer's persisted state
me int // this peer's index into peers[]

// persistent states
curTerm int // latest term server has seen(initialized to 0 on first boot, increases monotonically)
votedFor int // candidateId that received vote in current term(or null if none)
logs []LogEntry // log entries; each entry contains command for state machine, and term when entry was received by leader(first index is 1)

// implementation
state PeerState
timer *RaftTimer
syncConds []*sync.Cond // every Raft peer has a condition, use for trigger AppendEntries RPC
}

每个节点在 Make 初始化时都选择时长随机的 RaftTimer,之后启动新的 goroutine 监听 election timer 超时:

1
2
3
4
5
6
7
8
9
go func() {
for {
select {
case <-rf.timer.t.C: // election timeout
rf.resetElectTimer() // this reset is necessary, reset it when timeout
rf.vote()
}
}
}()

timer 超时后,发起投票:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
// start vote
// leader can start vote repeatedly, such as 2 nodes are crashed in 3 nodes cluster
// leader should reset election timeout when heartbeat to prevent this
func (rf *Raft) vote() {
pr("Vote|Timeout|%v", rf)
rf.curTerm++
rf.state = Candidate
rf.votedFor = rf.me

args := RequestVoteArgs{
Term: rf.curTerm,
CandidateID: rf.me,
}
replyCh := make(chan RequestVoteReply, len(rf.peers))
var wg sync.WaitGroup
for i := range rf.peers {
if i == rf.me {
rf.resetElectTimer() // other followers will reset when receive valid RPC, leader same
continue
}

wg.Add(1)
go func(server int) {
defer wg.Done()
var reply RequestVoteReply
respCh := make(chan struct{})
go func() {
rf.sendRequestVote(server, &args, &reply)
respCh <- struct{}{}
}()
select {
case <-time.After(RPC_CALL_TIMEOUT): // 1s
return
case <-respCh:
replyCh <- reply
}
}(i)
}
go func() {
wg.Wait()
close(replyCh) // avoid goroutine leak
}()

votes := 1
majority := len(rf.peers)/2 + 1
for reply := range replyCh {
if reply.Term > rf.curTerm { // higher term leader
pr("Vote|Higher Term:%d|%v", reply.Term, rf)
rf.back2Follower(reply.Term, VOTE_NIL)
return
}
if reply.VoteGranted {
votes++
}

if votes >= majority { // if reach majority earlier, shouldn't wait crashed peer for timeout
rf.state = Leader
go rf.heartbeat()
go rf.sync()

pr("Vote|Win|%v", rf)
return
}
}

// split vote
pr("Vote|Split|%v", rf)
rf.back2Follower(rf.curTerm, VOTE_NIL)
}

响应投票

Raft 对投票节点提出了三点要求:

  • 每轮能投几张:一个任期内,一个节点只能投一张票
  • 是否要投:候选人的日志至少要和自己的一样新,才投票(Lab2B 实现日志的 up-to-date 比较)
  • 投给谁:first-come-first-served,投给第一个符合条件的候选人

实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
type RequestVoteArgs struct {
Term int
CandidateID int
}

type RequestVoteReply struct {
Term int
VoteGranted bool
}

func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) {
reply.Term = rf.curTerm
reply.VoteGranted = false

if args.Term < rf.curTerm {
return // candidate expired
}
if args.Term > rf.curTerm {
rf.back2Follower(args.Term, VOTE_NIL)
}
// now the term are same

if rf.votedFor == VOTE_NIL || rf.votedFor == args.CandidateID {
reply.VoteGranted = true
rf.back2Follower(args.Term, args.CandidateID)
}
}

心跳通信

Raft 将客户端的命令封装为 log entry:

1
2
3
4
type LogEntry struct {
Term int
Command interface{}
}

心跳请求

当候选人成功竞选为 leader 后要 立刻 给集群中其他节点发送心跳,避免其他节点也超时发起新一轮选举。实现方案:获得多数票后,在后台为其他的所有 peer 启动同步日志的 goroutine,等待下一轮心跳 tick,这种广播方式最好使用 sync.Cond 条件变量来实现。

获得多数票后为所有节点准备 sync

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
// leader sync logs to followers
func (rf *Raft) sync() {
for i := range rf.peers {
if i == rf.me {
rf.resetElectTimer()
continue
}

go func(server int) {
for {
rf.mu.Lock()
rf.syncConds[server].Wait() // wait for trigger

args := AppendEntriesArgs{
Term: rf.curTerm,
LeaderID: rf.me,
PrevLogIndex: 0,
PrevLogTerm: 0,
Entries: nil, // heartbeat entries are empty
}
rf.mu.Unlock()

// do not depend on labrpc to call timeout(it may more bigger than heartbeat), so should be check manually
var reply AppendEntriesReply
respCh := make(chan struct{})
go func() {
rf.sendAppendEntries(server, &args, &reply)
respCh <- struct{}{}
}()
select {
case <-time.After(RPC_CALL_TIMEOUT): // After() with currency may be inefficient
continue
case <-respCh: // response succ
}

if reply.Term > rf.curTerm { // higher term
rf.back2Follower(reply.Term, VOTE_NIL)
return
}
}
}(i)
}
}

同时开启心跳 tick,准备广播通知 sync

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// send heartbeat
func (rf *Raft) heartbeat() {
ch := time.Tick(HEARTBEAT_INTERVAL)
for {
if !rf.isLeader() {
return
}

for i := range rf.peers {
if i == rf.me {
rf.resetElectTimer() // leader reset timer voluntary, so it won't elect again
continue
}

rf.syncConds[i].Broadcast()
}
<-ch
}
}

响应心跳

对于心跳请求,节点暂时只需对比任期号,若 term 未过期则调用成功。2B 部分将实现日志的一致性检查:

1
2
3
4
5
6
7
8
9
10
11
func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) {
reply.Term = rf.curTerm
reply.Succ = false

if rf.curTerm > args.Term {
return // leader expired
}

rf.back2Follower(args.Term, VOTE_NIL)
reply.Succ = true
}

总结

整个实验可从 test 着手,实验环境是 3 个节点组成的集群,实现时需在 leader crash 后及时选出下一任 leader,且处理好旧 leader re-join 等情况。
个人经验:对于分布式系统,调试时可在请求参数、响应结构中加入 debug 信息,用于追踪某次请求的处理过程和结果,梳理清楚了执行流程,再去针对性的解决问题。
为了让代码更清爽,我把 raft.go 的代码按功能拆分为了 2 部分:vote 处理投票请求,enry 处理心跳请求。之后的两个实验小节将对应修改这两个文件。