raft.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543
  1. package raft
  2. import (
  3. "errors"
  4. "fmt"
  5. "log"
  6. "sort"
  7. "sync/atomic"
  8. )
  9. const none = -1
  10. type messageType int64
  11. const (
  12. msgHup messageType = iota
  13. msgBeat
  14. msgProp
  15. msgApp
  16. msgAppResp
  17. msgVote
  18. msgVoteResp
  19. msgSnap
  20. msgDenied
  21. )
  22. var mtmap = [...]string{
  23. msgHup: "msgHup",
  24. msgBeat: "msgBeat",
  25. msgProp: "msgProp",
  26. msgApp: "msgApp",
  27. msgAppResp: "msgAppResp",
  28. msgVote: "msgVote",
  29. msgVoteResp: "msgVoteResp",
  30. msgSnap: "msgSnap",
  31. msgDenied: "msgDenied",
  32. }
  33. func (mt messageType) String() string {
  34. return mtmap[int64(mt)]
  35. }
  36. var errNoLeader = errors.New("no leader")
  37. const (
  38. stateFollower stateType = iota
  39. stateCandidate
  40. stateLeader
  41. )
  42. type stateType int64
  43. var stmap = [...]string{
  44. stateFollower: "stateFollower",
  45. stateCandidate: "stateCandidate",
  46. stateLeader: "stateLeader",
  47. }
  48. var stepmap = [...]stepFunc{
  49. stateFollower: stepFollower,
  50. stateCandidate: stepCandidate,
  51. stateLeader: stepLeader,
  52. }
  53. func (st stateType) String() string {
  54. return stmap[int64(st)]
  55. }
  56. type Message struct {
  57. Type messageType
  58. ClusterId int64
  59. To int64
  60. From int64
  61. Term int64
  62. LogTerm int64
  63. Index int64
  64. PrevTerm int64
  65. Entries []Entry
  66. Commit int64
  67. Snapshot Snapshot
  68. }
  69. func (m Message) String() string {
  70. return fmt.Sprintf("type=%v from=%x to=%x term=%d logTerm=%d i=%d ci=%d len(ents)=%d",
  71. m.Type, m.From, m.To, m.Term, m.LogTerm, m.Index, m.Commit, len(m.Entries))
  72. }
  73. type index struct {
  74. match, next int64
  75. }
  76. func (in *index) update(n int64) {
  77. in.match = n
  78. in.next = n + 1
  79. }
  80. func (in *index) decr() {
  81. if in.next--; in.next < 1 {
  82. in.next = 1
  83. }
  84. }
  85. func (in *index) String() string {
  86. return fmt.Sprintf("n=%d m=%d", in.next, in.match)
  87. }
  88. // An AtomicInt is an int64 to be accessed atomically.
  89. type atomicInt int64
  90. func (i *atomicInt) Set(n int64) {
  91. atomic.StoreInt64((*int64)(i), n)
  92. }
  93. func (i *atomicInt) Get() int64 {
  94. return atomic.LoadInt64((*int64)(i))
  95. }
  96. // int64Slice implements sort interface
  97. type int64Slice []int64
  98. func (p int64Slice) Len() int { return len(p) }
  99. func (p int64Slice) Less(i, j int) bool { return p[i] < p[j] }
  100. func (p int64Slice) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
  101. type stateMachine struct {
  102. clusterId int64
  103. id int64
  104. // the term we are participating in at any time
  105. term atomicInt
  106. index atomicInt
  107. // who we voted for in term
  108. vote int64
  109. // the log
  110. raftLog *raftLog
  111. ins map[int64]*index
  112. state stateType
  113. votes map[int64]bool
  114. msgs []Message
  115. // the leader id
  116. lead atomicInt
  117. // pending reconfiguration
  118. pendingConf bool
  119. snapshoter Snapshoter
  120. }
  121. func newStateMachine(id int64, peers []int64) *stateMachine {
  122. if id == none {
  123. panic("cannot use none id")
  124. }
  125. sm := &stateMachine{id: id, clusterId: none, lead: none, raftLog: newLog(), ins: make(map[int64]*index)}
  126. for _, p := range peers {
  127. sm.ins[p] = &index{}
  128. }
  129. sm.reset(0)
  130. return sm
  131. }
  132. func (sm *stateMachine) String() string {
  133. s := fmt.Sprintf(`state=%v term=%d`, sm.state, sm.term)
  134. switch sm.state {
  135. case stateFollower:
  136. s += fmt.Sprintf(" vote=%v lead=%v", sm.vote, sm.lead)
  137. case stateCandidate:
  138. s += fmt.Sprintf(` votes="%v"`, sm.votes)
  139. case stateLeader:
  140. s += fmt.Sprintf(` ins="%v"`, sm.ins)
  141. }
  142. return s
  143. }
  144. func (sm *stateMachine) setSnapshoter(snapshoter Snapshoter) {
  145. sm.snapshoter = snapshoter
  146. }
  147. func (sm *stateMachine) poll(id int64, v bool) (granted int) {
  148. if _, ok := sm.votes[id]; !ok {
  149. sm.votes[id] = v
  150. }
  151. for _, vv := range sm.votes {
  152. if vv {
  153. granted++
  154. }
  155. }
  156. return granted
  157. }
  158. // send persists state to stable storage and then sends to its mailbox.
  159. func (sm *stateMachine) send(m Message) {
  160. m.ClusterId = sm.clusterId
  161. m.From = sm.id
  162. m.Term = sm.term.Get()
  163. log.Printf("raft.send msg %v\n", m)
  164. sm.msgs = append(sm.msgs, m)
  165. }
  166. // sendAppend sends RRPC, with entries to the given peer.
  167. func (sm *stateMachine) sendAppend(to int64) {
  168. in := sm.ins[to]
  169. m := Message{}
  170. m.To = to
  171. m.Index = in.next - 1
  172. if sm.needSnapshot(m.Index) {
  173. m.Type = msgSnap
  174. m.Snapshot = sm.snapshoter.GetSnap()
  175. } else {
  176. m.Type = msgApp
  177. m.LogTerm = sm.raftLog.term(in.next - 1)
  178. m.Entries = sm.raftLog.entries(in.next)
  179. m.Commit = sm.raftLog.committed
  180. }
  181. sm.send(m)
  182. }
  183. // sendHeartbeat sends RRPC, without entries to the given peer.
  184. func (sm *stateMachine) sendHeartbeat(to int64) {
  185. in := sm.ins[to]
  186. index := max(in.next-1, sm.raftLog.lastIndex())
  187. m := Message{
  188. To: to,
  189. Type: msgApp,
  190. Index: index,
  191. LogTerm: sm.raftLog.term(index),
  192. Commit: sm.raftLog.committed,
  193. }
  194. sm.send(m)
  195. }
  196. // bcastAppend sends RRPC, with entries to all peers that are not up-to-date according to sm.mis.
  197. func (sm *stateMachine) bcastAppend() {
  198. for i := range sm.ins {
  199. if i == sm.id {
  200. continue
  201. }
  202. sm.sendAppend(i)
  203. }
  204. }
  205. // bcastHeartbeat sends RRPC, without entries to all the peers.
  206. func (sm *stateMachine) bcastHeartbeat() {
  207. for i := range sm.ins {
  208. if i == sm.id {
  209. continue
  210. }
  211. sm.sendHeartbeat(i)
  212. }
  213. }
  214. func (sm *stateMachine) maybeCommit() bool {
  215. // TODO(bmizerany): optimize.. Currently naive
  216. mis := make(int64Slice, 0, len(sm.ins))
  217. for i := range sm.ins {
  218. mis = append(mis, sm.ins[i].match)
  219. }
  220. sort.Sort(sort.Reverse(mis))
  221. mci := mis[sm.q()-1]
  222. return sm.raftLog.maybeCommit(mci, sm.term.Get())
  223. }
  224. // nextEnts returns the appliable entries and updates the applied index
  225. func (sm *stateMachine) nextEnts() (ents []Entry) {
  226. return sm.raftLog.nextEnts()
  227. }
  228. func (sm *stateMachine) reset(term int64) {
  229. sm.term.Set(term)
  230. sm.lead.Set(none)
  231. sm.vote = none
  232. sm.votes = make(map[int64]bool)
  233. for i := range sm.ins {
  234. sm.ins[i] = &index{next: sm.raftLog.lastIndex() + 1}
  235. if i == sm.id {
  236. sm.ins[i].match = sm.raftLog.lastIndex()
  237. }
  238. }
  239. }
  240. func (sm *stateMachine) q() int {
  241. return len(sm.ins)/2 + 1
  242. }
  243. func (sm *stateMachine) appendEntry(e Entry) {
  244. e.Term = sm.term.Get()
  245. sm.index.Set(sm.raftLog.append(sm.raftLog.lastIndex(), e))
  246. sm.ins[sm.id].update(sm.raftLog.lastIndex())
  247. sm.maybeCommit()
  248. }
  249. // promotable indicates whether state machine could be promoted.
  250. // New machine has to wait for the first log entry to be committed, or it will
  251. // always start as a one-node cluster.
  252. func (sm *stateMachine) promotable() bool {
  253. return sm.raftLog.committed != 0
  254. }
  255. func (sm *stateMachine) becomeFollower(term int64, lead int64) {
  256. sm.reset(term)
  257. sm.lead.Set(lead)
  258. sm.state = stateFollower
  259. sm.pendingConf = false
  260. }
  261. func (sm *stateMachine) becomeCandidate() {
  262. // TODO(xiangli) remove the panic when the raft implementation is stable
  263. if sm.state == stateLeader {
  264. panic("invalid transition [leader -> candidate]")
  265. }
  266. sm.reset(sm.term.Get() + 1)
  267. sm.vote = sm.id
  268. sm.state = stateCandidate
  269. }
  270. func (sm *stateMachine) becomeLeader() {
  271. // TODO(xiangli) remove the panic when the raft implementation is stable
  272. if sm.state == stateFollower {
  273. panic("invalid transition [follower -> leader]")
  274. }
  275. sm.reset(sm.term.Get())
  276. sm.lead.Set(sm.id)
  277. sm.state = stateLeader
  278. for _, e := range sm.raftLog.entries(sm.raftLog.committed + 1) {
  279. if e.isConfig() {
  280. sm.pendingConf = true
  281. }
  282. }
  283. sm.appendEntry(Entry{Type: Normal, Data: nil})
  284. }
  285. func (sm *stateMachine) Msgs() []Message {
  286. msgs := sm.msgs
  287. sm.msgs = make([]Message, 0)
  288. return msgs
  289. }
  290. func (sm *stateMachine) Step(m Message) (ok bool) {
  291. log.Printf("raft.step beforeState %v\n", sm)
  292. log.Printf("raft.step beforeLog %v\n", sm.raftLog)
  293. defer log.Printf("raft.step afterLog %v\n", sm.raftLog)
  294. defer log.Printf("raft.step afterState %v\n", sm)
  295. log.Printf("raft.step msg %v\n", m)
  296. if m.Type == msgHup {
  297. sm.becomeCandidate()
  298. if sm.q() == sm.poll(sm.id, true) {
  299. sm.becomeLeader()
  300. return true
  301. }
  302. for i := range sm.ins {
  303. if i == sm.id {
  304. continue
  305. }
  306. lasti := sm.raftLog.lastIndex()
  307. sm.send(Message{To: i, Type: msgVote, Index: lasti, LogTerm: sm.raftLog.term(lasti)})
  308. }
  309. return true
  310. }
  311. switch {
  312. case m.Term == 0:
  313. // local message
  314. case m.Term > sm.term.Get():
  315. lead := m.From
  316. if m.Type == msgVote {
  317. lead = none
  318. }
  319. sm.becomeFollower(m.Term, lead)
  320. case m.Term < sm.term.Get():
  321. // ignore
  322. return true
  323. }
  324. return stepmap[sm.state](sm, m)
  325. }
  326. func (sm *stateMachine) handleAppendEntries(m Message) {
  327. if sm.raftLog.maybeAppend(m.Index, m.LogTerm, m.Commit, m.Entries...) {
  328. sm.index.Set(sm.raftLog.lastIndex())
  329. sm.send(Message{To: m.From, Type: msgAppResp, Index: sm.raftLog.lastIndex()})
  330. } else {
  331. sm.send(Message{To: m.From, Type: msgAppResp, Index: -1})
  332. }
  333. }
  334. func (sm *stateMachine) handleSnapshot(m Message) {
  335. sm.restore(m.Snapshot)
  336. sm.send(Message{To: m.From, Type: msgAppResp, Index: sm.raftLog.lastIndex()})
  337. }
  338. func (sm *stateMachine) addNode(id int64) {
  339. sm.ins[id] = &index{next: sm.raftLog.lastIndex() + 1}
  340. sm.pendingConf = false
  341. }
  342. func (sm *stateMachine) removeNode(id int64) {
  343. delete(sm.ins, id)
  344. sm.pendingConf = false
  345. }
  346. type stepFunc func(sm *stateMachine, m Message) bool
  347. func stepLeader(sm *stateMachine, m Message) bool {
  348. switch m.Type {
  349. case msgBeat:
  350. sm.bcastHeartbeat()
  351. case msgProp:
  352. if len(m.Entries) != 1 {
  353. panic("unexpected length(entries) of a msgProp")
  354. }
  355. e := m.Entries[0]
  356. if e.isConfig() {
  357. if sm.pendingConf {
  358. return false
  359. }
  360. sm.pendingConf = true
  361. }
  362. sm.appendEntry(e)
  363. sm.bcastAppend()
  364. case msgAppResp:
  365. if m.Index < 0 {
  366. sm.ins[m.From].decr()
  367. sm.sendAppend(m.From)
  368. } else {
  369. sm.ins[m.From].update(m.Index)
  370. if sm.maybeCommit() {
  371. sm.bcastAppend()
  372. }
  373. }
  374. case msgVote:
  375. sm.send(Message{To: m.From, Type: msgVoteResp, Index: -1})
  376. }
  377. return true
  378. }
  379. func stepCandidate(sm *stateMachine, m Message) bool {
  380. switch m.Type {
  381. case msgProp:
  382. return false
  383. case msgApp:
  384. sm.becomeFollower(sm.term.Get(), m.From)
  385. sm.handleAppendEntries(m)
  386. case msgSnap:
  387. sm.becomeFollower(m.Term, m.From)
  388. sm.handleSnapshot(m)
  389. case msgVote:
  390. sm.send(Message{To: m.From, Type: msgVoteResp, Index: -1})
  391. case msgVoteResp:
  392. gr := sm.poll(m.From, m.Index >= 0)
  393. switch sm.q() {
  394. case gr:
  395. sm.becomeLeader()
  396. sm.bcastAppend()
  397. case len(sm.votes) - gr:
  398. sm.becomeFollower(sm.term.Get(), none)
  399. }
  400. }
  401. return true
  402. }
  403. func stepFollower(sm *stateMachine, m Message) bool {
  404. switch m.Type {
  405. case msgProp:
  406. if sm.lead.Get() == none {
  407. return false
  408. }
  409. m.To = sm.lead.Get()
  410. sm.send(m)
  411. case msgApp:
  412. sm.lead.Set(m.From)
  413. sm.handleAppendEntries(m)
  414. case msgSnap:
  415. sm.handleSnapshot(m)
  416. case msgVote:
  417. if (sm.vote == none || sm.vote == m.From) && sm.raftLog.isUpToDate(m.Index, m.LogTerm) {
  418. sm.vote = m.From
  419. sm.send(Message{To: m.From, Type: msgVoteResp, Index: sm.raftLog.lastIndex()})
  420. } else {
  421. sm.send(Message{To: m.From, Type: msgVoteResp, Index: -1})
  422. }
  423. }
  424. return true
  425. }
  426. // maybeCompact tries to compact the log. It calls the snapshoter to take a snapshot and
  427. // then compact the log up-to the index at which the snapshot was taken.
  428. func (sm *stateMachine) maybeCompact() bool {
  429. if sm.snapshoter == nil || !sm.raftLog.shouldCompact() {
  430. return false
  431. }
  432. sm.snapshoter.Snap(sm.raftLog.applied, sm.raftLog.term(sm.raftLog.applied), sm.nodes())
  433. sm.raftLog.compact(sm.raftLog.applied)
  434. return true
  435. }
  436. // restore recovers the statemachine from a snapshot. It restores the log and the
  437. // configuration of statemachine. It calls the snapshoter to restore from the given
  438. // snapshot.
  439. func (sm *stateMachine) restore(s Snapshot) {
  440. if sm.snapshoter == nil {
  441. panic("try to restore from snapshot, but snapshoter is nil")
  442. }
  443. sm.raftLog.restore(s.Index, s.Term)
  444. sm.index.Set(sm.raftLog.lastIndex())
  445. sm.ins = make(map[int64]*index)
  446. for _, n := range s.Nodes {
  447. sm.ins[n] = &index{next: sm.raftLog.lastIndex() + 1}
  448. if n == sm.id {
  449. sm.ins[n].match = sm.raftLog.lastIndex()
  450. }
  451. }
  452. sm.pendingConf = false
  453. sm.snapshoter.Restore(s)
  454. }
  455. func (sm *stateMachine) needSnapshot(i int64) bool {
  456. if i < sm.raftLog.offset {
  457. if sm.snapshoter == nil {
  458. panic("need snapshot but snapshoter is nil")
  459. }
  460. return true
  461. }
  462. return false
  463. }
  464. func (sm *stateMachine) nodes() []int64 {
  465. nodes := make([]int64, 0, len(sm.ins))
  466. for k := range sm.ins {
  467. nodes = append(nodes, k)
  468. }
  469. return nodes
  470. }