raft.go 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311
  1. package raft
  2. import (
  3. "errors"
  4. "sort"
  5. )
  6. const none = -1
  7. type messageType int
  8. const (
  9. msgHup messageType = iota
  10. msgProp
  11. msgApp
  12. msgAppResp
  13. msgVote
  14. msgVoteResp
  15. )
  16. var mtmap = [...]string{
  17. msgHup: "msgHup",
  18. msgProp: "msgProp",
  19. msgApp: "msgApp",
  20. msgAppResp: "msgAppResp",
  21. msgVote: "msgVote",
  22. msgVoteResp: "msgVoteResp",
  23. }
  24. func (mt messageType) String() string {
  25. return mtmap[int(mt)]
  26. }
  27. var errNoLeader = errors.New("no leader")
  28. const (
  29. stateFollower stateType = iota
  30. stateCandidate
  31. stateLeader
  32. )
  33. type stateType int
  34. var stmap = [...]string{
  35. stateFollower: "stateFollower",
  36. stateCandidate: "stateCandidate",
  37. stateLeader: "stateLeader",
  38. }
  39. func (st stateType) String() string {
  40. return stmap[int(st)]
  41. }
  42. type Message struct {
  43. Type messageType
  44. To int
  45. From int
  46. Term int
  47. LogTerm int
  48. Index int
  49. PrevTerm int
  50. Entries []Entry
  51. Commit int
  52. Data []byte
  53. }
  54. type index struct {
  55. match, next int
  56. }
  57. func (in *index) update(n int) {
  58. in.match = n
  59. in.next = n + 1
  60. }
  61. func (in *index) decr() {
  62. if in.next--; in.next < 1 {
  63. in.next = 1
  64. }
  65. }
  66. type stateMachine struct {
  67. // k is the number of peers
  68. k int
  69. // addr is an integer representation of our address amoungst our peers. It is 0 <= addr < k.
  70. addr int
  71. // the term we are participating in at any time
  72. term int
  73. // who we voted for in term
  74. vote int
  75. // the log
  76. log *log
  77. ins []*index
  78. state stateType
  79. votes map[int]bool
  80. msgs []Message
  81. // the leader addr
  82. lead int
  83. }
  84. func newStateMachine(k, addr int) *stateMachine {
  85. sm := &stateMachine{k: k, addr: addr, log: newLog()}
  86. sm.reset()
  87. return sm
  88. }
  89. func (sm *stateMachine) canStep(m Message) bool {
  90. if m.Type == msgProp {
  91. return sm.lead != none
  92. }
  93. return true
  94. }
  95. func (sm *stateMachine) poll(addr int, v bool) (granted int) {
  96. if _, ok := sm.votes[addr]; !ok {
  97. sm.votes[addr] = v
  98. }
  99. for _, vv := range sm.votes {
  100. if vv {
  101. granted++
  102. }
  103. }
  104. return granted
  105. }
  106. // send persists state to stable storage and then sends to its mailbox
  107. func (sm *stateMachine) send(m Message) {
  108. m.From = sm.addr
  109. m.Term = sm.term
  110. sm.msgs = append(sm.msgs, m)
  111. }
  112. // sendAppend sends RRPC, with entries to all peers that are not up-to-date according to sm.mis.
  113. func (sm *stateMachine) sendAppend() {
  114. for i := 0; i < sm.k; i++ {
  115. if i == sm.addr {
  116. continue
  117. }
  118. in := sm.ins[i]
  119. m := Message{}
  120. m.Type = msgApp
  121. m.To = i
  122. m.Index = in.next - 1
  123. m.LogTerm = sm.log.term(in.next - 1)
  124. m.Entries = sm.log.entries(in.next)
  125. m.Commit = sm.log.commit
  126. sm.send(m)
  127. }
  128. }
  129. func (sm *stateMachine) maybeCommit() bool {
  130. // TODO(bmizerany): optimize.. Currently naive
  131. mis := make([]int, len(sm.ins))
  132. for i := range mis {
  133. mis[i] = sm.ins[i].match
  134. }
  135. sort.Sort(sort.Reverse(sort.IntSlice(mis)))
  136. mci := mis[sm.q()-1]
  137. return sm.log.maybeCommit(mci, sm.term)
  138. }
  139. // nextEnts returns the appliable entries and updates the applied index
  140. func (sm *stateMachine) nextEnts() (ents []Entry) {
  141. return sm.log.nextEnts()
  142. }
  143. func (sm *stateMachine) reset() {
  144. sm.lead = none
  145. sm.vote = none
  146. sm.votes = make(map[int]bool)
  147. sm.ins = make([]*index, sm.k)
  148. for i := range sm.ins {
  149. sm.ins[i] = &index{next: sm.log.lastIndex() + 1}
  150. }
  151. }
  152. func (sm *stateMachine) q() int {
  153. return sm.k/2 + 1
  154. }
  155. func (sm *stateMachine) becomeFollower(term, lead int) {
  156. sm.reset()
  157. sm.term = term
  158. sm.lead = lead
  159. sm.state = stateFollower
  160. }
  161. func (sm *stateMachine) becomeCandidate() {
  162. // TODO(xiangli) remove the panic when the raft implementation is stable
  163. if sm.state == stateLeader {
  164. panic("invalid transition [leader -> candidate]")
  165. }
  166. sm.reset()
  167. sm.term++
  168. sm.vote = sm.addr
  169. sm.state = stateCandidate
  170. sm.poll(sm.addr, true)
  171. }
  172. func (sm *stateMachine) becomeLeader() {
  173. // TODO(xiangli) remove the panic when the raft implementation is stable
  174. if sm.state == stateFollower {
  175. panic("invalid transition [follower -> leader]")
  176. }
  177. sm.reset()
  178. sm.lead = sm.addr
  179. sm.state = stateLeader
  180. }
  181. func (sm *stateMachine) Msgs() []Message {
  182. msgs := sm.msgs
  183. sm.msgs = make([]Message, 0)
  184. return msgs
  185. }
  186. func (sm *stateMachine) Step(m Message) {
  187. switch m.Type {
  188. case msgHup:
  189. sm.becomeCandidate()
  190. for i := 0; i < sm.k; i++ {
  191. if i == sm.addr {
  192. continue
  193. }
  194. lasti := sm.log.lastIndex()
  195. sm.send(Message{To: i, Type: msgVote, Index: lasti, LogTerm: sm.log.term(lasti)})
  196. }
  197. return
  198. case msgProp:
  199. switch sm.lead {
  200. case sm.addr:
  201. sm.log.append(sm.log.lastIndex(), Entry{Term: sm.term, Data: m.Data})
  202. sm.sendAppend()
  203. case none:
  204. panic("msgProp given without leader")
  205. default:
  206. m.To = sm.lead
  207. sm.send(m)
  208. }
  209. return
  210. }
  211. switch {
  212. case m.Term > sm.term:
  213. sm.becomeFollower(m.Term, m.From)
  214. case m.Term < sm.term:
  215. // ignore
  216. return
  217. }
  218. handleAppendEntries := func() {
  219. if sm.log.maybeAppend(m.Index, m.LogTerm, m.Commit, m.Entries...) {
  220. sm.send(Message{To: m.From, Type: msgAppResp, Index: sm.log.lastIndex()})
  221. } else {
  222. sm.send(Message{To: m.From, Type: msgAppResp, Index: -1})
  223. }
  224. }
  225. switch sm.state {
  226. case stateLeader:
  227. switch m.Type {
  228. case msgAppResp:
  229. in := sm.ins[m.From]
  230. if m.Index < 0 {
  231. in.decr()
  232. sm.sendAppend()
  233. } else {
  234. in.update(m.Index)
  235. if sm.maybeCommit() {
  236. sm.sendAppend()
  237. }
  238. }
  239. }
  240. case stateCandidate:
  241. switch m.Type {
  242. case msgApp:
  243. sm.becomeFollower(sm.term, m.From)
  244. handleAppendEntries()
  245. case msgVoteResp:
  246. gr := sm.poll(m.From, m.Index >= 0)
  247. switch sm.q() {
  248. case gr:
  249. sm.becomeLeader()
  250. sm.sendAppend()
  251. case len(sm.votes) - gr:
  252. sm.becomeFollower(sm.term, none)
  253. }
  254. }
  255. case stateFollower:
  256. switch m.Type {
  257. case msgApp:
  258. handleAppendEntries()
  259. case msgVote:
  260. if sm.log.isUpToDate(m.Index, m.LogTerm) {
  261. sm.send(Message{To: m.From, Type: msgVoteResp, Index: sm.log.lastIndex()})
  262. } else {
  263. sm.send(Message{To: m.From, Type: msgVoteResp, Index: -1})
  264. }
  265. }
  266. }
  267. }