raft.go 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369
  1. package raft
  2. import (
  3. "errors"
  4. "sort"
  5. )
  6. const none = -1
  7. type messageType int
  8. const (
  9. msgHup messageType = iota
  10. msgBeat
  11. msgProp
  12. msgApp
  13. msgAppResp
  14. msgVote
  15. msgVoteResp
  16. )
  17. var mtmap = [...]string{
  18. msgHup: "msgHup",
  19. msgBeat: "msgBeat",
  20. msgProp: "msgProp",
  21. msgApp: "msgApp",
  22. msgAppResp: "msgAppResp",
  23. msgVote: "msgVote",
  24. msgVoteResp: "msgVoteResp",
  25. }
  26. func (mt messageType) String() string {
  27. return mtmap[int(mt)]
  28. }
  29. var errNoLeader = errors.New("no leader")
  30. const (
  31. stateFollower stateType = iota
  32. stateCandidate
  33. stateLeader
  34. )
  35. type stateType int
  36. var stmap = [...]string{
  37. stateFollower: "stateFollower",
  38. stateCandidate: "stateCandidate",
  39. stateLeader: "stateLeader",
  40. }
  41. var stepmap = [...]stepFunc{
  42. stateFollower: stepFollower,
  43. stateCandidate: stepCandidate,
  44. stateLeader: stepLeader,
  45. }
  46. func (st stateType) String() string {
  47. return stmap[int(st)]
  48. }
  49. type Message struct {
  50. Type messageType
  51. To int
  52. From int
  53. Term int
  54. LogTerm int
  55. Index int
  56. PrevTerm int
  57. Entries []Entry
  58. Commit int
  59. }
  60. type index struct {
  61. match, next int
  62. }
  63. func (in *index) update(n int) {
  64. in.match = n
  65. in.next = n + 1
  66. }
  67. func (in *index) decr() {
  68. if in.next--; in.next < 1 {
  69. in.next = 1
  70. }
  71. }
  72. type stateMachine struct {
  73. id int
  74. // the term we are participating in at any time
  75. term int
  76. // who we voted for in term
  77. vote int
  78. // the log
  79. log *log
  80. ins map[int]*index
  81. state stateType
  82. votes map[int]bool
  83. msgs []Message
  84. // the leader id
  85. lead int
  86. // pending reconfiguration
  87. pendingConf bool
  88. }
  89. func newStateMachine(id int, peers []int) *stateMachine {
  90. sm := &stateMachine{id: id, log: newLog(), ins: make(map[int]*index)}
  91. for _, p := range peers {
  92. sm.ins[p] = &index{}
  93. }
  94. sm.reset(0)
  95. return sm
  96. }
  97. func (sm *stateMachine) poll(id int, v bool) (granted int) {
  98. if _, ok := sm.votes[id]; !ok {
  99. sm.votes[id] = v
  100. }
  101. for _, vv := range sm.votes {
  102. if vv {
  103. granted++
  104. }
  105. }
  106. return granted
  107. }
  108. // send persists state to stable storage and then sends to its mailbox.
  109. func (sm *stateMachine) send(m Message) {
  110. m.From = sm.id
  111. m.Term = sm.term
  112. sm.msgs = append(sm.msgs, m)
  113. }
  114. // sendAppend sends RRPC, with entries to the given peer.
  115. func (sm *stateMachine) sendAppend(to int) {
  116. in := sm.ins[to]
  117. m := Message{}
  118. m.Type = msgApp
  119. m.To = to
  120. m.Index = in.next - 1
  121. m.LogTerm = sm.log.term(in.next - 1)
  122. m.Entries = sm.log.entries(in.next)
  123. m.Commit = sm.log.committed
  124. sm.send(m)
  125. }
  126. // bcastAppend sends RRPC, with entries to all peers that are not up-to-date according to sm.mis.
  127. func (sm *stateMachine) bcastAppend() {
  128. for i := range sm.ins {
  129. if i == sm.id {
  130. continue
  131. }
  132. sm.sendAppend(i)
  133. }
  134. }
  135. func (sm *stateMachine) maybeCommit() bool {
  136. // TODO(bmizerany): optimize.. Currently naive
  137. mis := make([]int, 0, len(sm.ins))
  138. for i := range sm.ins {
  139. mis = append(mis, sm.ins[i].match)
  140. }
  141. sort.Sort(sort.Reverse(sort.IntSlice(mis)))
  142. mci := mis[sm.q()-1]
  143. return sm.log.maybeCommit(mci, sm.term)
  144. }
  145. // nextEnts returns the appliable entries and updates the applied index
  146. func (sm *stateMachine) nextEnts() (ents []Entry) {
  147. return sm.log.nextEnts()
  148. }
  149. func (sm *stateMachine) reset(term int) {
  150. sm.term = term
  151. sm.lead = none
  152. sm.vote = none
  153. sm.votes = make(map[int]bool)
  154. for i := range sm.ins {
  155. sm.ins[i] = &index{next: sm.log.lastIndex() + 1}
  156. if i == sm.id {
  157. sm.ins[i].match = sm.log.lastIndex()
  158. }
  159. }
  160. }
  161. func (sm *stateMachine) q() int {
  162. return len(sm.ins)/2 + 1
  163. }
  164. func (sm *stateMachine) becomeFollower(term, lead int) {
  165. sm.reset(term)
  166. sm.lead = lead
  167. sm.state = stateFollower
  168. sm.pendingConf = false
  169. }
  170. func (sm *stateMachine) becomeCandidate() {
  171. // TODO(xiangli) remove the panic when the raft implementation is stable
  172. if sm.state == stateLeader {
  173. panic("invalid transition [leader -> candidate]")
  174. }
  175. sm.reset(sm.term + 1)
  176. sm.vote = sm.id
  177. sm.state = stateCandidate
  178. }
  179. func (sm *stateMachine) becomeLeader() {
  180. // TODO(xiangli) remove the panic when the raft implementation is stable
  181. if sm.state == stateFollower {
  182. panic("invalid transition [follower -> leader]")
  183. }
  184. sm.reset(sm.term)
  185. sm.lead = sm.id
  186. sm.state = stateLeader
  187. for _, e := range sm.log.ents[sm.log.committed:] {
  188. if e.isConfig() {
  189. sm.pendingConf = true
  190. }
  191. }
  192. }
  193. func (sm *stateMachine) Msgs() []Message {
  194. msgs := sm.msgs
  195. sm.msgs = make([]Message, 0)
  196. return msgs
  197. }
  198. func (sm *stateMachine) Step(m Message) (ok bool) {
  199. if m.Type == msgHup {
  200. sm.becomeCandidate()
  201. if sm.q() == sm.poll(sm.id, true) {
  202. sm.becomeLeader()
  203. return true
  204. }
  205. for i := range sm.ins {
  206. if i == sm.id {
  207. continue
  208. }
  209. lasti := sm.log.lastIndex()
  210. sm.send(Message{To: i, Type: msgVote, Index: lasti, LogTerm: sm.log.term(lasti)})
  211. }
  212. return true
  213. }
  214. switch {
  215. case m.Term == 0:
  216. // local message
  217. case m.Term > sm.term:
  218. sm.becomeFollower(m.Term, m.From)
  219. case m.Term < sm.term:
  220. // ignore
  221. return true
  222. }
  223. return stepmap[sm.state](sm, m)
  224. }
  225. func (sm *stateMachine) handleAppendEntries(m Message) {
  226. if sm.log.maybeAppend(m.Index, m.LogTerm, m.Commit, m.Entries...) {
  227. sm.send(Message{To: m.From, Type: msgAppResp, Index: sm.log.lastIndex()})
  228. } else {
  229. sm.send(Message{To: m.From, Type: msgAppResp, Index: -1})
  230. }
  231. }
  232. func (sm *stateMachine) addNode(id int) {
  233. sm.ins[id] = &index{next: sm.log.lastIndex() + 1}
  234. sm.pendingConf = false
  235. }
  236. func (sm *stateMachine) removeNode(id int) {
  237. delete(sm.ins, id)
  238. sm.pendingConf = false
  239. }
  240. type stepFunc func(sm *stateMachine, m Message) bool
  241. func stepLeader(sm *stateMachine, m Message) bool {
  242. switch m.Type {
  243. case msgBeat:
  244. sm.bcastAppend()
  245. case msgProp:
  246. if len(m.Entries) != 1 {
  247. panic("unexpected length(entries) of a msgProp")
  248. }
  249. e := m.Entries[0]
  250. if e.isConfig() {
  251. if sm.pendingConf {
  252. return false
  253. }
  254. sm.pendingConf = true
  255. }
  256. e.Term = sm.term
  257. sm.log.append(sm.log.lastIndex(), e)
  258. sm.ins[sm.id].update(sm.log.lastIndex())
  259. sm.maybeCommit()
  260. sm.bcastAppend()
  261. case msgAppResp:
  262. if m.Index < 0 {
  263. sm.ins[m.From].decr()
  264. sm.sendAppend(m.From)
  265. } else {
  266. sm.ins[m.From].update(m.Index)
  267. if sm.maybeCommit() {
  268. sm.bcastAppend()
  269. }
  270. }
  271. case msgVote:
  272. sm.send(Message{To: m.From, Type: msgVoteResp, Index: -1})
  273. }
  274. return true
  275. }
  276. func stepCandidate(sm *stateMachine, m Message) bool {
  277. switch m.Type {
  278. case msgProp:
  279. return false
  280. case msgApp:
  281. sm.becomeFollower(sm.term, m.From)
  282. sm.handleAppendEntries(m)
  283. case msgVote:
  284. sm.send(Message{To: m.From, Type: msgVoteResp, Index: -1})
  285. case msgVoteResp:
  286. gr := sm.poll(m.From, m.Index >= 0)
  287. switch sm.q() {
  288. case gr:
  289. sm.becomeLeader()
  290. sm.bcastAppend()
  291. case len(sm.votes) - gr:
  292. sm.becomeFollower(sm.term, none)
  293. }
  294. }
  295. return true
  296. }
  297. func stepFollower(sm *stateMachine, m Message) bool {
  298. switch m.Type {
  299. case msgProp:
  300. if sm.lead == none {
  301. return false
  302. }
  303. m.To = sm.lead
  304. sm.send(m)
  305. case msgApp:
  306. sm.handleAppendEntries(m)
  307. case msgVote:
  308. if (sm.vote == none || sm.vote == m.From) && sm.log.isUpToDate(m.Index, m.LogTerm) {
  309. sm.vote = m.From
  310. sm.send(Message{To: m.From, Type: msgVoteResp, Index: sm.log.lastIndex()})
  311. } else {
  312. sm.send(Message{To: m.From, Type: msgVoteResp, Index: -1})
  313. }
  314. }
  315. return true
  316. }