raft.go 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323
  1. package raft
  2. import (
  3. "errors"
  4. "sort"
  5. )
  6. const none = -1
  7. type messageType int
  8. const (
  9. msgHup messageType = iota
  10. msgProp
  11. msgApp
  12. msgAppResp
  13. msgVote
  14. msgVoteResp
  15. )
  16. var mtmap = [...]string{
  17. msgHup: "msgHup",
  18. msgProp: "msgProp",
  19. msgApp: "msgApp",
  20. msgAppResp: "msgAppResp",
  21. msgVote: "msgVote",
  22. msgVoteResp: "msgVoteResp",
  23. }
  24. func (mt messageType) String() string {
  25. return mtmap[int(mt)]
  26. }
  27. var errNoLeader = errors.New("no leader")
  28. const (
  29. stateFollower stateType = iota
  30. stateCandidate
  31. stateLeader
  32. )
  33. type stateType int
  34. var stmap = [...]string{
  35. stateFollower: "stateFollower",
  36. stateCandidate: "stateCandidate",
  37. stateLeader: "stateLeader",
  38. }
  39. func (st stateType) String() string {
  40. return stmap[int(st)]
  41. }
  42. type Message struct {
  43. Type messageType
  44. To int
  45. From int
  46. Term int
  47. LogTerm int
  48. Index int
  49. PrevTerm int
  50. Entries []Entry
  51. Commit int
  52. Data []byte
  53. }
  54. type index struct {
  55. match, next int
  56. }
  57. func (in *index) update(n int) {
  58. in.match = n
  59. in.next = n + 1
  60. }
  61. func (in *index) decr() {
  62. if in.next--; in.next < 1 {
  63. in.next = 1
  64. }
  65. }
  66. type stateMachine struct {
  67. // k is the number of peers
  68. k int
  69. // addr is an integer representation of our address amoungst our peers. It is 0 <= addr < k.
  70. addr int
  71. // the term we are participating in at any time
  72. term int
  73. // who we voted for in term
  74. vote int
  75. // the log
  76. log *log
  77. ins []index
  78. state stateType
  79. votes map[int]bool
  80. msgs []Message
  81. // the leader addr
  82. lead int
  83. }
  84. func newStateMachine(k, addr int) *stateMachine {
  85. sm := &stateMachine{k: k, addr: addr, log: newLog()}
  86. sm.reset()
  87. return sm
  88. }
  89. func (sm *stateMachine) canStep(m Message) bool {
  90. if m.Type == msgProp {
  91. return sm.lead != none
  92. }
  93. return true
  94. }
  95. func (sm *stateMachine) poll(addr int, v bool) (granted int) {
  96. if _, ok := sm.votes[addr]; !ok {
  97. sm.votes[addr] = v
  98. }
  99. for _, vv := range sm.votes {
  100. if vv {
  101. granted++
  102. }
  103. }
  104. return granted
  105. }
  106. // send persists state to stable storage and then sends to its mailbox
  107. func (sm *stateMachine) send(m Message) {
  108. m.From = sm.addr
  109. m.Term = sm.term
  110. sm.msgs = append(sm.msgs, m)
  111. }
  112. // sendAppend sends RRPC, with entries to the given peer
  113. func (sm *stateMachine) sendAppend(to int) {
  114. in := sm.ins[to]
  115. m := Message{}
  116. m.Type = msgApp
  117. m.To = to
  118. m.Index = in.next - 1
  119. m.LogTerm = sm.log.term(in.next - 1)
  120. m.Entries = sm.log.entries(in.next)
  121. m.Commit = sm.log.committed
  122. sm.send(m)
  123. }
  124. // bcastAppend sends RRPC, with entries to all peers that are not up-to-date according to sm.mis.
  125. func (sm *stateMachine) bcastAppend() {
  126. for i := 0; i < sm.k; i++ {
  127. if i == sm.addr {
  128. continue
  129. }
  130. sm.sendAppend(i)
  131. }
  132. }
  133. func (sm *stateMachine) maybeCommit() bool {
  134. // TODO(bmizerany): optimize.. Currently naive
  135. mis := make([]int, len(sm.ins))
  136. for i := range mis {
  137. mis[i] = sm.ins[i].match
  138. }
  139. sort.Sort(sort.Reverse(sort.IntSlice(mis)))
  140. mci := mis[sm.q()-1]
  141. return sm.log.maybeCommit(mci, sm.term)
  142. }
  143. // nextEnts returns the appliable entries and updates the applied index
  144. func (sm *stateMachine) nextEnts() (ents []Entry) {
  145. return sm.log.nextEnts()
  146. }
  147. func (sm *stateMachine) reset() {
  148. sm.lead = none
  149. sm.vote = none
  150. sm.votes = make(map[int]bool)
  151. sm.ins = make([]index, sm.k)
  152. for i := range sm.ins {
  153. sm.ins[i] = index{next: sm.log.lastIndex() + 1}
  154. }
  155. }
  156. func (sm *stateMachine) q() int {
  157. return sm.k/2 + 1
  158. }
  159. func (sm *stateMachine) becomeFollower(term, lead int) {
  160. sm.reset()
  161. sm.term = term
  162. sm.lead = lead
  163. sm.state = stateFollower
  164. }
  165. func (sm *stateMachine) becomeCandidate() {
  166. // TODO(xiangli) remove the panic when the raft implementation is stable
  167. if sm.state == stateLeader {
  168. panic("invalid transition [leader -> candidate]")
  169. }
  170. sm.reset()
  171. sm.term++
  172. sm.vote = sm.addr
  173. sm.state = stateCandidate
  174. }
  175. func (sm *stateMachine) becomeLeader() {
  176. // TODO(xiangli) remove the panic when the raft implementation is stable
  177. if sm.state == stateFollower {
  178. panic("invalid transition [follower -> leader]")
  179. }
  180. sm.reset()
  181. sm.lead = sm.addr
  182. sm.state = stateLeader
  183. }
  184. func (sm *stateMachine) Msgs() []Message {
  185. msgs := sm.msgs
  186. sm.msgs = make([]Message, 0)
  187. return msgs
  188. }
  189. func (sm *stateMachine) Step(m Message) {
  190. switch m.Type {
  191. case msgHup:
  192. sm.becomeCandidate()
  193. if sm.q() == sm.poll(sm.addr, true) {
  194. sm.becomeLeader()
  195. return
  196. }
  197. for i := 0; i < sm.k; i++ {
  198. if i == sm.addr {
  199. continue
  200. }
  201. lasti := sm.log.lastIndex()
  202. sm.send(Message{To: i, Type: msgVote, Index: lasti, LogTerm: sm.log.term(lasti)})
  203. }
  204. return
  205. case msgProp:
  206. switch sm.lead {
  207. case sm.addr:
  208. sm.log.append(sm.log.lastIndex(), Entry{Term: sm.term, Data: m.Data})
  209. sm.bcastAppend()
  210. case none:
  211. panic("msgProp given without leader")
  212. default:
  213. m.To = sm.lead
  214. sm.send(m)
  215. }
  216. return
  217. }
  218. switch {
  219. case m.Term > sm.term:
  220. sm.becomeFollower(m.Term, m.From)
  221. case m.Term < sm.term:
  222. // ignore
  223. return
  224. }
  225. handleAppendEntries := func() {
  226. if sm.log.maybeAppend(m.Index, m.LogTerm, m.Commit, m.Entries...) {
  227. sm.send(Message{To: m.From, Type: msgAppResp, Index: sm.log.lastIndex()})
  228. } else {
  229. sm.send(Message{To: m.From, Type: msgAppResp, Index: -1})
  230. }
  231. }
  232. switch sm.state {
  233. case stateLeader:
  234. switch m.Type {
  235. case msgAppResp:
  236. if m.Index < 0 {
  237. sm.ins[m.From].decr()
  238. sm.sendAppend(m.From)
  239. } else {
  240. sm.ins[m.From].update(m.Index)
  241. if sm.maybeCommit() {
  242. sm.bcastAppend()
  243. }
  244. }
  245. case msgVote:
  246. sm.send(Message{To: m.From, Type: msgVoteResp, Index: -1})
  247. }
  248. case stateCandidate:
  249. switch m.Type {
  250. case msgApp:
  251. sm.becomeFollower(sm.term, m.From)
  252. handleAppendEntries()
  253. case msgVote:
  254. sm.send(Message{To: m.From, Type: msgVoteResp, Index: -1})
  255. case msgVoteResp:
  256. gr := sm.poll(m.From, m.Index >= 0)
  257. switch sm.q() {
  258. case gr:
  259. sm.becomeLeader()
  260. sm.bcastAppend()
  261. case len(sm.votes) - gr:
  262. sm.becomeFollower(sm.term, none)
  263. }
  264. }
  265. case stateFollower:
  266. switch m.Type {
  267. case msgApp:
  268. handleAppendEntries()
  269. case msgVote:
  270. if (sm.vote == none || sm.vote == m.From) && sm.log.isUpToDate(m.Index, m.LogTerm) {
  271. sm.vote = m.From
  272. sm.send(Message{To: m.From, Type: msgVoteResp, Index: sm.log.lastIndex()})
  273. } else {
  274. sm.send(Message{To: m.From, Type: msgVoteResp, Index: -1})
  275. }
  276. }
  277. }
  278. }