raft.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583
  1. /*
  2. Copyright 2014 CoreOS, Inc.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package raft
  14. import (
  15. "errors"
  16. "fmt"
  17. "math/rand"
  18. "sort"
  19. pb "github.com/coreos/etcd/raft/raftpb"
  20. )
  21. // None is a placeholder node ID used when there is no leader.
  22. const None uint64 = 0
  23. var errNoLeader = errors.New("no leader")
  24. // Possible values for StateType.
  25. const (
  26. StateFollower StateType = iota
  27. StateCandidate
  28. StateLeader
  29. )
  30. // StateType represents the role of a node in a cluster.
  31. type StateType uint64
  32. var stmap = [...]string{
  33. "StateFollower",
  34. "StateCandidate",
  35. "StateLeader",
  36. }
  37. func (st StateType) String() string {
  38. return stmap[uint64(st)]
  39. }
  40. func (st StateType) MarshalJSON() ([]byte, error) {
  41. return []byte(fmt.Sprintf("%q", st.String())), nil
  42. }
  43. type progress struct {
  44. match, next uint64
  45. }
  46. func (pr *progress) update(n uint64) {
  47. pr.match = n
  48. pr.next = n + 1
  49. }
  50. // maybeDecrTo returns false if the given to index comes from an out of order message.
  51. // Otherwise it decreases the progress next index and returns true.
  52. func (pr *progress) maybeDecrTo(to uint64) bool {
  53. // the rejection must be stale if the
  54. // progress has matched with follower
  55. // or "to" does not match next - 1
  56. if pr.match != 0 || pr.next-1 != to {
  57. return false
  58. }
  59. if pr.next--; pr.next < 1 {
  60. pr.next = 1
  61. }
  62. return true
  63. }
  64. func (pr *progress) String() string {
  65. return fmt.Sprintf("n=%d m=%d", pr.next, pr.match)
  66. }
  67. // uint64Slice implements sort interface
  68. type uint64Slice []uint64
  69. func (p uint64Slice) Len() int { return len(p) }
  70. func (p uint64Slice) Less(i, j int) bool { return p[i] < p[j] }
  71. func (p uint64Slice) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
  72. type raft struct {
  73. pb.HardState
  74. id uint64
  75. // the log
  76. raftLog *raftLog
  77. prs map[uint64]*progress
  78. state StateType
  79. votes map[uint64]bool
  80. msgs []pb.Message
  81. // the leader id
  82. lead uint64
  83. // New configuration is ignored if there exists unapplied configuration.
  84. pendingConf bool
  85. elapsed int // number of ticks since the last msg
  86. heartbeatTimeout int
  87. electionTimeout int
  88. tick func()
  89. step stepFunc
  90. }
  91. func newRaft(id uint64, peers []uint64, election, heartbeat int) *raft {
  92. if id == None {
  93. panic("cannot use none id")
  94. }
  95. rand.Seed(int64(id))
  96. r := &raft{
  97. id: id,
  98. lead: None,
  99. raftLog: newLog(),
  100. prs: make(map[uint64]*progress),
  101. electionTimeout: election,
  102. heartbeatTimeout: heartbeat,
  103. }
  104. for _, p := range peers {
  105. r.prs[p] = &progress{}
  106. }
  107. r.becomeFollower(0, None)
  108. return r
  109. }
  110. func (r *raft) hasLeader() bool { return r.lead != None }
  111. func (r *raft) softState() *SoftState {
  112. return &SoftState{Lead: r.lead, RaftState: r.state, Nodes: r.nodes()}
  113. }
  114. func (r *raft) String() string {
  115. s := fmt.Sprintf(`state=%v term=%d`, r.state, r.Term)
  116. switch r.state {
  117. case StateFollower:
  118. s += fmt.Sprintf(" vote=%v lead=%v", r.Vote, r.lead)
  119. case StateCandidate:
  120. s += fmt.Sprintf(` votes="%v"`, r.votes)
  121. case StateLeader:
  122. s += fmt.Sprintf(` prs="%v"`, r.prs)
  123. }
  124. return s
  125. }
  126. func (r *raft) poll(id uint64, v bool) (granted int) {
  127. if _, ok := r.votes[id]; !ok {
  128. r.votes[id] = v
  129. }
  130. for _, vv := range r.votes {
  131. if vv {
  132. granted++
  133. }
  134. }
  135. return granted
  136. }
  137. // send persists state to stable storage and then sends to its mailbox.
  138. func (r *raft) send(m pb.Message) {
  139. m.From = r.id
  140. // do not attach term to msgProp
  141. // proposals are a way to forward to the leader and
  142. // should be treated as local message.
  143. if m.Type != pb.MsgProp {
  144. m.Term = r.Term
  145. }
  146. r.msgs = append(r.msgs, m)
  147. }
  148. // sendAppend sends RRPC, with entries to the given peer.
  149. func (r *raft) sendAppend(to uint64) {
  150. pr := r.prs[to]
  151. m := pb.Message{}
  152. m.To = to
  153. m.Index = pr.next - 1
  154. if r.needSnapshot(m.Index) {
  155. m.Type = pb.MsgSnap
  156. m.Snapshot = r.raftLog.snapshot
  157. } else {
  158. m.Type = pb.MsgApp
  159. m.LogTerm = r.raftLog.term(pr.next - 1)
  160. m.Entries = r.raftLog.entries(pr.next)
  161. m.Commit = r.raftLog.committed
  162. }
  163. r.send(m)
  164. }
  165. // sendHeartbeat sends an empty msgApp
  166. func (r *raft) sendHeartbeat(to uint64) {
  167. m := pb.Message{
  168. To: to,
  169. Type: pb.MsgApp,
  170. }
  171. r.send(m)
  172. }
  173. // bcastAppend sends RRPC, with entries to all peers that are not up-to-date according to r.mis.
  174. func (r *raft) bcastAppend() {
  175. for i := range r.prs {
  176. if i == r.id {
  177. continue
  178. }
  179. r.sendAppend(i)
  180. }
  181. }
  182. // bcastHeartbeat sends RRPC, without entries to all the peers.
  183. func (r *raft) bcastHeartbeat() {
  184. for i := range r.prs {
  185. if i == r.id {
  186. continue
  187. }
  188. r.sendHeartbeat(i)
  189. }
  190. }
  191. func (r *raft) maybeCommit() bool {
  192. // TODO(bmizerany): optimize.. Currently naive
  193. mis := make(uint64Slice, 0, len(r.prs))
  194. for i := range r.prs {
  195. mis = append(mis, r.prs[i].match)
  196. }
  197. sort.Sort(sort.Reverse(mis))
  198. mci := mis[r.q()-1]
  199. return r.raftLog.maybeCommit(mci, r.Term)
  200. }
  201. func (r *raft) reset(term uint64) {
  202. r.Term = term
  203. r.lead = None
  204. r.Vote = None
  205. r.elapsed = 0
  206. r.votes = make(map[uint64]bool)
  207. for i := range r.prs {
  208. r.prs[i] = &progress{next: r.raftLog.lastIndex() + 1}
  209. if i == r.id {
  210. r.prs[i].match = r.raftLog.lastIndex()
  211. }
  212. }
  213. r.pendingConf = false
  214. }
  215. func (r *raft) q() int {
  216. return len(r.prs)/2 + 1
  217. }
  218. func (r *raft) appendEntry(e pb.Entry) {
  219. e.Term = r.Term
  220. e.Index = r.raftLog.lastIndex() + 1
  221. r.raftLog.append(r.raftLog.lastIndex(), e)
  222. r.prs[r.id].update(r.raftLog.lastIndex())
  223. r.maybeCommit()
  224. }
  225. // tickElection is ran by followers and candidates after r.electionTimeout.
  226. func (r *raft) tickElection() {
  227. if !r.promotable() {
  228. r.elapsed = 0
  229. return
  230. }
  231. r.elapsed++
  232. if r.isElectionTimeout() {
  233. r.elapsed = 0
  234. r.Step(pb.Message{From: r.id, Type: pb.MsgHup})
  235. }
  236. }
  237. // tickHeartbeat is ran by leaders to send a msgBeat after r.heartbeatTimeout.
  238. func (r *raft) tickHeartbeat() {
  239. r.elapsed++
  240. if r.elapsed > r.heartbeatTimeout {
  241. r.elapsed = 0
  242. r.Step(pb.Message{From: r.id, Type: pb.MsgBeat})
  243. }
  244. }
  245. func (r *raft) becomeFollower(term uint64, lead uint64) {
  246. r.step = stepFollower
  247. r.reset(term)
  248. r.tick = r.tickElection
  249. r.lead = lead
  250. r.state = StateFollower
  251. }
  252. func (r *raft) becomeCandidate() {
  253. // TODO(xiangli) remove the panic when the raft implementation is stable
  254. if r.state == StateLeader {
  255. panic("invalid transition [leader -> candidate]")
  256. }
  257. r.step = stepCandidate
  258. r.reset(r.Term + 1)
  259. r.tick = r.tickElection
  260. r.Vote = r.id
  261. r.state = StateCandidate
  262. }
  263. func (r *raft) becomeLeader() {
  264. // TODO(xiangli) remove the panic when the raft implementation is stable
  265. if r.state == StateFollower {
  266. panic("invalid transition [follower -> leader]")
  267. }
  268. r.step = stepLeader
  269. r.reset(r.Term)
  270. r.tick = r.tickHeartbeat
  271. r.lead = r.id
  272. r.state = StateLeader
  273. for _, e := range r.raftLog.entries(r.raftLog.committed + 1) {
  274. if e.Type != pb.EntryConfChange {
  275. continue
  276. }
  277. if r.pendingConf {
  278. panic("unexpected double uncommitted config entry")
  279. }
  280. r.pendingConf = true
  281. }
  282. r.appendEntry(pb.Entry{Data: nil})
  283. }
  284. func (r *raft) ReadMessages() []pb.Message {
  285. msgs := r.msgs
  286. r.msgs = make([]pb.Message, 0)
  287. return msgs
  288. }
  289. func (r *raft) campaign() {
  290. r.becomeCandidate()
  291. if r.q() == r.poll(r.id, true) {
  292. r.becomeLeader()
  293. }
  294. for i := range r.prs {
  295. if i == r.id {
  296. continue
  297. }
  298. lasti := r.raftLog.lastIndex()
  299. r.send(pb.Message{To: i, Type: pb.MsgVote, Index: lasti, LogTerm: r.raftLog.term(lasti)})
  300. }
  301. }
  302. func (r *raft) Step(m pb.Message) error {
  303. // TODO(bmizerany): this likely allocs - prevent that.
  304. defer func() { r.Commit = r.raftLog.committed }()
  305. if m.Type == pb.MsgHup {
  306. r.campaign()
  307. }
  308. switch {
  309. case m.Term == 0:
  310. // local message
  311. case m.Term > r.Term:
  312. lead := m.From
  313. if m.Type == pb.MsgVote {
  314. lead = None
  315. }
  316. r.becomeFollower(m.Term, lead)
  317. case m.Term < r.Term:
  318. // ignore
  319. return nil
  320. }
  321. r.step(r, m)
  322. return nil
  323. }
  324. func (r *raft) handleAppendEntries(m pb.Message) {
  325. if r.raftLog.maybeAppend(m.Index, m.LogTerm, m.Commit, m.Entries...) {
  326. mlastIndex := m.Index
  327. if len(m.Entries) != 0 {
  328. mlastIndex = m.Entries[len(m.Entries)-1].Index
  329. }
  330. r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: mlastIndex})
  331. } else {
  332. r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: m.Index, Reject: true})
  333. }
  334. }
  335. func (r *raft) handleSnapshot(m pb.Message) {
  336. if r.restore(m.Snapshot) {
  337. r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.lastIndex()})
  338. } else {
  339. r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.committed})
  340. }
  341. }
  342. func (r *raft) resetPendingConf() {
  343. r.pendingConf = false
  344. }
  345. func (r *raft) addNode(id uint64) {
  346. r.setProgress(id, 0, r.raftLog.lastIndex()+1)
  347. r.pendingConf = false
  348. }
  349. func (r *raft) removeNode(id uint64) {
  350. r.delProgress(id)
  351. r.pendingConf = false
  352. }
  353. type stepFunc func(r *raft, m pb.Message)
  354. func stepLeader(r *raft, m pb.Message) {
  355. switch m.Type {
  356. case pb.MsgBeat:
  357. r.bcastHeartbeat()
  358. case pb.MsgProp:
  359. if len(m.Entries) != 1 {
  360. panic("unexpected length(entries) of a msgProp")
  361. }
  362. e := m.Entries[0]
  363. if e.Type == pb.EntryConfChange {
  364. if r.pendingConf {
  365. return
  366. }
  367. r.pendingConf = true
  368. }
  369. r.appendEntry(e)
  370. r.bcastAppend()
  371. case pb.MsgAppResp:
  372. if m.Index == 0 {
  373. return
  374. }
  375. if m.Reject {
  376. if r.prs[m.From].maybeDecrTo(m.Index) {
  377. r.sendAppend(m.From)
  378. }
  379. } else {
  380. r.prs[m.From].update(m.Index)
  381. if r.maybeCommit() {
  382. r.bcastAppend()
  383. }
  384. }
  385. case pb.MsgVote:
  386. r.send(pb.Message{To: m.From, Type: pb.MsgVoteResp, Reject: true})
  387. }
  388. }
  389. func stepCandidate(r *raft, m pb.Message) {
  390. switch m.Type {
  391. case pb.MsgProp:
  392. panic("no leader")
  393. case pb.MsgApp:
  394. r.becomeFollower(r.Term, m.From)
  395. r.handleAppendEntries(m)
  396. case pb.MsgSnap:
  397. r.becomeFollower(m.Term, m.From)
  398. r.handleSnapshot(m)
  399. case pb.MsgVote:
  400. r.send(pb.Message{To: m.From, Type: pb.MsgVoteResp, Reject: true})
  401. case pb.MsgVoteResp:
  402. gr := r.poll(m.From, !m.Reject)
  403. switch r.q() {
  404. case gr:
  405. r.becomeLeader()
  406. r.bcastAppend()
  407. case len(r.votes) - gr:
  408. r.becomeFollower(r.Term, None)
  409. }
  410. }
  411. }
  412. func stepFollower(r *raft, m pb.Message) {
  413. switch m.Type {
  414. case pb.MsgProp:
  415. if r.lead == None {
  416. panic("no leader")
  417. }
  418. m.To = r.lead
  419. r.send(m)
  420. case pb.MsgApp:
  421. r.elapsed = 0
  422. r.lead = m.From
  423. r.handleAppendEntries(m)
  424. case pb.MsgSnap:
  425. r.elapsed = 0
  426. r.handleSnapshot(m)
  427. case pb.MsgVote:
  428. if (r.Vote == None || r.Vote == m.From) && r.raftLog.isUpToDate(m.Index, m.LogTerm) {
  429. r.elapsed = 0
  430. r.Vote = m.From
  431. r.send(pb.Message{To: m.From, Type: pb.MsgVoteResp})
  432. } else {
  433. r.send(pb.Message{To: m.From, Type: pb.MsgVoteResp, Reject: true})
  434. }
  435. }
  436. }
  437. func (r *raft) compact(index uint64, nodes []uint64, d []byte) {
  438. if index > r.raftLog.applied {
  439. panic(fmt.Sprintf("raft: compact index (%d) exceeds applied index (%d)", index, r.raftLog.applied))
  440. }
  441. r.raftLog.snap(d, index, r.raftLog.term(index), nodes)
  442. r.raftLog.compact(index)
  443. }
  444. // restore recovers the statemachine from a snapshot. It restores the log and the
  445. // configuration of statemachine.
  446. func (r *raft) restore(s pb.Snapshot) bool {
  447. if s.Index <= r.raftLog.committed {
  448. return false
  449. }
  450. r.raftLog.restore(s)
  451. r.prs = make(map[uint64]*progress)
  452. for _, n := range s.Nodes {
  453. if n == r.id {
  454. r.setProgress(n, r.raftLog.lastIndex(), r.raftLog.lastIndex()+1)
  455. } else {
  456. r.setProgress(n, 0, r.raftLog.lastIndex()+1)
  457. }
  458. }
  459. return true
  460. }
  461. func (r *raft) needSnapshot(i uint64) bool {
  462. if i < r.raftLog.offset {
  463. if r.raftLog.snapshot.Term == 0 {
  464. panic("need non-empty snapshot")
  465. }
  466. return true
  467. }
  468. return false
  469. }
  470. func (r *raft) nodes() []uint64 {
  471. nodes := make([]uint64, 0, len(r.prs))
  472. for k := range r.prs {
  473. nodes = append(nodes, k)
  474. }
  475. return nodes
  476. }
  477. func (r *raft) setProgress(id, match, next uint64) {
  478. r.prs[id] = &progress{next: next, match: match}
  479. }
  480. func (r *raft) delProgress(id uint64) {
  481. delete(r.prs, id)
  482. }
  483. // promotable indicates whether state machine can be promoted to leader,
  484. // which is true when its own id is in progress list.
  485. func (r *raft) promotable() bool {
  486. _, ok := r.prs[r.id]
  487. return ok
  488. }
  489. func (r *raft) loadEnts(ents []pb.Entry) {
  490. r.raftLog.load(ents)
  491. }
  492. func (r *raft) loadState(state pb.HardState) {
  493. r.raftLog.committed = state.Commit
  494. r.Term = state.Term
  495. r.Vote = state.Vote
  496. r.Commit = state.Commit
  497. }
  498. // isElectionTimeout returns true if r.elapsed is greater than the
  499. // randomized election timeout in (electiontimeout, 2 * electiontimeout - 1).
  500. // Otherwise, it returns false.
  501. func (r *raft) isElectionTimeout() bool {
  502. d := r.elapsed - r.electionTimeout
  503. if d < 0 {
  504. return false
  505. }
  506. return d > rand.Int()%r.electionTimeout
  507. }