raft.go 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680
  1. /*
  2. Copyright 2014 CoreOS, Inc.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package raft
  14. import (
  15. "errors"
  16. "fmt"
  17. "log"
  18. "math/rand"
  19. "sort"
  20. "strings"
  21. pb "github.com/coreos/etcd/raft/raftpb"
  22. )
  23. // None is a placeholder node ID used when there is no leader.
  24. const None uint64 = 0
  25. var errNoLeader = errors.New("no leader")
  26. // Possible values for StateType.
  27. const (
  28. StateFollower StateType = iota
  29. StateCandidate
  30. StateLeader
  31. )
  32. // StateType represents the role of a node in a cluster.
  33. type StateType uint64
  34. var stmap = [...]string{
  35. "StateFollower",
  36. "StateCandidate",
  37. "StateLeader",
  38. }
  39. func (st StateType) String() string {
  40. return stmap[uint64(st)]
  41. }
  42. func (st StateType) MarshalJSON() ([]byte, error) {
  43. return []byte(fmt.Sprintf("%q", st.String())), nil
  44. }
  45. type progress struct {
  46. match, next uint64
  47. }
  48. func (pr *progress) update(n uint64) {
  49. if pr.match < n {
  50. pr.match = n
  51. }
  52. if pr.next < n+1 {
  53. pr.next = n + 1
  54. }
  55. }
  56. func (pr *progress) optimisticUpdate(n uint64) {
  57. pr.next = n + 1
  58. }
  59. // maybeDecrTo returns false if the given to index comes from an out of order message.
  60. // Otherwise it decreases the progress next index and returns true.
  61. func (pr *progress) maybeDecrTo(to uint64) bool {
  62. if pr.match != 0 {
  63. // the rejection must be stale if the progress has matched and "to"
  64. // is smaller than "match".
  65. if to <= pr.match {
  66. return false
  67. }
  68. // directly decrease next to match + 1
  69. pr.next = pr.match + 1
  70. return true
  71. }
  72. // the rejection must be stale if "to" does not match next - 1
  73. if pr.next-1 != to {
  74. return false
  75. }
  76. if pr.next--; pr.next < 1 {
  77. pr.next = 1
  78. }
  79. return true
  80. }
  81. func (pr *progress) String() string {
  82. return fmt.Sprintf("n=%d m=%d", pr.next, pr.match)
  83. }
  84. type raft struct {
  85. pb.HardState
  86. id uint64
  87. // the log
  88. raftLog *raftLog
  89. prs map[uint64]*progress
  90. state StateType
  91. votes map[uint64]bool
  92. msgs []pb.Message
  93. // the leader id
  94. lead uint64
  95. // New configuration is ignored if there exists unapplied configuration.
  96. pendingConf bool
  97. elapsed int // number of ticks since the last msg
  98. heartbeatTimeout int
  99. electionTimeout int
  100. rand *rand.Rand
  101. tick func()
  102. step stepFunc
  103. }
  104. func newRaft(id uint64, peers []uint64, election, heartbeat int, storage Storage) *raft {
  105. if id == None {
  106. panic("cannot use none id")
  107. }
  108. log := newLog(storage)
  109. hs, cs, err := storage.InitialState()
  110. if err != nil {
  111. panic(err) // TODO(bdarnell)
  112. }
  113. if len(cs.Nodes) > 0 {
  114. if len(peers) > 0 {
  115. // TODO(bdarnell): the peers argument is always nil except in
  116. // tests; the argument should be removed and these tests should be
  117. // updated to specify their nodes through a snapshot.
  118. panic("cannot specify both newRaft(peers) and ConfState.Nodes)")
  119. }
  120. peers = cs.Nodes
  121. }
  122. r := &raft{
  123. id: id,
  124. lead: None,
  125. raftLog: log,
  126. prs: make(map[uint64]*progress),
  127. electionTimeout: election,
  128. heartbeatTimeout: heartbeat,
  129. }
  130. r.rand = rand.New(rand.NewSource(int64(id)))
  131. for _, p := range peers {
  132. r.prs[p] = &progress{next: 1}
  133. }
  134. if !isHardStateEqual(hs, emptyState) {
  135. r.loadState(hs)
  136. }
  137. r.becomeFollower(r.Term, None)
  138. nodesStrs := make([]string, 0)
  139. for _, n := range r.nodes() {
  140. nodesStrs = append(nodesStrs, fmt.Sprintf("%x", n))
  141. }
  142. fmt.Printf("raft: newRaft %x [peers: [%s], term: %d, commit: %d, lastindex: %d, lastterm: %d]",
  143. r.id, strings.Join(nodesStrs, ","), r.Term, r.raftLog.committed, r.raftLog.lastIndex(), r.raftLog.lastTerm())
  144. return r
  145. }
  146. func (r *raft) hasLeader() bool { return r.lead != None }
  147. func (r *raft) leader() uint64 { return r.lead }
  148. func (r *raft) softState() *SoftState {
  149. return &SoftState{Lead: r.lead, RaftState: r.state, Nodes: r.nodes()}
  150. }
  151. func (r *raft) String() string {
  152. s := fmt.Sprintf(`state=%v term=%d`, r.state, r.Term)
  153. switch r.state {
  154. case StateFollower:
  155. s += fmt.Sprintf(" vote=%v lead=%v", r.Vote, r.lead)
  156. case StateCandidate:
  157. s += fmt.Sprintf(` votes="%v"`, r.votes)
  158. case StateLeader:
  159. s += fmt.Sprintf(` prs="%v"`, r.prs)
  160. }
  161. return s
  162. }
  163. func (r *raft) poll(id uint64, v bool) (granted int) {
  164. if v {
  165. log.Printf("raft: %x received vote from %x at term %d", r.id, id, r.Term)
  166. } else {
  167. log.Printf("raft: %x received vote rejection from %x at term %d", r.id, id, r.Term)
  168. }
  169. if _, ok := r.votes[id]; !ok {
  170. r.votes[id] = v
  171. }
  172. for _, vv := range r.votes {
  173. if vv {
  174. granted++
  175. }
  176. }
  177. return granted
  178. }
  179. // send persists state to stable storage and then sends to its mailbox.
  180. func (r *raft) send(m pb.Message) {
  181. m.From = r.id
  182. // do not attach term to MsgProp
  183. // proposals are a way to forward to the leader and
  184. // should be treated as local message.
  185. if m.Type != pb.MsgProp {
  186. m.Term = r.Term
  187. }
  188. r.msgs = append(r.msgs, m)
  189. }
  190. // sendAppend sends RRPC, with entries to the given peer.
  191. func (r *raft) sendAppend(to uint64) {
  192. pr := r.prs[to]
  193. m := pb.Message{}
  194. m.To = to
  195. if r.needSnapshot(pr.next) {
  196. m.Type = pb.MsgSnap
  197. snapshot, err := r.raftLog.snapshot()
  198. if err != nil {
  199. panic(err) // TODO(bdarnell)
  200. }
  201. if IsEmptySnap(snapshot) {
  202. panic("need non-empty snapshot")
  203. }
  204. m.Snapshot = snapshot
  205. sindex, sterm := snapshot.Metadata.Index, snapshot.Metadata.Term
  206. log.Printf("raft: %x [firstindex: %d, commit: %d] sent snapshot[index: %d, term: %d] to %x [match: %d, next: %d]",
  207. r.id, r.raftLog.firstIndex(), r.Commit, sindex, sterm, to, pr.match, pr.next)
  208. } else {
  209. m.Type = pb.MsgApp
  210. m.Index = pr.next - 1
  211. m.LogTerm = r.raftLog.term(pr.next - 1)
  212. m.Entries = r.raftLog.entries(pr.next)
  213. m.Commit = r.raftLog.committed
  214. // optimistically increase the next if the follower
  215. // has been matched.
  216. if n := len(m.Entries); pr.match != 0 && n != 0 {
  217. pr.optimisticUpdate(m.Entries[n-1].Index)
  218. }
  219. }
  220. r.send(m)
  221. }
  222. // sendHeartbeat sends an empty MsgApp
  223. func (r *raft) sendHeartbeat(to uint64) {
  224. // Attach the commit as min(to.matched, r.committed).
  225. // When the leader sends out heartbeat message,
  226. // the receiver(follower) might not be matched with the leader
  227. // or it might not have all the committed entries.
  228. // The leader MUST NOT forward the follower's commit to
  229. // an unmatched index.
  230. commit := min(r.prs[to].match, r.raftLog.committed)
  231. m := pb.Message{
  232. To: to,
  233. Type: pb.MsgHeartbeat,
  234. Commit: commit,
  235. }
  236. r.send(m)
  237. }
  238. // bcastAppend sends RRPC, with entries to all peers that are not up-to-date
  239. // according to the progress recorded in r.prs.
  240. func (r *raft) bcastAppend() {
  241. for i := range r.prs {
  242. if i == r.id {
  243. continue
  244. }
  245. r.sendAppend(i)
  246. }
  247. }
  248. // bcastHeartbeat sends RRPC, without entries to all the peers.
  249. func (r *raft) bcastHeartbeat() {
  250. for i := range r.prs {
  251. if i == r.id {
  252. continue
  253. }
  254. r.sendHeartbeat(i)
  255. }
  256. }
  257. func (r *raft) maybeCommit() bool {
  258. // TODO(bmizerany): optimize.. Currently naive
  259. mis := make(uint64Slice, 0, len(r.prs))
  260. for i := range r.prs {
  261. mis = append(mis, r.prs[i].match)
  262. }
  263. sort.Sort(sort.Reverse(mis))
  264. mci := mis[r.q()-1]
  265. return r.raftLog.maybeCommit(mci, r.Term)
  266. }
  267. func (r *raft) reset(term uint64) {
  268. r.Term = term
  269. r.lead = None
  270. r.Vote = None
  271. r.elapsed = 0
  272. r.votes = make(map[uint64]bool)
  273. for i := range r.prs {
  274. r.prs[i] = &progress{next: r.raftLog.lastIndex() + 1}
  275. if i == r.id {
  276. r.prs[i].match = r.raftLog.lastIndex()
  277. }
  278. }
  279. r.pendingConf = false
  280. }
  281. func (r *raft) q() int {
  282. return len(r.prs)/2 + 1
  283. }
  284. func (r *raft) appendEntry(e pb.Entry) {
  285. e.Term = r.Term
  286. e.Index = r.raftLog.lastIndex() + 1
  287. r.raftLog.append(e)
  288. r.prs[r.id].update(r.raftLog.lastIndex())
  289. r.maybeCommit()
  290. }
  291. // tickElection is run by followers and candidates after r.electionTimeout.
  292. func (r *raft) tickElection() {
  293. if !r.promotable() {
  294. r.elapsed = 0
  295. return
  296. }
  297. r.elapsed++
  298. if r.isElectionTimeout() {
  299. r.elapsed = 0
  300. r.Step(pb.Message{From: r.id, Type: pb.MsgHup})
  301. }
  302. }
  303. // tickHeartbeat is run by leaders to send a MsgBeat after r.heartbeatTimeout.
  304. func (r *raft) tickHeartbeat() {
  305. r.elapsed++
  306. if r.elapsed > r.heartbeatTimeout {
  307. r.elapsed = 0
  308. r.Step(pb.Message{From: r.id, Type: pb.MsgBeat})
  309. }
  310. }
  311. func (r *raft) becomeFollower(term uint64, lead uint64) {
  312. r.step = stepFollower
  313. r.reset(term)
  314. r.tick = r.tickElection
  315. r.lead = lead
  316. r.state = StateFollower
  317. log.Printf("raft: %x became follower at term %d", r.id, r.Term)
  318. }
  319. func (r *raft) becomeCandidate() {
  320. // TODO(xiangli) remove the panic when the raft implementation is stable
  321. if r.state == StateLeader {
  322. panic("invalid transition [leader -> candidate]")
  323. }
  324. r.step = stepCandidate
  325. r.reset(r.Term + 1)
  326. r.tick = r.tickElection
  327. r.Vote = r.id
  328. r.state = StateCandidate
  329. log.Printf("raft: %x became candidate at term %d", r.id, r.Term)
  330. }
  331. func (r *raft) becomeLeader() {
  332. // TODO(xiangli) remove the panic when the raft implementation is stable
  333. if r.state == StateFollower {
  334. panic("invalid transition [follower -> leader]")
  335. }
  336. r.step = stepLeader
  337. r.reset(r.Term)
  338. r.tick = r.tickHeartbeat
  339. r.lead = r.id
  340. r.state = StateLeader
  341. for _, e := range r.raftLog.entries(r.raftLog.committed + 1) {
  342. if e.Type != pb.EntryConfChange {
  343. continue
  344. }
  345. if r.pendingConf {
  346. panic("unexpected double uncommitted config entry")
  347. }
  348. r.pendingConf = true
  349. }
  350. r.appendEntry(pb.Entry{Data: nil})
  351. log.Printf("raft: %x became leader at term %d", r.id, r.Term)
  352. }
  353. func (r *raft) campaign() {
  354. r.becomeCandidate()
  355. if r.q() == r.poll(r.id, true) {
  356. r.becomeLeader()
  357. return
  358. }
  359. for i := range r.prs {
  360. if i == r.id {
  361. continue
  362. }
  363. log.Printf("raft: %x [logterm: %d, index: %d] sent vote request to %x at term %d",
  364. r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), i, r.Term)
  365. r.send(pb.Message{To: i, Type: pb.MsgVote, Index: r.raftLog.lastIndex(), LogTerm: r.raftLog.lastTerm()})
  366. }
  367. }
  368. func (r *raft) Step(m pb.Message) error {
  369. // TODO(bmizerany): this likely allocs - prevent that.
  370. defer func() { r.Commit = r.raftLog.committed }()
  371. if m.Type == pb.MsgHup {
  372. log.Printf("raft: %x is starting a new election at term %d", r.id, r.Term)
  373. r.campaign()
  374. return nil
  375. }
  376. switch {
  377. case m.Term == 0:
  378. // local message
  379. case m.Term > r.Term:
  380. lead := m.From
  381. if m.Type == pb.MsgVote {
  382. lead = None
  383. }
  384. log.Printf("raft: %x [term: %d] received a %s message with higher term from %x [term: %d]",
  385. r.id, r.Term, m.Type, m.From, m.Term)
  386. r.becomeFollower(m.Term, lead)
  387. case m.Term < r.Term:
  388. // ignore
  389. log.Printf("raft: %x [term: %d] ignored a %s message with lower term from %x [term: %d]",
  390. r.id, r.Term, m.Type, m.From, m.Term)
  391. return nil
  392. }
  393. r.step(r, m)
  394. return nil
  395. }
  396. func (r *raft) handleAppendEntries(m pb.Message) {
  397. if mlastIndex, ok := r.raftLog.maybeAppend(m.Index, m.LogTerm, m.Commit, m.Entries...); ok {
  398. r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: mlastIndex})
  399. } else {
  400. log.Printf("raft: %x [logterm: %d, index: %d] rejected msgApp [logterm: %d, index: %d] from %x",
  401. r.id, r.raftLog.term(m.Index), m.Index, m.LogTerm, m.Index, m.From)
  402. r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: m.Index, Reject: true})
  403. }
  404. }
  405. func (r *raft) handleHeartbeat(m pb.Message) {
  406. r.raftLog.commitTo(m.Commit)
  407. }
  408. func (r *raft) handleSnapshot(m pb.Message) {
  409. sindex, sterm := m.Snapshot.Metadata.Index, m.Snapshot.Metadata.Term
  410. if r.restore(m.Snapshot) {
  411. log.Printf("raft: %x [commit: %d] restored snapshot [index: %d, term: %d]",
  412. r.id, r.Commit, sindex, sterm)
  413. r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.lastIndex()})
  414. } else {
  415. log.Printf("raft: %x [commit: %d] ignored snapshot [index: %d, term: %d]",
  416. r.id, r.Commit, sindex, sterm)
  417. r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.committed})
  418. }
  419. }
  420. func (r *raft) resetPendingConf() { r.pendingConf = false }
  421. func (r *raft) addNode(id uint64) {
  422. if _, ok := r.prs[id]; ok {
  423. // Ignore any redundant addNode calls (which can happen because the
  424. // initial bootstrapping entries are applied twice).
  425. return
  426. }
  427. r.setProgress(id, 0, r.raftLog.lastIndex()+1)
  428. r.pendingConf = false
  429. }
  430. func (r *raft) removeNode(id uint64) {
  431. r.delProgress(id)
  432. r.pendingConf = false
  433. }
  434. type stepFunc func(r *raft, m pb.Message)
  435. func stepLeader(r *raft, m pb.Message) {
  436. switch m.Type {
  437. case pb.MsgBeat:
  438. r.bcastHeartbeat()
  439. case pb.MsgProp:
  440. if len(m.Entries) != 1 {
  441. panic("unexpected length(entries) of a MsgProp")
  442. }
  443. e := m.Entries[0]
  444. if e.Type == pb.EntryConfChange {
  445. if r.pendingConf {
  446. return
  447. }
  448. r.pendingConf = true
  449. }
  450. r.appendEntry(e)
  451. r.bcastAppend()
  452. case pb.MsgAppResp:
  453. if m.Reject {
  454. log.Printf("raft: %x received msgApp rejection from %x for index %d",
  455. r.id, m.From, m.Index)
  456. if r.prs[m.From].maybeDecrTo(m.Index) {
  457. r.sendAppend(m.From)
  458. }
  459. } else {
  460. r.prs[m.From].update(m.Index)
  461. if r.maybeCommit() {
  462. r.bcastAppend()
  463. }
  464. }
  465. case pb.MsgVote:
  466. log.Printf("raft: %x [logterm: %d, index: %d, vote: %x] rejected vote from %x [logterm: %d, index: %d] at term %x",
  467. r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.From, m.LogTerm, m.Index, r.Term)
  468. r.send(pb.Message{To: m.From, Type: pb.MsgVoteResp, Reject: true})
  469. }
  470. }
  471. func stepCandidate(r *raft, m pb.Message) {
  472. switch m.Type {
  473. case pb.MsgProp:
  474. panic("no leader")
  475. case pb.MsgApp:
  476. r.becomeFollower(r.Term, m.From)
  477. r.handleAppendEntries(m)
  478. case pb.MsgHeartbeat:
  479. r.becomeFollower(r.Term, m.From)
  480. r.handleHeartbeat(m)
  481. case pb.MsgSnap:
  482. r.becomeFollower(m.Term, m.From)
  483. r.handleSnapshot(m)
  484. case pb.MsgVote:
  485. log.Printf("raft: %x [logterm: %d, index: %d, vote: %x] rejected vote from %x [logterm: %d, index: %d] at term %x",
  486. r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.From, m.LogTerm, m.Index, r.Term)
  487. r.send(pb.Message{To: m.From, Type: pb.MsgVoteResp, Reject: true})
  488. case pb.MsgVoteResp:
  489. gr := r.poll(m.From, !m.Reject)
  490. log.Printf("raft: %x [q:%d] has received %d votes and %d vote rejections", r.id, r.q(), gr, len(r.votes)-gr)
  491. switch r.q() {
  492. case gr:
  493. r.becomeLeader()
  494. r.bcastAppend()
  495. case len(r.votes) - gr:
  496. r.becomeFollower(r.Term, None)
  497. }
  498. }
  499. }
  500. func stepFollower(r *raft, m pb.Message) {
  501. switch m.Type {
  502. case pb.MsgProp:
  503. if r.lead == None {
  504. panic("no leader")
  505. }
  506. m.To = r.lead
  507. r.send(m)
  508. case pb.MsgApp:
  509. r.elapsed = 0
  510. r.lead = m.From
  511. r.handleAppendEntries(m)
  512. case pb.MsgHeartbeat:
  513. r.elapsed = 0
  514. r.lead = m.From
  515. r.handleHeartbeat(m)
  516. case pb.MsgSnap:
  517. r.elapsed = 0
  518. r.handleSnapshot(m)
  519. case pb.MsgVote:
  520. if (r.Vote == None || r.Vote == m.From) && r.raftLog.isUpToDate(m.Index, m.LogTerm) {
  521. r.elapsed = 0
  522. log.Printf("raft: %x [logterm: %d, index: %d, vote: %x] voted for %x [logterm: %d, index: %d] at term %x",
  523. r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.From, m.LogTerm, m.Index, r.Term)
  524. r.Vote = m.From
  525. r.send(pb.Message{To: m.From, Type: pb.MsgVoteResp})
  526. } else {
  527. log.Printf("raft: %x [logterm: %d, index: %d, vote: %x] rejected vote from %x [logterm: %d, index: %d] at term %x",
  528. r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.From, m.LogTerm, m.Index, r.Term)
  529. r.send(pb.Message{To: m.From, Type: pb.MsgVoteResp, Reject: true})
  530. }
  531. }
  532. }
  533. // restore recovers the statemachine from a snapshot. It restores the log and the
  534. // configuration of statemachine.
  535. func (r *raft) restore(s pb.Snapshot) bool {
  536. if s.Metadata.Index <= r.raftLog.committed {
  537. return false
  538. }
  539. if r.raftLog.matchTerm(s.Metadata.Index, s.Metadata.Term) {
  540. log.Printf("raft: %x [commit: %d, lastindex: %d, lastterm: %d] fast-forwarded commit to snapshot [index: %d, term: %d]",
  541. r.id, r.Commit, r.raftLog.lastIndex(), r.raftLog.lastTerm(), s.Metadata.Index, s.Metadata.Term)
  542. r.raftLog.commitTo(s.Metadata.Index)
  543. return false
  544. }
  545. log.Printf("raft: %x [commit: %d, lastindex: %d, lastterm: %d] starts to restore snapshot [index: %d, term: %d]",
  546. r.id, r.Commit, r.raftLog.lastIndex(), r.raftLog.lastTerm(), s.Metadata.Index, s.Metadata.Term)
  547. r.raftLog.restore(s)
  548. r.prs = make(map[uint64]*progress)
  549. for _, n := range s.Metadata.ConfState.Nodes {
  550. match, next := uint64(0), uint64(r.raftLog.lastIndex())+1
  551. if n == r.id {
  552. match = next - 1
  553. } else {
  554. match = 0
  555. }
  556. log.Printf("raft: %x restored progress of %x [match: %d, next: %d]", r.id, n, match, next)
  557. r.setProgress(n, match, next)
  558. }
  559. return true
  560. }
  561. func (r *raft) needSnapshot(i uint64) bool {
  562. return i < r.raftLog.firstIndex()
  563. }
  564. func (r *raft) nodes() []uint64 {
  565. nodes := make([]uint64, 0, len(r.prs))
  566. for k := range r.prs {
  567. nodes = append(nodes, k)
  568. }
  569. sort.Sort(uint64Slice(nodes))
  570. return nodes
  571. }
  572. func (r *raft) setProgress(id, match, next uint64) {
  573. r.prs[id] = &progress{next: next, match: match}
  574. }
  575. func (r *raft) delProgress(id uint64) {
  576. delete(r.prs, id)
  577. }
  578. // promotable indicates whether state machine can be promoted to leader,
  579. // which is true when its own id is in progress list.
  580. func (r *raft) promotable() bool {
  581. _, ok := r.prs[r.id]
  582. return ok
  583. }
  584. func (r *raft) loadState(state pb.HardState) {
  585. if state.Commit < r.raftLog.committed || state.Commit > r.raftLog.lastIndex() {
  586. log.Panicf("raft: %x state.commit %d is out of range [%d, %d]", r.id, state.Commit, r.raftLog.committed, r.raftLog.lastIndex())
  587. }
  588. r.raftLog.committed = state.Commit
  589. r.Term = state.Term
  590. r.Vote = state.Vote
  591. r.Commit = state.Commit
  592. }
  593. // isElectionTimeout returns true if r.elapsed is greater than the
  594. // randomized election timeout in (electiontimeout, 2 * electiontimeout - 1).
  595. // Otherwise, it returns false.
  596. func (r *raft) isElectionTimeout() bool {
  597. d := r.elapsed - r.electionTimeout
  598. if d < 0 {
  599. return false
  600. }
  601. return d > r.rand.Int()%r.electionTimeout
  602. }