type
Post
status
Published
date
Dec 1, 2022
slug
raft-implement
summary
学习 Raft 的最好方法就是实现一个,我们将以 MIT 6.824 课程的脚手架为基础,实现一个基本可用的 Raft 库
tags
分布式
共识算法
Raft
Go
category
技术分享
icon
password
Property
Aug 22, 2023 11:07 AM
最近在学习分布式系统的基础知识,然后就读了 Raft 的论文并翻译了一遍放到博客中,现在对整个算法的设计理念和实现过程都有了一个基础的认知,所以准备动手按照论文来实现一个基本可用的 Raft 库。
学习的过程中,也是把久仰大名的 MIT 分布式系统课程的视频大致看了一遍,不得不说课程的确让人受益匪浅,课程实验 Lab2 就是需要学生们实现一个 Raft 库来为后面课程打基础,那正好可以完成课程实验来巩固自己对 Raft 的掌握。
该文中的代码均为实验性质,不建议直接在生产中使用
准备工作
工欲善其事必先利其器,下面这些需要事先准备好(# 后面是我的选择)
- 一台心爱的电脑 # M1 Mac mini
- 安装 Go 语言,参考 这里 #
brew install go
- 安装 Git,参考 这里 #
brew install git
- 趁手的 IDE # Goland
接着克隆实验的基础代码库,这个代码仓库可以为你解决很多基础问题,比如 RPC库、测试用例等,甚至还有已经写好的实现框架,很大程度的降低了完成我们实验的难度。
代码克隆完成之后,我们需要关注的代码主要位于 raft 文件夹内
$ git clone git://g.csail.mit.edu/6.824-golabs-2022 6.824 # 将代码克隆到 6.824 文件夹 $ cd 6.824 $ tree -L 2 . . ├── Makefile └── src ├── go.mod ├── go.sum ├── kvraft # Lab 3 ├── labgob # 封装了 gob 库以检查字段是不是以大写开头 ├── labrpc # 专门给 Lab 使用的简单的 RPC 库 ├── main # 可执行文件入口 ├── models # Lab 3 ├── mr # Lab 1 ├── mrapps # Lab 1 ├── porcupine ├── raft # Lab 2 - bingo ├── shardctrler # Lab 4 └── shardkv # Lab 4 12 directories, 3 files
跟论文所述的拆分思想一致,本实验也将整个任务拆分为四块来实现,分别是 A - leader election、B - log、C - persistence 和 D - log compaction。我们将按照此顺序依次完成,并通过
test_test.go
文件中的所有测试,而因为已经有了测试,那么实现的时候,个人感觉有点点 TDD 的意思,就是要用最快的方法让测试由红转绿,然后通过重构来消除代码的坏味道,当然并不是真正意义上的 TDD,毕竟这个测试粒度对于驱动开发来说过于粗糙。核心状态和规则
Raft 的实现主要是根据论文中的图 2 来的,即下面这张图,我们来根据这张图梳理一下
状态
需要在所有节点持久化的状态
currentTerm | 当前任期,初始化都是 0,单调递增 |
votedFor | 当前任期获得自己选票的候选者 ID,如果没有投票就没有值,在此实验中,没有就是 -1 |
log | 日志条目的数组,条目中包含接收此条目的任期和命令,条目的索引从 1 开始 |
所有节点非持久化状态
commitIndex | 目前所知最大的正式提交的条目索引,从0开始,单调递增 |
lastApplied | 目前所知最大的正式应用到状态机的条目索引,从0开始,单调递增 |
领导者节点非持久化状态 — 📢 每次赢得选举都要重新初始化
nextIndex | 存储了将会发送给每个节点的下一个日志条目索引,初始化状态为领导者最大日志条目索引+1 |
matchIndex | 存储了已经复制过到每个节点上的最大的日志条目索引,初始状态为0,单调递增 |
规则
所有节点都适用的规则
- 如果
commitIndex
>lastApplied
: 增加lastApplied
,将log[lastApplied]
应用到状态机
- 如果 RPC 请求或者响应包含
term
>currentTerm
:将currentTerm
设置为term
,并且将当前节点转为跟随者
跟随者适用规则
- 响应领导者和候选人的 RPC 请求
- 如果选举超时时间过了,期间没有收到领导者的
AppendEntries
RPC 也没有在当前任期投出自己的选票 : 当前节点转为候选者,发起新的选举
候选者适用规则
- 开始选举
- 增加
currentTerm
,即当前任期 + 1 - 给自己投一票
- 重置选举超时时间定时器
- 发送
RequestVote
给其他所有节点
- 如果收到大多数节点的选票,则转为领导者
- 如果收到领导者的
AppendEntries
RPC,则转为跟随者
- 如果选举超时时间过了,重新发起新的选举
领导者适用规则
- 在任期内
- 发送空的(entries 是空)
AppendEntries
RPCs 给所有节点 - 任期内每隔一段时间重复发送心跳包,时间间隔需要远小于选举超时时间间隔
- 收到来自客户端的请求
- 向自己节点的日志里面追加此日志条目
- 日志条目应用到状态机后,响应给客户端(实验中可以异步复制到其他节点,异步提交给状态机,尽快响应给客户端)
- 如果自己最后一个日志条目索引大于等于某个跟随者对应的
nextIndex
- 发送包含
nextIndex
和之后的日志条目的AppendEntries
给此跟随者 - 如果成功,更新此跟随者对应的
netIndex
和matchIndex
- 如果因为日志不一致失败,减小
nextIndex
并重试上面的流程
- 如果存在一个
N
,N > commitIndex
,大多数matchIndex[i]≥N
且log[N].term == currentTerm
,那么设置commitIndex = N
定义基本模型
定义日志条目模型
type Entry struct { Command interface{} //来自客户端的命令 Term int //接收到此命令的任期 }
定义节点模型
// A Go object implementing a single Raft peer. type Raft struct { mu sync.Mutex // Lock to protect shared access to this peer's state peers []*labrpc.ClientEnd // RPC end points of all peers persister *Persister // Object to hold this peer's persisted state me int // this peer's index into peers[] dead int32 // set by Kill() state State currentTerm int electionResetEvent time.Time electionTimeout time.Duration votedFor int votes int commitIndex int lastApplied int logs []LogEntry nextIndex []int matchIndex []int applyChan chan ApplyMsg } type LogEntry struct { Term int Command interface{} } type State int const ( Follower State = iota Leader Candidate )
Part A - 实现领导者选举
首先实现的是领导者选举流程,目的是通过测试
go test -run 2A
。我们根据论文梳理下领导者选举基本工作流程,用来指导我们的编程- 构建一个多节点的集群,所有节点初始状态均为
Follower
,并在随机的electionTimeout
超时之后开始变为Candidate
状态并开启新任期的选举流程
- 选举开始后,候选人通过向集群中的其他节点发送
RequestVote
RPC 消息来搜集选票,选票超过集群节点大多数则当选此任期领导者
- 当选之后,领导者需定时向跟随者发送心跳
AppendEntries
RPC 以保持权威,阻止他们发起新的选举,直到任期结束
以上流程的每一步都需要遵循一定的规则,以确保选举安全,即同一任期只能有一个领导者当选,此流程总涉及到两个 RPC 调用,所以我们先梳理一下这两个 RPC 的实现细节
RequestVote
RPC
此 RPC 由候选者发起,用来搜集选票,参数如下
term | 候选者当前任期 |
candidateId | 候选者 ID |
lastLogIndex | 候选者最后一个日志条目索引 |
lastLogTerm | 候选者最后一个日志条目的任期 |
响应结果属性如下
term | 当前节点的当前任期,当这个值大于候选者任期时,候选者需要更新自己的任期并转为跟随者 |
voteGranted | 如果是 true ,证明当前节点投了票,否则为不投票 |
接收到此请求的节点,按照下面规则实现具体的投票逻辑
- 如果
term < currentTerm
,拒绝投票,返回自己的任期和false
- 如果
votedFor
是空的(我们实现为值是 -1)或者跟candidateId
相等,并且候选者的日志至少是跟当前节点一样新,则投出选票,返回true
- Raft 是通过比较最后一个日志条目的索引和任期来判断两个节点谁的日志更新。如果两个节点的最后一个日志条目有不同的任期,任期越大则节点日志越新;如果两个节点最后的日志条目的任期相同,则索引越大,日志越新
AppendEntries
RPC
此 RPC 是领导者用来复制日志到其他节点使用的,也可以用来当做心跳来维持权威。它的参数如下
term | 领导者的任期 |
leaderId | 领导者 ID,因此跟随者可以转发请求到此 ID |
prevLogIndex | 紧跟着新条目的上一个条目的索引 |
prevLogTerm | 上一个条目的任期 |
entries | 需要存储的日志条目,如果是空的话,就是普通的心跳,里面可能有多个条目高效的传输 |
leaderCommit | 领导者的 commitIndex |
返回值如下
term | 接收者的当前任期,以便领导者更新自己的任期 |
success | 如果接收者包含了匹配 prevLogIndex 和 prevLogTerm 的条目,返回 true |
接受者的实现规则
- 如果
term < currentTerm
返回 false
- 如果在
prevLogIndex
处没有可以匹配prevLogTerm
的日志条目,返回 false
- 如果已经存在的日志条目跟新的条目冲突,即相同的 index,不同的任期,则删除已经存在的条目和所有它后面的条目
- 追加所有不存在的新条目
- 如果
leaderCommit > commitIndex
,将commitIndex
设置为min(leaderCommit, index of last new entry)
// Make // the service or tester wants to create a Raft server. the ports // of all the Raft servers (including this one) are in peers[]. this // server's port is peers[me]. all the servers' peers[] arrays // have the same order. persister is a place for this server to // save its persistent state, and also initially holds the most // recent saved state, if any. applyCh is a channel on which the // tester or service expects Raft to send ApplyMsg messages. // Make() must return quickly, so it should start goroutines // for any long-running work. func Make(peers []*labrpc.ClientEnd, me int, persister *Persister, applyCh chan ApplyMsg) *Raft { rf := &Raft{} rf.peers = peers rf.persister = persister rf.me = me // Your initialization code here (2A, 2B, 2C). rf.state = Follower rf.votes = 0 rf.votedFor = -1 rf.currentTerm = 0 rf.electionTimeout = time.Duration(150+rand.Intn(300)) * time.Millisecond rf.electionResetEvent = time.Now() rf.commitIndex = 0 rf.lastApplied = 0 rf.logs = make([]LogEntry, 0) rf.applyChan = applyCh // initialize from state persisted before a crash rf.readPersist(persister.ReadRaftState()) // start ticker goroutine to start elections go rf.ticker() return rf }
⚠️ 遵守学习纪律,作业代码不再继续公开提供
附上所有测试通过的截图,留个纪念