raft.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572
  1. /*
  2. Copyright 2014 CoreOS, Inc.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package raft
  14. import (
  15. "errors"
  16. "fmt"
  17. "math/rand"
  18. "sort"
  19. pb "github.com/coreos/etcd/raft/raftpb"
  20. )
  21. // None is a placeholder node ID used when there is no leader.
  22. const None uint64 = 0
  23. var errNoLeader = errors.New("no leader")
  24. // Possible values for StateType.
  25. const (
  26. StateFollower StateType = iota
  27. StateCandidate
  28. StateLeader
  29. )
  30. // StateType represents the role of a node in a cluster.
  31. type StateType uint64
  32. var stmap = [...]string{
  33. "StateFollower",
  34. "StateCandidate",
  35. "StateLeader",
  36. }
  37. func (st StateType) String() string {
  38. return stmap[uint64(st)]
  39. }
  40. func (st StateType) MarshalJSON() ([]byte, error) {
  41. return []byte(fmt.Sprintf("%q", st.String())), nil
  42. }
  43. type progress struct {
  44. match, next uint64
  45. }
  46. func (pr *progress) update(n uint64) {
  47. pr.match = n
  48. pr.next = n + 1
  49. }
  50. // maybeDecrTo returns false if the given to index comes from an out of order message.
  51. // Otherwise it decreases the progress next index and returns true.
  52. func (pr *progress) maybeDecrTo(to uint64) bool {
  53. // the rejection must be stale if the
  54. // progress has matched with follower
  55. // or "to" does not match next - 1
  56. if pr.match != 0 || pr.next-1 != to {
  57. return false
  58. }
  59. if pr.next--; pr.next < 1 {
  60. pr.next = 1
  61. }
  62. return true
  63. }
  64. func (pr *progress) String() string {
  65. return fmt.Sprintf("n=%d m=%d", pr.next, pr.match)
  66. }
  67. // uint64Slice implements sort interface
  68. type uint64Slice []uint64
  69. func (p uint64Slice) Len() int { return len(p) }
  70. func (p uint64Slice) Less(i, j int) bool { return p[i] < p[j] }
  71. func (p uint64Slice) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
  72. type raft struct {
  73. pb.HardState
  74. id uint64
  75. // the log
  76. raftLog *raftLog
  77. prs map[uint64]*progress
  78. state StateType
  79. votes map[uint64]bool
  80. msgs []pb.Message
  81. // the leader id
  82. lead uint64
  83. // New configuration is ignored if there exists unapplied configuration.
  84. pendingConf bool
  85. elapsed int // number of ticks since the last msg
  86. heartbeatTimeout int
  87. electionTimeout int
  88. rand *rand.Rand
  89. tick func()
  90. step stepFunc
  91. }
  92. func newRaft(id uint64, peers []uint64, election, heartbeat int) *raft {
  93. if id == None {
  94. panic("cannot use none id")
  95. }
  96. r := &raft{
  97. id: id,
  98. lead: None,
  99. raftLog: newLog(),
  100. prs: make(map[uint64]*progress),
  101. electionTimeout: election,
  102. heartbeatTimeout: heartbeat,
  103. }
  104. r.rand = rand.New(rand.NewSource(int64(id)))
  105. for _, p := range peers {
  106. r.prs[p] = &progress{}
  107. }
  108. r.becomeFollower(0, None)
  109. return r
  110. }
  111. func (r *raft) hasLeader() bool { return r.lead != None }
  112. func (r *raft) softState() *SoftState {
  113. return &SoftState{Lead: r.lead, RaftState: r.state, Nodes: r.nodes()}
  114. }
  115. func (r *raft) String() string {
  116. s := fmt.Sprintf(`state=%v term=%d`, r.state, r.Term)
  117. switch r.state {
  118. case StateFollower:
  119. s += fmt.Sprintf(" vote=%v lead=%v", r.Vote, r.lead)
  120. case StateCandidate:
  121. s += fmt.Sprintf(` votes="%v"`, r.votes)
  122. case StateLeader:
  123. s += fmt.Sprintf(` prs="%v"`, r.prs)
  124. }
  125. return s
  126. }
  127. func (r *raft) poll(id uint64, v bool) (granted int) {
  128. if _, ok := r.votes[id]; !ok {
  129. r.votes[id] = v
  130. }
  131. for _, vv := range r.votes {
  132. if vv {
  133. granted++
  134. }
  135. }
  136. return granted
  137. }
  138. // send persists state to stable storage and then sends to its mailbox.
  139. func (r *raft) send(m pb.Message) {
  140. m.From = r.id
  141. // do not attach term to msgProp
  142. // proposals are a way to forward to the leader and
  143. // should be treated as local message.
  144. if m.Type != pb.MsgProp {
  145. m.Term = r.Term
  146. }
  147. r.msgs = append(r.msgs, m)
  148. }
  149. // sendAppend sends RRPC, with entries to the given peer.
  150. func (r *raft) sendAppend(to uint64) {
  151. pr := r.prs[to]
  152. m := pb.Message{}
  153. m.To = to
  154. m.Index = pr.next - 1
  155. if r.needSnapshot(m.Index) {
  156. m.Type = pb.MsgSnap
  157. m.Snapshot = r.raftLog.snapshot
  158. } else {
  159. m.Type = pb.MsgApp
  160. m.LogTerm = r.raftLog.term(pr.next - 1)
  161. m.Entries = r.raftLog.entries(pr.next)
  162. m.Commit = r.raftLog.committed
  163. }
  164. r.send(m)
  165. }
  166. // sendHeartbeat sends an empty msgApp
  167. func (r *raft) sendHeartbeat(to uint64) {
  168. m := pb.Message{
  169. To: to,
  170. Type: pb.MsgApp,
  171. }
  172. r.send(m)
  173. }
  174. // bcastAppend sends RRPC, with entries to all peers that are not up-to-date according to r.mis.
  175. func (r *raft) bcastAppend() {
  176. for i := range r.prs {
  177. if i == r.id {
  178. continue
  179. }
  180. r.sendAppend(i)
  181. }
  182. }
  183. // bcastHeartbeat sends RRPC, without entries to all the peers.
  184. func (r *raft) bcastHeartbeat() {
  185. for i := range r.prs {
  186. if i == r.id {
  187. continue
  188. }
  189. r.sendHeartbeat(i)
  190. }
  191. }
  192. func (r *raft) maybeCommit() bool {
  193. // TODO(bmizerany): optimize.. Currently naive
  194. mis := make(uint64Slice, 0, len(r.prs))
  195. for i := range r.prs {
  196. mis = append(mis, r.prs[i].match)
  197. }
  198. sort.Sort(sort.Reverse(mis))
  199. mci := mis[r.q()-1]
  200. return r.raftLog.maybeCommit(mci, r.Term)
  201. }
  202. func (r *raft) reset(term uint64) {
  203. r.Term = term
  204. r.lead = None
  205. r.Vote = None
  206. r.elapsed = 0
  207. r.votes = make(map[uint64]bool)
  208. for i := range r.prs {
  209. r.prs[i] = &progress{next: r.raftLog.lastIndex() + 1}
  210. if i == r.id {
  211. r.prs[i].match = r.raftLog.lastIndex()
  212. }
  213. }
  214. r.pendingConf = false
  215. }
  216. func (r *raft) q() int {
  217. return len(r.prs)/2 + 1
  218. }
  219. func (r *raft) appendEntry(e pb.Entry) {
  220. e.Term = r.Term
  221. e.Index = r.raftLog.lastIndex() + 1
  222. r.raftLog.append(r.raftLog.lastIndex(), e)
  223. r.prs[r.id].update(r.raftLog.lastIndex())
  224. r.maybeCommit()
  225. }
  226. // tickElection is ran by followers and candidates after r.electionTimeout.
  227. func (r *raft) tickElection() {
  228. if !r.promotable() {
  229. r.elapsed = 0
  230. return
  231. }
  232. r.elapsed++
  233. if r.isElectionTimeout() {
  234. r.elapsed = 0
  235. r.Step(pb.Message{From: r.id, Type: pb.MsgHup})
  236. }
  237. }
  238. // tickHeartbeat is ran by leaders to send a msgBeat after r.heartbeatTimeout.
  239. func (r *raft) tickHeartbeat() {
  240. r.elapsed++
  241. if r.elapsed > r.heartbeatTimeout {
  242. r.elapsed = 0
  243. r.Step(pb.Message{From: r.id, Type: pb.MsgBeat})
  244. }
  245. }
  246. func (r *raft) becomeFollower(term uint64, lead uint64) {
  247. r.step = stepFollower
  248. r.reset(term)
  249. r.tick = r.tickElection
  250. r.lead = lead
  251. r.state = StateFollower
  252. }
  253. func (r *raft) becomeCandidate() {
  254. // TODO(xiangli) remove the panic when the raft implementation is stable
  255. if r.state == StateLeader {
  256. panic("invalid transition [leader -> candidate]")
  257. }
  258. r.step = stepCandidate
  259. r.reset(r.Term + 1)
  260. r.tick = r.tickElection
  261. r.Vote = r.id
  262. r.state = StateCandidate
  263. }
  264. func (r *raft) becomeLeader() {
  265. // TODO(xiangli) remove the panic when the raft implementation is stable
  266. if r.state == StateFollower {
  267. panic("invalid transition [follower -> leader]")
  268. }
  269. r.step = stepLeader
  270. r.reset(r.Term)
  271. r.tick = r.tickHeartbeat
  272. r.lead = r.id
  273. r.state = StateLeader
  274. for _, e := range r.raftLog.entries(r.raftLog.committed + 1) {
  275. if e.Type != pb.EntryConfChange {
  276. continue
  277. }
  278. if r.pendingConf {
  279. panic("unexpected double uncommitted config entry")
  280. }
  281. r.pendingConf = true
  282. }
  283. r.appendEntry(pb.Entry{Data: nil})
  284. }
  285. func (r *raft) campaign() {
  286. r.becomeCandidate()
  287. if r.q() == r.poll(r.id, true) {
  288. r.becomeLeader()
  289. }
  290. for i := range r.prs {
  291. if i == r.id {
  292. continue
  293. }
  294. r.send(pb.Message{To: i, Type: pb.MsgVote, Index: r.raftLog.lastIndex(), LogTerm: r.raftLog.lastTerm()})
  295. }
  296. }
  297. func (r *raft) Step(m pb.Message) error {
  298. // TODO(bmizerany): this likely allocs - prevent that.
  299. defer func() { r.Commit = r.raftLog.committed }()
  300. if m.Type == pb.MsgHup {
  301. r.campaign()
  302. }
  303. switch {
  304. case m.Term == 0:
  305. // local message
  306. case m.Term > r.Term:
  307. lead := m.From
  308. if m.Type == pb.MsgVote {
  309. lead = None
  310. }
  311. r.becomeFollower(m.Term, lead)
  312. case m.Term < r.Term:
  313. // ignore
  314. return nil
  315. }
  316. r.step(r, m)
  317. return nil
  318. }
  319. func (r *raft) handleAppendEntries(m pb.Message) {
  320. if mlastIndex, ok := r.raftLog.maybeAppend(m.Index, m.LogTerm, m.Commit, m.Entries...); ok {
  321. r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: mlastIndex})
  322. } else {
  323. r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: m.Index, Reject: true})
  324. }
  325. }
  326. func (r *raft) handleSnapshot(m pb.Message) {
  327. if r.restore(m.Snapshot) {
  328. r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.lastIndex()})
  329. } else {
  330. r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.committed})
  331. }
  332. }
  333. func (r *raft) resetPendingConf() {
  334. r.pendingConf = false
  335. }
  336. func (r *raft) addNode(id uint64) {
  337. r.setProgress(id, 0, r.raftLog.lastIndex()+1)
  338. r.pendingConf = false
  339. }
  340. func (r *raft) removeNode(id uint64) {
  341. r.delProgress(id)
  342. r.pendingConf = false
  343. }
  344. type stepFunc func(r *raft, m pb.Message)
  345. func stepLeader(r *raft, m pb.Message) {
  346. switch m.Type {
  347. case pb.MsgBeat:
  348. r.bcastHeartbeat()
  349. case pb.MsgProp:
  350. if len(m.Entries) != 1 {
  351. panic("unexpected length(entries) of a msgProp")
  352. }
  353. e := m.Entries[0]
  354. if e.Type == pb.EntryConfChange {
  355. if r.pendingConf {
  356. return
  357. }
  358. r.pendingConf = true
  359. }
  360. r.appendEntry(e)
  361. r.bcastAppend()
  362. case pb.MsgAppResp:
  363. if m.Index == 0 {
  364. return
  365. }
  366. if m.Reject {
  367. if r.prs[m.From].maybeDecrTo(m.Index) {
  368. r.sendAppend(m.From)
  369. }
  370. } else {
  371. r.prs[m.From].update(m.Index)
  372. if r.maybeCommit() {
  373. r.bcastAppend()
  374. }
  375. }
  376. case pb.MsgVote:
  377. r.send(pb.Message{To: m.From, Type: pb.MsgVoteResp, Reject: true})
  378. }
  379. }
  380. func stepCandidate(r *raft, m pb.Message) {
  381. switch m.Type {
  382. case pb.MsgProp:
  383. panic("no leader")
  384. case pb.MsgApp:
  385. r.becomeFollower(r.Term, m.From)
  386. r.handleAppendEntries(m)
  387. case pb.MsgSnap:
  388. r.becomeFollower(m.Term, m.From)
  389. r.handleSnapshot(m)
  390. case pb.MsgVote:
  391. r.send(pb.Message{To: m.From, Type: pb.MsgVoteResp, Reject: true})
  392. case pb.MsgVoteResp:
  393. gr := r.poll(m.From, !m.Reject)
  394. switch r.q() {
  395. case gr:
  396. r.becomeLeader()
  397. r.bcastAppend()
  398. case len(r.votes) - gr:
  399. r.becomeFollower(r.Term, None)
  400. }
  401. }
  402. }
  403. func stepFollower(r *raft, m pb.Message) {
  404. switch m.Type {
  405. case pb.MsgProp:
  406. if r.lead == None {
  407. panic("no leader")
  408. }
  409. m.To = r.lead
  410. r.send(m)
  411. case pb.MsgApp:
  412. r.elapsed = 0
  413. r.lead = m.From
  414. r.handleAppendEntries(m)
  415. case pb.MsgSnap:
  416. r.elapsed = 0
  417. r.handleSnapshot(m)
  418. case pb.MsgVote:
  419. if (r.Vote == None || r.Vote == m.From) && r.raftLog.isUpToDate(m.Index, m.LogTerm) {
  420. r.elapsed = 0
  421. r.Vote = m.From
  422. r.send(pb.Message{To: m.From, Type: pb.MsgVoteResp})
  423. } else {
  424. r.send(pb.Message{To: m.From, Type: pb.MsgVoteResp, Reject: true})
  425. }
  426. }
  427. }
  428. func (r *raft) compact(index uint64, nodes []uint64, d []byte) {
  429. if index > r.raftLog.applied {
  430. panic(fmt.Sprintf("raft: compact index (%d) exceeds applied index (%d)", index, r.raftLog.applied))
  431. }
  432. r.raftLog.snap(d, index, r.raftLog.term(index), nodes)
  433. r.raftLog.compact(index)
  434. }
  435. // restore recovers the statemachine from a snapshot. It restores the log and the
  436. // configuration of statemachine.
  437. func (r *raft) restore(s pb.Snapshot) bool {
  438. if s.Index <= r.raftLog.committed {
  439. return false
  440. }
  441. r.raftLog.restore(s)
  442. r.prs = make(map[uint64]*progress)
  443. for _, n := range s.Nodes {
  444. if n == r.id {
  445. r.setProgress(n, r.raftLog.lastIndex(), r.raftLog.lastIndex()+1)
  446. } else {
  447. r.setProgress(n, 0, r.raftLog.lastIndex()+1)
  448. }
  449. }
  450. return true
  451. }
  452. func (r *raft) needSnapshot(i uint64) bool {
  453. if i < r.raftLog.offset {
  454. if r.raftLog.snapshot.Term == 0 {
  455. panic("need non-empty snapshot")
  456. }
  457. return true
  458. }
  459. return false
  460. }
  461. func (r *raft) nodes() []uint64 {
  462. nodes := make([]uint64, 0, len(r.prs))
  463. for k := range r.prs {
  464. nodes = append(nodes, k)
  465. }
  466. return nodes
  467. }
  468. func (r *raft) setProgress(id, match, next uint64) {
  469. r.prs[id] = &progress{next: next, match: match}
  470. }
  471. func (r *raft) delProgress(id uint64) {
  472. delete(r.prs, id)
  473. }
  474. // promotable indicates whether state machine can be promoted to leader,
  475. // which is true when its own id is in progress list.
  476. func (r *raft) promotable() bool {
  477. _, ok := r.prs[r.id]
  478. return ok
  479. }
  480. func (r *raft) loadEnts(ents []pb.Entry) {
  481. r.raftLog.load(ents)
  482. }
  483. func (r *raft) loadState(state pb.HardState) {
  484. r.raftLog.committed = state.Commit
  485. r.Term = state.Term
  486. r.Vote = state.Vote
  487. r.Commit = state.Commit
  488. }
  489. // isElectionTimeout returns true if r.elapsed is greater than the
  490. // randomized election timeout in (electiontimeout, 2 * electiontimeout - 1).
  491. // Otherwise, it returns false.
  492. func (r *raft) isElectionTimeout() bool {
  493. d := r.elapsed - r.electionTimeout
  494. if d < 0 {
  495. return false
  496. }
  497. return d > r.rand.Int()%r.electionTimeout
  498. }