raft.go 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322
  1. package raft
  2. import (
  3. "errors"
  4. "sort"
  5. )
  6. const none = -1
  7. type messageType int
  8. const (
  9. msgHup messageType = iota
  10. msgProp
  11. msgApp
  12. msgAppResp
  13. msgVote
  14. msgVoteResp
  15. )
  16. var mtmap = [...]string{
  17. msgHup: "msgHup",
  18. msgProp: "msgProp",
  19. msgApp: "msgApp",
  20. msgAppResp: "msgAppResp",
  21. msgVote: "msgVote",
  22. msgVoteResp: "msgVoteResp",
  23. }
  24. func (mt messageType) String() string {
  25. return mtmap[int(mt)]
  26. }
  27. var errNoLeader = errors.New("no leader")
  28. const (
  29. stateFollower stateType = iota
  30. stateCandidate
  31. stateLeader
  32. )
  33. type stateType int
  34. var stmap = [...]string{
  35. stateFollower: "stateFollower",
  36. stateCandidate: "stateCandidate",
  37. stateLeader: "stateLeader",
  38. }
  39. func (st stateType) String() string {
  40. return stmap[int(st)]
  41. }
  42. type Message struct {
  43. Type messageType
  44. To int
  45. From int
  46. Term int
  47. LogTerm int
  48. Index int
  49. PrevTerm int
  50. Entries []Entry
  51. Commit int
  52. Data []byte
  53. }
  54. type index struct {
  55. match, next int
  56. }
  57. func (in *index) update(n int) {
  58. in.match = n
  59. in.next = n + 1
  60. }
  61. func (in *index) decr() {
  62. if in.next--; in.next < 1 {
  63. in.next = 1
  64. }
  65. }
  66. type stateMachine struct {
  67. // k is the number of peers
  68. k int
  69. // addr is an integer representation of our address amoungst our peers. It is 0 <= addr < k.
  70. addr int
  71. // the term we are participating in at any time
  72. term int
  73. // who we voted for in term
  74. vote int
  75. // the log
  76. log *log
  77. ins []*index
  78. state stateType
  79. votes map[int]bool
  80. msgs []Message
  81. // the leader addr
  82. lead int
  83. }
  84. func newStateMachine(k, addr int) *stateMachine {
  85. sm := &stateMachine{k: k, addr: addr, log: newLog()}
  86. sm.reset()
  87. return sm
  88. }
  89. func (sm *stateMachine) canStep(m Message) bool {
  90. if m.Type == msgProp {
  91. return sm.lead != none
  92. }
  93. return true
  94. }
  95. func (sm *stateMachine) poll(addr int, v bool) (granted int) {
  96. if _, ok := sm.votes[addr]; !ok {
  97. sm.votes[addr] = v
  98. }
  99. for _, vv := range sm.votes {
  100. if vv {
  101. granted++
  102. }
  103. }
  104. return granted
  105. }
  106. // send persists state to stable storage and then sends to its mailbox
  107. func (sm *stateMachine) send(m Message) {
  108. m.From = sm.addr
  109. m.Term = sm.term
  110. sm.msgs = append(sm.msgs, m)
  111. }
  112. // sendAppend sends RRPC, with entries to all peers that are not up-to-date according to sm.mis.
  113. func (sm *stateMachine) sendAppend() {
  114. for i := 0; i < sm.k; i++ {
  115. if i == sm.addr {
  116. continue
  117. }
  118. in := sm.ins[i]
  119. m := Message{}
  120. m.Type = msgApp
  121. m.To = i
  122. m.Index = in.next - 1
  123. m.LogTerm = sm.log.term(in.next - 1)
  124. m.Entries = sm.log.entries(in.next)
  125. m.Commit = sm.log.commit
  126. sm.send(m)
  127. }
  128. }
  129. func (sm *stateMachine) maybeCommit() bool {
  130. // TODO(bmizerany): optimize.. Currently naive
  131. mis := make([]int, len(sm.ins))
  132. for i := range mis {
  133. mis[i] = sm.ins[i].match
  134. }
  135. sort.Sort(sort.Reverse(sort.IntSlice(mis)))
  136. mci := mis[sm.q()-1]
  137. if mci > sm.log.commit && sm.log.term(mci) == sm.term {
  138. sm.log.commit = mci
  139. return true
  140. }
  141. return false
  142. }
  143. // nextEnts returns the appliable entries and updates the applied index
  144. func (sm *stateMachine) nextEnts() (ents []Entry) {
  145. return sm.log.nextEnts()
  146. }
  147. func (sm *stateMachine) reset() {
  148. sm.lead = none
  149. sm.vote = none
  150. sm.votes = make(map[int]bool)
  151. sm.ins = make([]*index, sm.k)
  152. for i := range sm.ins {
  153. sm.ins[i] = &index{next: sm.log.lastIndex() + 1}
  154. }
  155. }
  156. func (sm *stateMachine) q() int {
  157. return sm.k/2 + 1
  158. }
  159. func (sm *stateMachine) voteWorthy(i, term int) bool {
  160. return sm.log.isUpToDate(i, term)
  161. }
  162. func (sm *stateMachine) becomeFollower(term, lead int) {
  163. sm.reset()
  164. sm.term = term
  165. sm.lead = lead
  166. sm.state = stateFollower
  167. }
  168. func (sm *stateMachine) becomeCandidate() {
  169. // TODO(xiangli) remove the panic when the raft implementation is stable
  170. if sm.state == stateLeader {
  171. panic("invalid transition [leader -> candidate]")
  172. }
  173. sm.reset()
  174. sm.term++
  175. sm.vote = sm.addr
  176. sm.state = stateCandidate
  177. sm.poll(sm.addr, true)
  178. }
  179. func (sm *stateMachine) becomeLeader() {
  180. // TODO(xiangli) remove the panic when the raft implementation is stable
  181. if sm.state == stateFollower {
  182. panic("invalid transition [follower -> leader]")
  183. }
  184. sm.reset()
  185. sm.lead = sm.addr
  186. sm.state = stateLeader
  187. }
  188. func (sm *stateMachine) Msgs() []Message {
  189. msgs := sm.msgs
  190. sm.msgs = make([]Message, 0)
  191. return msgs
  192. }
  193. func (sm *stateMachine) Step(m Message) {
  194. switch m.Type {
  195. case msgHup:
  196. sm.becomeCandidate()
  197. for i := 0; i < sm.k; i++ {
  198. if i == sm.addr {
  199. continue
  200. }
  201. lasti := sm.log.lastIndex()
  202. sm.send(Message{To: i, Type: msgVote, Index: lasti, LogTerm: sm.log.term(lasti)})
  203. }
  204. return
  205. case msgProp:
  206. switch sm.lead {
  207. case sm.addr:
  208. sm.log.append(sm.log.lastIndex(), Entry{Term: sm.term, Data: m.Data})
  209. sm.sendAppend()
  210. case none:
  211. panic("msgProp given without leader")
  212. default:
  213. m.To = sm.lead
  214. sm.send(m)
  215. }
  216. return
  217. }
  218. switch {
  219. case m.Term > sm.term:
  220. sm.becomeFollower(m.Term, m.From)
  221. case m.Term < sm.term:
  222. // ignore
  223. return
  224. }
  225. handleAppendEntries := func() {
  226. if sm.log.matchTerm(m.Index, m.LogTerm) {
  227. sm.log.commit = m.Commit
  228. sm.log.append(m.Index, m.Entries...)
  229. sm.send(Message{To: m.From, Type: msgAppResp, Index: sm.log.lastIndex()})
  230. } else {
  231. sm.send(Message{To: m.From, Type: msgAppResp, Index: -1})
  232. }
  233. }
  234. switch sm.state {
  235. case stateLeader:
  236. switch m.Type {
  237. case msgAppResp:
  238. in := sm.ins[m.From]
  239. if m.Index < 0 {
  240. in.decr()
  241. sm.sendAppend()
  242. } else {
  243. in.update(m.Index)
  244. if sm.maybeCommit() {
  245. sm.sendAppend()
  246. }
  247. }
  248. }
  249. case stateCandidate:
  250. switch m.Type {
  251. case msgApp:
  252. sm.becomeFollower(sm.term, m.From)
  253. handleAppendEntries()
  254. case msgVoteResp:
  255. gr := sm.poll(m.From, m.Index >= 0)
  256. switch sm.q() {
  257. case gr:
  258. sm.becomeLeader()
  259. sm.sendAppend()
  260. case len(sm.votes) - gr:
  261. sm.becomeFollower(sm.term, none)
  262. }
  263. }
  264. case stateFollower:
  265. switch m.Type {
  266. case msgApp:
  267. handleAppendEntries()
  268. case msgVote:
  269. if sm.voteWorthy(m.Index, m.LogTerm) {
  270. sm.send(Message{To: m.From, Type: msgVoteResp, Index: sm.log.lastIndex()})
  271. } else {
  272. sm.send(Message{To: m.From, Type: msgVoteResp, Index: -1})
  273. }
  274. }
  275. }
  276. }