raft.go 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689
  1. /*
  2. Copyright 2014 CoreOS, Inc.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package raft
  14. import (
  15. "errors"
  16. "fmt"
  17. "log"
  18. "math/rand"
  19. "sort"
  20. "strings"
  21. pb "github.com/coreos/etcd/raft/raftpb"
  22. )
  23. // None is a placeholder node ID used when there is no leader.
  24. const None uint64 = 0
  25. var errNoLeader = errors.New("no leader")
  26. // Possible values for StateType.
  27. const (
  28. StateFollower StateType = iota
  29. StateCandidate
  30. StateLeader
  31. )
  32. // StateType represents the role of a node in a cluster.
  33. type StateType uint64
  34. var stmap = [...]string{
  35. "StateFollower",
  36. "StateCandidate",
  37. "StateLeader",
  38. }
  39. func (st StateType) String() string {
  40. return stmap[uint64(st)]
  41. }
  42. type Progress struct {
  43. Match, Next uint64
  44. Wait int
  45. }
  46. func (pr *Progress) update(n uint64) {
  47. pr.waitReset()
  48. if pr.Match < n {
  49. pr.Match = n
  50. }
  51. if pr.Next < n+1 {
  52. pr.Next = n + 1
  53. }
  54. }
  55. func (pr *Progress) optimisticUpdate(n uint64) { pr.Next = n + 1 }
  56. // maybeDecrTo returns false if the given to index comes from an out of order message.
  57. // Otherwise it decreases the progress next index to min(rejected, last) and returns true.
  58. func (pr *Progress) maybeDecrTo(rejected, last uint64) bool {
  59. pr.waitReset()
  60. if pr.Match != 0 {
  61. // the rejection must be stale if the progress has matched and "rejected"
  62. // is smaller than "match".
  63. if rejected <= pr.Match {
  64. return false
  65. }
  66. // directly decrease next to match + 1
  67. pr.Next = pr.Match + 1
  68. return true
  69. }
  70. // the rejection must be stale if "rejected" does not match next - 1
  71. if pr.Next-1 != rejected {
  72. return false
  73. }
  74. if pr.Next = min(rejected, last+1); pr.Next < 1 {
  75. pr.Next = 1
  76. }
  77. return true
  78. }
  79. func (pr *Progress) waitDecr(i int) {
  80. pr.Wait -= i
  81. if pr.Wait < 0 {
  82. pr.Wait = 0
  83. }
  84. }
  85. func (pr *Progress) waitSet(w int) { pr.Wait = w }
  86. func (pr *Progress) waitReset() { pr.Wait = 0 }
  87. func (pr *Progress) shouldWait() bool { return pr.Match == 0 && pr.Wait > 0 }
  88. func (pr *Progress) String() string {
  89. return fmt.Sprintf("next = %d, match = %d, wait = %v", pr.Next, pr.Match, pr.Wait)
  90. }
  91. type raft struct {
  92. pb.HardState
  93. id uint64
  94. // the log
  95. raftLog *raftLog
  96. prs map[uint64]*Progress
  97. state StateType
  98. votes map[uint64]bool
  99. msgs []pb.Message
  100. // the leader id
  101. lead uint64
  102. // New configuration is ignored if there exists unapplied configuration.
  103. pendingConf bool
  104. elapsed int // number of ticks since the last msg
  105. heartbeatTimeout int
  106. electionTimeout int
  107. rand *rand.Rand
  108. tick func()
  109. step stepFunc
  110. }
  111. func newRaft(id uint64, peers []uint64, election, heartbeat int, storage Storage) *raft {
  112. if id == None {
  113. panic("cannot use none id")
  114. }
  115. raftlog := newLog(storage)
  116. hs, cs, err := storage.InitialState()
  117. if err != nil {
  118. panic(err) // TODO(bdarnell)
  119. }
  120. if len(cs.Nodes) > 0 {
  121. if len(peers) > 0 {
  122. // TODO(bdarnell): the peers argument is always nil except in
  123. // tests; the argument should be removed and these tests should be
  124. // updated to specify their nodes through a snapshot.
  125. panic("cannot specify both newRaft(peers) and ConfState.Nodes)")
  126. }
  127. peers = cs.Nodes
  128. }
  129. r := &raft{
  130. id: id,
  131. lead: None,
  132. raftLog: raftlog,
  133. prs: make(map[uint64]*Progress),
  134. electionTimeout: election,
  135. heartbeatTimeout: heartbeat,
  136. }
  137. r.rand = rand.New(rand.NewSource(int64(id)))
  138. for _, p := range peers {
  139. r.prs[p] = &Progress{Next: 1}
  140. }
  141. if !isHardStateEqual(hs, emptyState) {
  142. r.loadState(hs)
  143. }
  144. r.becomeFollower(r.Term, None)
  145. nodesStrs := make([]string, 0)
  146. for _, n := range r.nodes() {
  147. nodesStrs = append(nodesStrs, fmt.Sprintf("%x", n))
  148. }
  149. log.Printf("raft: newRaft %x [peers: [%s], term: %d, commit: %d, lastindex: %d, lastterm: %d]",
  150. r.id, strings.Join(nodesStrs, ","), r.Term, r.raftLog.committed, r.raftLog.lastIndex(), r.raftLog.lastTerm())
  151. return r
  152. }
  153. func (r *raft) hasLeader() bool { return r.lead != None }
  154. func (r *raft) softState() *SoftState { return &SoftState{Lead: r.lead, RaftState: r.state} }
  155. func (r *raft) q() int { return len(r.prs)/2 + 1 }
  156. func (r *raft) nodes() []uint64 {
  157. nodes := make([]uint64, 0, len(r.prs))
  158. for k := range r.prs {
  159. nodes = append(nodes, k)
  160. }
  161. sort.Sort(uint64Slice(nodes))
  162. return nodes
  163. }
  164. // send persists state to stable storage and then sends to its mailbox.
  165. func (r *raft) send(m pb.Message) {
  166. m.From = r.id
  167. // do not attach term to MsgProp
  168. // proposals are a way to forward to the leader and
  169. // should be treated as local message.
  170. if m.Type != pb.MsgProp {
  171. m.Term = r.Term
  172. }
  173. r.msgs = append(r.msgs, m)
  174. }
  175. // sendAppend sends RRPC, with entries to the given peer.
  176. func (r *raft) sendAppend(to uint64) {
  177. pr := r.prs[to]
  178. if pr.shouldWait() {
  179. log.Printf("raft: %x ignored sending %s to %x [%s]", r.id, pb.MsgApp, to, pr)
  180. return
  181. }
  182. m := pb.Message{}
  183. m.To = to
  184. if r.needSnapshot(pr.Next) {
  185. m.Type = pb.MsgSnap
  186. snapshot, err := r.raftLog.snapshot()
  187. if err != nil {
  188. panic(err) // TODO(bdarnell)
  189. }
  190. if IsEmptySnap(snapshot) {
  191. panic("need non-empty snapshot")
  192. }
  193. m.Snapshot = snapshot
  194. sindex, sterm := snapshot.Metadata.Index, snapshot.Metadata.Term
  195. log.Printf("raft: %x [firstindex: %d, commit: %d] sent snapshot[index: %d, term: %d] to %x [%s]",
  196. r.id, r.raftLog.firstIndex(), r.Commit, sindex, sterm, to, pr)
  197. pr.waitSet(r.electionTimeout)
  198. } else {
  199. m.Type = pb.MsgApp
  200. m.Index = pr.Next - 1
  201. m.LogTerm = r.raftLog.term(pr.Next - 1)
  202. m.Entries = r.raftLog.entries(pr.Next)
  203. m.Commit = r.raftLog.committed
  204. // optimistically increase the next if the follower
  205. // has been matched.
  206. if n := len(m.Entries); pr.Match != 0 && n != 0 {
  207. pr.optimisticUpdate(m.Entries[n-1].Index)
  208. } else if pr.Match == 0 {
  209. // TODO (xiangli): better way to find out if the follower is in good path or not
  210. // a follower might be in bad path even if match != 0, since we optimistically
  211. // increase the next.
  212. pr.waitSet(r.heartbeatTimeout)
  213. }
  214. }
  215. r.send(m)
  216. }
  217. // sendHeartbeat sends an empty MsgApp
  218. func (r *raft) sendHeartbeat(to uint64) {
  219. // Attach the commit as min(to.matched, r.committed).
  220. // When the leader sends out heartbeat message,
  221. // the receiver(follower) might not be matched with the leader
  222. // or it might not have all the committed entries.
  223. // The leader MUST NOT forward the follower's commit to
  224. // an unmatched index.
  225. commit := min(r.prs[to].Match, r.raftLog.committed)
  226. m := pb.Message{
  227. To: to,
  228. Type: pb.MsgHeartbeat,
  229. Commit: commit,
  230. }
  231. r.send(m)
  232. }
  233. // bcastAppend sends RRPC, with entries to all peers that are not up-to-date
  234. // according to the progress recorded in r.prs.
  235. func (r *raft) bcastAppend() {
  236. for i := range r.prs {
  237. if i == r.id {
  238. continue
  239. }
  240. r.sendAppend(i)
  241. }
  242. }
  243. // bcastHeartbeat sends RRPC, without entries to all the peers.
  244. func (r *raft) bcastHeartbeat() {
  245. for i := range r.prs {
  246. if i == r.id {
  247. continue
  248. }
  249. r.sendHeartbeat(i)
  250. r.prs[i].waitDecr(r.heartbeatTimeout)
  251. }
  252. }
  253. func (r *raft) maybeCommit() bool {
  254. // TODO(bmizerany): optimize.. Currently naive
  255. mis := make(uint64Slice, 0, len(r.prs))
  256. for i := range r.prs {
  257. mis = append(mis, r.prs[i].Match)
  258. }
  259. sort.Sort(sort.Reverse(mis))
  260. mci := mis[r.q()-1]
  261. return r.raftLog.maybeCommit(mci, r.Term)
  262. }
  263. func (r *raft) reset(term uint64) {
  264. r.Term = term
  265. r.lead = None
  266. r.Vote = None
  267. r.elapsed = 0
  268. r.votes = make(map[uint64]bool)
  269. for i := range r.prs {
  270. r.prs[i] = &Progress{Next: r.raftLog.lastIndex() + 1}
  271. if i == r.id {
  272. r.prs[i].Match = r.raftLog.lastIndex()
  273. }
  274. }
  275. r.pendingConf = false
  276. }
  277. func (r *raft) appendEntry(es ...pb.Entry) {
  278. li := r.raftLog.lastIndex()
  279. for i := range es {
  280. es[i].Term = r.Term
  281. es[i].Index = li + 1 + uint64(i)
  282. }
  283. r.raftLog.append(es...)
  284. r.prs[r.id].update(r.raftLog.lastIndex())
  285. r.maybeCommit()
  286. }
  287. // tickElection is run by followers and candidates after r.electionTimeout.
  288. func (r *raft) tickElection() {
  289. if !r.promotable() {
  290. r.elapsed = 0
  291. return
  292. }
  293. r.elapsed++
  294. if r.isElectionTimeout() {
  295. r.elapsed = 0
  296. r.Step(pb.Message{From: r.id, Type: pb.MsgHup})
  297. }
  298. }
  299. // tickHeartbeat is run by leaders to send a MsgBeat after r.heartbeatTimeout.
  300. func (r *raft) tickHeartbeat() {
  301. r.elapsed++
  302. if r.elapsed >= r.heartbeatTimeout {
  303. r.elapsed = 0
  304. r.Step(pb.Message{From: r.id, Type: pb.MsgBeat})
  305. }
  306. }
  307. func (r *raft) becomeFollower(term uint64, lead uint64) {
  308. r.step = stepFollower
  309. r.reset(term)
  310. r.tick = r.tickElection
  311. r.lead = lead
  312. r.state = StateFollower
  313. log.Printf("raft: %x became follower at term %d", r.id, r.Term)
  314. }
  315. func (r *raft) becomeCandidate() {
  316. // TODO(xiangli) remove the panic when the raft implementation is stable
  317. if r.state == StateLeader {
  318. panic("invalid transition [leader -> candidate]")
  319. }
  320. r.step = stepCandidate
  321. r.reset(r.Term + 1)
  322. r.tick = r.tickElection
  323. r.Vote = r.id
  324. r.state = StateCandidate
  325. log.Printf("raft: %x became candidate at term %d", r.id, r.Term)
  326. }
  327. func (r *raft) becomeLeader() {
  328. // TODO(xiangli) remove the panic when the raft implementation is stable
  329. if r.state == StateFollower {
  330. panic("invalid transition [follower -> leader]")
  331. }
  332. r.step = stepLeader
  333. r.reset(r.Term)
  334. r.tick = r.tickHeartbeat
  335. r.lead = r.id
  336. r.state = StateLeader
  337. for _, e := range r.raftLog.entries(r.raftLog.committed + 1) {
  338. if e.Type != pb.EntryConfChange {
  339. continue
  340. }
  341. if r.pendingConf {
  342. panic("unexpected double uncommitted config entry")
  343. }
  344. r.pendingConf = true
  345. }
  346. r.appendEntry(pb.Entry{Data: nil})
  347. log.Printf("raft: %x became leader at term %d", r.id, r.Term)
  348. }
  349. func (r *raft) campaign() {
  350. r.becomeCandidate()
  351. if r.q() == r.poll(r.id, true) {
  352. r.becomeLeader()
  353. return
  354. }
  355. for i := range r.prs {
  356. if i == r.id {
  357. continue
  358. }
  359. log.Printf("raft: %x [logterm: %d, index: %d] sent vote request to %x at term %d",
  360. r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), i, r.Term)
  361. r.send(pb.Message{To: i, Type: pb.MsgVote, Index: r.raftLog.lastIndex(), LogTerm: r.raftLog.lastTerm()})
  362. }
  363. }
  364. func (r *raft) poll(id uint64, v bool) (granted int) {
  365. if v {
  366. log.Printf("raft: %x received vote from %x at term %d", r.id, id, r.Term)
  367. } else {
  368. log.Printf("raft: %x received vote rejection from %x at term %d", r.id, id, r.Term)
  369. }
  370. if _, ok := r.votes[id]; !ok {
  371. r.votes[id] = v
  372. }
  373. for _, vv := range r.votes {
  374. if vv {
  375. granted++
  376. }
  377. }
  378. return granted
  379. }
  380. func (r *raft) Step(m pb.Message) error {
  381. if m.Type == pb.MsgHup {
  382. log.Printf("raft: %x is starting a new election at term %d", r.id, r.Term)
  383. r.campaign()
  384. r.Commit = r.raftLog.committed
  385. return nil
  386. }
  387. switch {
  388. case m.Term == 0:
  389. // local message
  390. case m.Term > r.Term:
  391. lead := m.From
  392. if m.Type == pb.MsgVote {
  393. lead = None
  394. }
  395. log.Printf("raft: %x [term: %d] received a %s message with higher term from %x [term: %d]",
  396. r.id, r.Term, m.Type, m.From, m.Term)
  397. r.becomeFollower(m.Term, lead)
  398. case m.Term < r.Term:
  399. // ignore
  400. log.Printf("raft: %x [term: %d] ignored a %s message with lower term from %x [term: %d]",
  401. r.id, r.Term, m.Type, m.From, m.Term)
  402. return nil
  403. }
  404. r.step(r, m)
  405. r.Commit = r.raftLog.committed
  406. return nil
  407. }
  408. type stepFunc func(r *raft, m pb.Message)
  409. func stepLeader(r *raft, m pb.Message) {
  410. switch m.Type {
  411. case pb.MsgBeat:
  412. r.bcastHeartbeat()
  413. case pb.MsgProp:
  414. if len(m.Entries) == 0 {
  415. log.Panicf("raft: %x stepped empty MsgProp", r.id)
  416. }
  417. for i, e := range m.Entries {
  418. if e.Type == pb.EntryConfChange {
  419. if r.pendingConf {
  420. m.Entries[i] = pb.Entry{Type: pb.EntryNormal}
  421. }
  422. r.pendingConf = true
  423. }
  424. }
  425. r.appendEntry(m.Entries...)
  426. r.bcastAppend()
  427. case pb.MsgAppResp:
  428. if m.Reject {
  429. log.Printf("raft: %x received msgApp rejection(lastindex: %d) from %x for index %d",
  430. r.id, m.RejectHint, m.From, m.Index)
  431. if r.prs[m.From].maybeDecrTo(m.Index, m.RejectHint) {
  432. log.Printf("raft: %x decreased progress of %x to [%s]", r.id, m.From, r.prs[m.From])
  433. r.sendAppend(m.From)
  434. }
  435. } else {
  436. r.prs[m.From].update(m.Index)
  437. if r.maybeCommit() {
  438. r.bcastAppend()
  439. }
  440. }
  441. case pb.MsgHeartbeatResp:
  442. if r.prs[m.From].Match < r.raftLog.lastIndex() {
  443. r.sendAppend(m.From)
  444. }
  445. case pb.MsgVote:
  446. log.Printf("raft: %x [logterm: %d, index: %d, vote: %x] rejected vote from %x [logterm: %d, index: %d] at term %d",
  447. r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.From, m.LogTerm, m.Index, r.Term)
  448. r.send(pb.Message{To: m.From, Type: pb.MsgVoteResp, Reject: true})
  449. }
  450. }
  451. func stepCandidate(r *raft, m pb.Message) {
  452. switch m.Type {
  453. case pb.MsgProp:
  454. log.Printf("raft: %x no leader at term %d; dropping proposal", r.id, r.Term)
  455. return
  456. case pb.MsgApp:
  457. r.becomeFollower(r.Term, m.From)
  458. r.handleAppendEntries(m)
  459. case pb.MsgHeartbeat:
  460. r.becomeFollower(r.Term, m.From)
  461. r.handleHeartbeat(m)
  462. case pb.MsgSnap:
  463. r.becomeFollower(m.Term, m.From)
  464. r.handleSnapshot(m)
  465. case pb.MsgVote:
  466. log.Printf("raft: %x [logterm: %d, index: %d, vote: %x] rejected vote from %x [logterm: %d, index: %d] at term %x",
  467. r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.From, m.LogTerm, m.Index, r.Term)
  468. r.send(pb.Message{To: m.From, Type: pb.MsgVoteResp, Reject: true})
  469. case pb.MsgVoteResp:
  470. gr := r.poll(m.From, !m.Reject)
  471. log.Printf("raft: %x [q:%d] has received %d votes and %d vote rejections", r.id, r.q(), gr, len(r.votes)-gr)
  472. switch r.q() {
  473. case gr:
  474. r.becomeLeader()
  475. r.bcastAppend()
  476. case len(r.votes) - gr:
  477. r.becomeFollower(r.Term, None)
  478. }
  479. }
  480. }
  481. func stepFollower(r *raft, m pb.Message) {
  482. switch m.Type {
  483. case pb.MsgProp:
  484. if r.lead == None {
  485. log.Printf("raft: %x no leader at term %d; dropping proposal", r.id, r.Term)
  486. return
  487. }
  488. m.To = r.lead
  489. r.send(m)
  490. case pb.MsgApp:
  491. r.elapsed = 0
  492. r.lead = m.From
  493. r.handleAppendEntries(m)
  494. case pb.MsgHeartbeat:
  495. r.elapsed = 0
  496. r.lead = m.From
  497. r.handleHeartbeat(m)
  498. case pb.MsgSnap:
  499. r.elapsed = 0
  500. r.handleSnapshot(m)
  501. case pb.MsgVote:
  502. if (r.Vote == None || r.Vote == m.From) && r.raftLog.isUpToDate(m.Index, m.LogTerm) {
  503. r.elapsed = 0
  504. log.Printf("raft: %x [logterm: %d, index: %d, vote: %x] voted for %x [logterm: %d, index: %d] at term %d",
  505. r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.From, m.LogTerm, m.Index, r.Term)
  506. r.Vote = m.From
  507. r.send(pb.Message{To: m.From, Type: pb.MsgVoteResp})
  508. } else {
  509. log.Printf("raft: %x [logterm: %d, index: %d, vote: %x] rejected vote from %x [logterm: %d, index: %d] at term %d",
  510. r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.From, m.LogTerm, m.Index, r.Term)
  511. r.send(pb.Message{To: m.From, Type: pb.MsgVoteResp, Reject: true})
  512. }
  513. }
  514. }
  515. func (r *raft) handleAppendEntries(m pb.Message) {
  516. if mlastIndex, ok := r.raftLog.maybeAppend(m.Index, m.LogTerm, m.Commit, m.Entries...); ok {
  517. r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: mlastIndex})
  518. } else {
  519. log.Printf("raft: %x [logterm: %d, index: %d] rejected msgApp [logterm: %d, index: %d] from %x",
  520. r.id, r.raftLog.term(m.Index), m.Index, m.LogTerm, m.Index, m.From)
  521. r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: m.Index, Reject: true, RejectHint: r.raftLog.lastIndex()})
  522. }
  523. }
  524. func (r *raft) handleHeartbeat(m pb.Message) {
  525. r.raftLog.commitTo(m.Commit)
  526. r.send(pb.Message{To: m.From, Type: pb.MsgHeartbeatResp})
  527. }
  528. func (r *raft) handleSnapshot(m pb.Message) {
  529. sindex, sterm := m.Snapshot.Metadata.Index, m.Snapshot.Metadata.Term
  530. if r.restore(m.Snapshot) {
  531. log.Printf("raft: %x [commit: %d] restored snapshot [index: %d, term: %d]",
  532. r.id, r.Commit, sindex, sterm)
  533. r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.lastIndex()})
  534. } else {
  535. log.Printf("raft: %x [commit: %d] ignored snapshot [index: %d, term: %d]",
  536. r.id, r.Commit, sindex, sterm)
  537. r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.committed})
  538. }
  539. }
  540. // restore recovers the state machine from a snapshot. It restores the log and the
  541. // configuration of state machine.
  542. func (r *raft) restore(s pb.Snapshot) bool {
  543. if s.Metadata.Index <= r.raftLog.committed {
  544. return false
  545. }
  546. if r.raftLog.matchTerm(s.Metadata.Index, s.Metadata.Term) {
  547. log.Printf("raft: %x [commit: %d, lastindex: %d, lastterm: %d] fast-forwarded commit to snapshot [index: %d, term: %d]",
  548. r.id, r.Commit, r.raftLog.lastIndex(), r.raftLog.lastTerm(), s.Metadata.Index, s.Metadata.Term)
  549. r.raftLog.commitTo(s.Metadata.Index)
  550. return false
  551. }
  552. log.Printf("raft: %x [commit: %d, lastindex: %d, lastterm: %d] starts to restore snapshot [index: %d, term: %d]",
  553. r.id, r.Commit, r.raftLog.lastIndex(), r.raftLog.lastTerm(), s.Metadata.Index, s.Metadata.Term)
  554. r.raftLog.restore(s)
  555. r.prs = make(map[uint64]*Progress)
  556. for _, n := range s.Metadata.ConfState.Nodes {
  557. match, next := uint64(0), uint64(r.raftLog.lastIndex())+1
  558. if n == r.id {
  559. match = next - 1
  560. } else {
  561. match = 0
  562. }
  563. r.setProgress(n, match, next)
  564. log.Printf("raft: %x restored progress of %x [%s]", r.id, n, r.prs[n])
  565. }
  566. return true
  567. }
  568. func (r *raft) needSnapshot(i uint64) bool {
  569. return i < r.raftLog.firstIndex()
  570. }
  571. // promotable indicates whether state machine can be promoted to leader,
  572. // which is true when its own id is in progress list.
  573. func (r *raft) promotable() bool {
  574. _, ok := r.prs[r.id]
  575. return ok
  576. }
  577. func (r *raft) addNode(id uint64) {
  578. if _, ok := r.prs[id]; ok {
  579. // Ignore any redundant addNode calls (which can happen because the
  580. // initial bootstrapping entries are applied twice).
  581. return
  582. }
  583. r.setProgress(id, 0, r.raftLog.lastIndex()+1)
  584. r.pendingConf = false
  585. }
  586. func (r *raft) removeNode(id uint64) {
  587. r.delProgress(id)
  588. r.pendingConf = false
  589. }
  590. func (r *raft) resetPendingConf() { r.pendingConf = false }
  591. func (r *raft) setProgress(id, match, next uint64) {
  592. r.prs[id] = &Progress{Next: next, Match: match}
  593. }
  594. func (r *raft) delProgress(id uint64) {
  595. delete(r.prs, id)
  596. }
  597. func (r *raft) loadState(state pb.HardState) {
  598. if state.Commit < r.raftLog.committed || state.Commit > r.raftLog.lastIndex() {
  599. log.Panicf("raft: %x state.commit %d is out of range [%d, %d]", r.id, state.Commit, r.raftLog.committed, r.raftLog.lastIndex())
  600. }
  601. r.raftLog.committed = state.Commit
  602. r.Term = state.Term
  603. r.Vote = state.Vote
  604. r.Commit = state.Commit
  605. }
  606. // isElectionTimeout returns true if r.elapsed is greater than the
  607. // randomized election timeout in (electiontimeout, 2 * electiontimeout - 1).
  608. // Otherwise, it returns false.
  609. func (r *raft) isElectionTimeout() bool {
  610. d := r.elapsed - r.electionTimeout
  611. if d < 0 {
  612. return false
  613. }
  614. return d > r.rand.Int()%r.electionTimeout
  615. }