raft.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581
  1. /*
  2. Copyright 2014 CoreOS, Inc.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package raft
  14. import (
  15. "errors"
  16. "fmt"
  17. "math/rand"
  18. "sort"
  19. pb "github.com/coreos/etcd/raft/raftpb"
  20. )
  21. // None is a placeholder node ID used when there is no leader.
  22. const None uint64 = 0
  23. var errNoLeader = errors.New("no leader")
  24. // Possible values for StateType.
  25. const (
  26. StateFollower StateType = iota
  27. StateCandidate
  28. StateLeader
  29. )
  30. // StateType represents the role of a node in a cluster.
  31. type StateType uint64
  32. var stmap = [...]string{
  33. "StateFollower",
  34. "StateCandidate",
  35. "StateLeader",
  36. }
  37. func (st StateType) String() string {
  38. return stmap[uint64(st)]
  39. }
  40. func (st StateType) MarshalJSON() ([]byte, error) {
  41. return []byte(fmt.Sprintf("%q", st.String())), nil
  42. }
  43. type progress struct {
  44. match, next uint64
  45. }
  46. func (pr *progress) update(n uint64) {
  47. pr.match = n
  48. pr.next = n + 1
  49. }
  50. // maybeDecrTo returns false if the given to index comes from an out of order message.
  51. // Otherwise it decreases the progress next index and returns true.
  52. func (pr *progress) maybeDecrTo(to uint64) bool {
  53. // the rejection must be stale if the progress has matched with
  54. // follower or "to" does not match next - 1
  55. if pr.match != 0 || pr.next-1 != to {
  56. return false
  57. }
  58. if pr.next--; pr.next < 1 {
  59. pr.next = 1
  60. }
  61. return true
  62. }
  63. func (pr *progress) String() string {
  64. return fmt.Sprintf("n=%d m=%d", pr.next, pr.match)
  65. }
  66. // uint64Slice implements sort interface
  67. type uint64Slice []uint64
  68. func (p uint64Slice) Len() int { return len(p) }
  69. func (p uint64Slice) Less(i, j int) bool { return p[i] < p[j] }
  70. func (p uint64Slice) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
  71. type raft struct {
  72. pb.HardState
  73. id uint64
  74. // the log
  75. raftLog *raftLog
  76. prs map[uint64]*progress
  77. state StateType
  78. votes map[uint64]bool
  79. msgs []pb.Message
  80. // the leader id
  81. lead uint64
  82. // New configuration is ignored if there exists unapplied configuration.
  83. pendingConf bool
  84. elapsed int // number of ticks since the last msg
  85. heartbeatTimeout int
  86. electionTimeout int
  87. rand *rand.Rand
  88. tick func()
  89. step stepFunc
  90. }
  91. func newRaft(id uint64, peers []uint64, election, heartbeat int) *raft {
  92. if id == None {
  93. panic("cannot use none id")
  94. }
  95. r := &raft{
  96. id: id,
  97. lead: None,
  98. raftLog: newLog(),
  99. prs: make(map[uint64]*progress),
  100. electionTimeout: election,
  101. heartbeatTimeout: heartbeat,
  102. }
  103. r.rand = rand.New(rand.NewSource(int64(id)))
  104. for _, p := range peers {
  105. r.prs[p] = &progress{}
  106. }
  107. r.becomeFollower(0, None)
  108. return r
  109. }
  110. func (r *raft) hasLeader() bool { return r.lead != None }
  111. func (r *raft) softState() *SoftState {
  112. return &SoftState{Lead: r.lead, RaftState: r.state, Nodes: r.nodes()}
  113. }
  114. func (r *raft) String() string {
  115. s := fmt.Sprintf(`state=%v term=%d`, r.state, r.Term)
  116. switch r.state {
  117. case StateFollower:
  118. s += fmt.Sprintf(" vote=%v lead=%v", r.Vote, r.lead)
  119. case StateCandidate:
  120. s += fmt.Sprintf(` votes="%v"`, r.votes)
  121. case StateLeader:
  122. s += fmt.Sprintf(` prs="%v"`, r.prs)
  123. }
  124. return s
  125. }
  126. func (r *raft) poll(id uint64, v bool) (granted int) {
  127. if _, ok := r.votes[id]; !ok {
  128. r.votes[id] = v
  129. }
  130. for _, vv := range r.votes {
  131. if vv {
  132. granted++
  133. }
  134. }
  135. return granted
  136. }
  137. // send persists state to stable storage and then sends to its mailbox.
  138. func (r *raft) send(m pb.Message) {
  139. m.From = r.id
  140. // do not attach term to MsgProp
  141. // proposals are a way to forward to the leader and
  142. // should be treated as local message.
  143. if m.Type != pb.MsgProp {
  144. m.Term = r.Term
  145. }
  146. r.msgs = append(r.msgs, m)
  147. }
  148. // sendAppend sends RRPC, with entries to the given peer.
  149. func (r *raft) sendAppend(to uint64) {
  150. pr := r.prs[to]
  151. m := pb.Message{}
  152. m.To = to
  153. m.Index = pr.next - 1
  154. if r.needSnapshot(m.Index) {
  155. m.Type = pb.MsgSnap
  156. m.Snapshot = r.raftLog.snapshot
  157. } else {
  158. m.Type = pb.MsgApp
  159. m.LogTerm = r.raftLog.term(pr.next - 1)
  160. m.Entries = r.raftLog.entries(pr.next)
  161. m.Commit = r.raftLog.committed
  162. }
  163. r.send(m)
  164. }
  165. // sendHeartbeat sends an empty MsgApp
  166. func (r *raft) sendHeartbeat(to uint64) {
  167. // Attach the commit as min(to.matched, r.committed).
  168. // When the leader sends out heartbeat message,
  169. // the receiver(follower) might not be matched with the leader
  170. // or it might not have all the committed entries.
  171. // The leader MUST NOT forward the follower's commit to
  172. // an unmatched index.
  173. commit := min(r.prs[to].match, r.raftLog.committed)
  174. m := pb.Message{
  175. To: to,
  176. Type: pb.MsgApp,
  177. Commit: commit,
  178. }
  179. r.send(m)
  180. }
  181. // bcastAppend sends RRPC, with entries to all peers that are not up-to-date
  182. // according to the progress recorded in r.prs.
  183. func (r *raft) bcastAppend() {
  184. for i := range r.prs {
  185. if i == r.id {
  186. continue
  187. }
  188. r.sendAppend(i)
  189. }
  190. }
  191. // bcastHeartbeat sends RRPC, without entries to all the peers.
  192. func (r *raft) bcastHeartbeat() {
  193. for i := range r.prs {
  194. if i == r.id {
  195. continue
  196. }
  197. r.sendHeartbeat(i)
  198. }
  199. }
  200. func (r *raft) maybeCommit() bool {
  201. // TODO(bmizerany): optimize.. Currently naive
  202. mis := make(uint64Slice, 0, len(r.prs))
  203. for i := range r.prs {
  204. mis = append(mis, r.prs[i].match)
  205. }
  206. sort.Sort(sort.Reverse(mis))
  207. mci := mis[r.q()-1]
  208. return r.raftLog.maybeCommit(mci, r.Term)
  209. }
  210. func (r *raft) reset(term uint64) {
  211. r.Term = term
  212. r.lead = None
  213. r.Vote = None
  214. r.elapsed = 0
  215. r.votes = make(map[uint64]bool)
  216. for i := range r.prs {
  217. r.prs[i] = &progress{next: r.raftLog.lastIndex() + 1}
  218. if i == r.id {
  219. r.prs[i].match = r.raftLog.lastIndex()
  220. }
  221. }
  222. r.pendingConf = false
  223. }
  224. func (r *raft) q() int {
  225. return len(r.prs)/2 + 1
  226. }
  227. func (r *raft) appendEntry(e pb.Entry) {
  228. e.Term = r.Term
  229. e.Index = r.raftLog.lastIndex() + 1
  230. r.raftLog.append(r.raftLog.lastIndex(), e)
  231. r.prs[r.id].update(r.raftLog.lastIndex())
  232. r.maybeCommit()
  233. }
  234. // tickElection is run by followers and candidates after r.electionTimeout.
  235. func (r *raft) tickElection() {
  236. if !r.promotable() {
  237. r.elapsed = 0
  238. return
  239. }
  240. r.elapsed++
  241. if r.isElectionTimeout() {
  242. r.elapsed = 0
  243. r.Step(pb.Message{From: r.id, Type: pb.MsgHup})
  244. }
  245. }
  246. // tickHeartbeat is run by leaders to send a MsgBeat after r.heartbeatTimeout.
  247. func (r *raft) tickHeartbeat() {
  248. r.elapsed++
  249. if r.elapsed > r.heartbeatTimeout {
  250. r.elapsed = 0
  251. r.Step(pb.Message{From: r.id, Type: pb.MsgBeat})
  252. }
  253. }
  254. func (r *raft) becomeFollower(term uint64, lead uint64) {
  255. r.step = stepFollower
  256. r.reset(term)
  257. r.tick = r.tickElection
  258. r.lead = lead
  259. r.state = StateFollower
  260. }
  261. func (r *raft) becomeCandidate() {
  262. // TODO(xiangli) remove the panic when the raft implementation is stable
  263. if r.state == StateLeader {
  264. panic("invalid transition [leader -> candidate]")
  265. }
  266. r.step = stepCandidate
  267. r.reset(r.Term + 1)
  268. r.tick = r.tickElection
  269. r.Vote = r.id
  270. r.state = StateCandidate
  271. }
  272. func (r *raft) becomeLeader() {
  273. // TODO(xiangli) remove the panic when the raft implementation is stable
  274. if r.state == StateFollower {
  275. panic("invalid transition [follower -> leader]")
  276. }
  277. r.step = stepLeader
  278. r.reset(r.Term)
  279. r.tick = r.tickHeartbeat
  280. r.lead = r.id
  281. r.state = StateLeader
  282. for _, e := range r.raftLog.entries(r.raftLog.committed + 1) {
  283. if e.Type != pb.EntryConfChange {
  284. continue
  285. }
  286. if r.pendingConf {
  287. panic("unexpected double uncommitted config entry")
  288. }
  289. r.pendingConf = true
  290. }
  291. r.appendEntry(pb.Entry{Data: nil})
  292. }
  293. func (r *raft) campaign() {
  294. r.becomeCandidate()
  295. if r.q() == r.poll(r.id, true) {
  296. r.becomeLeader()
  297. }
  298. for i := range r.prs {
  299. if i == r.id {
  300. continue
  301. }
  302. r.send(pb.Message{To: i, Type: pb.MsgVote, Index: r.raftLog.lastIndex(), LogTerm: r.raftLog.lastTerm()})
  303. }
  304. }
  305. func (r *raft) Step(m pb.Message) error {
  306. // TODO(bmizerany): this likely allocs - prevent that.
  307. defer func() { r.Commit = r.raftLog.committed }()
  308. if m.Type == pb.MsgHup {
  309. r.campaign()
  310. }
  311. switch {
  312. case m.Term == 0:
  313. // local message
  314. case m.Term > r.Term:
  315. lead := m.From
  316. if m.Type == pb.MsgVote {
  317. lead = None
  318. }
  319. r.becomeFollower(m.Term, lead)
  320. case m.Term < r.Term:
  321. // ignore
  322. return nil
  323. }
  324. r.step(r, m)
  325. return nil
  326. }
  327. func (r *raft) handleAppendEntries(m pb.Message) {
  328. if mlastIndex, ok := r.raftLog.maybeAppend(m.Index, m.LogTerm, m.Commit, m.Entries...); ok {
  329. r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: mlastIndex})
  330. } else {
  331. r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: m.Index, Reject: true})
  332. }
  333. }
  334. func (r *raft) handleSnapshot(m pb.Message) {
  335. if r.restore(m.Snapshot) {
  336. r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.lastIndex()})
  337. } else {
  338. r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.committed})
  339. }
  340. }
  341. func (r *raft) resetPendingConf() {
  342. r.pendingConf = false
  343. }
  344. func (r *raft) addNode(id uint64) {
  345. r.setProgress(id, 0, r.raftLog.lastIndex()+1)
  346. r.pendingConf = false
  347. }
  348. func (r *raft) removeNode(id uint64) {
  349. r.delProgress(id)
  350. r.pendingConf = false
  351. }
  352. type stepFunc func(r *raft, m pb.Message)
  353. func stepLeader(r *raft, m pb.Message) {
  354. switch m.Type {
  355. case pb.MsgBeat:
  356. r.bcastHeartbeat()
  357. case pb.MsgProp:
  358. if len(m.Entries) != 1 {
  359. panic("unexpected length(entries) of a MsgProp")
  360. }
  361. e := m.Entries[0]
  362. if e.Type == pb.EntryConfChange {
  363. if r.pendingConf {
  364. return
  365. }
  366. r.pendingConf = true
  367. }
  368. r.appendEntry(e)
  369. r.bcastAppend()
  370. case pb.MsgAppResp:
  371. if m.Index == 0 {
  372. return
  373. }
  374. if m.Reject {
  375. if r.prs[m.From].maybeDecrTo(m.Index) {
  376. r.sendAppend(m.From)
  377. }
  378. } else {
  379. r.prs[m.From].update(m.Index)
  380. if r.maybeCommit() {
  381. r.bcastAppend()
  382. }
  383. }
  384. case pb.MsgVote:
  385. r.send(pb.Message{To: m.From, Type: pb.MsgVoteResp, Reject: true})
  386. }
  387. }
  388. func stepCandidate(r *raft, m pb.Message) {
  389. switch m.Type {
  390. case pb.MsgProp:
  391. panic("no leader")
  392. case pb.MsgApp:
  393. r.becomeFollower(r.Term, m.From)
  394. r.handleAppendEntries(m)
  395. case pb.MsgSnap:
  396. r.becomeFollower(m.Term, m.From)
  397. r.handleSnapshot(m)
  398. case pb.MsgVote:
  399. r.send(pb.Message{To: m.From, Type: pb.MsgVoteResp, Reject: true})
  400. case pb.MsgVoteResp:
  401. gr := r.poll(m.From, !m.Reject)
  402. switch r.q() {
  403. case gr:
  404. r.becomeLeader()
  405. r.bcastAppend()
  406. case len(r.votes) - gr:
  407. r.becomeFollower(r.Term, None)
  408. }
  409. }
  410. }
  411. func stepFollower(r *raft, m pb.Message) {
  412. switch m.Type {
  413. case pb.MsgProp:
  414. if r.lead == None {
  415. panic("no leader")
  416. }
  417. m.To = r.lead
  418. r.send(m)
  419. case pb.MsgApp:
  420. r.elapsed = 0
  421. r.lead = m.From
  422. r.handleAppendEntries(m)
  423. case pb.MsgSnap:
  424. r.elapsed = 0
  425. r.handleSnapshot(m)
  426. case pb.MsgVote:
  427. if (r.Vote == None || r.Vote == m.From) && r.raftLog.isUpToDate(m.Index, m.LogTerm) {
  428. r.elapsed = 0
  429. r.Vote = m.From
  430. r.send(pb.Message{To: m.From, Type: pb.MsgVoteResp})
  431. } else {
  432. r.send(pb.Message{To: m.From, Type: pb.MsgVoteResp, Reject: true})
  433. }
  434. }
  435. }
  436. func (r *raft) compact(index uint64, nodes []uint64, d []byte) {
  437. if index > r.raftLog.applied {
  438. panic(fmt.Sprintf("raft: compact index (%d) exceeds applied index (%d)", index, r.raftLog.applied))
  439. }
  440. r.raftLog.snap(d, index, r.raftLog.term(index), nodes)
  441. r.raftLog.compact(index)
  442. }
  443. // restore recovers the statemachine from a snapshot. It restores the log and the
  444. // configuration of statemachine.
  445. func (r *raft) restore(s pb.Snapshot) bool {
  446. if s.Index <= r.raftLog.committed {
  447. return false
  448. }
  449. r.raftLog.restore(s)
  450. r.prs = make(map[uint64]*progress)
  451. for _, n := range s.Nodes {
  452. if n == r.id {
  453. r.setProgress(n, r.raftLog.lastIndex(), r.raftLog.lastIndex()+1)
  454. } else {
  455. r.setProgress(n, 0, r.raftLog.lastIndex()+1)
  456. }
  457. }
  458. return true
  459. }
  460. func (r *raft) needSnapshot(i uint64) bool {
  461. if i < r.raftLog.offset {
  462. if r.raftLog.snapshot.Term == 0 {
  463. panic("need non-empty snapshot")
  464. }
  465. return true
  466. }
  467. return false
  468. }
  469. func (r *raft) nodes() []uint64 {
  470. nodes := make([]uint64, 0, len(r.prs))
  471. for k := range r.prs {
  472. nodes = append(nodes, k)
  473. }
  474. sort.Sort(uint64Slice(nodes))
  475. return nodes
  476. }
  477. func (r *raft) setProgress(id, match, next uint64) {
  478. r.prs[id] = &progress{next: next, match: match}
  479. }
  480. func (r *raft) delProgress(id uint64) {
  481. delete(r.prs, id)
  482. }
  483. // promotable indicates whether state machine can be promoted to leader,
  484. // which is true when its own id is in progress list.
  485. func (r *raft) promotable() bool {
  486. _, ok := r.prs[r.id]
  487. return ok
  488. }
  489. func (r *raft) loadEnts(ents []pb.Entry) {
  490. r.raftLog.load(ents)
  491. }
  492. func (r *raft) loadState(state pb.HardState) {
  493. r.raftLog.committed = state.Commit
  494. r.Term = state.Term
  495. r.Vote = state.Vote
  496. r.Commit = state.Commit
  497. }
  498. // isElectionTimeout returns true if r.elapsed is greater than the
  499. // randomized election timeout in (electiontimeout, 2 * electiontimeout - 1).
  500. // Otherwise, it returns false.
  501. func (r *raft) isElectionTimeout() bool {
  502. d := r.elapsed - r.electionTimeout
  503. if d < 0 {
  504. return false
  505. }
  506. return d > r.rand.Int()%r.electionTimeout
  507. }