上一篇 Lab2C: 持久化 中介绍了Raft持久化的实现,本篇介绍Raft日志压缩原理和实现。实验二共包含四个子实验:
基本原理 Raft的日志会随着接收客户端的请求数而不断增长,对实际生产环境中的系统而言,内存是有限的,较大的日志,在服务崩溃重启时需要较长时间加载恢复,这会造成服务的可用性问题。日志压缩主要解决这些问题。
上图中,在快照上,日志长度为7。在index=5处生成了一个快照(snaphot),快照中除记录了状态机的状态:x <- 0, y <- 9
那快照中为什么需要保存last included index和last included term呢?
但有个问题是,Leader和Follower虽然在大多数情况下日志是同步的,但当新节点加入或节点日志滞后的情况下,如果我们对Leader做了快照,移除了快照部分的日志记录。如果快照部分的日志还没有同步给Follower,Follower怎么复制日志呢?这就是为什么需要保存快照最后一个日志记录的index和term的原因。有了这两个信息还需要配合一个InstallSnapshot RPC操作,才能让Follower也在相同的位置执行快照。论文中Figure 13介绍了InstallSnapshot RPC的实现细节,在此就不展示了。
需要注意的是,只能对已经提交的日志才能做快照 。快照除了涉及Raft本身提供的能力外,还涉及到状态机的概念,Raft中提供的对外API Install,入参中的snapshot参数就是由状态机生成的。可以结合测试用例一起看,测试用例是Raft的调用者,相当于Raft的用户,在实际系统中,它就是上层server。Lab2D引导中给了一个Raft和上层应用(比如K/V store service)的一个完整交互:
上层服务决定什么时候需要进行快照(通常是日志大小到达一定阈值)。Raft提供对外API Snapshot,Raft做快照时,会保存状态机的全部状态和快照中的最后一个日志记录的index和term,并进行持久化。对于快照后的Leader,在发送AppendEntries RPC之前,会先判断要同步给Follower的日志是否已经在快照中了,如果已经在快照中了,则发送InstallSnapshot RPC请求。
Follower在接收到InstallSnapshot RPC请求后,如果满足快照条件,会将快照应用到状态机,状态机应用快照成功后,会调用CondInstallSnapshot,此时Follower才真正开始在本地做快照。
实现 Lab2D的实现需要增加一些Raft状态,这些状态需要在InstallSnapshot RPC中发送给Follower。实现主要包括三个部分:
InstallSnapshot RPC
Raft状态 本实验中,我们需要新增以下Raft状态:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 func (r *Raft) persist () { state := r.dumpState() r.persister.SaveRaftState(state) } func (r *Raft) persistStateAndSnapshot () { state := r.dumpState() r.persister.SaveStateAndSnapshot(state, r.snapshot) } func (r *Raft) readPersist (data []byte ) { if data == nil || len (data) < 1 { return } var currentTerm int var voteFor int var lastIncludedIndex int var lastIncludedTerm int var log []*LogEntry buf := bytes.NewBuffer(data) d := labgob.NewDecoder(buf) if d.Decode(¤tTerm) != nil || d.Decode(&voteFor) != nil || d.Decode(&lastIncludedIndex) != nil || d.Decode(&lastIncludedTerm) != nil || d.Decode(&log) != nil { Warning("raft.readPersist: server[%v] at term[%v] read persist state failed" , r.me, r.currentTerm) } else { r.currentTerm = currentTerm r.votedFor = voteFor r.lastIncludedIndex = lastIncludedIndex r.lastIncludedTerm = lastIncludedTerm r.log = log r.commitIndex = r.lastIncludedIndex r.lastApplied = r.lastIncludedIndex r.snapshot = r.persister.ReadSnapshot() } } func (r *Raft) dumpState () []byte { w := new (bytes.Buffer) e := labgob.NewEncoder(w) e.Encode(r.CurrentTerm()) e.Encode(r.votedFor) e.Encode(r.lastIncludedIndex) e.Encode(r.lastIncludedTerm) e.Encode(r.log) return w.Bytes() }
Snapshot Leader和Follower无可以进行Snapshot操作,所以Leader循环和Follower循环进行对Snapshot事件进行处理。只能对已经提交的index进行Snapshot,如果满足快照条件,需要对日志进行压缩,然后更新Raft状态和状态机状态,并持久化。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 func (r *Raft) processSnapshot (req *SnapshotArgs) *SnapshotReply { if req.Index <= r.lastIncludedIndex || req.Index > r.commitIndex { return newSnapshotReply(false ) } entry := r.getLogEntry(req.Index) if entry == nil { return newSnapshotReply(false ) } r.compactLog(req.Index) r.lastIncludedIndex = entry.Index r.lastIncludedTerm = entry.Term r.snapshot = req.Snapshot r.persistStateAndSnapshot() return newSnapshotReply(true ) } func (r *Raft) compactLog (index int ) { entry := r.getLogEntry(index) if entry == nil { r.log = []*LogEntry{} return } afterEntries := r.log[r.getInternalLogIndex(index)+1 :] newLog := make ([]*LogEntry, len (afterEntries)) copy (newLog, afterEntries) r.log = newLog }
CondInstallSnapshot CondInstallSnapshot的调用时机是Follower接收到InstallSnapshot RPC请求后,将快照应用到状态机时,由状态机调用的。需要注意的是,为了保证Follower快照时的原子性,也就是说提交快照时不能再提交新的请求。我们需要在CondInstallSnapshot中先校验是否在快照提交给状态机期间有新的请求被提交。实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 func (r *Raft) processCondInstallSnapshotRequest (req *CondInstallSnapshotArgs) *CondInstallSnapshotReply { if req.LastIncludedIndex <= r.commitIndex { return newCondInstallSnapshotReply(false ) } r.compactLog(req.LastIncludedIndex) r.lastIncludedIndex = req.LastIncludedIndex r.lastIncludedTerm = req.LastIncludedTerm r.commitIndex = req.LastIncludedIndex r.lastApplied = req.LastIncludedIndex r.snapshot = req.Snapshot r.persistStateAndSnapshot() return newCondInstallSnapshotReply(true ) }
InstallSnapshot RPC 发送端Leader在AppendEntries需要先判断待发送的日志是否已经做过快照,如果是,说明Follower中还没有这些日志记录,而这些日志记录已经做了快照,需要发送InstallSnapshot RPC。
处理InstallSnapshot RPC的返回跟AppendEntries RPC类似,但更简单,除了同步term之外,需要更新nextIndex和matchIndex。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 func (r *Raft) broadcastAppendEntries () { for peer := range r.peers { if peer == r.me { continue } if r.killed() { Debug("raft.broadcastAppendEntries: server[%v] state[%v] at term[%v] killed" , r.me, r.State(), r.CurrentTerm()) break } prevLogIndex := r.getPrevLogIndex(peer) if prevLogIndex >= r.lastIncludedIndex { entries, prevLogTerm := r.getLogEntriesAfter(prevLogIndex, MaxLogEntriesPerRequest) req := newAppendEntriesArgs(r.CurrentTerm(), r.leader, prevLogIndex, prevLogTerm, r.commitIndex, entries) r.asyncSendAppendEntries(r.me, peer, req) } else { req := newInstallSnapshotArgs(r.CurrentTerm(), r.leader, r.lastIncludedIndex, r.lastIncludedTerm, r.snapshot) r.asyncInstallSnapshot(r.me, peer, req) } } } func (r *Raft) asyncInstallSnapshot (server int , peer int , req *InstallSnapshotArgs) { go func (server int , peer int , req *InstallSnapshotArgs) { var reply InstallSnapshotReply if ok := r.sendInstallSnapshot(peer, req, &reply); !ok { return } target := &InstallSnapshotReplyEvent{ Peer: peer, Req: req, Reply: &reply, } r.sendAsync(target) }(r.me, peer, req) } func (r *Raft) processInstallSnapshotReply (peer int , req *InstallSnapshotArgs, reply *InstallSnapshotReply) error { if reply.Term < r.CurrentTerm() { return nil } if reply.Term > r.CurrentTerm() { r.updateCurrentTerm(reply.Term, LEADER_NONE) return nil } r.nextIndex[peer] = req.LastIncludedIndex + 1 r.matchIndex[peer] = req.LastIncludedIndex return nil }
接收端(Follower)实现除了同步term之外(通过规则),需要做一个幂等操作,即:如果接收到lastIncludedIndex已经是旧的快照,不需要再应用到状态机。需要注意的是,Follower在处理InstallSnapshot RPC时,只是将快照提交到状态机,并没有进行日志压缩,因为我们不能保证应该状态机一定成功,所以需要状态机应用成功后再次调用CondInstallSnapshot,然后Raft在进行实际的快照操作。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 func (r *Raft) processInstallSnapshotRequest (req *InstallSnapshotArgs) (*InstallSnapshotReply, bool ) { if req.Term > r.CurrentTerm() { r.updateCurrentTerm(req.Term, req.LeaderId) } reply := newInstallSnapshotReply(r.CurrentTerm()) if req.Term < r.CurrentTerm() || req.LastIncludedIndex <= r.lastIncludedIndex { return reply, true } msg := ApplyMsg{ SnapshotValid: true , Snapshot: req.Snapshot, SnapshotTerm: req.LastIncludedTerm, SnapshotIndex: req.LastIncludedIndex, } r.applyQueue <- msg return reply, true } func (r *Raft) startLoop () { ... r.wg.Add(1 ) go func () { defer r.wg.Done() r.applyLoop() }() } func (r *Raft) applyLoop () { for msg := range r.applyQueue { r.applyCh <- msg r.sendAsync(msg) } } func (r *Raft) processApplyMsg (applyMsg *ApplyMsg) { if !applyMsg.CommandValid { return } r.lastApplied = applyMsg.CommandIndex }
Q&A Follower接收到InstallSnapshot时为什么不进行日志压缩? 因为Raft和状态机是属于两个不同层次的服务,为了不让状态机应用操作阻塞事件循环,应用操作是一个异步操作,在一个单独的线程中执行。我们有两种实现方式:
日志压缩中如果index未找到,为什么要移除全部日志? 论文日志压缩有这样一句话:
Usually the snapshot will contain new information not already in the recipient’s log. In this case, the follower discards its entire log; it is all superseded by the snapshot and may possibly have uncommitted entries that conflict with the snapshot.
事件循环线程和应用线程同步如何同步? 事件循环线程在应用命令/快照时,是先将命令/快照消息发送到应用消息channel中,由应用线程消费该channel再应用到状态机。退出时,需要先让事件循环退出,再关闭应用线程。应用线程只是循环读取applyQueue中的消息,我们只需要在事件循环结束后,关闭applyQueue channel即可。如下:
1 2 3 4 5 6 7 8 9 10 func (r *Raft) startLoop () { ... go func () { defer r.wg.Done() r.loop() close (r.applyQueue) }() ...
测试结果 最后附上Lab2C的测试结果
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 $ go test -v -race --run 2D === RUN TestSnapshotBasic2D Test (2D): snapshots basic ... ... Passed -- 3.9 3 148 56846 251 --- PASS: TestSnapshotBasic2D (3.90s) === RUN TestSnapshotInstall2D Test (2D): install snapshots (disconnect) ... ... Passed -- 37.8 3 1640 479895 377 --- PASS: TestSnapshotInstall2D (37.84s) === RUN TestSnapshotInstallUnreliable2D Test (2D): install snapshots (disconnect+unreliable) ... ... Passed -- 46.8 3 2020 546698 345 --- PASS: TestSnapshotInstallUnreliable2D (46.77s) === RUN TestSnapshotInstallCrash2D Test (2D): install snapshots (crash) ... ... Passed -- 28.1 3 1014 308631 366 --- PASS: TestSnapshotInstallCrash2D (28.08s) === RUN TestSnapshotInstallUnCrash2D Test (2D): install snapshots (unreliable+crash) ... ... Passed -- 31.5 3 1128 325411 388 --- PASS: TestSnapshotInstallUnCrash2D (31.46s) PASS ok 6.824/raft 148.084s
参考资料 [1] In Search of an Understandable Consensus Algorithm [2] 6.824 Lab 2: Raft Part2D