raft.go 17 KB


  1. /*
  2. Copyright 2014 CoreOS, Inc.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package raft
  14. import (
  15. "errors"
  16. "fmt"
  17. "log"
  18. "math/rand"
  19. "sort"
  20. "strings"
  21. pb "github.com/coreos/etcd/raft/raftpb"
  22. )
  23. // None is a placeholder node ID used when there is no leader.
  24. const None uint64 = 0
  25. var errNoLeader = errors.New("no leader")
  26. // Possible values for StateType.
  27. const (
  28. StateFollower StateType = iota
  29. StateCandidate
  30. StateLeader
  31. )
  32. // StateType represents the role of a node in a cluster.
  33. type StateType uint64
  34. var stmap = [...]string{
  35. "StateFollower",
  36. "StateCandidate",
  37. "StateLeader",
  38. }
  39. func (st StateType) String() string {
  40. return stmap[uint64(st)]
  41. }
  42. func (st StateType) MarshalJSON() ([]byte, error) {
  43. return []byte(fmt.Sprintf("%q", st.String())), nil
  44. }
  45. type progress struct{ match, next uint64 }
  46. func (pr *progress) update(n uint64) {
  47. if pr.match < n {
  48. pr.match = n
  49. }
  50. if pr.next < n+1 {
  51. pr.next = n + 1
  52. }
  53. }
  54. func (pr *progress) optimisticUpdate(n uint64) { pr.next = n + 1 }
  55. // maybeDecrTo returns false if the given to index comes from an out of order message.
  56. // Otherwise it decreases the progress next index and returns true.
  57. func (pr *progress) maybeDecrTo(to uint64) bool {
  58. if pr.match != 0 {
  59. // the rejection must be stale if the progress has matched and "to"
  60. // is smaller than "match".
  61. if to <= pr.match {
  62. return false
  63. }
  64. // directly decrease next to match + 1
  65. pr.next = pr.match + 1
  66. return true
  67. }
  68. // the rejection must be stale if "to" does not match next - 1
  69. if pr.next-1 != to {
  70. return false
  71. }
  72. if pr.next--; pr.next < 1 {
  73. pr.next = 1
  74. }
  75. return true
  76. }
  77. func (pr *progress) String() string { return fmt.Sprintf("next = %d, match = %d", pr.next, pr.match) }
  78. type raft struct {
  79. pb.HardState
  80. id uint64
  81. // the log
  82. raftLog *raftLog
  83. prs map[uint64]*progress
  84. state StateType
  85. votes map[uint64]bool
  86. msgs []pb.Message
  87. // the leader id
  88. lead uint64
  89. // New configuration is ignored if there exists unapplied configuration.
  90. pendingConf bool
  91. elapsed int // number of ticks since the last msg
  92. heartbeatTimeout int
  93. electionTimeout int
  94. rand *rand.Rand
  95. tick func()
  96. step stepFunc
  97. }
  98. func newRaft(id uint64, peers []uint64, election, heartbeat int, storage Storage) *raft {
  99. if id == None {
  100. panic("cannot use none id")
  101. }
  102. raftlog := newLog(storage)
  103. hs, cs, err := storage.InitialState()
  104. if err != nil {
  105. panic(err) // TODO(bdarnell)
  106. }
  107. if len(cs.Nodes) > 0 {
  108. if len(peers) > 0 {
  109. // TODO(bdarnell): the peers argument is always nil except in
  110. // tests; the argument should be removed and these tests should be
  111. // updated to specify their nodes through a snapshot.
  112. panic("cannot specify both newRaft(peers) and ConfState.Nodes)")
  113. }
  114. peers = cs.Nodes
  115. }
  116. r := &raft{
  117. id: id,
  118. lead: None,
  119. raftLog: raftlog,
  120. prs: make(map[uint64]*progress),
  121. electionTimeout: election,
  122. heartbeatTimeout: heartbeat,
  123. }
  124. r.rand = rand.New(rand.NewSource(int64(id)))
  125. for _, p := range peers {
  126. r.prs[p] = &progress{next: 1}
  127. }
  128. if !isHardStateEqual(hs, emptyState) {
  129. r.loadState(hs)
  130. }
  131. r.becomeFollower(r.Term, None)
  132. nodesStrs := make([]string, 0)
  133. for _, n := range r.nodes() {
  134. nodesStrs = append(nodesStrs, fmt.Sprintf("%x", n))
  135. }
  136. log.Printf("raft: newRaft %x [peers: [%s], term: %d, commit: %d, lastindex: %d, lastterm: %d]",
  137. r.id, strings.Join(nodesStrs, ","), r.Term, r.raftLog.committed, r.raftLog.lastIndex(), r.raftLog.lastTerm())
  138. return r
  139. }
  140. func (r *raft) hasLeader() bool { return r.lead != None }
  141. func (r *raft) leader() uint64 { return r.lead }
  142. func (r *raft) softState() *SoftState { return &SoftState{Lead: r.lead, RaftState: r.state} }
  143. func (r *raft) q() int { return len(r.prs)/2 + 1 }
  144. func (r *raft) nodes() []uint64 {
  145. nodes := make([]uint64, 0, len(r.prs))
  146. for k := range r.prs {
  147. nodes = append(nodes, k)
  148. }
  149. sort.Sort(uint64Slice(nodes))
  150. return nodes
  151. }
  152. // send persists state to stable storage and then sends to its mailbox.
  153. func (r *raft) send(m pb.Message) {
  154. m.From = r.id
  155. // do not attach term to MsgProp
  156. // proposals are a way to forward to the leader and
  157. // should be treated as local message.
  158. if m.Type != pb.MsgProp {
  159. m.Term = r.Term
  160. }
  161. r.msgs = append(r.msgs, m)
  162. }
  163. // sendAppend sends RRPC, with entries to the given peer.
  164. func (r *raft) sendAppend(to uint64) {
  165. pr := r.prs[to]
  166. m := pb.Message{}
  167. m.To = to
  168. if r.needSnapshot(pr.next) {
  169. m.Type = pb.MsgSnap
  170. snapshot, err := r.raftLog.snapshot()
  171. if err != nil {
  172. panic(err) // TODO(bdarnell)
  173. }
  174. if IsEmptySnap(snapshot) {
  175. panic("need non-empty snapshot")
  176. }
  177. m.Snapshot = snapshot
  178. sindex, sterm := snapshot.Metadata.Index, snapshot.Metadata.Term
  179. log.Printf("raft: %x [firstindex: %d, commit: %d] sent snapshot[index: %d, term: %d] to %x [%s]",
  180. r.id, r.raftLog.firstIndex(), r.Commit, sindex, sterm, to, pr)
  181. } else {
  182. m.Type = pb.MsgApp
  183. m.Index = pr.next - 1
  184. m.LogTerm = r.raftLog.term(pr.next - 1)
  185. m.Entries = r.raftLog.entries(pr.next)
  186. m.Commit = r.raftLog.committed
  187. // optimistically increase the next if the follower
  188. // has been matched.
  189. if n := len(m.Entries); pr.match != 0 && n != 0 {
  190. pr.optimisticUpdate(m.Entries[n-1].Index)
  191. }
  192. }
  193. r.send(m)
  194. }
  195. // sendHeartbeat sends an empty MsgApp
  196. func (r *raft) sendHeartbeat(to uint64) {
  197. // Attach the commit as min(to.matched, r.committed).
  198. // When the leader sends out heartbeat message,
  199. // the receiver(follower) might not be matched with the leader
  200. // or it might not have all the committed entries.
  201. // The leader MUST NOT forward the follower's commit to
  202. // an unmatched index.
  203. commit := min(r.prs[to].match, r.raftLog.committed)
  204. m := pb.Message{
  205. To: to,
  206. Type: pb.MsgHeartbeat,
  207. Commit: commit,
  208. }
  209. r.send(m)
  210. }
  211. // bcastAppend sends RRPC, with entries to all peers that are not up-to-date
  212. // according to the progress recorded in r.prs.
  213. func (r *raft) bcastAppend() {
  214. for i := range r.prs {
  215. if i == r.id {
  216. continue
  217. }
  218. r.sendAppend(i)
  219. }
  220. }
  221. // bcastHeartbeat sends RRPC, without entries to all the peers.
  222. func (r *raft) bcastHeartbeat() {
  223. for i := range r.prs {
  224. if i == r.id {
  225. continue
  226. }
  227. r.sendHeartbeat(i)
  228. }
  229. }
  230. func (r *raft) maybeCommit() bool {
  231. // TODO(bmizerany): optimize.. Currently naive
  232. mis := make(uint64Slice, 0, len(r.prs))
  233. for i := range r.prs {
  234. mis = append(mis, r.prs[i].match)
  235. }
  236. sort.Sort(sort.Reverse(mis))
  237. mci := mis[r.q()-1]
  238. return r.raftLog.maybeCommit(mci, r.Term)
  239. }
  240. func (r *raft) reset(term uint64) {
  241. r.Term = term
  242. r.lead = None
  243. r.Vote = None
  244. r.elapsed = 0
  245. r.votes = make(map[uint64]bool)
  246. for i := range r.prs {
  247. r.prs[i] = &progress{next: r.raftLog.lastIndex() + 1}
  248. if i == r.id {
  249. r.prs[i].match = r.raftLog.lastIndex()
  250. }
  251. }
  252. r.pendingConf = false
  253. }
  254. func (r *raft) appendEntry(es ...pb.Entry) {
  255. li := r.raftLog.lastIndex()
  256. for i := range es {
  257. es[i].Term = r.Term
  258. es[i].Index = li + 1 + uint64(i)
  259. }
  260. r.raftLog.append(es...)
  261. r.prs[r.id].update(r.raftLog.lastIndex())
  262. r.maybeCommit()
  263. }
  264. // tickElection is run by followers and candidates after r.electionTimeout.
  265. func (r *raft) tickElection() {
  266. if !r.promotable() {
  267. r.elapsed = 0
  268. return
  269. }
  270. r.elapsed++
  271. if r.isElectionTimeout() {
  272. r.elapsed = 0
  273. r.Step(pb.Message{From: r.id, Type: pb.MsgHup})
  274. }
  275. }
  276. // tickHeartbeat is run by leaders to send a MsgBeat after r.heartbeatTimeout.
  277. func (r *raft) tickHeartbeat() {
  278. r.elapsed++
  279. if r.elapsed > r.heartbeatTimeout {
  280. r.elapsed = 0
  281. r.Step(pb.Message{From: r.id, Type: pb.MsgBeat})
  282. }
  283. }
  284. func (r *raft) becomeFollower(term uint64, lead uint64) {
  285. r.step = stepFollower
  286. r.reset(term)
  287. r.tick = r.tickElection
  288. r.lead = lead
  289. r.state = StateFollower
  290. log.Printf("raft: %x became follower at term %d", r.id, r.Term)
  291. }
  292. func (r *raft) becomeCandidate() {
  293. // TODO(xiangli) remove the panic when the raft implementation is stable
  294. if r.state == StateLeader {
  295. panic("invalid transition [leader -> candidate]")
  296. }
  297. r.step = stepCandidate
  298. r.reset(r.Term + 1)
  299. r.tick = r.tickElection
  300. r.Vote = r.id
  301. r.state = StateCandidate
  302. log.Printf("raft: %x became candidate at term %d", r.id, r.Term)
  303. }
  304. func (r *raft) becomeLeader() {
  305. // TODO(xiangli) remove the panic when the raft implementation is stable
  306. if r.state == StateFollower {
  307. panic("invalid transition [follower -> leader]")
  308. }
  309. r.step = stepLeader
  310. r.reset(r.Term)
  311. r.tick = r.tickHeartbeat
  312. r.lead = r.id
  313. r.state = StateLeader
  314. for _, e := range r.raftLog.entries(r.raftLog.committed + 1) {
  315. if e.Type != pb.EntryConfChange {
  316. continue
  317. }
  318. if r.pendingConf {
  319. panic("unexpected double uncommitted config entry")
  320. }
  321. r.pendingConf = true
  322. }
  323. r.appendEntry(pb.Entry{Data: nil})
  324. log.Printf("raft: %x became leader at term %d", r.id, r.Term)
  325. }
  326. func (r *raft) campaign() {
  327. r.becomeCandidate()
  328. if r.q() == r.poll(r.id, true) {
  329. r.becomeLeader()
  330. return
  331. }
  332. for i := range r.prs {
  333. if i == r.id {
  334. continue
  335. }
  336. log.Printf("raft: %x [logterm: %d, index: %d] sent vote request to %x at term %d",
  337. r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), i, r.Term)
  338. r.send(pb.Message{To: i, Type: pb.MsgVote, Index: r.raftLog.lastIndex(), LogTerm: r.raftLog.lastTerm()})
  339. }
  340. }
  341. func (r *raft) poll(id uint64, v bool) (granted int) {
  342. if v {
  343. log.Printf("raft: %x received vote from %x at term %d", r.id, id, r.Term)
  344. } else {
  345. log.Printf("raft: %x received vote rejection from %x at term %d", r.id, id, r.Term)
  346. }
  347. if _, ok := r.votes[id]; !ok {
  348. r.votes[id] = v
  349. }
  350. for _, vv := range r.votes {
  351. if vv {
  352. granted++
  353. }
  354. }
  355. return granted
  356. }
  357. func (r *raft) Step(m pb.Message) error {
  358. if m.Type == pb.MsgHup {
  359. log.Printf("raft: %x is starting a new election at term %d", r.id, r.Term)
  360. r.campaign()
  361. r.Commit = r.raftLog.committed
  362. return nil
  363. }
  364. switch {
  365. case m.Term == 0:
  366. // local message
  367. case m.Term > r.Term:
  368. lead := m.From
  369. if m.Type == pb.MsgVote {
  370. lead = None
  371. }
  372. log.Printf("raft: %x [term: %d] received a %s message with higher term from %x [term: %d]",
  373. r.id, r.Term, m.Type, m.From, m.Term)
  374. r.becomeFollower(m.Term, lead)
  375. case m.Term < r.Term:
  376. // ignore
  377. log.Printf("raft: %x [term: %d] ignored a %s message with lower term from %x [term: %d]",
  378. r.id, r.Term, m.Type, m.From, m.Term)
  379. return nil
  380. }
  381. r.step(r, m)
  382. r.Commit = r.raftLog.committed
  383. return nil
  384. }
  385. type stepFunc func(r *raft, m pb.Message)
  386. func stepLeader(r *raft, m pb.Message) {
  387. switch m.Type {
  388. case pb.MsgBeat:
  389. r.bcastHeartbeat()
  390. case pb.MsgProp:
  391. if len(m.Entries) == 0 {
  392. log.Panicf("raft: %x stepped empty MsgProp", r.id)
  393. }
  394. for i, e := range m.Entries {
  395. if e.Type == pb.EntryConfChange {
  396. if r.pendingConf {
  397. m.Entries[i] = pb.Entry{Type: pb.EntryNormal}
  398. }
  399. r.pendingConf = true
  400. }
  401. }
  402. r.appendEntry(m.Entries...)
  403. r.bcastAppend()
  404. case pb.MsgAppResp:
  405. if m.Reject {
  406. log.Printf("raft: %x received msgApp rejection from %x for index %d",
  407. r.id, m.From, m.Index)
  408. if r.prs[m.From].maybeDecrTo(m.Index) {
  409. r.sendAppend(m.From)
  410. }
  411. } else {
  412. r.prs[m.From].update(m.Index)
  413. if r.maybeCommit() {
  414. r.bcastAppend()
  415. }
  416. }
  417. case pb.MsgVote:
  418. log.Printf("raft: %x [logterm: %d, index: %d, vote: %x] rejected vote from %x [logterm: %d, index: %d] at term %x",
  419. r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.From, m.LogTerm, m.Index, r.Term)
  420. r.send(pb.Message{To: m.From, Type: pb.MsgVoteResp, Reject: true})
  421. }
  422. }
  423. func stepCandidate(r *raft, m pb.Message) {
  424. switch m.Type {
  425. case pb.MsgProp:
  426. panic("no leader")
  427. case pb.MsgApp:
  428. r.becomeFollower(r.Term, m.From)
  429. r.handleAppendEntries(m)
  430. case pb.MsgHeartbeat:
  431. r.becomeFollower(r.Term, m.From)
  432. r.handleHeartbeat(m)
  433. case pb.MsgSnap:
  434. r.becomeFollower(m.Term, m.From)
  435. r.handleSnapshot(m)
  436. case pb.MsgVote:
  437. log.Printf("raft: %x [logterm: %d, index: %d, vote: %x] rejected vote from %x [logterm: %d, index: %d] at term %x",
  438. r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.From, m.LogTerm, m.Index, r.Term)
  439. r.send(pb.Message{To: m.From, Type: pb.MsgVoteResp, Reject: true})
  440. case pb.MsgVoteResp:
  441. gr := r.poll(m.From, !m.Reject)
  442. log.Printf("raft: %x [q:%d] has received %d votes and %d vote rejections", r.id, r.q(), gr, len(r.votes)-gr)
  443. switch r.q() {
  444. case gr:
  445. r.becomeLeader()
  446. r.bcastAppend()
  447. case len(r.votes) - gr:
  448. r.becomeFollower(r.Term, None)
  449. }
  450. }
  451. }
  452. func stepFollower(r *raft, m pb.Message) {
  453. switch m.Type {
  454. case pb.MsgProp:
  455. if r.lead == None {
  456. panic("no leader")
  457. }
  458. m.To = r.lead
  459. r.send(m)
  460. case pb.MsgApp:
  461. r.elapsed = 0
  462. r.lead = m.From
  463. r.handleAppendEntries(m)
  464. case pb.MsgHeartbeat:
  465. r.elapsed = 0
  466. r.lead = m.From
  467. r.handleHeartbeat(m)
  468. case pb.MsgSnap:
  469. r.elapsed = 0
  470. r.handleSnapshot(m)
  471. case pb.MsgVote:
  472. if (r.Vote == None || r.Vote == m.From) && r.raftLog.isUpToDate(m.Index, m.LogTerm) {
  473. r.elapsed = 0
  474. log.Printf("raft: %x [logterm: %d, index: %d, vote: %x] voted for %x [logterm: %d, index: %d] at term %x",
  475. r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.From, m.LogTerm, m.Index, r.Term)
  476. r.Vote = m.From
  477. r.send(pb.Message{To: m.From, Type: pb.MsgVoteResp})
  478. } else {
  479. log.Printf("raft: %x [logterm: %d, index: %d, vote: %x] rejected vote from %x [logterm: %d, index: %d] at term %x",
  480. r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.From, m.LogTerm, m.Index, r.Term)
  481. r.send(pb.Message{To: m.From, Type: pb.MsgVoteResp, Reject: true})
  482. }
  483. }
  484. }
  485. func (r *raft) handleAppendEntries(m pb.Message) {
  486. if mlastIndex, ok := r.raftLog.maybeAppend(m.Index, m.LogTerm, m.Commit, m.Entries...); ok {
  487. r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: mlastIndex})
  488. } else {
  489. log.Printf("raft: %x [logterm: %d, index: %d] rejected msgApp [logterm: %d, index: %d] from %x",
  490. r.id, r.raftLog.term(m.Index), m.Index, m.LogTerm, m.Index, m.From)
  491. r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: m.Index, Reject: true})
  492. }
  493. }
  494. func (r *raft) handleHeartbeat(m pb.Message) {
  495. r.raftLog.commitTo(m.Commit)
  496. }
  497. func (r *raft) handleSnapshot(m pb.Message) {
  498. sindex, sterm := m.Snapshot.Metadata.Index, m.Snapshot.Metadata.Term
  499. if r.restore(m.Snapshot) {
  500. log.Printf("raft: %x [commit: %d] restored snapshot [index: %d, term: %d]",
  501. r.id, r.Commit, sindex, sterm)
  502. r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.lastIndex()})
  503. } else {
  504. log.Printf("raft: %x [commit: %d] ignored snapshot [index: %d, term: %d]",
  505. r.id, r.Commit, sindex, sterm)
  506. r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.committed})
  507. }
  508. }
  509. // restore recovers the statemachine from a snapshot. It restores the log and the
  510. // configuration of statemachine.
  511. func (r *raft) restore(s pb.Snapshot) bool {
  512. if s.Metadata.Index <= r.raftLog.committed {
  513. return false
  514. }
  515. if r.raftLog.matchTerm(s.Metadata.Index, s.Metadata.Term) {
  516. log.Printf("raft: %x [commit: %d, lastindex: %d, lastterm: %d] fast-forwarded commit to snapshot [index: %d, term: %d]",
  517. r.id, r.Commit, r.raftLog.lastIndex(), r.raftLog.lastTerm(), s.Metadata.Index, s.Metadata.Term)
  518. r.raftLog.commitTo(s.Metadata.Index)
  519. return false
  520. }
  521. log.Printf("raft: %x [commit: %d, lastindex: %d, lastterm: %d] starts to restore snapshot [index: %d, term: %d]",
  522. r.id, r.Commit, r.raftLog.lastIndex(), r.raftLog.lastTerm(), s.Metadata.Index, s.Metadata.Term)
  523. r.raftLog.restore(s)
  524. r.prs = make(map[uint64]*progress)
  525. for _, n := range s.Metadata.ConfState.Nodes {
  526. match, next := uint64(0), uint64(r.raftLog.lastIndex())+1
  527. if n == r.id {
  528. match = next - 1
  529. } else {
  530. match = 0
  531. }
  532. r.setProgress(n, match, next)
  533. log.Printf("raft: %x restored progress of %x [%s]", r.id, n, r.prs[n])
  534. }
  535. return true
  536. }
  537. func (r *raft) needSnapshot(i uint64) bool {
  538. return i < r.raftLog.firstIndex()
  539. }
  540. // promotable indicates whether state machine can be promoted to leader,
  541. // which is true when its own id is in progress list.
  542. func (r *raft) promotable() bool {
  543. _, ok := r.prs[r.id]
  544. return ok
  545. }
  546. func (r *raft) addNode(id uint64) {
  547. if _, ok := r.prs[id]; ok {
  548. // Ignore any redundant addNode calls (which can happen because the
  549. // initial bootstrapping entries are applied twice).
  550. return
  551. }
  552. r.setProgress(id, 0, r.raftLog.lastIndex()+1)
  553. r.pendingConf = false
  554. }
  555. func (r *raft) removeNode(id uint64) {
  556. r.delProgress(id)
  557. r.pendingConf = false
  558. }
  559. func (r *raft) resetPendingConf() { r.pendingConf = false }
  560. func (r *raft) setProgress(id, match, next uint64) {
  561. r.prs[id] = &progress{next: next, match: match}
  562. }
  563. func (r *raft) delProgress(id uint64) {
  564. delete(r.prs, id)
  565. }
  566. func (r *raft) loadState(state pb.HardState) {
  567. if state.Commit < r.raftLog.committed || state.Commit > r.raftLog.lastIndex() {
  568. log.Panicf("raft: %x state.commit %d is out of range [%d, %d]", r.id, state.Commit, r.raftLog.committed, r.raftLog.lastIndex())
  569. }
  570. r.raftLog.committed = state.Commit
  571. r.Term = state.Term
  572. r.Vote = state.Vote
  573. r.Commit = state.Commit
  574. }
  575. // isElectionTimeout returns true if r.elapsed is greater than the
  576. // randomized election timeout in (electiontimeout, 2 * electiontimeout - 1).
  577. // Otherwise, it returns false.
  578. func (r *raft) isElectionTimeout() bool {
  579. d := r.elapsed - r.electionTimeout
  580. if d < 0 {
  581. return false
  582. }
  583. return d > r.rand.Int()%r.electionTimeout
  584. }