raft.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611
  1. /*
  2. Copyright 2014 CoreOS, Inc.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package raft
  14. import (
  15. "errors"
  16. "fmt"
  17. "math/rand"
  18. "sort"
  19. pb "github.com/coreos/etcd/raft/raftpb"
  20. )
  21. // None is a placeholder node ID used when there is no leader.
  22. const None uint64 = 0
  23. var errNoLeader = errors.New("no leader")
  24. // Possible values for StateType.
  25. const (
  26. StateFollower StateType = iota
  27. StateCandidate
  28. StateLeader
  29. )
  30. // StateType represents the role of a node in a cluster.
  31. type StateType uint64
  32. var stmap = [...]string{
  33. "StateFollower",
  34. "StateCandidate",
  35. "StateLeader",
  36. }
  37. func (st StateType) String() string {
  38. return stmap[uint64(st)]
  39. }
  40. func (st StateType) MarshalJSON() ([]byte, error) {
  41. return []byte(fmt.Sprintf("%q", st.String())), nil
  42. }
  43. type progress struct {
  44. match, next uint64
  45. }
  46. func (pr *progress) update(n uint64) {
  47. if pr.match < n {
  48. pr.match = n
  49. }
  50. if pr.next < n+1 {
  51. pr.next = n + 1
  52. }
  53. }
  54. func (pr *progress) optimisticUpdate(n uint64) {
  55. pr.next = n + 1
  56. }
  57. // maybeDecrTo returns false if the given to index comes from an out of order message.
  58. // Otherwise it decreases the progress next index and returns true.
  59. func (pr *progress) maybeDecrTo(to uint64) bool {
  60. if pr.match != 0 {
  61. // the rejection must be stale if the progress has matched and "to"
  62. // is smaller than "match".
  63. if to <= pr.match {
  64. return false
  65. }
  66. // directly decrease next to match + 1
  67. pr.next = pr.match + 1
  68. return true
  69. }
  70. // the rejection must be stale if "to" does not match next - 1
  71. if pr.next-1 != to {
  72. return false
  73. }
  74. if pr.next--; pr.next < 1 {
  75. pr.next = 1
  76. }
  77. return true
  78. }
  79. func (pr *progress) String() string {
  80. return fmt.Sprintf("n=%d m=%d", pr.next, pr.match)
  81. }
  82. type raft struct {
  83. pb.HardState
  84. id uint64
  85. // the log
  86. raftLog *raftLog
  87. prs map[uint64]*progress
  88. state StateType
  89. votes map[uint64]bool
  90. msgs []pb.Message
  91. // the leader id
  92. lead uint64
  93. // New configuration is ignored if there exists unapplied configuration.
  94. pendingConf bool
  95. elapsed int // number of ticks since the last msg
  96. heartbeatTimeout int
  97. electionTimeout int
  98. rand *rand.Rand
  99. tick func()
  100. step stepFunc
  101. }
  102. func newRaft(id uint64, peers []uint64, election, heartbeat int, storage Storage) *raft {
  103. if id == None {
  104. panic("cannot use none id")
  105. }
  106. log := newLog(storage)
  107. hs, cs, err := storage.InitialState()
  108. if err != nil {
  109. panic(err) // TODO(bdarnell)
  110. }
  111. if len(cs.Nodes) > 0 {
  112. if len(peers) > 0 {
  113. // TODO(bdarnell): the peers argument is always nil except in
  114. // tests; the argument should be removed and these tests should be
  115. // updated to specify their nodes through a snapshot.
  116. panic("cannot specify both newRaft(peers) and ConfState.Nodes)")
  117. }
  118. peers = cs.Nodes
  119. }
  120. r := &raft{
  121. id: id,
  122. lead: None,
  123. raftLog: log,
  124. prs: make(map[uint64]*progress),
  125. electionTimeout: election,
  126. heartbeatTimeout: heartbeat,
  127. }
  128. r.rand = rand.New(rand.NewSource(int64(id)))
  129. for _, p := range peers {
  130. r.prs[p] = &progress{next: 1}
  131. }
  132. if !isHardStateEqual(hs, emptyState) {
  133. r.loadState(hs)
  134. }
  135. r.becomeFollower(r.Term, None)
  136. return r
  137. }
  138. func (r *raft) hasLeader() bool { return r.lead != None }
  139. func (r *raft) leader() uint64 { return r.lead }
  140. func (r *raft) softState() *SoftState {
  141. return &SoftState{Lead: r.lead, RaftState: r.state, Nodes: r.nodes()}
  142. }
  143. func (r *raft) String() string {
  144. s := fmt.Sprintf(`state=%v term=%d`, r.state, r.Term)
  145. switch r.state {
  146. case StateFollower:
  147. s += fmt.Sprintf(" vote=%v lead=%v", r.Vote, r.lead)
  148. case StateCandidate:
  149. s += fmt.Sprintf(` votes="%v"`, r.votes)
  150. case StateLeader:
  151. s += fmt.Sprintf(` prs="%v"`, r.prs)
  152. }
  153. return s
  154. }
  155. func (r *raft) poll(id uint64, v bool) (granted int) {
  156. if _, ok := r.votes[id]; !ok {
  157. r.votes[id] = v
  158. }
  159. for _, vv := range r.votes {
  160. if vv {
  161. granted++
  162. }
  163. }
  164. return granted
  165. }
  166. // send persists state to stable storage and then sends to its mailbox.
  167. func (r *raft) send(m pb.Message) {
  168. m.From = r.id
  169. // do not attach term to MsgProp
  170. // proposals are a way to forward to the leader and
  171. // should be treated as local message.
  172. if m.Type != pb.MsgProp {
  173. m.Term = r.Term
  174. }
  175. r.msgs = append(r.msgs, m)
  176. }
  177. // sendAppend sends RRPC, with entries to the given peer.
  178. func (r *raft) sendAppend(to uint64) {
  179. pr := r.prs[to]
  180. m := pb.Message{}
  181. m.To = to
  182. if r.needSnapshot(pr.next) {
  183. m.Type = pb.MsgSnap
  184. snapshot, err := r.raftLog.snapshot()
  185. if err != nil {
  186. panic(err) // TODO(bdarnell)
  187. }
  188. if IsEmptySnap(snapshot) {
  189. panic("need non-empty snapshot")
  190. }
  191. m.Snapshot = snapshot
  192. } else {
  193. m.Type = pb.MsgApp
  194. m.Index = pr.next - 1
  195. m.LogTerm = r.raftLog.term(pr.next - 1)
  196. m.Entries = r.raftLog.entries(pr.next)
  197. m.Commit = r.raftLog.committed
  198. // optimistically increase the next if the follower
  199. // has been matched.
  200. if n := len(m.Entries); pr.match != 0 && n != 0 {
  201. pr.optimisticUpdate(m.Entries[n-1].Index)
  202. }
  203. }
  204. r.send(m)
  205. }
  206. // sendHeartbeat sends an empty MsgApp
  207. func (r *raft) sendHeartbeat(to uint64) {
  208. // Attach the commit as min(to.matched, r.committed).
  209. // When the leader sends out heartbeat message,
  210. // the receiver(follower) might not be matched with the leader
  211. // or it might not have all the committed entries.
  212. // The leader MUST NOT forward the follower's commit to
  213. // an unmatched index.
  214. commit := min(r.prs[to].match, r.raftLog.committed)
  215. m := pb.Message{
  216. To: to,
  217. Type: pb.MsgApp,
  218. Commit: commit,
  219. }
  220. r.send(m)
  221. }
  222. // bcastAppend sends RRPC, with entries to all peers that are not up-to-date
  223. // according to the progress recorded in r.prs.
  224. func (r *raft) bcastAppend() {
  225. for i := range r.prs {
  226. if i == r.id {
  227. continue
  228. }
  229. r.sendAppend(i)
  230. }
  231. }
  232. // bcastHeartbeat sends RRPC, without entries to all the peers.
  233. func (r *raft) bcastHeartbeat() {
  234. for i := range r.prs {
  235. if i == r.id {
  236. continue
  237. }
  238. r.sendHeartbeat(i)
  239. }
  240. }
  241. func (r *raft) maybeCommit() bool {
  242. // TODO(bmizerany): optimize.. Currently naive
  243. mis := make(uint64Slice, 0, len(r.prs))
  244. for i := range r.prs {
  245. mis = append(mis, r.prs[i].match)
  246. }
  247. sort.Sort(sort.Reverse(mis))
  248. mci := mis[r.q()-1]
  249. return r.raftLog.maybeCommit(mci, r.Term)
  250. }
  251. func (r *raft) reset(term uint64) {
  252. r.Term = term
  253. r.lead = None
  254. r.Vote = None
  255. r.elapsed = 0
  256. r.votes = make(map[uint64]bool)
  257. for i := range r.prs {
  258. r.prs[i] = &progress{next: r.raftLog.lastIndex() + 1}
  259. if i == r.id {
  260. r.prs[i].match = r.raftLog.lastIndex()
  261. }
  262. }
  263. r.pendingConf = false
  264. }
  265. func (r *raft) q() int {
  266. return len(r.prs)/2 + 1
  267. }
  268. func (r *raft) appendEntry(e pb.Entry) {
  269. e.Term = r.Term
  270. e.Index = r.raftLog.lastIndex() + 1
  271. r.raftLog.append(r.raftLog.lastIndex(), e)
  272. r.prs[r.id].update(r.raftLog.lastIndex())
  273. r.maybeCommit()
  274. }
  275. // tickElection is run by followers and candidates after r.electionTimeout.
  276. func (r *raft) tickElection() {
  277. if !r.promotable() {
  278. r.elapsed = 0
  279. return
  280. }
  281. r.elapsed++
  282. if r.isElectionTimeout() {
  283. r.elapsed = 0
  284. r.Step(pb.Message{From: r.id, Type: pb.MsgHup})
  285. }
  286. }
  287. // tickHeartbeat is run by leaders to send a MsgBeat after r.heartbeatTimeout.
  288. func (r *raft) tickHeartbeat() {
  289. r.elapsed++
  290. if r.elapsed > r.heartbeatTimeout {
  291. r.elapsed = 0
  292. r.Step(pb.Message{From: r.id, Type: pb.MsgBeat})
  293. }
  294. }
  295. func (r *raft) becomeFollower(term uint64, lead uint64) {
  296. r.step = stepFollower
  297. r.reset(term)
  298. r.tick = r.tickElection
  299. r.lead = lead
  300. r.state = StateFollower
  301. }
  302. func (r *raft) becomeCandidate() {
  303. // TODO(xiangli) remove the panic when the raft implementation is stable
  304. if r.state == StateLeader {
  305. panic("invalid transition [leader -> candidate]")
  306. }
  307. r.step = stepCandidate
  308. r.reset(r.Term + 1)
  309. r.tick = r.tickElection
  310. r.Vote = r.id
  311. r.state = StateCandidate
  312. }
  313. func (r *raft) becomeLeader() {
  314. // TODO(xiangli) remove the panic when the raft implementation is stable
  315. if r.state == StateFollower {
  316. panic("invalid transition [follower -> leader]")
  317. }
  318. r.step = stepLeader
  319. r.reset(r.Term)
  320. r.tick = r.tickHeartbeat
  321. r.lead = r.id
  322. r.state = StateLeader
  323. for _, e := range r.raftLog.entries(r.raftLog.committed + 1) {
  324. if e.Type != pb.EntryConfChange {
  325. continue
  326. }
  327. if r.pendingConf {
  328. panic("unexpected double uncommitted config entry")
  329. }
  330. r.pendingConf = true
  331. }
  332. r.appendEntry(pb.Entry{Data: nil})
  333. }
  334. func (r *raft) campaign() {
  335. r.becomeCandidate()
  336. if r.q() == r.poll(r.id, true) {
  337. r.becomeLeader()
  338. }
  339. for i := range r.prs {
  340. if i == r.id {
  341. continue
  342. }
  343. r.send(pb.Message{To: i, Type: pb.MsgVote, Index: r.raftLog.lastIndex(), LogTerm: r.raftLog.lastTerm()})
  344. }
  345. }
  346. func (r *raft) Step(m pb.Message) error {
  347. // TODO(bmizerany): this likely allocs - prevent that.
  348. defer func() { r.Commit = r.raftLog.committed }()
  349. if m.Type == pb.MsgHup {
  350. r.campaign()
  351. }
  352. switch {
  353. case m.Term == 0:
  354. // local message
  355. case m.Term > r.Term:
  356. lead := m.From
  357. if m.Type == pb.MsgVote {
  358. lead = None
  359. }
  360. r.becomeFollower(m.Term, lead)
  361. case m.Term < r.Term:
  362. // ignore
  363. return nil
  364. }
  365. r.step(r, m)
  366. return nil
  367. }
  368. func (r *raft) handleAppendEntries(m pb.Message) {
  369. if mlastIndex, ok := r.raftLog.maybeAppend(m.Index, m.LogTerm, m.Commit, m.Entries...); ok {
  370. r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: mlastIndex})
  371. } else {
  372. r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: m.Index, Reject: true})
  373. }
  374. }
  375. func (r *raft) handleHeartbeat(m pb.Message) {
  376. r.raftLog.commitTo(m.Commit)
  377. }
  378. func (r *raft) handleSnapshot(m pb.Message) {
  379. if r.restore(m.Snapshot) {
  380. r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.lastIndex()})
  381. } else {
  382. r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.committed})
  383. }
  384. }
  385. func (r *raft) resetPendingConf() { r.pendingConf = false }
  386. func (r *raft) addNode(id uint64) {
  387. r.setProgress(id, 0, r.raftLog.lastIndex()+1)
  388. r.pendingConf = false
  389. }
  390. func (r *raft) removeNode(id uint64) {
  391. r.delProgress(id)
  392. r.pendingConf = false
  393. }
  394. type stepFunc func(r *raft, m pb.Message)
  395. func stepLeader(r *raft, m pb.Message) {
  396. switch m.Type {
  397. case pb.MsgBeat:
  398. r.bcastHeartbeat()
  399. case pb.MsgProp:
  400. if len(m.Entries) != 1 {
  401. panic("unexpected length(entries) of a MsgProp")
  402. }
  403. e := m.Entries[0]
  404. if e.Type == pb.EntryConfChange {
  405. if r.pendingConf {
  406. return
  407. }
  408. r.pendingConf = true
  409. }
  410. r.appendEntry(e)
  411. r.bcastAppend()
  412. case pb.MsgAppResp:
  413. if m.Index == 0 {
  414. return
  415. }
  416. if m.Reject {
  417. if r.prs[m.From].maybeDecrTo(m.Index) {
  418. r.sendAppend(m.From)
  419. }
  420. } else {
  421. r.prs[m.From].update(m.Index)
  422. if r.maybeCommit() {
  423. r.bcastAppend()
  424. }
  425. }
  426. case pb.MsgVote:
  427. r.send(pb.Message{To: m.From, Type: pb.MsgVoteResp, Reject: true})
  428. }
  429. }
  430. func stepCandidate(r *raft, m pb.Message) {
  431. switch m.Type {
  432. case pb.MsgProp:
  433. panic("no leader")
  434. case pb.MsgApp:
  435. r.becomeFollower(r.Term, m.From)
  436. r.handleAppendEntries(m)
  437. case pb.MsgSnap:
  438. r.becomeFollower(m.Term, m.From)
  439. r.handleSnapshot(m)
  440. case pb.MsgVote:
  441. r.send(pb.Message{To: m.From, Type: pb.MsgVoteResp, Reject: true})
  442. case pb.MsgVoteResp:
  443. gr := r.poll(m.From, !m.Reject)
  444. switch r.q() {
  445. case gr:
  446. r.becomeLeader()
  447. r.bcastAppend()
  448. case len(r.votes) - gr:
  449. r.becomeFollower(r.Term, None)
  450. }
  451. }
  452. }
  453. func stepFollower(r *raft, m pb.Message) {
  454. switch m.Type {
  455. case pb.MsgProp:
  456. if r.lead == None {
  457. panic("no leader")
  458. }
  459. m.To = r.lead
  460. r.send(m)
  461. case pb.MsgApp:
  462. r.elapsed = 0
  463. r.lead = m.From
  464. if m.LogTerm == 0 && m.Index == 0 && len(m.Entries) == 0 {
  465. r.handleHeartbeat(m)
  466. } else {
  467. r.handleAppendEntries(m)
  468. }
  469. case pb.MsgSnap:
  470. r.elapsed = 0
  471. r.handleSnapshot(m)
  472. case pb.MsgVote:
  473. if (r.Vote == None || r.Vote == m.From) && r.raftLog.isUpToDate(m.Index, m.LogTerm) {
  474. r.elapsed = 0
  475. r.Vote = m.From
  476. r.send(pb.Message{To: m.From, Type: pb.MsgVoteResp})
  477. } else {
  478. r.send(pb.Message{To: m.From, Type: pb.MsgVoteResp, Reject: true})
  479. }
  480. }
  481. }
  482. // restore recovers the statemachine from a snapshot. It restores the log and the
  483. // configuration of statemachine.
  484. func (r *raft) restore(s pb.Snapshot) bool {
  485. if s.Metadata.Index <= r.raftLog.committed {
  486. return false
  487. }
  488. r.raftLog.restore(s)
  489. r.prs = make(map[uint64]*progress)
  490. for _, n := range s.Metadata.ConfState.Nodes {
  491. if n == r.id {
  492. r.setProgress(n, r.raftLog.lastIndex(), r.raftLog.lastIndex()+1)
  493. } else {
  494. r.setProgress(n, 0, r.raftLog.lastIndex()+1)
  495. }
  496. }
  497. return true
  498. }
  499. func (r *raft) needSnapshot(i uint64) bool {
  500. return i < r.raftLog.firstIndex()
  501. }
  502. func (r *raft) nodes() []uint64 {
  503. nodes := make([]uint64, 0, len(r.prs))
  504. for k := range r.prs {
  505. nodes = append(nodes, k)
  506. }
  507. sort.Sort(uint64Slice(nodes))
  508. return nodes
  509. }
  510. func (r *raft) setProgress(id, match, next uint64) {
  511. r.prs[id] = &progress{next: next, match: match}
  512. }
  513. func (r *raft) delProgress(id uint64) {
  514. delete(r.prs, id)
  515. }
  516. // promotable indicates whether state machine can be promoted to leader,
  517. // which is true when its own id is in progress list.
  518. func (r *raft) promotable() bool {
  519. _, ok := r.prs[r.id]
  520. return ok
  521. }
  522. func (r *raft) loadState(state pb.HardState) {
  523. r.raftLog.committed = state.Commit
  524. r.Term = state.Term
  525. r.Vote = state.Vote
  526. r.Commit = state.Commit
  527. }
  528. // isElectionTimeout returns true if r.elapsed is greater than the
  529. // randomized election timeout in (electiontimeout, 2 * electiontimeout - 1).
  530. // Otherwise, it returns false.
  531. func (r *raft) isElectionTimeout() bool {
  532. d := r.elapsed - r.electionTimeout
  533. if d < 0 {
  534. return false
  535. }
  536. return d > r.rand.Int()%r.electionTimeout
  537. }