raft.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573
  1. /*
  2. Copyright 2014 CoreOS, Inc.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package raft
  14. import (
  15. "errors"
  16. "fmt"
  17. "math/rand"
  18. "sort"
  19. pb "github.com/coreos/etcd/raft/raftpb"
  20. )
  21. // None is a placeholder node ID used when there is no leader.
  22. const None uint64 = 0
  23. var errNoLeader = errors.New("no leader")
  24. // Possible values for StateType.
  25. const (
  26. StateFollower StateType = iota
  27. StateCandidate
  28. StateLeader
  29. )
  30. // StateType represents the role of a node in a cluster.
  31. type StateType uint64
  32. var stmap = [...]string{
  33. "StateFollower",
  34. "StateCandidate",
  35. "StateLeader",
  36. }
  37. func (st StateType) String() string {
  38. return stmap[uint64(st)]
  39. }
  40. func (st StateType) MarshalJSON() ([]byte, error) {
  41. return []byte(fmt.Sprintf("%q", st.String())), nil
  42. }
  43. type progress struct {
  44. match, next uint64
  45. }
  46. func (pr *progress) update(n uint64) {
  47. pr.match = n
  48. pr.next = n + 1
  49. }
  50. // maybeDecrTo returns false if the given to index comes from an out of order message.
  51. // Otherwise it decreases the progress next index and returns true.
  52. func (pr *progress) maybeDecrTo(to uint64) bool {
  53. // the rejection must be stale if the progress has matched with
  54. // follower or "to" does not match next - 1
  55. if pr.match != 0 || pr.next-1 != to {
  56. return false
  57. }
  58. if pr.next--; pr.next < 1 {
  59. pr.next = 1
  60. }
  61. return true
  62. }
  63. func (pr *progress) String() string {
  64. return fmt.Sprintf("n=%d m=%d", pr.next, pr.match)
  65. }
  66. // uint64Slice implements sort interface
  67. type uint64Slice []uint64
  68. func (p uint64Slice) Len() int { return len(p) }
  69. func (p uint64Slice) Less(i, j int) bool { return p[i] < p[j] }
  70. func (p uint64Slice) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
  71. type raft struct {
  72. pb.HardState
  73. id uint64
  74. // the log
  75. raftLog *raftLog
  76. prs map[uint64]*progress
  77. state StateType
  78. votes map[uint64]bool
  79. msgs []pb.Message
  80. // the leader id
  81. lead uint64
  82. // New configuration is ignored if there exists unapplied configuration.
  83. pendingConf bool
  84. elapsed int // number of ticks since the last msg
  85. heartbeatTimeout int
  86. electionTimeout int
  87. rand *rand.Rand
  88. tick func()
  89. step stepFunc
  90. }
  91. func newRaft(id uint64, peers []uint64, election, heartbeat int) *raft {
  92. if id == None {
  93. panic("cannot use none id")
  94. }
  95. r := &raft{
  96. id: id,
  97. lead: None,
  98. raftLog: newLog(),
  99. prs: make(map[uint64]*progress),
  100. electionTimeout: election,
  101. heartbeatTimeout: heartbeat,
  102. }
  103. r.rand = rand.New(rand.NewSource(int64(id)))
  104. for _, p := range peers {
  105. r.prs[p] = &progress{}
  106. }
  107. r.becomeFollower(0, None)
  108. return r
  109. }
  110. func (r *raft) hasLeader() bool { return r.lead != None }
  111. func (r *raft) softState() *SoftState {
  112. return &SoftState{Lead: r.lead, RaftState: r.state, Nodes: r.nodes()}
  113. }
  114. func (r *raft) String() string {
  115. s := fmt.Sprintf(`state=%v term=%d`, r.state, r.Term)
  116. switch r.state {
  117. case StateFollower:
  118. s += fmt.Sprintf(" vote=%v lead=%v", r.Vote, r.lead)
  119. case StateCandidate:
  120. s += fmt.Sprintf(` votes="%v"`, r.votes)
  121. case StateLeader:
  122. s += fmt.Sprintf(` prs="%v"`, r.prs)
  123. }
  124. return s
  125. }
  126. func (r *raft) poll(id uint64, v bool) (granted int) {
  127. if _, ok := r.votes[id]; !ok {
  128. r.votes[id] = v
  129. }
  130. for _, vv := range r.votes {
  131. if vv {
  132. granted++
  133. }
  134. }
  135. return granted
  136. }
  137. // send persists state to stable storage and then sends to its mailbox.
  138. func (r *raft) send(m pb.Message) {
  139. m.From = r.id
  140. // do not attach term to MsgProp
  141. // proposals are a way to forward to the leader and
  142. // should be treated as local message.
  143. if m.Type != pb.MsgProp {
  144. m.Term = r.Term
  145. }
  146. r.msgs = append(r.msgs, m)
  147. }
  148. // sendAppend sends RRPC, with entries to the given peer.
  149. func (r *raft) sendAppend(to uint64) {
  150. pr := r.prs[to]
  151. m := pb.Message{}
  152. m.To = to
  153. m.Index = pr.next - 1
  154. if r.needSnapshot(m.Index) {
  155. m.Type = pb.MsgSnap
  156. m.Snapshot = r.raftLog.snapshot
  157. } else {
  158. m.Type = pb.MsgApp
  159. m.LogTerm = r.raftLog.term(pr.next - 1)
  160. m.Entries = r.raftLog.entries(pr.next)
  161. m.Commit = r.raftLog.committed
  162. }
  163. r.send(m)
  164. }
  165. // sendHeartbeat sends an empty MsgApp
  166. func (r *raft) sendHeartbeat(to uint64) {
  167. m := pb.Message{
  168. To: to,
  169. Type: pb.MsgApp,
  170. }
  171. r.send(m)
  172. }
  173. // bcastAppend sends RRPC, with entries to all peers that are not up-to-date
  174. // according to the progress recorded in r.prs.
  175. func (r *raft) bcastAppend() {
  176. for i := range r.prs {
  177. if i == r.id {
  178. continue
  179. }
  180. r.sendAppend(i)
  181. }
  182. }
  183. // bcastHeartbeat sends RRPC, without entries to all the peers.
  184. func (r *raft) bcastHeartbeat() {
  185. for i := range r.prs {
  186. if i == r.id {
  187. continue
  188. }
  189. r.sendHeartbeat(i)
  190. }
  191. }
  192. func (r *raft) maybeCommit() bool {
  193. // TODO(bmizerany): optimize.. Currently naive
  194. mis := make(uint64Slice, 0, len(r.prs))
  195. for i := range r.prs {
  196. mis = append(mis, r.prs[i].match)
  197. }
  198. sort.Sort(sort.Reverse(mis))
  199. mci := mis[r.q()-1]
  200. return r.raftLog.maybeCommit(mci, r.Term)
  201. }
  202. func (r *raft) reset(term uint64) {
  203. r.Term = term
  204. r.lead = None
  205. r.Vote = None
  206. r.elapsed = 0
  207. r.votes = make(map[uint64]bool)
  208. for i := range r.prs {
  209. r.prs[i] = &progress{next: r.raftLog.lastIndex() + 1}
  210. if i == r.id {
  211. r.prs[i].match = r.raftLog.lastIndex()
  212. }
  213. }
  214. r.pendingConf = false
  215. }
  216. func (r *raft) q() int {
  217. return len(r.prs)/2 + 1
  218. }
  219. func (r *raft) appendEntry(e pb.Entry) {
  220. e.Term = r.Term
  221. e.Index = r.raftLog.lastIndex() + 1
  222. r.raftLog.append(r.raftLog.lastIndex(), e)
  223. r.prs[r.id].update(r.raftLog.lastIndex())
  224. r.maybeCommit()
  225. }
  226. // tickElection is run by followers and candidates after r.electionTimeout.
  227. func (r *raft) tickElection() {
  228. if !r.promotable() {
  229. r.elapsed = 0
  230. return
  231. }
  232. r.elapsed++
  233. if r.isElectionTimeout() {
  234. r.elapsed = 0
  235. r.Step(pb.Message{From: r.id, Type: pb.MsgHup})
  236. }
  237. }
  238. // tickHeartbeat is run by leaders to send a MsgBeat after r.heartbeatTimeout.
  239. func (r *raft) tickHeartbeat() {
  240. r.elapsed++
  241. if r.elapsed > r.heartbeatTimeout {
  242. r.elapsed = 0
  243. r.Step(pb.Message{From: r.id, Type: pb.MsgBeat})
  244. }
  245. }
  246. func (r *raft) becomeFollower(term uint64, lead uint64) {
  247. r.step = stepFollower
  248. r.reset(term)
  249. r.tick = r.tickElection
  250. r.lead = lead
  251. r.state = StateFollower
  252. }
  253. func (r *raft) becomeCandidate() {
  254. // TODO(xiangli) remove the panic when the raft implementation is stable
  255. if r.state == StateLeader {
  256. panic("invalid transition [leader -> candidate]")
  257. }
  258. r.step = stepCandidate
  259. r.reset(r.Term + 1)
  260. r.tick = r.tickElection
  261. r.Vote = r.id
  262. r.state = StateCandidate
  263. }
  264. func (r *raft) becomeLeader() {
  265. // TODO(xiangli) remove the panic when the raft implementation is stable
  266. if r.state == StateFollower {
  267. panic("invalid transition [follower -> leader]")
  268. }
  269. r.step = stepLeader
  270. r.reset(r.Term)
  271. r.tick = r.tickHeartbeat
  272. r.lead = r.id
  273. r.state = StateLeader
  274. for _, e := range r.raftLog.entries(r.raftLog.committed + 1) {
  275. if e.Type != pb.EntryConfChange {
  276. continue
  277. }
  278. if r.pendingConf {
  279. panic("unexpected double uncommitted config entry")
  280. }
  281. r.pendingConf = true
  282. }
  283. r.appendEntry(pb.Entry{Data: nil})
  284. }
  285. func (r *raft) campaign() {
  286. r.becomeCandidate()
  287. if r.q() == r.poll(r.id, true) {
  288. r.becomeLeader()
  289. }
  290. for i := range r.prs {
  291. if i == r.id {
  292. continue
  293. }
  294. r.send(pb.Message{To: i, Type: pb.MsgVote, Index: r.raftLog.lastIndex(), LogTerm: r.raftLog.lastTerm()})
  295. }
  296. }
  297. func (r *raft) Step(m pb.Message) error {
  298. // TODO(bmizerany): this likely allocs - prevent that.
  299. defer func() { r.Commit = r.raftLog.committed }()
  300. if m.Type == pb.MsgHup {
  301. r.campaign()
  302. }
  303. switch {
  304. case m.Term == 0:
  305. // local message
  306. case m.Term > r.Term:
  307. lead := m.From
  308. if m.Type == pb.MsgVote {
  309. lead = None
  310. }
  311. r.becomeFollower(m.Term, lead)
  312. case m.Term < r.Term:
  313. // ignore
  314. return nil
  315. }
  316. r.step(r, m)
  317. return nil
  318. }
  319. func (r *raft) handleAppendEntries(m pb.Message) {
  320. if mlastIndex, ok := r.raftLog.maybeAppend(m.Index, m.LogTerm, m.Commit, m.Entries...); ok {
  321. r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: mlastIndex})
  322. } else {
  323. r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: m.Index, Reject: true})
  324. }
  325. }
  326. func (r *raft) handleSnapshot(m pb.Message) {
  327. if r.restore(m.Snapshot) {
  328. r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.lastIndex()})
  329. } else {
  330. r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.committed})
  331. }
  332. }
  333. func (r *raft) resetPendingConf() {
  334. r.pendingConf = false
  335. }
  336. func (r *raft) addNode(id uint64) {
  337. r.setProgress(id, 0, r.raftLog.lastIndex()+1)
  338. r.pendingConf = false
  339. }
  340. func (r *raft) removeNode(id uint64) {
  341. r.delProgress(id)
  342. r.pendingConf = false
  343. }
  344. type stepFunc func(r *raft, m pb.Message)
  345. func stepLeader(r *raft, m pb.Message) {
  346. switch m.Type {
  347. case pb.MsgBeat:
  348. r.bcastHeartbeat()
  349. case pb.MsgProp:
  350. if len(m.Entries) != 1 {
  351. panic("unexpected length(entries) of a MsgProp")
  352. }
  353. e := m.Entries[0]
  354. if e.Type == pb.EntryConfChange {
  355. if r.pendingConf {
  356. return
  357. }
  358. r.pendingConf = true
  359. }
  360. r.appendEntry(e)
  361. r.bcastAppend()
  362. case pb.MsgAppResp:
  363. if m.Index == 0 {
  364. return
  365. }
  366. if m.Reject {
  367. if r.prs[m.From].maybeDecrTo(m.Index) {
  368. r.sendAppend(m.From)
  369. }
  370. } else {
  371. r.prs[m.From].update(m.Index)
  372. if r.maybeCommit() {
  373. r.bcastAppend()
  374. }
  375. }
  376. case pb.MsgVote:
  377. r.send(pb.Message{To: m.From, Type: pb.MsgVoteResp, Reject: true})
  378. }
  379. }
  380. func stepCandidate(r *raft, m pb.Message) {
  381. switch m.Type {
  382. case pb.MsgProp:
  383. panic("no leader")
  384. case pb.MsgApp:
  385. r.becomeFollower(r.Term, m.From)
  386. r.handleAppendEntries(m)
  387. case pb.MsgSnap:
  388. r.becomeFollower(m.Term, m.From)
  389. r.handleSnapshot(m)
  390. case pb.MsgVote:
  391. r.send(pb.Message{To: m.From, Type: pb.MsgVoteResp, Reject: true})
  392. case pb.MsgVoteResp:
  393. gr := r.poll(m.From, !m.Reject)
  394. switch r.q() {
  395. case gr:
  396. r.becomeLeader()
  397. r.bcastAppend()
  398. case len(r.votes) - gr:
  399. r.becomeFollower(r.Term, None)
  400. }
  401. }
  402. }
  403. func stepFollower(r *raft, m pb.Message) {
  404. switch m.Type {
  405. case pb.MsgProp:
  406. if r.lead == None {
  407. panic("no leader")
  408. }
  409. m.To = r.lead
  410. r.send(m)
  411. case pb.MsgApp:
  412. r.elapsed = 0
  413. r.lead = m.From
  414. r.handleAppendEntries(m)
  415. case pb.MsgSnap:
  416. r.elapsed = 0
  417. r.handleSnapshot(m)
  418. case pb.MsgVote:
  419. if (r.Vote == None || r.Vote == m.From) && r.raftLog.isUpToDate(m.Index, m.LogTerm) {
  420. r.elapsed = 0
  421. r.Vote = m.From
  422. r.send(pb.Message{To: m.From, Type: pb.MsgVoteResp})
  423. } else {
  424. r.send(pb.Message{To: m.From, Type: pb.MsgVoteResp, Reject: true})
  425. }
  426. }
  427. }
  428. func (r *raft) compact(index uint64, nodes []uint64, d []byte) {
  429. if index > r.raftLog.applied {
  430. panic(fmt.Sprintf("raft: compact index (%d) exceeds applied index (%d)", index, r.raftLog.applied))
  431. }
  432. r.raftLog.snap(d, index, r.raftLog.term(index), nodes)
  433. r.raftLog.compact(index)
  434. }
  435. // restore recovers the statemachine from a snapshot. It restores the log and the
  436. // configuration of statemachine.
  437. func (r *raft) restore(s pb.Snapshot) bool {
  438. if s.Index <= r.raftLog.committed {
  439. return false
  440. }
  441. r.raftLog.restore(s)
  442. r.prs = make(map[uint64]*progress)
  443. for _, n := range s.Nodes {
  444. if n == r.id {
  445. r.setProgress(n, r.raftLog.lastIndex(), r.raftLog.lastIndex()+1)
  446. } else {
  447. r.setProgress(n, 0, r.raftLog.lastIndex()+1)
  448. }
  449. }
  450. return true
  451. }
  452. func (r *raft) needSnapshot(i uint64) bool {
  453. if i < r.raftLog.offset {
  454. if r.raftLog.snapshot.Term == 0 {
  455. panic("need non-empty snapshot")
  456. }
  457. return true
  458. }
  459. return false
  460. }
  461. func (r *raft) nodes() []uint64 {
  462. nodes := make([]uint64, 0, len(r.prs))
  463. for k := range r.prs {
  464. nodes = append(nodes, k)
  465. }
  466. sort.Sort(uint64Slice(nodes))
  467. return nodes
  468. }
  469. func (r *raft) setProgress(id, match, next uint64) {
  470. r.prs[id] = &progress{next: next, match: match}
  471. }
  472. func (r *raft) delProgress(id uint64) {
  473. delete(r.prs, id)
  474. }
  475. // promotable indicates whether state machine can be promoted to leader,
  476. // which is true when its own id is in progress list.
  477. func (r *raft) promotable() bool {
  478. _, ok := r.prs[r.id]
  479. return ok
  480. }
  481. func (r *raft) loadEnts(ents []pb.Entry) {
  482. r.raftLog.load(ents)
  483. }
  484. func (r *raft) loadState(state pb.HardState) {
  485. r.raftLog.committed = state.Commit
  486. r.Term = state.Term
  487. r.Vote = state.Vote
  488. r.Commit = state.Commit
  489. }
  490. // isElectionTimeout returns true if r.elapsed is greater than the
  491. // randomized election timeout in (electiontimeout, 2 * electiontimeout - 1).
  492. // Otherwise, it returns false.
  493. func (r *raft) isElectionTimeout() bool {
  494. d := r.elapsed - r.electionTimeout
  495. if d < 0 {
  496. return false
  497. }
  498. return d > r.rand.Int()%r.electionTimeout
  499. }