raft.go 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318
  1. package raft
  2. import (
  3. "errors"
  4. "sort"
  5. )
  6. const none = -1
  7. type messageType int
  8. const (
  9. msgHup messageType = iota
  10. msgProp
  11. msgApp
  12. msgAppResp
  13. msgVote
  14. msgVoteResp
  15. )
  16. var mtmap = [...]string{
  17. msgHup: "msgHup",
  18. msgProp: "msgProp",
  19. msgApp: "msgApp",
  20. msgAppResp: "msgAppResp",
  21. msgVote: "msgVote",
  22. msgVoteResp: "msgVoteResp",
  23. }
  24. func (mt messageType) String() string {
  25. return mtmap[int(mt)]
  26. }
  27. var errNoLeader = errors.New("no leader")
  28. const (
  29. stateFollower stateType = iota
  30. stateCandidate
  31. stateLeader
  32. )
  33. type stateType int
  34. var stmap = [...]string{
  35. stateFollower: "stateFollower",
  36. stateCandidate: "stateCandidate",
  37. stateLeader: "stateLeader",
  38. }
  39. func (st stateType) String() string {
  40. return stmap[int(st)]
  41. }
  42. type Message struct {
  43. Type messageType
  44. To int
  45. From int
  46. Term int
  47. LogTerm int
  48. Index int
  49. PrevTerm int
  50. Entries []Entry
  51. Commit int
  52. Data []byte
  53. }
  54. type index struct {
  55. match, next int
  56. }
  57. func (in *index) update(n int) {
  58. in.match = n
  59. in.next = n + 1
  60. }
  61. func (in *index) decr() {
  62. if in.next--; in.next < 1 {
  63. in.next = 1
  64. }
  65. }
  66. type stateMachine struct {
  67. // k is the number of peers
  68. k int
  69. // addr is an integer representation of our address amoungst our peers. It is 0 <= addr < k.
  70. addr int
  71. // the term we are participating in at any time
  72. term int
  73. // who we voted for in term
  74. vote int
  75. // the log
  76. log *log
  77. ins []index
  78. state stateType
  79. votes map[int]bool
  80. msgs []Message
  81. // the leader addr
  82. lead int
  83. }
  84. func newStateMachine(k, addr int) *stateMachine {
  85. sm := &stateMachine{k: k, addr: addr, log: newLog()}
  86. sm.reset()
  87. return sm
  88. }
  89. func (sm *stateMachine) canStep(m Message) bool {
  90. if m.Type == msgProp {
  91. return sm.lead != none
  92. }
  93. return true
  94. }
  95. func (sm *stateMachine) poll(addr int, v bool) (granted int) {
  96. if _, ok := sm.votes[addr]; !ok {
  97. sm.votes[addr] = v
  98. }
  99. for _, vv := range sm.votes {
  100. if vv {
  101. granted++
  102. }
  103. }
  104. return granted
  105. }
  106. // send persists state to stable storage and then sends to its mailbox
  107. func (sm *stateMachine) send(m Message) {
  108. m.From = sm.addr
  109. m.Term = sm.term
  110. sm.msgs = append(sm.msgs, m)
  111. }
  112. // sendAppend sends RRPC, with entries to all peers that are not up-to-date according to sm.mis.
  113. func (sm *stateMachine) sendAppend() {
  114. for i := 0; i < sm.k; i++ {
  115. if i == sm.addr {
  116. continue
  117. }
  118. in := sm.ins[i]
  119. m := Message{}
  120. m.Type = msgApp
  121. m.To = i
  122. m.Index = in.next - 1
  123. m.LogTerm = sm.log.term(in.next - 1)
  124. m.Entries = sm.log.entries(in.next)
  125. m.Commit = sm.log.committed
  126. sm.send(m)
  127. }
  128. }
  129. func (sm *stateMachine) maybeCommit() bool {
  130. // TODO(bmizerany): optimize.. Currently naive
  131. mis := make([]int, len(sm.ins))
  132. for i := range mis {
  133. mis[i] = sm.ins[i].match
  134. }
  135. sort.Sort(sort.Reverse(sort.IntSlice(mis)))
  136. mci := mis[sm.q()-1]
  137. return sm.log.maybeCommit(mci, sm.term)
  138. }
  139. // nextEnts returns the appliable entries and updates the applied index
  140. func (sm *stateMachine) nextEnts() (ents []Entry) {
  141. return sm.log.nextEnts()
  142. }
  143. func (sm *stateMachine) reset() {
  144. sm.lead = none
  145. sm.vote = none
  146. sm.votes = make(map[int]bool)
  147. sm.ins = make([]index, sm.k)
  148. for i := range sm.ins {
  149. sm.ins[i] = index{next: sm.log.lastIndex() + 1}
  150. }
  151. }
  152. func (sm *stateMachine) q() int {
  153. return sm.k/2 + 1
  154. }
  155. func (sm *stateMachine) becomeFollower(term, lead int) {
  156. sm.reset()
  157. sm.term = term
  158. sm.lead = lead
  159. sm.state = stateFollower
  160. }
  161. func (sm *stateMachine) becomeCandidate() {
  162. // TODO(xiangli) remove the panic when the raft implementation is stable
  163. if sm.state == stateLeader {
  164. panic("invalid transition [leader -> candidate]")
  165. }
  166. sm.reset()
  167. sm.term++
  168. sm.vote = sm.addr
  169. sm.state = stateCandidate
  170. }
  171. func (sm *stateMachine) becomeLeader() {
  172. // TODO(xiangli) remove the panic when the raft implementation is stable
  173. if sm.state == stateFollower {
  174. panic("invalid transition [follower -> leader]")
  175. }
  176. sm.reset()
  177. sm.lead = sm.addr
  178. sm.state = stateLeader
  179. }
  180. func (sm *stateMachine) Msgs() []Message {
  181. msgs := sm.msgs
  182. sm.msgs = make([]Message, 0)
  183. return msgs
  184. }
  185. func (sm *stateMachine) Step(m Message) {
  186. switch m.Type {
  187. case msgHup:
  188. sm.becomeCandidate()
  189. if sm.q() == sm.poll(sm.addr, true) {
  190. sm.becomeLeader()
  191. return
  192. }
  193. for i := 0; i < sm.k; i++ {
  194. if i == sm.addr {
  195. continue
  196. }
  197. lasti := sm.log.lastIndex()
  198. sm.send(Message{To: i, Type: msgVote, Index: lasti, LogTerm: sm.log.term(lasti)})
  199. }
  200. return
  201. case msgProp:
  202. switch sm.lead {
  203. case sm.addr:
  204. sm.log.append(sm.log.lastIndex(), Entry{Term: sm.term, Data: m.Data})
  205. sm.sendAppend()
  206. case none:
  207. panic("msgProp given without leader")
  208. default:
  209. m.To = sm.lead
  210. sm.send(m)
  211. }
  212. return
  213. }
  214. switch {
  215. case m.Term > sm.term:
  216. sm.becomeFollower(m.Term, m.From)
  217. case m.Term < sm.term:
  218. // ignore
  219. return
  220. }
  221. handleAppendEntries := func() {
  222. if sm.log.maybeAppend(m.Index, m.LogTerm, m.Commit, m.Entries...) {
  223. sm.send(Message{To: m.From, Type: msgAppResp, Index: sm.log.lastIndex()})
  224. } else {
  225. sm.send(Message{To: m.From, Type: msgAppResp, Index: -1})
  226. }
  227. }
  228. switch sm.state {
  229. case stateLeader:
  230. switch m.Type {
  231. case msgAppResp:
  232. if m.Index < 0 {
  233. sm.ins[m.From].decr()
  234. sm.sendAppend()
  235. } else {
  236. sm.ins[m.From].update(m.Index)
  237. if sm.maybeCommit() {
  238. sm.sendAppend()
  239. }
  240. }
  241. case msgVote:
  242. sm.send(Message{To: m.From, Type: msgVoteResp, Index: -1})
  243. }
  244. case stateCandidate:
  245. switch m.Type {
  246. case msgApp:
  247. sm.becomeFollower(sm.term, m.From)
  248. handleAppendEntries()
  249. case msgVote:
  250. sm.send(Message{To: m.From, Type: msgVoteResp, Index: -1})
  251. case msgVoteResp:
  252. gr := sm.poll(m.From, m.Index >= 0)
  253. switch sm.q() {
  254. case gr:
  255. sm.becomeLeader()
  256. sm.sendAppend()
  257. case len(sm.votes) - gr:
  258. sm.becomeFollower(sm.term, none)
  259. }
  260. }
  261. case stateFollower:
  262. switch m.Type {
  263. case msgApp:
  264. handleAppendEntries()
  265. case msgVote:
  266. if (sm.vote == none || sm.vote == m.From) && sm.log.isUpToDate(m.Index, m.LogTerm) {
  267. sm.vote = m.From
  268. sm.send(Message{To: m.From, Type: msgVoteResp, Index: sm.log.lastIndex()})
  269. } else {
  270. sm.send(Message{To: m.From, Type: msgVoteResp, Index: -1})
  271. }
  272. }
  273. }
  274. }