raft.go 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328
  1. package raft
  2. import (
  3. "errors"
  4. "sort"
  5. )
  6. const none = -1
  7. type messageType int
  8. const (
  9. msgHup messageType = iota
  10. msgProp
  11. msgApp
  12. msgAppResp
  13. msgVote
  14. msgVoteResp
  15. )
  16. var mtmap = [...]string{
  17. msgHup: "msgHup",
  18. msgProp: "msgProp",
  19. msgApp: "msgApp",
  20. msgAppResp: "msgAppResp",
  21. msgVote: "msgVote",
  22. msgVoteResp: "msgVoteResp",
  23. }
  24. func (mt messageType) String() string {
  25. return mtmap[int(mt)]
  26. }
  27. var errNoLeader = errors.New("no leader")
  28. const (
  29. stateFollower stateType = iota
  30. stateCandidate
  31. stateLeader
  32. )
  33. type stateType int
  34. var stmap = [...]string{
  35. stateFollower: "stateFollower",
  36. stateCandidate: "stateCandidate",
  37. stateLeader: "stateLeader",
  38. }
  39. func (st stateType) String() string {
  40. return stmap[int(st)]
  41. }
  42. type Entry struct {
  43. Term int
  44. Data []byte
  45. }
  46. type Message struct {
  47. Type messageType
  48. To int
  49. From int
  50. Term int
  51. LogTerm int
  52. Index int
  53. PrevTerm int
  54. Entries []Entry
  55. Commit int
  56. Data []byte
  57. }
  58. type stepper interface {
  59. step(m Message)
  60. }
  61. type index struct {
  62. match, next int
  63. }
  64. func (in *index) update(n int) {
  65. in.match = n
  66. in.next = n + 1
  67. }
  68. func (in *index) decr() {
  69. if in.next--; in.next < 1 {
  70. in.next = 1
  71. }
  72. }
  73. type stateMachine struct {
  74. // k is the number of peers
  75. k int
  76. // addr is an integer representation of our address amoungst our peers. It is 0 <= addr < k.
  77. addr int
  78. // the term we are participating in at any time
  79. term int
  80. // who we voted for in term
  81. vote int
  82. // the log
  83. log []Entry
  84. ins []*index
  85. state stateType
  86. commit int
  87. votes map[int]bool
  88. next stepper
  89. // the leader addr
  90. lead int
  91. }
  92. func newStateMachine(k, addr int, next stepper) *stateMachine {
  93. log := make([]Entry, 1, 1024)
  94. sm := &stateMachine{k: k, addr: addr, next: next, log: log}
  95. sm.reset()
  96. return sm
  97. }
  98. func (sm *stateMachine) canStep(m Message) bool {
  99. if m.Type == msgProp {
  100. return sm.lead != none
  101. }
  102. return true
  103. }
  104. func (sm *stateMachine) poll(addr int, v bool) (granted int) {
  105. if _, ok := sm.votes[addr]; !ok {
  106. sm.votes[addr] = v
  107. }
  108. for _, vv := range sm.votes {
  109. if vv {
  110. granted++
  111. }
  112. }
  113. return granted
  114. }
  115. var empty = Entry{}
  116. func (sm *stateMachine) append(after int, ents ...Entry) int {
  117. sm.log = append(sm.log[:after+1], ents...)
  118. return len(sm.log) - 1
  119. }
  120. func (sm *stateMachine) isLogOk(i, term int) bool {
  121. if i > sm.li() {
  122. return false
  123. }
  124. return sm.log[i].Term == term
  125. }
  126. // send persists state to stable storage and then sends m over the network to m.To
  127. func (sm *stateMachine) send(m Message) {
  128. m.From = sm.addr
  129. m.Term = sm.term
  130. sm.next.step(m)
  131. }
  132. // sendAppend sends RRPC, with entries to all peers that are not up-to-date according to sm.mis.
  133. func (sm *stateMachine) sendAppend() {
  134. for i := 0; i < sm.k; i++ {
  135. if i == sm.addr {
  136. continue
  137. }
  138. in := sm.ins[i]
  139. m := Message{}
  140. m.Type = msgApp
  141. m.To = i
  142. m.Index = in.next - 1
  143. m.LogTerm = sm.log[in.next-1].Term
  144. m.Entries = sm.log[in.next:]
  145. sm.send(m)
  146. }
  147. }
  148. func (sm *stateMachine) theN() int {
  149. // TODO(bmizerany): optimize.. Currently naive
  150. mis := make([]int, len(sm.ins))
  151. for i := range mis {
  152. mis[i] = sm.ins[i].match
  153. }
  154. sort.Ints(mis)
  155. for _, mi := range mis[sm.k/2+1:] {
  156. if sm.log[mi].Term == sm.term {
  157. return mi
  158. }
  159. }
  160. return -1
  161. }
  162. func (sm *stateMachine) maybeAdvanceCommit() int {
  163. ci := sm.theN()
  164. if ci > sm.commit {
  165. sm.commit = ci
  166. }
  167. return sm.commit
  168. }
  169. func (sm *stateMachine) reset() {
  170. sm.lead = none
  171. sm.vote = none
  172. sm.votes = make(map[int]bool)
  173. sm.ins = make([]*index, sm.k)
  174. for i := range sm.ins {
  175. sm.ins[i] = &index{next: len(sm.log)}
  176. }
  177. }
  178. func (sm *stateMachine) q() int {
  179. return sm.k/2 + 1
  180. }
  181. func (sm *stateMachine) voteWorthy(i, term int) bool {
  182. // LET logOk == \/ m.mlastLogTerm > LastTerm(log[i])
  183. // \/ /\ m.mlastLogTerm = LastTerm(log[i])
  184. // /\ m.mlastLogIndex >= Len(log[i])
  185. e := sm.log[sm.li()]
  186. return term > e.Term || (term == e.Term && i >= sm.li())
  187. }
  188. func (sm *stateMachine) li() int {
  189. return len(sm.log) - 1
  190. }
  191. func (sm *stateMachine) becomeFollower(term, lead int) {
  192. sm.reset()
  193. sm.term = term
  194. sm.lead = lead
  195. sm.state = stateFollower
  196. }
  197. func (sm *stateMachine) step(m Message) {
  198. switch m.Type {
  199. case msgHup:
  200. sm.term++
  201. sm.reset()
  202. sm.state = stateCandidate
  203. sm.vote = sm.addr
  204. sm.poll(sm.addr, true)
  205. for i := 0; i < sm.k; i++ {
  206. if i == sm.addr {
  207. continue
  208. }
  209. lasti := sm.li()
  210. sm.send(Message{To: i, Type: msgVote, Index: lasti, LogTerm: sm.log[lasti].Term})
  211. }
  212. return
  213. case msgProp:
  214. switch sm.lead {
  215. case sm.addr:
  216. sm.append(sm.li(), Entry{Term: sm.term, Data: m.Data})
  217. sm.sendAppend()
  218. case none:
  219. panic("msgProp given without leader")
  220. default:
  221. m.To = sm.lead
  222. sm.send(m)
  223. }
  224. return
  225. }
  226. switch {
  227. case m.Term > sm.term:
  228. sm.becomeFollower(m.Term, m.From)
  229. case m.Term < sm.term:
  230. // ignore
  231. return
  232. }
  233. handleAppendEntries := func() {
  234. if sm.isLogOk(m.Index, m.LogTerm) {
  235. sm.append(m.Index, m.Entries...)
  236. sm.send(Message{To: m.From, Type: msgAppResp, Index: sm.li()})
  237. } else {
  238. sm.send(Message{To: m.From, Type: msgAppResp, Index: -1})
  239. }
  240. }
  241. switch sm.state {
  242. case stateLeader:
  243. switch m.Type {
  244. case msgAppResp:
  245. in := sm.ins[m.From]
  246. if m.Index < 0 {
  247. in.decr()
  248. sm.sendAppend()
  249. } else {
  250. in.update(m.Index)
  251. }
  252. }
  253. case stateCandidate:
  254. switch m.Type {
  255. case msgApp:
  256. sm.becomeFollower(sm.term, m.From)
  257. handleAppendEntries()
  258. case msgVoteResp:
  259. gr := sm.poll(m.From, m.Index >= 0)
  260. switch sm.q() {
  261. case gr:
  262. sm.state = stateLeader
  263. sm.lead = sm.addr
  264. sm.sendAppend()
  265. case len(sm.votes) - gr:
  266. sm.state = stateFollower
  267. }
  268. }
  269. case stateFollower:
  270. switch m.Type {
  271. case msgApp:
  272. handleAppendEntries()
  273. case msgVote:
  274. if sm.voteWorthy(m.Index, m.LogTerm) {
  275. sm.send(Message{To: m.From, Type: msgVoteResp, Index: sm.li()})
  276. } else {
  277. sm.send(Message{To: m.From, Type: msgVoteResp, Index: -1})
  278. }
  279. }
  280. }
  281. }