raft.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571
  1. /*
  2. Copyright 2014 CoreOS, Inc.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package raft
  14. import (
  15. "errors"
  16. "fmt"
  17. "math/rand"
  18. "sort"
  19. pb "github.com/coreos/etcd/raft/raftpb"
  20. )
  21. // None is a placeholder node ID used when there is no leader.
  22. const None uint64 = 0
  23. var errNoLeader = errors.New("no leader")
  24. // Possible values for StateType.
  25. const (
  26. StateFollower StateType = iota
  27. StateCandidate
  28. StateLeader
  29. )
  30. // StateType represents the role of a node in a cluster.
  31. type StateType uint64
  32. var stmap = [...]string{
  33. "StateFollower",
  34. "StateCandidate",
  35. "StateLeader",
  36. }
  37. func (st StateType) String() string {
  38. return stmap[uint64(st)]
  39. }
  40. func (st StateType) MarshalJSON() ([]byte, error) {
  41. return []byte(fmt.Sprintf("%q", st.String())), nil
  42. }
  43. type progress struct {
  44. match, next uint64
  45. }
  46. func (pr *progress) update(n uint64) {
  47. pr.match = n
  48. pr.next = n + 1
  49. }
  50. // maybeDecrTo returns false if the given to index comes from an out of order message.
  51. // Otherwise it decreases the progress next index and returns true.
  52. func (pr *progress) maybeDecrTo(to uint64) bool {
  53. // the rejection must be stale if the progress has matched with
  54. // follower or "to" does not match next - 1
  55. if pr.match != 0 || pr.next-1 != to {
  56. return false
  57. }
  58. if pr.next--; pr.next < 1 {
  59. pr.next = 1
  60. }
  61. return true
  62. }
  63. func (pr *progress) String() string {
  64. return fmt.Sprintf("n=%d m=%d", pr.next, pr.match)
  65. }
  66. // uint64Slice implements sort interface
  67. type uint64Slice []uint64
  68. func (p uint64Slice) Len() int { return len(p) }
  69. func (p uint64Slice) Less(i, j int) bool { return p[i] < p[j] }
  70. func (p uint64Slice) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
  71. type raft struct {
  72. pb.HardState
  73. id uint64
  74. // the log
  75. raftLog *raftLog
  76. prs map[uint64]*progress
  77. state StateType
  78. votes map[uint64]bool
  79. msgs []pb.Message
  80. // the leader id
  81. lead uint64
  82. // New configuration is ignored if there exists unapplied configuration.
  83. pendingConf bool
  84. elapsed int // number of ticks since the last msg
  85. heartbeatTimeout int
  86. electionTimeout int
  87. rand *rand.Rand
  88. tick func()
  89. step stepFunc
  90. }
  91. func newRaft(id uint64, peers []uint64, election, heartbeat int) *raft {
  92. if id == None {
  93. panic("cannot use none id")
  94. }
  95. r := &raft{
  96. id: id,
  97. lead: None,
  98. raftLog: newLog(),
  99. prs: make(map[uint64]*progress),
  100. electionTimeout: election,
  101. heartbeatTimeout: heartbeat,
  102. }
  103. r.rand = rand.New(rand.NewSource(int64(id)))
  104. for _, p := range peers {
  105. r.prs[p] = &progress{}
  106. }
  107. r.becomeFollower(0, None)
  108. return r
  109. }
  110. func (r *raft) hasLeader() bool { return r.lead != None }
  111. func (r *raft) softState() *SoftState {
  112. return &SoftState{Lead: r.lead, RaftState: r.state, Nodes: r.nodes()}
  113. }
  114. func (r *raft) String() string {
  115. s := fmt.Sprintf(`state=%v term=%d`, r.state, r.Term)
  116. switch r.state {
  117. case StateFollower:
  118. s += fmt.Sprintf(" vote=%v lead=%v", r.Vote, r.lead)
  119. case StateCandidate:
  120. s += fmt.Sprintf(` votes="%v"`, r.votes)
  121. case StateLeader:
  122. s += fmt.Sprintf(` prs="%v"`, r.prs)
  123. }
  124. return s
  125. }
  126. func (r *raft) poll(id uint64, v bool) (granted int) {
  127. if _, ok := r.votes[id]; !ok {
  128. r.votes[id] = v
  129. }
  130. for _, vv := range r.votes {
  131. if vv {
  132. granted++
  133. }
  134. }
  135. return granted
  136. }
  137. // send persists state to stable storage and then sends to its mailbox.
  138. func (r *raft) send(m pb.Message) {
  139. m.From = r.id
  140. // do not attach term to msgProp
  141. // proposals are a way to forward to the leader and
  142. // should be treated as local message.
  143. if m.Type != pb.MsgProp {
  144. m.Term = r.Term
  145. }
  146. r.msgs = append(r.msgs, m)
  147. }
  148. // sendAppend sends RRPC, with entries to the given peer.
  149. func (r *raft) sendAppend(to uint64) {
  150. pr := r.prs[to]
  151. m := pb.Message{}
  152. m.To = to
  153. m.Index = pr.next - 1
  154. if r.needSnapshot(m.Index) {
  155. m.Type = pb.MsgSnap
  156. m.Snapshot = r.raftLog.snapshot
  157. } else {
  158. m.Type = pb.MsgApp
  159. m.LogTerm = r.raftLog.term(pr.next - 1)
  160. m.Entries = r.raftLog.entries(pr.next)
  161. m.Commit = r.raftLog.committed
  162. }
  163. r.send(m)
  164. }
  165. // sendHeartbeat sends an empty msgApp
  166. func (r *raft) sendHeartbeat(to uint64) {
  167. m := pb.Message{
  168. To: to,
  169. Type: pb.MsgApp,
  170. }
  171. r.send(m)
  172. }
  173. // bcastAppend sends RRPC, with entries to all peers that are not up-to-date according to r.mis.
  174. func (r *raft) bcastAppend() {
  175. for i := range r.prs {
  176. if i == r.id {
  177. continue
  178. }
  179. r.sendAppend(i)
  180. }
  181. }
  182. // bcastHeartbeat sends RRPC, without entries to all the peers.
  183. func (r *raft) bcastHeartbeat() {
  184. for i := range r.prs {
  185. if i == r.id {
  186. continue
  187. }
  188. r.sendHeartbeat(i)
  189. }
  190. }
  191. func (r *raft) maybeCommit() bool {
  192. // TODO(bmizerany): optimize.. Currently naive
  193. mis := make(uint64Slice, 0, len(r.prs))
  194. for i := range r.prs {
  195. mis = append(mis, r.prs[i].match)
  196. }
  197. sort.Sort(sort.Reverse(mis))
  198. mci := mis[r.q()-1]
  199. return r.raftLog.maybeCommit(mci, r.Term)
  200. }
  201. func (r *raft) reset(term uint64) {
  202. r.Term = term
  203. r.lead = None
  204. r.Vote = None
  205. r.elapsed = 0
  206. r.votes = make(map[uint64]bool)
  207. for i := range r.prs {
  208. r.prs[i] = &progress{next: r.raftLog.lastIndex() + 1}
  209. if i == r.id {
  210. r.prs[i].match = r.raftLog.lastIndex()
  211. }
  212. }
  213. r.pendingConf = false
  214. }
  215. func (r *raft) q() int {
  216. return len(r.prs)/2 + 1
  217. }
  218. func (r *raft) appendEntry(e pb.Entry) {
  219. e.Term = r.Term
  220. e.Index = r.raftLog.lastIndex() + 1
  221. r.raftLog.append(r.raftLog.lastIndex(), e)
  222. r.prs[r.id].update(r.raftLog.lastIndex())
  223. r.maybeCommit()
  224. }
  225. // tickElection is ran by followers and candidates after r.electionTimeout.
  226. func (r *raft) tickElection() {
  227. if !r.promotable() {
  228. r.elapsed = 0
  229. return
  230. }
  231. r.elapsed++
  232. if r.isElectionTimeout() {
  233. r.elapsed = 0
  234. r.Step(pb.Message{From: r.id, Type: pb.MsgHup})
  235. }
  236. }
  237. // tickHeartbeat is ran by leaders to send a msgBeat after r.heartbeatTimeout.
  238. func (r *raft) tickHeartbeat() {
  239. r.elapsed++
  240. if r.elapsed > r.heartbeatTimeout {
  241. r.elapsed = 0
  242. r.Step(pb.Message{From: r.id, Type: pb.MsgBeat})
  243. }
  244. }
  245. func (r *raft) becomeFollower(term uint64, lead uint64) {
  246. r.step = stepFollower
  247. r.reset(term)
  248. r.tick = r.tickElection
  249. r.lead = lead
  250. r.state = StateFollower
  251. }
  252. func (r *raft) becomeCandidate() {
  253. // TODO(xiangli) remove the panic when the raft implementation is stable
  254. if r.state == StateLeader {
  255. panic("invalid transition [leader -> candidate]")
  256. }
  257. r.step = stepCandidate
  258. r.reset(r.Term + 1)
  259. r.tick = r.tickElection
  260. r.Vote = r.id
  261. r.state = StateCandidate
  262. }
  263. func (r *raft) becomeLeader() {
  264. // TODO(xiangli) remove the panic when the raft implementation is stable
  265. if r.state == StateFollower {
  266. panic("invalid transition [follower -> leader]")
  267. }
  268. r.step = stepLeader
  269. r.reset(r.Term)
  270. r.tick = r.tickHeartbeat
  271. r.lead = r.id
  272. r.state = StateLeader
  273. for _, e := range r.raftLog.entries(r.raftLog.committed + 1) {
  274. if e.Type != pb.EntryConfChange {
  275. continue
  276. }
  277. if r.pendingConf {
  278. panic("unexpected double uncommitted config entry")
  279. }
  280. r.pendingConf = true
  281. }
  282. r.appendEntry(pb.Entry{Data: nil})
  283. }
  284. func (r *raft) campaign() {
  285. r.becomeCandidate()
  286. if r.q() == r.poll(r.id, true) {
  287. r.becomeLeader()
  288. }
  289. for i := range r.prs {
  290. if i == r.id {
  291. continue
  292. }
  293. r.send(pb.Message{To: i, Type: pb.MsgVote, Index: r.raftLog.lastIndex(), LogTerm: r.raftLog.lastTerm()})
  294. }
  295. }
  296. func (r *raft) Step(m pb.Message) error {
  297. // TODO(bmizerany): this likely allocs - prevent that.
  298. defer func() { r.Commit = r.raftLog.committed }()
  299. if m.Type == pb.MsgHup {
  300. r.campaign()
  301. }
  302. switch {
  303. case m.Term == 0:
  304. // local message
  305. case m.Term > r.Term:
  306. lead := m.From
  307. if m.Type == pb.MsgVote {
  308. lead = None
  309. }
  310. r.becomeFollower(m.Term, lead)
  311. case m.Term < r.Term:
  312. // ignore
  313. return nil
  314. }
  315. r.step(r, m)
  316. return nil
  317. }
  318. func (r *raft) handleAppendEntries(m pb.Message) {
  319. if mlastIndex, ok := r.raftLog.maybeAppend(m.Index, m.LogTerm, m.Commit, m.Entries...); ok {
  320. r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: mlastIndex})
  321. } else {
  322. r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: m.Index, Reject: true})
  323. }
  324. }
  325. func (r *raft) handleSnapshot(m pb.Message) {
  326. if r.restore(m.Snapshot) {
  327. r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.lastIndex()})
  328. } else {
  329. r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.committed})
  330. }
  331. }
  332. func (r *raft) resetPendingConf() {
  333. r.pendingConf = false
  334. }
  335. func (r *raft) addNode(id uint64) {
  336. r.setProgress(id, 0, r.raftLog.lastIndex()+1)
  337. r.pendingConf = false
  338. }
  339. func (r *raft) removeNode(id uint64) {
  340. r.delProgress(id)
  341. r.pendingConf = false
  342. }
  343. type stepFunc func(r *raft, m pb.Message)
  344. func stepLeader(r *raft, m pb.Message) {
  345. switch m.Type {
  346. case pb.MsgBeat:
  347. r.bcastHeartbeat()
  348. case pb.MsgProp:
  349. if len(m.Entries) != 1 {
  350. panic("unexpected length(entries) of a msgProp")
  351. }
  352. e := m.Entries[0]
  353. if e.Type == pb.EntryConfChange {
  354. if r.pendingConf {
  355. return
  356. }
  357. r.pendingConf = true
  358. }
  359. r.appendEntry(e)
  360. r.bcastAppend()
  361. case pb.MsgAppResp:
  362. if m.Index == 0 {
  363. return
  364. }
  365. if m.Reject {
  366. if r.prs[m.From].maybeDecrTo(m.Index) {
  367. r.sendAppend(m.From)
  368. }
  369. } else {
  370. r.prs[m.From].update(m.Index)
  371. if r.maybeCommit() {
  372. r.bcastAppend()
  373. }
  374. }
  375. case pb.MsgVote:
  376. r.send(pb.Message{To: m.From, Type: pb.MsgVoteResp, Reject: true})
  377. }
  378. }
  379. func stepCandidate(r *raft, m pb.Message) {
  380. switch m.Type {
  381. case pb.MsgProp:
  382. panic("no leader")
  383. case pb.MsgApp:
  384. r.becomeFollower(r.Term, m.From)
  385. r.handleAppendEntries(m)
  386. case pb.MsgSnap:
  387. r.becomeFollower(m.Term, m.From)
  388. r.handleSnapshot(m)
  389. case pb.MsgVote:
  390. r.send(pb.Message{To: m.From, Type: pb.MsgVoteResp, Reject: true})
  391. case pb.MsgVoteResp:
  392. gr := r.poll(m.From, !m.Reject)
  393. switch r.q() {
  394. case gr:
  395. r.becomeLeader()
  396. r.bcastAppend()
  397. case len(r.votes) - gr:
  398. r.becomeFollower(r.Term, None)
  399. }
  400. }
  401. }
  402. func stepFollower(r *raft, m pb.Message) {
  403. switch m.Type {
  404. case pb.MsgProp:
  405. if r.lead == None {
  406. panic("no leader")
  407. }
  408. m.To = r.lead
  409. r.send(m)
  410. case pb.MsgApp:
  411. r.elapsed = 0
  412. r.lead = m.From
  413. r.handleAppendEntries(m)
  414. case pb.MsgSnap:
  415. r.elapsed = 0
  416. r.handleSnapshot(m)
  417. case pb.MsgVote:
  418. if (r.Vote == None || r.Vote == m.From) && r.raftLog.isUpToDate(m.Index, m.LogTerm) {
  419. r.elapsed = 0
  420. r.Vote = m.From
  421. r.send(pb.Message{To: m.From, Type: pb.MsgVoteResp})
  422. } else {
  423. r.send(pb.Message{To: m.From, Type: pb.MsgVoteResp, Reject: true})
  424. }
  425. }
  426. }
  427. func (r *raft) compact(index uint64, nodes []uint64, d []byte) {
  428. if index > r.raftLog.applied {
  429. panic(fmt.Sprintf("raft: compact index (%d) exceeds applied index (%d)", index, r.raftLog.applied))
  430. }
  431. r.raftLog.snap(d, index, r.raftLog.term(index), nodes)
  432. r.raftLog.compact(index)
  433. }
  434. // restore recovers the statemachine from a snapshot. It restores the log and the
  435. // configuration of statemachine.
  436. func (r *raft) restore(s pb.Snapshot) bool {
  437. if s.Index <= r.raftLog.committed {
  438. return false
  439. }
  440. r.raftLog.restore(s)
  441. r.prs = make(map[uint64]*progress)
  442. for _, n := range s.Nodes {
  443. if n == r.id {
  444. r.setProgress(n, r.raftLog.lastIndex(), r.raftLog.lastIndex()+1)
  445. } else {
  446. r.setProgress(n, 0, r.raftLog.lastIndex()+1)
  447. }
  448. }
  449. return true
  450. }
  451. func (r *raft) needSnapshot(i uint64) bool {
  452. if i < r.raftLog.offset {
  453. if r.raftLog.snapshot.Term == 0 {
  454. panic("need non-empty snapshot")
  455. }
  456. return true
  457. }
  458. return false
  459. }
  460. func (r *raft) nodes() []uint64 {
  461. nodes := make([]uint64, 0, len(r.prs))
  462. for k := range r.prs {
  463. nodes = append(nodes, k)
  464. }
  465. return nodes
  466. }
  467. func (r *raft) setProgress(id, match, next uint64) {
  468. r.prs[id] = &progress{next: next, match: match}
  469. }
  470. func (r *raft) delProgress(id uint64) {
  471. delete(r.prs, id)
  472. }
  473. // promotable indicates whether state machine can be promoted to leader,
  474. // which is true when its own id is in progress list.
  475. func (r *raft) promotable() bool {
  476. _, ok := r.prs[r.id]
  477. return ok
  478. }
  479. func (r *raft) loadEnts(ents []pb.Entry) {
  480. r.raftLog.load(ents)
  481. }
  482. func (r *raft) loadState(state pb.HardState) {
  483. r.raftLog.committed = state.Commit
  484. r.Term = state.Term
  485. r.Vote = state.Vote
  486. r.Commit = state.Commit
  487. }
  488. // isElectionTimeout returns true if r.elapsed is greater than the
  489. // randomized election timeout in (electiontimeout, 2 * electiontimeout - 1).
  490. // Otherwise, it returns false.
  491. func (r *raft) isElectionTimeout() bool {
  492. d := r.elapsed - r.electionTimeout
  493. if d < 0 {
  494. return false
  495. }
  496. return d > r.rand.Int()%r.electionTimeout
  497. }