raft.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579
  1. /*
  2. Copyright 2014 CoreOS, Inc.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package raft
  14. import (
  15. "errors"
  16. "fmt"
  17. "math/rand"
  18. "sort"
  19. pb "github.com/coreos/etcd/raft/raftpb"
  20. )
  21. // None is a placeholder node ID used when there is no leader.
  22. const None uint64 = 0
  23. var errNoLeader = errors.New("no leader")
  24. // Possible values for StateType.
  25. const (
  26. StateFollower StateType = iota
  27. StateCandidate
  28. StateLeader
  29. )
  30. // StateType represents the role of a node in a cluster.
  31. type StateType uint64
  32. var stmap = [...]string{
  33. "StateFollower",
  34. "StateCandidate",
  35. "StateLeader",
  36. }
  37. func (st StateType) String() string {
  38. return stmap[uint64(st)]
  39. }
  40. func (st StateType) MarshalJSON() ([]byte, error) {
  41. return []byte(fmt.Sprintf("%q", st.String())), nil
  42. }
  43. type progress struct {
  44. match, next uint64
  45. }
  46. func (pr *progress) update(n uint64) {
  47. pr.match = n
  48. pr.next = n + 1
  49. }
  50. // maybeDecrTo returns false if the given to index comes from an out of order message.
  51. // Otherwise it decreases the progress next index and returns true.
  52. func (pr *progress) maybeDecrTo(to uint64) bool {
  53. // the rejection must be stale if the
  54. // progress has matched with follower
  55. // or "to" does not match next - 1
  56. if pr.match != 0 || pr.next-1 != to {
  57. return false
  58. }
  59. if pr.next--; pr.next < 1 {
  60. pr.next = 1
  61. }
  62. return true
  63. }
  64. func (pr *progress) String() string {
  65. return fmt.Sprintf("n=%d m=%d", pr.next, pr.match)
  66. }
  67. // uint64Slice implements sort interface
  68. type uint64Slice []uint64
  69. func (p uint64Slice) Len() int { return len(p) }
  70. func (p uint64Slice) Less(i, j int) bool { return p[i] < p[j] }
  71. func (p uint64Slice) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
  72. type raft struct {
  73. pb.HardState
  74. id uint64
  75. // the log
  76. raftLog *raftLog
  77. prs map[uint64]*progress
  78. state StateType
  79. votes map[uint64]bool
  80. msgs []pb.Message
  81. // the leader id
  82. lead uint64
  83. // New configuration is ignored if there exists unapplied configuration.
  84. pendingConf bool
  85. elapsed int // number of ticks since the last msg
  86. heartbeatTimeout int
  87. electionTimeout int
  88. rand *rand.Rand
  89. tick func()
  90. step stepFunc
  91. }
  92. func newRaft(id uint64, peers []uint64, election, heartbeat int) *raft {
  93. if id == None {
  94. panic("cannot use none id")
  95. }
  96. r := &raft{
  97. id: id,
  98. lead: None,
  99. raftLog: newLog(),
  100. prs: make(map[uint64]*progress),
  101. electionTimeout: election,
  102. heartbeatTimeout: heartbeat,
  103. }
  104. r.rand = rand.New(rand.NewSource(int64(id)))
  105. for _, p := range peers {
  106. r.prs[p] = &progress{}
  107. }
  108. r.becomeFollower(0, None)
  109. return r
  110. }
  111. func (r *raft) hasLeader() bool { return r.lead != None }
  112. func (r *raft) softState() *SoftState {
  113. return &SoftState{Lead: r.lead, RaftState: r.state, Nodes: r.nodes()}
  114. }
  115. func (r *raft) String() string {
  116. s := fmt.Sprintf(`state=%v term=%d`, r.state, r.Term)
  117. switch r.state {
  118. case StateFollower:
  119. s += fmt.Sprintf(" vote=%v lead=%v", r.Vote, r.lead)
  120. case StateCandidate:
  121. s += fmt.Sprintf(` votes="%v"`, r.votes)
  122. case StateLeader:
  123. s += fmt.Sprintf(` prs="%v"`, r.prs)
  124. }
  125. return s
  126. }
  127. func (r *raft) poll(id uint64, v bool) (granted int) {
  128. if _, ok := r.votes[id]; !ok {
  129. r.votes[id] = v
  130. }
  131. for _, vv := range r.votes {
  132. if vv {
  133. granted++
  134. }
  135. }
  136. return granted
  137. }
  138. // send persists state to stable storage and then sends to its mailbox.
  139. func (r *raft) send(m pb.Message) {
  140. m.From = r.id
  141. // do not attach term to msgProp
  142. // proposals are a way to forward to the leader and
  143. // should be treated as local message.
  144. if m.Type != pb.MsgProp {
  145. m.Term = r.Term
  146. }
  147. r.msgs = append(r.msgs, m)
  148. }
  149. // sendAppend sends RRPC, with entries to the given peer.
  150. func (r *raft) sendAppend(to uint64) {
  151. pr := r.prs[to]
  152. m := pb.Message{}
  153. m.To = to
  154. m.Index = pr.next - 1
  155. if r.needSnapshot(m.Index) {
  156. m.Type = pb.MsgSnap
  157. m.Snapshot = r.raftLog.snapshot
  158. } else {
  159. m.Type = pb.MsgApp
  160. m.LogTerm = r.raftLog.term(pr.next - 1)
  161. m.Entries = r.raftLog.entries(pr.next)
  162. m.Commit = r.raftLog.committed
  163. }
  164. r.send(m)
  165. }
  166. // sendHeartbeat sends an empty msgApp
  167. func (r *raft) sendHeartbeat(to uint64) {
  168. m := pb.Message{
  169. To: to,
  170. Type: pb.MsgApp,
  171. }
  172. r.send(m)
  173. }
  174. // bcastAppend sends RRPC, with entries to all peers that are not up-to-date according to r.mis.
  175. func (r *raft) bcastAppend() {
  176. for i := range r.prs {
  177. if i == r.id {
  178. continue
  179. }
  180. r.sendAppend(i)
  181. }
  182. }
  183. // bcastHeartbeat sends RRPC, without entries to all the peers.
  184. func (r *raft) bcastHeartbeat() {
  185. for i := range r.prs {
  186. if i == r.id {
  187. continue
  188. }
  189. r.sendHeartbeat(i)
  190. }
  191. }
  192. func (r *raft) maybeCommit() bool {
  193. // TODO(bmizerany): optimize.. Currently naive
  194. mis := make(uint64Slice, 0, len(r.prs))
  195. for i := range r.prs {
  196. mis = append(mis, r.prs[i].match)
  197. }
  198. sort.Sort(sort.Reverse(mis))
  199. mci := mis[r.q()-1]
  200. return r.raftLog.maybeCommit(mci, r.Term)
  201. }
  202. func (r *raft) reset(term uint64) {
  203. r.Term = term
  204. r.lead = None
  205. r.Vote = None
  206. r.elapsed = 0
  207. r.votes = make(map[uint64]bool)
  208. for i := range r.prs {
  209. r.prs[i] = &progress{next: r.raftLog.lastIndex() + 1}
  210. if i == r.id {
  211. r.prs[i].match = r.raftLog.lastIndex()
  212. }
  213. }
  214. r.pendingConf = false
  215. }
  216. func (r *raft) q() int {
  217. return len(r.prs)/2 + 1
  218. }
  219. func (r *raft) appendEntry(e pb.Entry) {
  220. e.Term = r.Term
  221. e.Index = r.raftLog.lastIndex() + 1
  222. r.raftLog.append(r.raftLog.lastIndex(), e)
  223. r.prs[r.id].update(r.raftLog.lastIndex())
  224. r.maybeCommit()
  225. }
  226. // tickElection is ran by followers and candidates after r.electionTimeout.
  227. func (r *raft) tickElection() {
  228. if !r.promotable() {
  229. r.elapsed = 0
  230. return
  231. }
  232. r.elapsed++
  233. if r.isElectionTimeout() {
  234. r.elapsed = 0
  235. r.Step(pb.Message{From: r.id, Type: pb.MsgHup})
  236. }
  237. }
  238. // tickHeartbeat is ran by leaders to send a msgBeat after r.heartbeatTimeout.
  239. func (r *raft) tickHeartbeat() {
  240. r.elapsed++
  241. if r.elapsed > r.heartbeatTimeout {
  242. r.elapsed = 0
  243. r.Step(pb.Message{From: r.id, Type: pb.MsgBeat})
  244. }
  245. }
  246. func (r *raft) becomeFollower(term uint64, lead uint64) {
  247. r.step = stepFollower
  248. r.reset(term)
  249. r.tick = r.tickElection
  250. r.lead = lead
  251. r.state = StateFollower
  252. }
  253. func (r *raft) becomeCandidate() {
  254. // TODO(xiangli) remove the panic when the raft implementation is stable
  255. if r.state == StateLeader {
  256. panic("invalid transition [leader -> candidate]")
  257. }
  258. r.step = stepCandidate
  259. r.reset(r.Term + 1)
  260. r.tick = r.tickElection
  261. r.Vote = r.id
  262. r.state = StateCandidate
  263. }
  264. func (r *raft) becomeLeader() {
  265. // TODO(xiangli) remove the panic when the raft implementation is stable
  266. if r.state == StateFollower {
  267. panic("invalid transition [follower -> leader]")
  268. }
  269. r.step = stepLeader
  270. r.reset(r.Term)
  271. r.tick = r.tickHeartbeat
  272. r.lead = r.id
  273. r.state = StateLeader
  274. for _, e := range r.raftLog.entries(r.raftLog.committed + 1) {
  275. if e.Type != pb.EntryConfChange {
  276. continue
  277. }
  278. if r.pendingConf {
  279. panic("unexpected double uncommitted config entry")
  280. }
  281. r.pendingConf = true
  282. }
  283. r.appendEntry(pb.Entry{Data: nil})
  284. }
  285. func (r *raft) ReadMessages() []pb.Message {
  286. msgs := r.msgs
  287. r.msgs = make([]pb.Message, 0)
  288. return msgs
  289. }
  290. func (r *raft) campaign() {
  291. r.becomeCandidate()
  292. if r.q() == r.poll(r.id, true) {
  293. r.becomeLeader()
  294. }
  295. for i := range r.prs {
  296. if i == r.id {
  297. continue
  298. }
  299. r.send(pb.Message{To: i, Type: pb.MsgVote, Index: r.raftLog.lastIndex(), LogTerm: r.raftLog.lastTerm()})
  300. }
  301. }
  302. func (r *raft) Step(m pb.Message) error {
  303. // TODO(bmizerany): this likely allocs - prevent that.
  304. defer func() { r.Commit = r.raftLog.committed }()
  305. if m.Type == pb.MsgHup {
  306. r.campaign()
  307. }
  308. switch {
  309. case m.Term == 0:
  310. // local message
  311. case m.Term > r.Term:
  312. lead := m.From
  313. if m.Type == pb.MsgVote {
  314. lead = None
  315. }
  316. r.becomeFollower(m.Term, lead)
  317. case m.Term < r.Term:
  318. // ignore
  319. return nil
  320. }
  321. r.step(r, m)
  322. return nil
  323. }
  324. func (r *raft) handleAppendEntries(m pb.Message) {
  325. if mlastIndex, ok := r.raftLog.maybeAppend(m.Index, m.LogTerm, m.Commit, m.Entries...); ok {
  326. r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: mlastIndex})
  327. } else {
  328. r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: m.Index, Reject: true})
  329. }
  330. }
  331. func (r *raft) handleSnapshot(m pb.Message) {
  332. if r.restore(m.Snapshot) {
  333. r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.lastIndex()})
  334. } else {
  335. r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.committed})
  336. }
  337. }
  338. func (r *raft) resetPendingConf() {
  339. r.pendingConf = false
  340. }
  341. func (r *raft) addNode(id uint64) {
  342. r.setProgress(id, 0, r.raftLog.lastIndex()+1)
  343. r.pendingConf = false
  344. }
  345. func (r *raft) removeNode(id uint64) {
  346. r.delProgress(id)
  347. r.pendingConf = false
  348. }
  349. type stepFunc func(r *raft, m pb.Message)
  350. func stepLeader(r *raft, m pb.Message) {
  351. switch m.Type {
  352. case pb.MsgBeat:
  353. r.bcastHeartbeat()
  354. case pb.MsgProp:
  355. if len(m.Entries) != 1 {
  356. panic("unexpected length(entries) of a msgProp")
  357. }
  358. e := m.Entries[0]
  359. if e.Type == pb.EntryConfChange {
  360. if r.pendingConf {
  361. return
  362. }
  363. r.pendingConf = true
  364. }
  365. r.appendEntry(e)
  366. r.bcastAppend()
  367. case pb.MsgAppResp:
  368. if m.Index == 0 {
  369. return
  370. }
  371. if m.Reject {
  372. if r.prs[m.From].maybeDecrTo(m.Index) {
  373. r.sendAppend(m.From)
  374. }
  375. } else {
  376. r.prs[m.From].update(m.Index)
  377. if r.maybeCommit() {
  378. r.bcastAppend()
  379. }
  380. }
  381. case pb.MsgVote:
  382. r.send(pb.Message{To: m.From, Type: pb.MsgVoteResp, Reject: true})
  383. }
  384. }
  385. func stepCandidate(r *raft, m pb.Message) {
  386. switch m.Type {
  387. case pb.MsgProp:
  388. panic("no leader")
  389. case pb.MsgApp:
  390. r.becomeFollower(r.Term, m.From)
  391. r.handleAppendEntries(m)
  392. case pb.MsgSnap:
  393. r.becomeFollower(m.Term, m.From)
  394. r.handleSnapshot(m)
  395. case pb.MsgVote:
  396. r.send(pb.Message{To: m.From, Type: pb.MsgVoteResp, Reject: true})
  397. case pb.MsgVoteResp:
  398. gr := r.poll(m.From, !m.Reject)
  399. switch r.q() {
  400. case gr:
  401. r.becomeLeader()
  402. r.bcastAppend()
  403. case len(r.votes) - gr:
  404. r.becomeFollower(r.Term, None)
  405. }
  406. }
  407. }
  408. func stepFollower(r *raft, m pb.Message) {
  409. switch m.Type {
  410. case pb.MsgProp:
  411. if r.lead == None {
  412. panic("no leader")
  413. }
  414. m.To = r.lead
  415. r.send(m)
  416. case pb.MsgApp:
  417. r.elapsed = 0
  418. r.lead = m.From
  419. r.handleAppendEntries(m)
  420. case pb.MsgSnap:
  421. r.elapsed = 0
  422. r.handleSnapshot(m)
  423. case pb.MsgVote:
  424. if (r.Vote == None || r.Vote == m.From) && r.raftLog.isUpToDate(m.Index, m.LogTerm) {
  425. r.elapsed = 0
  426. r.Vote = m.From
  427. r.send(pb.Message{To: m.From, Type: pb.MsgVoteResp})
  428. } else {
  429. r.send(pb.Message{To: m.From, Type: pb.MsgVoteResp, Reject: true})
  430. }
  431. }
  432. }
  433. func (r *raft) compact(index uint64, nodes []uint64, d []byte) {
  434. if index > r.raftLog.applied {
  435. panic(fmt.Sprintf("raft: compact index (%d) exceeds applied index (%d)", index, r.raftLog.applied))
  436. }
  437. r.raftLog.snap(d, index, r.raftLog.term(index), nodes)
  438. r.raftLog.compact(index)
  439. }
  440. // restore recovers the statemachine from a snapshot. It restores the log and the
  441. // configuration of statemachine.
  442. func (r *raft) restore(s pb.Snapshot) bool {
  443. if s.Index <= r.raftLog.committed {
  444. return false
  445. }
  446. r.raftLog.restore(s)
  447. r.prs = make(map[uint64]*progress)
  448. for _, n := range s.Nodes {
  449. if n == r.id {
  450. r.setProgress(n, r.raftLog.lastIndex(), r.raftLog.lastIndex()+1)
  451. } else {
  452. r.setProgress(n, 0, r.raftLog.lastIndex()+1)
  453. }
  454. }
  455. return true
  456. }
  457. func (r *raft) needSnapshot(i uint64) bool {
  458. if i < r.raftLog.offset {
  459. if r.raftLog.snapshot.Term == 0 {
  460. panic("need non-empty snapshot")
  461. }
  462. return true
  463. }
  464. return false
  465. }
  466. func (r *raft) nodes() []uint64 {
  467. nodes := make([]uint64, 0, len(r.prs))
  468. for k := range r.prs {
  469. nodes = append(nodes, k)
  470. }
  471. return nodes
  472. }
  473. func (r *raft) setProgress(id, match, next uint64) {
  474. r.prs[id] = &progress{next: next, match: match}
  475. }
  476. func (r *raft) delProgress(id uint64) {
  477. delete(r.prs, id)
  478. }
  479. // promotable indicates whether state machine can be promoted to leader,
  480. // which is true when its own id is in progress list.
  481. func (r *raft) promotable() bool {
  482. _, ok := r.prs[r.id]
  483. return ok
  484. }
  485. func (r *raft) loadEnts(ents []pb.Entry) {
  486. r.raftLog.load(ents)
  487. }
  488. func (r *raft) loadState(state pb.HardState) {
  489. r.raftLog.committed = state.Commit
  490. r.Term = state.Term
  491. r.Vote = state.Vote
  492. r.Commit = state.Commit
  493. }
  494. // isElectionTimeout returns true if r.elapsed is greater than the
  495. // randomized election timeout in (electiontimeout, 2 * electiontimeout - 1).
  496. // Otherwise, it returns false.
  497. func (r *raft) isElectionTimeout() bool {
  498. d := r.elapsed - r.electionTimeout
  499. if d < 0 {
  500. return false
  501. }
  502. return d > r.rand.Int()%r.electionTimeout
  503. }