raft.go 17 KB


  1. /*
  2. Copyright 2014 CoreOS, Inc.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package raft
  14. import (
  15. "errors"
  16. "fmt"
  17. "log"
  18. "math/rand"
  19. "sort"
  20. "strings"
  21. pb "github.com/coreos/etcd/raft/raftpb"
  22. )
  23. // None is a placeholder node ID used when there is no leader.
  24. const None uint64 = 0
  25. var errNoLeader = errors.New("no leader")
  26. // Possible values for StateType.
  27. const (
  28. StateFollower StateType = iota
  29. StateCandidate
  30. StateLeader
  31. )
  32. // StateType represents the role of a node in a cluster.
  33. type StateType uint64
  34. var stmap = [...]string{
  35. "StateFollower",
  36. "StateCandidate",
  37. "StateLeader",
  38. }
  39. func (st StateType) String() string {
  40. return stmap[uint64(st)]
  41. }
  42. func (st StateType) MarshalJSON() ([]byte, error) {
  43. return []byte(fmt.Sprintf("%q", st.String())), nil
  44. }
  45. type progress struct{ match, next uint64 }
  46. func (pr *progress) update(n uint64) {
  47. if pr.match < n {
  48. pr.match = n
  49. }
  50. if pr.next < n+1 {
  51. pr.next = n + 1
  52. }
  53. }
  54. func (pr *progress) optimisticUpdate(n uint64) { pr.next = n + 1 }
  55. // maybeDecrTo returns false if the given to index comes from an out of order message.
  56. // Otherwise it decreases the progress next index and returns true.
  57. func (pr *progress) maybeDecrTo(to uint64) bool {
  58. if pr.match != 0 {
  59. // the rejection must be stale if the progress has matched and "to"
  60. // is smaller than "match".
  61. if to <= pr.match {
  62. return false
  63. }
  64. // directly decrease next to match + 1
  65. pr.next = pr.match + 1
  66. return true
  67. }
  68. // the rejection must be stale if "to" does not match next - 1
  69. if pr.next-1 != to {
  70. return false
  71. }
  72. if pr.next--; pr.next < 1 {
  73. pr.next = 1
  74. }
  75. return true
  76. }
  77. func (pr *progress) String() string { return fmt.Sprintf("next = %d, match = %d", pr.next, pr.match) }
  78. type raft struct {
  79. pb.HardState
  80. id uint64
  81. // the log
  82. raftLog *raftLog
  83. prs map[uint64]*progress
  84. state StateType
  85. votes map[uint64]bool
  86. msgs []pb.Message
  87. // the leader id
  88. lead uint64
  89. // New configuration is ignored if there exists unapplied configuration.
  90. pendingConf bool
  91. elapsed int // number of ticks since the last msg
  92. heartbeatTimeout int
  93. electionTimeout int
  94. rand *rand.Rand
  95. tick func()
  96. step stepFunc
  97. }
  98. func newRaft(id uint64, peers []uint64, election, heartbeat int, storage Storage) *raft {
  99. if id == None {
  100. panic("cannot use none id")
  101. }
  102. raftlog := newLog(storage)
  103. hs, cs, err := storage.InitialState()
  104. if err != nil {
  105. panic(err) // TODO(bdarnell)
  106. }
  107. if len(cs.Nodes) > 0 {
  108. if len(peers) > 0 {
  109. // TODO(bdarnell): the peers argument is always nil except in
  110. // tests; the argument should be removed and these tests should be
  111. // updated to specify their nodes through a snapshot.
  112. panic("cannot specify both newRaft(peers) and ConfState.Nodes)")
  113. }
  114. peers = cs.Nodes
  115. }
  116. r := &raft{
  117. id: id,
  118. lead: None,
  119. raftLog: raftlog,
  120. prs: make(map[uint64]*progress),
  121. electionTimeout: election,
  122. heartbeatTimeout: heartbeat,
  123. }
  124. r.rand = rand.New(rand.NewSource(int64(id)))
  125. for _, p := range peers {
  126. r.prs[p] = &progress{next: 1}
  127. }
  128. if !isHardStateEqual(hs, emptyState) {
  129. r.loadState(hs)
  130. }
  131. r.becomeFollower(r.Term, None)
  132. nodesStrs := make([]string, 0)
  133. for _, n := range r.nodes() {
  134. nodesStrs = append(nodesStrs, fmt.Sprintf("%x", n))
  135. }
  136. log.Printf("raft: newRaft %x [peers: [%s], term: %d, commit: %d, lastindex: %d, lastterm: %d]",
  137. r.id, strings.Join(nodesStrs, ","), r.Term, r.raftLog.committed, r.raftLog.lastIndex(), r.raftLog.lastTerm())
  138. return r
  139. }
  140. func (r *raft) hasLeader() bool { return r.lead != None }
  141. func (r *raft) leader() uint64 { return r.lead }
  142. func (r *raft) softState() *SoftState { return &SoftState{Lead: r.lead, RaftState: r.state} }
  143. func (r *raft) q() int { return len(r.prs)/2 + 1 }
  144. func (r *raft) nodes() []uint64 {
  145. nodes := make([]uint64, 0, len(r.prs))
  146. for k := range r.prs {
  147. nodes = append(nodes, k)
  148. }
  149. sort.Sort(uint64Slice(nodes))
  150. return nodes
  151. }
  152. // send persists state to stable storage and then sends to its mailbox.
  153. func (r *raft) send(m pb.Message) {
  154. m.From = r.id
  155. // do not attach term to MsgProp
  156. // proposals are a way to forward to the leader and
  157. // should be treated as local message.
  158. if m.Type != pb.MsgProp {
  159. m.Term = r.Term
  160. }
  161. r.msgs = append(r.msgs, m)
  162. }
  163. // sendAppend sends RRPC, with entries to the given peer.
  164. func (r *raft) sendAppend(to uint64) {
  165. pr := r.prs[to]
  166. m := pb.Message{}
  167. m.To = to
  168. if r.needSnapshot(pr.next) {
  169. m.Type = pb.MsgSnap
  170. snapshot, err := r.raftLog.snapshot()
  171. if err != nil {
  172. panic(err) // TODO(bdarnell)
  173. }
  174. if IsEmptySnap(snapshot) {
  175. panic("need non-empty snapshot")
  176. }
  177. m.Snapshot = snapshot
  178. sindex, sterm := snapshot.Metadata.Index, snapshot.Metadata.Term
  179. log.Printf("raft: %x [firstindex: %d, commit: %d] sent snapshot[index: %d, term: %d] to %x [%s]",
  180. r.id, r.raftLog.firstIndex(), r.Commit, sindex, sterm, to, pr)
  181. } else {
  182. m.Type = pb.MsgApp
  183. m.Index = pr.next - 1
  184. m.LogTerm = r.raftLog.term(pr.next - 1)
  185. m.Entries = r.raftLog.entries(pr.next)
  186. m.Commit = r.raftLog.committed
  187. // optimistically increase the next if the follower
  188. // has been matched.
  189. if n := len(m.Entries); pr.match != 0 && n != 0 {
  190. pr.optimisticUpdate(m.Entries[n-1].Index)
  191. }
  192. }
  193. r.send(m)
  194. }
  195. // sendHeartbeat sends an empty MsgApp
  196. func (r *raft) sendHeartbeat(to uint64) {
  197. // Attach the commit as min(to.matched, r.committed).
  198. // When the leader sends out heartbeat message,
  199. // the receiver(follower) might not be matched with the leader
  200. // or it might not have all the committed entries.
  201. // The leader MUST NOT forward the follower's commit to
  202. // an unmatched index.
  203. commit := min(r.prs[to].match, r.raftLog.committed)
  204. m := pb.Message{
  205. To: to,
  206. Type: pb.MsgHeartbeat,
  207. Commit: commit,
  208. }
  209. r.send(m)
  210. }
  211. // bcastAppend sends RRPC, with entries to all peers that are not up-to-date
  212. // according to the progress recorded in r.prs.
  213. func (r *raft) bcastAppend() {
  214. for i := range r.prs {
  215. if i == r.id {
  216. continue
  217. }
  218. r.sendAppend(i)
  219. }
  220. }
  221. // bcastHeartbeat sends RRPC, without entries to all the peers.
  222. func (r *raft) bcastHeartbeat() {
  223. for i := range r.prs {
  224. if i == r.id {
  225. continue
  226. }
  227. r.sendHeartbeat(i)
  228. }
  229. }
  230. func (r *raft) maybeCommit() bool {
  231. // TODO(bmizerany): optimize.. Currently naive
  232. mis := make(uint64Slice, 0, len(r.prs))
  233. for i := range r.prs {
  234. mis = append(mis, r.prs[i].match)
  235. }
  236. sort.Sort(sort.Reverse(mis))
  237. mci := mis[r.q()-1]
  238. return r.raftLog.maybeCommit(mci, r.Term)
  239. }
  240. func (r *raft) reset(term uint64) {
  241. r.Term = term
  242. r.lead = None
  243. r.Vote = None
  244. r.elapsed = 0
  245. r.votes = make(map[uint64]bool)
  246. for i := range r.prs {
  247. r.prs[i] = &progress{next: r.raftLog.lastIndex() + 1}
  248. if i == r.id {
  249. r.prs[i].match = r.raftLog.lastIndex()
  250. }
  251. }
  252. r.pendingConf = false
  253. }
  254. func (r *raft) appendEntry(e pb.Entry) {
  255. e.Term = r.Term
  256. e.Index = r.raftLog.lastIndex() + 1
  257. r.raftLog.append(e)
  258. r.prs[r.id].update(r.raftLog.lastIndex())
  259. r.maybeCommit()
  260. }
  261. // tickElection is run by followers and candidates after r.electionTimeout.
  262. func (r *raft) tickElection() {
  263. if !r.promotable() {
  264. r.elapsed = 0
  265. return
  266. }
  267. r.elapsed++
  268. if r.isElectionTimeout() {
  269. r.elapsed = 0
  270. r.Step(pb.Message{From: r.id, Type: pb.MsgHup})
  271. }
  272. }
  273. // tickHeartbeat is run by leaders to send a MsgBeat after r.heartbeatTimeout.
  274. func (r *raft) tickHeartbeat() {
  275. r.elapsed++
  276. if r.elapsed > r.heartbeatTimeout {
  277. r.elapsed = 0
  278. r.Step(pb.Message{From: r.id, Type: pb.MsgBeat})
  279. }
  280. }
  281. func (r *raft) becomeFollower(term uint64, lead uint64) {
  282. r.step = stepFollower
  283. r.reset(term)
  284. r.tick = r.tickElection
  285. r.lead = lead
  286. r.state = StateFollower
  287. log.Printf("raft: %x became follower at term %d", r.id, r.Term)
  288. }
  289. func (r *raft) becomeCandidate() {
  290. // TODO(xiangli) remove the panic when the raft implementation is stable
  291. if r.state == StateLeader {
  292. panic("invalid transition [leader -> candidate]")
  293. }
  294. r.step = stepCandidate
  295. r.reset(r.Term + 1)
  296. r.tick = r.tickElection
  297. r.Vote = r.id
  298. r.state = StateCandidate
  299. log.Printf("raft: %x became candidate at term %d", r.id, r.Term)
  300. }
  301. func (r *raft) becomeLeader() {
  302. // TODO(xiangli) remove the panic when the raft implementation is stable
  303. if r.state == StateFollower {
  304. panic("invalid transition [follower -> leader]")
  305. }
  306. r.step = stepLeader
  307. r.reset(r.Term)
  308. r.tick = r.tickHeartbeat
  309. r.lead = r.id
  310. r.state = StateLeader
  311. for _, e := range r.raftLog.entries(r.raftLog.committed + 1) {
  312. if e.Type != pb.EntryConfChange {
  313. continue
  314. }
  315. if r.pendingConf {
  316. panic("unexpected double uncommitted config entry")
  317. }
  318. r.pendingConf = true
  319. }
  320. r.appendEntry(pb.Entry{Data: nil})
  321. log.Printf("raft: %x became leader at term %d", r.id, r.Term)
  322. }
  323. func (r *raft) campaign() {
  324. r.becomeCandidate()
  325. if r.q() == r.poll(r.id, true) {
  326. r.becomeLeader()
  327. return
  328. }
  329. for i := range r.prs {
  330. if i == r.id {
  331. continue
  332. }
  333. log.Printf("raft: %x [logterm: %d, index: %d] sent vote request to %x at term %d",
  334. r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), i, r.Term)
  335. r.send(pb.Message{To: i, Type: pb.MsgVote, Index: r.raftLog.lastIndex(), LogTerm: r.raftLog.lastTerm()})
  336. }
  337. }
  338. func (r *raft) poll(id uint64, v bool) (granted int) {
  339. if v {
  340. log.Printf("raft: %x received vote from %x at term %d", r.id, id, r.Term)
  341. } else {
  342. log.Printf("raft: %x received vote rejection from %x at term %d", r.id, id, r.Term)
  343. }
  344. if _, ok := r.votes[id]; !ok {
  345. r.votes[id] = v
  346. }
  347. for _, vv := range r.votes {
  348. if vv {
  349. granted++
  350. }
  351. }
  352. return granted
  353. }
  354. func (r *raft) Step(m pb.Message) error {
  355. // TODO(bmizerany): this likely allocs - prevent that.
  356. defer func() { r.Commit = r.raftLog.committed }()
  357. if m.Type == pb.MsgHup {
  358. log.Printf("raft: %x is starting a new election at term %d", r.id, r.Term)
  359. r.campaign()
  360. return nil
  361. }
  362. switch {
  363. case m.Term == 0:
  364. // local message
  365. case m.Term > r.Term:
  366. lead := m.From
  367. if m.Type == pb.MsgVote {
  368. lead = None
  369. }
  370. log.Printf("raft: %x [term: %d] received a %s message with higher term from %x [term: %d]",
  371. r.id, r.Term, m.Type, m.From, m.Term)
  372. r.becomeFollower(m.Term, lead)
  373. case m.Term < r.Term:
  374. // ignore
  375. log.Printf("raft: %x [term: %d] ignored a %s message with lower term from %x [term: %d]",
  376. r.id, r.Term, m.Type, m.From, m.Term)
  377. return nil
  378. }
  379. r.step(r, m)
  380. return nil
  381. }
  382. type stepFunc func(r *raft, m pb.Message)
  383. func stepLeader(r *raft, m pb.Message) {
  384. switch m.Type {
  385. case pb.MsgBeat:
  386. r.bcastHeartbeat()
  387. case pb.MsgProp:
  388. if len(m.Entries) != 1 {
  389. panic("unexpected length(entries) of a MsgProp")
  390. }
  391. e := m.Entries[0]
  392. if e.Type == pb.EntryConfChange {
  393. if r.pendingConf {
  394. return
  395. }
  396. r.pendingConf = true
  397. }
  398. r.appendEntry(e)
  399. r.bcastAppend()
  400. case pb.MsgAppResp:
  401. if m.Reject {
  402. log.Printf("raft: %x received msgApp rejection from %x for index %d",
  403. r.id, m.From, m.Index)
  404. if r.prs[m.From].maybeDecrTo(m.Index) {
  405. r.sendAppend(m.From)
  406. }
  407. } else {
  408. r.prs[m.From].update(m.Index)
  409. if r.maybeCommit() {
  410. r.bcastAppend()
  411. }
  412. }
  413. case pb.MsgVote:
  414. log.Printf("raft: %x [logterm: %d, index: %d, vote: %x] rejected vote from %x [logterm: %d, index: %d] at term %x",
  415. r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.From, m.LogTerm, m.Index, r.Term)
  416. r.send(pb.Message{To: m.From, Type: pb.MsgVoteResp, Reject: true})
  417. }
  418. }
  419. func stepCandidate(r *raft, m pb.Message) {
  420. switch m.Type {
  421. case pb.MsgProp:
  422. panic("no leader")
  423. case pb.MsgApp:
  424. r.becomeFollower(r.Term, m.From)
  425. r.handleAppendEntries(m)
  426. case pb.MsgHeartbeat:
  427. r.becomeFollower(r.Term, m.From)
  428. r.handleHeartbeat(m)
  429. case pb.MsgSnap:
  430. r.becomeFollower(m.Term, m.From)
  431. r.handleSnapshot(m)
  432. case pb.MsgVote:
  433. log.Printf("raft: %x [logterm: %d, index: %d, vote: %x] rejected vote from %x [logterm: %d, index: %d] at term %x",
  434. r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.From, m.LogTerm, m.Index, r.Term)
  435. r.send(pb.Message{To: m.From, Type: pb.MsgVoteResp, Reject: true})
  436. case pb.MsgVoteResp:
  437. gr := r.poll(m.From, !m.Reject)
  438. log.Printf("raft: %x [q:%d] has received %d votes and %d vote rejections", r.id, r.q(), gr, len(r.votes)-gr)
  439. switch r.q() {
  440. case gr:
  441. r.becomeLeader()
  442. r.bcastAppend()
  443. case len(r.votes) - gr:
  444. r.becomeFollower(r.Term, None)
  445. }
  446. }
  447. }
  448. func stepFollower(r *raft, m pb.Message) {
  449. switch m.Type {
  450. case pb.MsgProp:
  451. if r.lead == None {
  452. panic("no leader")
  453. }
  454. m.To = r.lead
  455. r.send(m)
  456. case pb.MsgApp:
  457. r.elapsed = 0
  458. r.lead = m.From
  459. r.handleAppendEntries(m)
  460. case pb.MsgHeartbeat:
  461. r.elapsed = 0
  462. r.lead = m.From
  463. r.handleHeartbeat(m)
  464. case pb.MsgSnap:
  465. r.elapsed = 0
  466. r.handleSnapshot(m)
  467. case pb.MsgVote:
  468. if (r.Vote == None || r.Vote == m.From) && r.raftLog.isUpToDate(m.Index, m.LogTerm) {
  469. r.elapsed = 0
  470. log.Printf("raft: %x [logterm: %d, index: %d, vote: %x] voted for %x [logterm: %d, index: %d] at term %x",
  471. r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.From, m.LogTerm, m.Index, r.Term)
  472. r.Vote = m.From
  473. r.send(pb.Message{To: m.From, Type: pb.MsgVoteResp})
  474. } else {
  475. log.Printf("raft: %x [logterm: %d, index: %d, vote: %x] rejected vote from %x [logterm: %d, index: %d] at term %x",
  476. r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.From, m.LogTerm, m.Index, r.Term)
  477. r.send(pb.Message{To: m.From, Type: pb.MsgVoteResp, Reject: true})
  478. }
  479. }
  480. }
  481. func (r *raft) handleAppendEntries(m pb.Message) {
  482. if mlastIndex, ok := r.raftLog.maybeAppend(m.Index, m.LogTerm, m.Commit, m.Entries...); ok {
  483. r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: mlastIndex})
  484. } else {
  485. log.Printf("raft: %x [logterm: %d, index: %d] rejected msgApp [logterm: %d, index: %d] from %x",
  486. r.id, r.raftLog.term(m.Index), m.Index, m.LogTerm, m.Index, m.From)
  487. r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: m.Index, Reject: true})
  488. }
  489. }
  490. func (r *raft) handleHeartbeat(m pb.Message) {
  491. r.raftLog.commitTo(m.Commit)
  492. }
  493. func (r *raft) handleSnapshot(m pb.Message) {
  494. sindex, sterm := m.Snapshot.Metadata.Index, m.Snapshot.Metadata.Term
  495. if r.restore(m.Snapshot) {
  496. log.Printf("raft: %x [commit: %d] restored snapshot [index: %d, term: %d]",
  497. r.id, r.Commit, sindex, sterm)
  498. r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.lastIndex()})
  499. } else {
  500. log.Printf("raft: %x [commit: %d] ignored snapshot [index: %d, term: %d]",
  501. r.id, r.Commit, sindex, sterm)
  502. r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.committed})
  503. }
  504. }
  505. // restore recovers the statemachine from a snapshot. It restores the log and the
  506. // configuration of statemachine.
  507. func (r *raft) restore(s pb.Snapshot) bool {
  508. if s.Metadata.Index <= r.raftLog.committed {
  509. return false
  510. }
  511. if r.raftLog.matchTerm(s.Metadata.Index, s.Metadata.Term) {
  512. log.Printf("raft: %x [commit: %d, lastindex: %d, lastterm: %d] fast-forwarded commit to snapshot [index: %d, term: %d]",
  513. r.id, r.Commit, r.raftLog.lastIndex(), r.raftLog.lastTerm(), s.Metadata.Index, s.Metadata.Term)
  514. r.raftLog.commitTo(s.Metadata.Index)
  515. return false
  516. }
  517. log.Printf("raft: %x [commit: %d, lastindex: %d, lastterm: %d] starts to restore snapshot [index: %d, term: %d]",
  518. r.id, r.Commit, r.raftLog.lastIndex(), r.raftLog.lastTerm(), s.Metadata.Index, s.Metadata.Term)
  519. r.raftLog.restore(s)
  520. r.prs = make(map[uint64]*progress)
  521. for _, n := range s.Metadata.ConfState.Nodes {
  522. match, next := uint64(0), uint64(r.raftLog.lastIndex())+1
  523. if n == r.id {
  524. match = next - 1
  525. } else {
  526. match = 0
  527. }
  528. r.setProgress(n, match, next)
  529. log.Printf("raft: %x restored progress of %x [%s]", r.id, n, r.prs[n])
  530. }
  531. return true
  532. }
  533. func (r *raft) needSnapshot(i uint64) bool {
  534. return i < r.raftLog.firstIndex()
  535. }
  536. // promotable indicates whether state machine can be promoted to leader,
  537. // which is true when its own id is in progress list.
  538. func (r *raft) promotable() bool {
  539. _, ok := r.prs[r.id]
  540. return ok
  541. }
  542. func (r *raft) addNode(id uint64) {
  543. if _, ok := r.prs[id]; ok {
  544. // Ignore any redundant addNode calls (which can happen because the
  545. // initial bootstrapping entries are applied twice).
  546. return
  547. }
  548. r.setProgress(id, 0, r.raftLog.lastIndex()+1)
  549. r.pendingConf = false
  550. }
  551. func (r *raft) removeNode(id uint64) {
  552. r.delProgress(id)
  553. r.pendingConf = false
  554. }
  555. func (r *raft) resetPendingConf() { r.pendingConf = false }
  556. func (r *raft) setProgress(id, match, next uint64) {
  557. r.prs[id] = &progress{next: next, match: match}
  558. }
  559. func (r *raft) delProgress(id uint64) {
  560. delete(r.prs, id)
  561. }
  562. func (r *raft) loadState(state pb.HardState) {
  563. if state.Commit < r.raftLog.committed || state.Commit > r.raftLog.lastIndex() {
  564. log.Panicf("raft: %x state.commit %d is out of range [%d, %d]", r.id, state.Commit, r.raftLog.committed, r.raftLog.lastIndex())
  565. }
  566. r.raftLog.committed = state.Commit
  567. r.Term = state.Term
  568. r.Vote = state.Vote
  569. r.Commit = state.Commit
  570. }
  571. // isElectionTimeout returns true if r.elapsed is greater than the
  572. // randomized election timeout in (electiontimeout, 2 * electiontimeout - 1).
  573. // Otherwise, it returns false.
  574. func (r *raft) isElectionTimeout() bool {
  575. d := r.elapsed - r.electionTimeout
  576. if d < 0 {
  577. return false
  578. }
  579. return d > r.rand.Int()%r.electionTimeout
  580. }