raft.go 41 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296
  1. // Copyright 2015 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package raft
  15. import (
  16. "bytes"
  17. "errors"
  18. "fmt"
  19. "math"
  20. "math/rand"
  21. "sort"
  22. "strings"
  23. "sync"
  24. "time"
  25. pb "github.com/coreos/etcd/raft/raftpb"
  26. )
  27. // None is a placeholder node ID used when there is no leader.
  28. const None uint64 = 0
  29. const noLimit = math.MaxUint64
  30. // Possible values for StateType.
  31. const (
  32. StateFollower StateType = iota
  33. StateCandidate
  34. StateLeader
  35. StatePreCandidate
  36. numStates
  37. )
  38. type ReadOnlyOption int
  39. const (
  40. // ReadOnlySafe guarantees the linearizability of the read only request by
  41. // communicating with the quorum. It is the default and suggested option.
  42. ReadOnlySafe ReadOnlyOption = iota
  43. // ReadOnlyLeaseBased ensures linearizability of the read only request by
  44. // relying on the leader lease. It can be affected by clock drift.
  45. // If the clock drift is unbounded, leader might keep the lease longer than it
  46. // should (clock can move backward/pause without any bound). ReadIndex is not safe
  47. // in that case.
  48. ReadOnlyLeaseBased
  49. )
  50. // Possible values for CampaignType
  51. const (
  52. // campaignPreElection represents the first phase of a normal election when
  53. // Config.PreVote is true.
  54. campaignPreElection CampaignType = "CampaignPreElection"
  55. // campaignElection represents a normal (time-based) election (the second phase
  56. // of the election when Config.PreVote is true).
  57. campaignElection CampaignType = "CampaignElection"
  58. // campaignTransfer represents the type of leader transfer
  59. campaignTransfer CampaignType = "CampaignTransfer"
  60. )
  61. // lockedRand is a small wrapper around rand.Rand to provide
  62. // synchronization. Only the methods needed by the code are exposed
  63. // (e.g. Intn).
  64. type lockedRand struct {
  65. mu sync.Mutex
  66. rand *rand.Rand
  67. }
  68. func (r *lockedRand) Intn(n int) int {
  69. r.mu.Lock()
  70. v := r.rand.Intn(n)
  71. r.mu.Unlock()
  72. return v
  73. }
  74. var globalRand = &lockedRand{
  75. rand: rand.New(rand.NewSource(time.Now().UnixNano())),
  76. }
  77. // CampaignType represents the type of campaigning
  78. // the reason we use the type of string instead of uint64
  79. // is because it's simpler to compare and fill in raft entries
  80. type CampaignType string
  81. // StateType represents the role of a node in a cluster.
  82. type StateType uint64
  83. var stmap = [...]string{
  84. "StateFollower",
  85. "StateCandidate",
  86. "StateLeader",
  87. "StatePreCandidate",
  88. }
  89. func (st StateType) String() string {
  90. return stmap[uint64(st)]
  91. }
  92. // Config contains the parameters to start a raft.
  93. type Config struct {
  94. // ID is the identity of the local raft. ID cannot be 0.
  95. ID uint64
  96. // peers contains the IDs of all nodes (including self) in the raft cluster. It
  97. // should only be set when starting a new raft cluster. Restarting raft from
  98. // previous configuration will panic if peers is set. peer is private and only
  99. // used for testing right now.
  100. peers []uint64
  101. // ElectionTick is the number of Node.Tick invocations that must pass between
  102. // elections. That is, if a follower does not receive any message from the
  103. // leader of current term before ElectionTick has elapsed, it will become
  104. // candidate and start an election. ElectionTick must be greater than
  105. // HeartbeatTick. We suggest ElectionTick = 10 * HeartbeatTick to avoid
  106. // unnecessary leader switching.
  107. ElectionTick int
  108. // HeartbeatTick is the number of Node.Tick invocations that must pass between
  109. // heartbeats. That is, a leader sends heartbeat messages to maintain its
  110. // leadership every HeartbeatTick ticks.
  111. HeartbeatTick int
  112. // Storage is the storage for raft. raft generates entries and states to be
  113. // stored in storage. raft reads the persisted entries and states out of
  114. // Storage when it needs. raft reads out the previous state and configuration
  115. // out of storage when restarting.
  116. Storage Storage
  117. // Applied is the last applied index. It should only be set when restarting
  118. // raft. raft will not return entries to the application smaller or equal to
  119. // Applied. If Applied is unset when restarting, raft might return previous
  120. // applied entries. This is a very application dependent configuration.
  121. Applied uint64
  122. // MaxSizePerMsg limits the max size of each append message. Smaller value
  123. // lowers the raft recovery cost(initial probing and message lost during normal
  124. // operation). On the other side, it might affect the throughput during normal
  125. // replication. Note: math.MaxUint64 for unlimited, 0 for at most one entry per
  126. // message.
  127. MaxSizePerMsg uint64
  128. // MaxInflightMsgs limits the max number of in-flight append messages during
  129. // optimistic replication phase. The application transportation layer usually
  130. // has its own sending buffer over TCP/UDP. Setting MaxInflightMsgs to avoid
  131. // overflowing that sending buffer. TODO (xiangli): feedback to application to
  132. // limit the proposal rate?
  133. MaxInflightMsgs int
  134. // CheckQuorum specifies if the leader should check quorum activity. Leader
  135. // steps down when quorum is not active for an electionTimeout.
  136. CheckQuorum bool
  137. // PreVote enables the Pre-Vote algorithm described in raft thesis section
  138. // 9.6. This prevents disruption when a node that has been partitioned away
  139. // rejoins the cluster.
  140. PreVote bool
  141. // ReadOnlyOption specifies how the read only request is processed.
  142. //
  143. // ReadOnlySafe guarantees the linearizability of the read only request by
  144. // communicating with the quorum. It is the default and suggested option.
  145. //
  146. // ReadOnlyLeaseBased ensures linearizability of the read only request by
  147. // relying on the leader lease. It can be affected by clock drift.
  148. // If the clock drift is unbounded, leader might keep the lease longer than it
  149. // should (clock can move backward/pause without any bound). ReadIndex is not safe
  150. // in that case.
  151. // CheckQuorum MUST be enabled if ReadOnlyOption is ReadOnlyLeaseBased.
  152. ReadOnlyOption ReadOnlyOption
  153. // Logger is the logger used for raft log. For multinode which can host
  154. // multiple raft group, each raft group can have its own logger
  155. Logger Logger
  156. // DisableProposalForwarding set to true means that followers will drop
  157. // proposals, rather than forwarding them to the leader. One use case for
  158. // this feature would be in a situation where the Raft leader is used to
  159. // compute the data of a proposal, for example, adding a timestamp from a
  160. // hybrid logical clock to data in a monotonically increasing way. Forwarding
  161. // should be disabled to prevent a follower with an innaccurate hybrid
  162. // logical clock from assigning the timestamp and then forwarding the data
  163. // to the leader.
  164. DisableProposalForwarding bool
  165. }
  166. func (c *Config) validate() error {
  167. if c.ID == None {
  168. return errors.New("cannot use none as id")
  169. }
  170. if c.HeartbeatTick <= 0 {
  171. return errors.New("heartbeat tick must be greater than 0")
  172. }
  173. if c.ElectionTick <= c.HeartbeatTick {
  174. return errors.New("election tick must be greater than heartbeat tick")
  175. }
  176. if c.Storage == nil {
  177. return errors.New("storage cannot be nil")
  178. }
  179. if c.MaxInflightMsgs <= 0 {
  180. return errors.New("max inflight messages must be greater than 0")
  181. }
  182. if c.Logger == nil {
  183. c.Logger = raftLogger
  184. }
  185. if c.ReadOnlyOption == ReadOnlyLeaseBased && !c.CheckQuorum {
  186. return errors.New("CheckQuorum must be enabled when ReadOnlyOption is ReadOnlyLeaseBased")
  187. }
  188. return nil
  189. }
  190. type raft struct {
  191. id uint64
  192. Term uint64
  193. Vote uint64
  194. readStates []ReadState
  195. // the log
  196. raftLog *raftLog
  197. maxInflight int
  198. maxMsgSize uint64
  199. prs map[uint64]*Progress
  200. state StateType
  201. votes map[uint64]bool
  202. msgs []pb.Message
  203. // the leader id
  204. lead uint64
  205. // leadTransferee is id of the leader transfer target when its value is not zero.
  206. // Follow the procedure defined in raft thesis 3.10.
  207. leadTransferee uint64
  208. // New configuration is ignored if there exists unapplied configuration.
  209. pendingConf bool
  210. readOnly *readOnly
  211. // number of ticks since it reached last electionTimeout when it is leader
  212. // or candidate.
  213. // number of ticks since it reached last electionTimeout or received a
  214. // valid message from current leader when it is a follower.
  215. electionElapsed int
  216. // number of ticks since it reached last heartbeatTimeout.
  217. // only leader keeps heartbeatElapsed.
  218. heartbeatElapsed int
  219. checkQuorum bool
  220. preVote bool
  221. heartbeatTimeout int
  222. electionTimeout int
  223. // randomizedElectionTimeout is a random number between
  224. // [electiontimeout, 2 * electiontimeout - 1]. It gets reset
  225. // when raft changes its state to follower or candidate.
  226. randomizedElectionTimeout int
  227. disableProposalForwarding bool
  228. tick func()
  229. step stepFunc
  230. logger Logger
  231. }
  232. func newRaft(c *Config) *raft {
  233. if err := c.validate(); err != nil {
  234. panic(err.Error())
  235. }
  236. raftlog := newLog(c.Storage, c.Logger)
  237. hs, cs, err := c.Storage.InitialState()
  238. if err != nil {
  239. panic(err) // TODO(bdarnell)
  240. }
  241. peers := c.peers
  242. if len(cs.Nodes) > 0 {
  243. if len(peers) > 0 {
  244. // TODO(bdarnell): the peers argument is always nil except in
  245. // tests; the argument should be removed and these tests should be
  246. // updated to specify their nodes through a snapshot.
  247. panic("cannot specify both newRaft(peers) and ConfState.Nodes)")
  248. }
  249. peers = cs.Nodes
  250. }
  251. r := &raft{
  252. id: c.ID,
  253. lead: None,
  254. raftLog: raftlog,
  255. maxMsgSize: c.MaxSizePerMsg,
  256. maxInflight: c.MaxInflightMsgs,
  257. prs: make(map[uint64]*Progress),
  258. electionTimeout: c.ElectionTick,
  259. heartbeatTimeout: c.HeartbeatTick,
  260. logger: c.Logger,
  261. checkQuorum: c.CheckQuorum,
  262. preVote: c.PreVote,
  263. readOnly: newReadOnly(c.ReadOnlyOption),
  264. disableProposalForwarding: c.DisableProposalForwarding,
  265. }
  266. for _, p := range peers {
  267. r.prs[p] = &Progress{Next: 1, ins: newInflights(r.maxInflight)}
  268. }
  269. if !isHardStateEqual(hs, emptyState) {
  270. r.loadState(hs)
  271. }
  272. if c.Applied > 0 {
  273. raftlog.appliedTo(c.Applied)
  274. }
  275. r.becomeFollower(r.Term, None)
  276. var nodesStrs []string
  277. for _, n := range r.nodes() {
  278. nodesStrs = append(nodesStrs, fmt.Sprintf("%x", n))
  279. }
  280. r.logger.Infof("newRaft %x [peers: [%s], term: %d, commit: %d, applied: %d, lastindex: %d, lastterm: %d]",
  281. r.id, strings.Join(nodesStrs, ","), r.Term, r.raftLog.committed, r.raftLog.applied, r.raftLog.lastIndex(), r.raftLog.lastTerm())
  282. return r
  283. }
  284. func (r *raft) hasLeader() bool { return r.lead != None }
  285. func (r *raft) softState() *SoftState { return &SoftState{Lead: r.lead, RaftState: r.state} }
  286. func (r *raft) hardState() pb.HardState {
  287. return pb.HardState{
  288. Term: r.Term,
  289. Vote: r.Vote,
  290. Commit: r.raftLog.committed,
  291. }
  292. }
  293. func (r *raft) quorum() int { return len(r.prs)/2 + 1 }
  294. func (r *raft) nodes() []uint64 {
  295. nodes := make([]uint64, 0, len(r.prs))
  296. for id := range r.prs {
  297. nodes = append(nodes, id)
  298. }
  299. sort.Sort(uint64Slice(nodes))
  300. return nodes
  301. }
  302. // send persists state to stable storage and then sends to its mailbox.
  303. func (r *raft) send(m pb.Message) {
  304. m.From = r.id
  305. if m.Type == pb.MsgVote || m.Type == pb.MsgVoteResp || m.Type == pb.MsgPreVote || m.Type == pb.MsgPreVoteResp {
  306. if m.Term == 0 {
  307. // All {pre-,}campaign messages need to have the term set when
  308. // sending.
  309. // - MsgVote: m.Term is the term the node is campaigning for,
  310. // non-zero as we increment the term when campaigning.
  311. // - MsgVoteResp: m.Term is the new r.Term if the MsgVote was
  312. // granted, non-zero for the same reason MsgVote is
  313. // - MsgPreVote: m.Term is the term the node will campaign,
  314. // non-zero as we use m.Term to indicate the next term we'll be
  315. // campaigning for
  316. // - MsgPreVoteResp: m.Term is the term received in the original
  317. // MsgPreVote if the pre-vote was granted, non-zero for the
  318. // same reasons MsgPreVote is
  319. panic(fmt.Sprintf("term should be set when sending %s", m.Type))
  320. }
  321. } else {
  322. if m.Term != 0 {
  323. panic(fmt.Sprintf("term should not be set when sending %s (was %d)", m.Type, m.Term))
  324. }
  325. // do not attach term to MsgProp, MsgReadIndex
  326. // proposals are a way to forward to the leader and
  327. // should be treated as local message.
  328. // MsgReadIndex is also forwarded to leader.
  329. if m.Type != pb.MsgProp && m.Type != pb.MsgReadIndex {
  330. m.Term = r.Term
  331. }
  332. }
  333. r.msgs = append(r.msgs, m)
  334. }
  335. // sendAppend sends RPC, with entries to the given peer.
  336. func (r *raft) sendAppend(to uint64) {
  337. pr := r.prs[to]
  338. if pr.IsPaused() {
  339. return
  340. }
  341. m := pb.Message{}
  342. m.To = to
  343. term, errt := r.raftLog.term(pr.Next - 1)
  344. ents, erre := r.raftLog.entries(pr.Next, r.maxMsgSize)
  345. if errt != nil || erre != nil { // send snapshot if we failed to get term or entries
  346. if !pr.RecentActive {
  347. r.logger.Debugf("ignore sending snapshot to %x since it is not recently active", to)
  348. return
  349. }
  350. m.Type = pb.MsgSnap
  351. snapshot, err := r.raftLog.snapshot()
  352. if err != nil {
  353. if err == ErrSnapshotTemporarilyUnavailable {
  354. r.logger.Debugf("%x failed to send snapshot to %x because snapshot is temporarily unavailable", r.id, to)
  355. return
  356. }
  357. panic(err) // TODO(bdarnell)
  358. }
  359. if IsEmptySnap(snapshot) {
  360. panic("need non-empty snapshot")
  361. }
  362. m.Snapshot = snapshot
  363. sindex, sterm := snapshot.Metadata.Index, snapshot.Metadata.Term
  364. r.logger.Debugf("%x [firstindex: %d, commit: %d] sent snapshot[index: %d, term: %d] to %x [%s]",
  365. r.id, r.raftLog.firstIndex(), r.raftLog.committed, sindex, sterm, to, pr)
  366. pr.becomeSnapshot(sindex)
  367. r.logger.Debugf("%x paused sending replication messages to %x [%s]", r.id, to, pr)
  368. } else {
  369. m.Type = pb.MsgApp
  370. m.Index = pr.Next - 1
  371. m.LogTerm = term
  372. m.Entries = ents
  373. m.Commit = r.raftLog.committed
  374. if n := len(m.Entries); n != 0 {
  375. switch pr.State {
  376. // optimistically increase the next when in ProgressStateReplicate
  377. case ProgressStateReplicate:
  378. last := m.Entries[n-1].Index
  379. pr.optimisticUpdate(last)
  380. pr.ins.add(last)
  381. case ProgressStateProbe:
  382. pr.pause()
  383. default:
  384. r.logger.Panicf("%x is sending append in unhandled state %s", r.id, pr.State)
  385. }
  386. }
  387. }
  388. r.send(m)
  389. }
  390. // sendHeartbeat sends an empty MsgApp
  391. func (r *raft) sendHeartbeat(to uint64, ctx []byte) {
  392. // Attach the commit as min(to.matched, r.committed).
  393. // When the leader sends out heartbeat message,
  394. // the receiver(follower) might not be matched with the leader
  395. // or it might not have all the committed entries.
  396. // The leader MUST NOT forward the follower's commit to
  397. // an unmatched index.
  398. commit := min(r.prs[to].Match, r.raftLog.committed)
  399. m := pb.Message{
  400. To: to,
  401. Type: pb.MsgHeartbeat,
  402. Commit: commit,
  403. Context: ctx,
  404. }
  405. r.send(m)
  406. }
  407. // bcastAppend sends RPC, with entries to all peers that are not up-to-date
  408. // according to the progress recorded in r.prs.
  409. func (r *raft) bcastAppend() {
  410. for id := range r.prs {
  411. if id == r.id {
  412. continue
  413. }
  414. r.sendAppend(id)
  415. }
  416. }
  417. // bcastHeartbeat sends RPC, without entries to all the peers.
  418. func (r *raft) bcastHeartbeat() {
  419. lastCtx := r.readOnly.lastPendingRequestCtx()
  420. if len(lastCtx) == 0 {
  421. r.bcastHeartbeatWithCtx(nil)
  422. } else {
  423. r.bcastHeartbeatWithCtx([]byte(lastCtx))
  424. }
  425. }
  426. func (r *raft) bcastHeartbeatWithCtx(ctx []byte) {
  427. for id := range r.prs {
  428. if id == r.id {
  429. continue
  430. }
  431. r.sendHeartbeat(id, ctx)
  432. }
  433. }
  434. // maybeCommit attempts to advance the commit index. Returns true if
  435. // the commit index changed (in which case the caller should call
  436. // r.bcastAppend).
  437. func (r *raft) maybeCommit() bool {
  438. // TODO(bmizerany): optimize.. Currently naive
  439. mis := make(uint64Slice, 0, len(r.prs))
  440. for id := range r.prs {
  441. mis = append(mis, r.prs[id].Match)
  442. }
  443. sort.Sort(sort.Reverse(mis))
  444. mci := mis[r.quorum()-1]
  445. return r.raftLog.maybeCommit(mci, r.Term)
  446. }
  447. func (r *raft) reset(term uint64) {
  448. if r.Term != term {
  449. r.Term = term
  450. r.Vote = None
  451. }
  452. r.lead = None
  453. r.electionElapsed = 0
  454. r.heartbeatElapsed = 0
  455. r.resetRandomizedElectionTimeout()
  456. r.abortLeaderTransfer()
  457. r.votes = make(map[uint64]bool)
  458. for id := range r.prs {
  459. r.prs[id] = &Progress{Next: r.raftLog.lastIndex() + 1, ins: newInflights(r.maxInflight)}
  460. if id == r.id {
  461. r.prs[id].Match = r.raftLog.lastIndex()
  462. }
  463. }
  464. r.pendingConf = false
  465. r.readOnly = newReadOnly(r.readOnly.option)
  466. }
  467. func (r *raft) appendEntry(es ...pb.Entry) {
  468. li := r.raftLog.lastIndex()
  469. for i := range es {
  470. es[i].Term = r.Term
  471. es[i].Index = li + 1 + uint64(i)
  472. }
  473. r.raftLog.append(es...)
  474. r.prs[r.id].maybeUpdate(r.raftLog.lastIndex())
  475. // Regardless of maybeCommit's return, our caller will call bcastAppend.
  476. r.maybeCommit()
  477. }
  478. // tickElection is run by followers and candidates after r.electionTimeout.
  479. func (r *raft) tickElection() {
  480. r.electionElapsed++
  481. if r.promotable() && r.pastElectionTimeout() {
  482. r.electionElapsed = 0
  483. r.Step(pb.Message{From: r.id, Type: pb.MsgHup})
  484. }
  485. }
  486. // tickHeartbeat is run by leaders to send a MsgBeat after r.heartbeatTimeout.
  487. func (r *raft) tickHeartbeat() {
  488. r.heartbeatElapsed++
  489. r.electionElapsed++
  490. if r.electionElapsed >= r.electionTimeout {
  491. r.electionElapsed = 0
  492. if r.checkQuorum {
  493. r.Step(pb.Message{From: r.id, Type: pb.MsgCheckQuorum})
  494. }
  495. // If current leader cannot transfer leadership in electionTimeout, it becomes leader again.
  496. if r.state == StateLeader && r.leadTransferee != None {
  497. r.abortLeaderTransfer()
  498. }
  499. }
  500. if r.state != StateLeader {
  501. return
  502. }
  503. if r.heartbeatElapsed >= r.heartbeatTimeout {
  504. r.heartbeatElapsed = 0
  505. r.Step(pb.Message{From: r.id, Type: pb.MsgBeat})
  506. }
  507. }
  508. func (r *raft) becomeFollower(term uint64, lead uint64) {
  509. r.step = stepFollower
  510. r.reset(term)
  511. r.tick = r.tickElection
  512. r.lead = lead
  513. r.state = StateFollower
  514. r.logger.Infof("%x became follower at term %d", r.id, r.Term)
  515. }
  516. func (r *raft) becomeCandidate() {
  517. // TODO(xiangli) remove the panic when the raft implementation is stable
  518. if r.state == StateLeader {
  519. panic("invalid transition [leader -> candidate]")
  520. }
  521. r.step = stepCandidate
  522. r.reset(r.Term + 1)
  523. r.tick = r.tickElection
  524. r.Vote = r.id
  525. r.state = StateCandidate
  526. r.logger.Infof("%x became candidate at term %d", r.id, r.Term)
  527. }
  528. func (r *raft) becomePreCandidate() {
  529. // TODO(xiangli) remove the panic when the raft implementation is stable
  530. if r.state == StateLeader {
  531. panic("invalid transition [leader -> pre-candidate]")
  532. }
  533. // Becoming a pre-candidate changes our step functions and state,
  534. // but doesn't change anything else. In particular it does not increase
  535. // r.Term or change r.Vote.
  536. r.step = stepCandidate
  537. r.votes = make(map[uint64]bool)
  538. r.tick = r.tickElection
  539. r.state = StatePreCandidate
  540. r.logger.Infof("%x became pre-candidate at term %d", r.id, r.Term)
  541. }
  542. func (r *raft) becomeLeader() {
  543. // TODO(xiangli) remove the panic when the raft implementation is stable
  544. if r.state == StateFollower {
  545. panic("invalid transition [follower -> leader]")
  546. }
  547. r.step = stepLeader
  548. r.reset(r.Term)
  549. r.tick = r.tickHeartbeat
  550. r.lead = r.id
  551. r.state = StateLeader
  552. ents, err := r.raftLog.entries(r.raftLog.committed+1, noLimit)
  553. if err != nil {
  554. r.logger.Panicf("unexpected error getting uncommitted entries (%v)", err)
  555. }
  556. nconf := numOfPendingConf(ents)
  557. if nconf > 1 {
  558. panic("unexpected multiple uncommitted config entry")
  559. }
  560. if nconf == 1 {
  561. r.pendingConf = true
  562. }
  563. r.appendEntry(pb.Entry{Data: nil})
  564. r.logger.Infof("%x became leader at term %d", r.id, r.Term)
  565. }
  566. func (r *raft) campaign(t CampaignType) {
  567. var term uint64
  568. var voteMsg pb.MessageType
  569. if t == campaignPreElection {
  570. r.becomePreCandidate()
  571. voteMsg = pb.MsgPreVote
  572. // PreVote RPCs are sent for the next term before we've incremented r.Term.
  573. term = r.Term + 1
  574. } else {
  575. r.becomeCandidate()
  576. voteMsg = pb.MsgVote
  577. term = r.Term
  578. }
  579. if r.quorum() == r.poll(r.id, voteRespMsgType(voteMsg), true) {
  580. // We won the election after voting for ourselves (which must mean that
  581. // this is a single-node cluster). Advance to the next state.
  582. if t == campaignPreElection {
  583. r.campaign(campaignElection)
  584. } else {
  585. r.becomeLeader()
  586. }
  587. return
  588. }
  589. for id := range r.prs {
  590. if id == r.id {
  591. continue
  592. }
  593. r.logger.Infof("%x [logterm: %d, index: %d] sent %s request to %x at term %d",
  594. r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), voteMsg, id, r.Term)
  595. var ctx []byte
  596. if t == campaignTransfer {
  597. ctx = []byte(t)
  598. }
  599. r.send(pb.Message{Term: term, To: id, Type: voteMsg, Index: r.raftLog.lastIndex(), LogTerm: r.raftLog.lastTerm(), Context: ctx})
  600. }
  601. }
  602. func (r *raft) poll(id uint64, t pb.MessageType, v bool) (granted int) {
  603. if v {
  604. r.logger.Infof("%x received %s from %x at term %d", r.id, t, id, r.Term)
  605. } else {
  606. r.logger.Infof("%x received %s rejection from %x at term %d", r.id, t, id, r.Term)
  607. }
  608. if _, ok := r.votes[id]; !ok {
  609. r.votes[id] = v
  610. }
  611. for _, vv := range r.votes {
  612. if vv {
  613. granted++
  614. }
  615. }
  616. return granted
  617. }
  618. func (r *raft) Step(m pb.Message) error {
  619. // Handle the message term, which may result in our stepping down to a follower.
  620. switch {
  621. case m.Term == 0:
  622. // local message
  623. case m.Term > r.Term:
  624. if m.Type == pb.MsgVote || m.Type == pb.MsgPreVote {
  625. force := bytes.Equal(m.Context, []byte(campaignTransfer))
  626. inLease := r.checkQuorum && r.lead != None && r.electionElapsed < r.electionTimeout
  627. if !force && inLease {
  628. // If a server receives a RequestVote request within the minimum election timeout
  629. // of hearing from a current leader, it does not update its term or grant its vote
  630. r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] ignored %s from %x [logterm: %d, index: %d] at term %d: lease is not expired (remaining ticks: %d)",
  631. r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.Type, m.From, m.LogTerm, m.Index, r.Term, r.electionTimeout-r.electionElapsed)
  632. return nil
  633. }
  634. }
  635. switch {
  636. case m.Type == pb.MsgPreVote:
  637. // Never change our term in response to a PreVote
  638. case m.Type == pb.MsgPreVoteResp && !m.Reject:
  639. // We send pre-vote requests with a term in our future. If the
  640. // pre-vote is granted, we will increment our term when we get a
  641. // quorum. If it is not, the term comes from the node that
  642. // rejected our vote so we should become a follower at the new
  643. // term.
  644. default:
  645. r.logger.Infof("%x [term: %d] received a %s message with higher term from %x [term: %d]",
  646. r.id, r.Term, m.Type, m.From, m.Term)
  647. if m.Type == pb.MsgApp || m.Type == pb.MsgHeartbeat || m.Type == pb.MsgSnap {
  648. r.becomeFollower(m.Term, m.From)
  649. } else {
  650. r.becomeFollower(m.Term, None)
  651. }
  652. }
  653. case m.Term < r.Term:
  654. if r.checkQuorum && (m.Type == pb.MsgHeartbeat || m.Type == pb.MsgApp) {
  655. // We have received messages from a leader at a lower term. It is possible
  656. // that these messages were simply delayed in the network, but this could
  657. // also mean that this node has advanced its term number during a network
  658. // partition, and it is now unable to either win an election or to rejoin
  659. // the majority on the old term. If checkQuorum is false, this will be
  660. // handled by incrementing term numbers in response to MsgVote with a
  661. // higher term, but if checkQuorum is true we may not advance the term on
  662. // MsgVote and must generate other messages to advance the term. The net
  663. // result of these two features is to minimize the disruption caused by
  664. // nodes that have been removed from the cluster's configuration: a
  665. // removed node will send MsgVotes (or MsgPreVotes) which will be ignored,
  666. // but it will not receive MsgApp or MsgHeartbeat, so it will not create
  667. // disruptive term increases
  668. r.send(pb.Message{To: m.From, Type: pb.MsgAppResp})
  669. } else {
  670. // ignore other cases
  671. r.logger.Infof("%x [term: %d] ignored a %s message with lower term from %x [term: %d]",
  672. r.id, r.Term, m.Type, m.From, m.Term)
  673. }
  674. return nil
  675. }
  676. switch m.Type {
  677. case pb.MsgHup:
  678. if r.state != StateLeader {
  679. ents, err := r.raftLog.slice(r.raftLog.applied+1, r.raftLog.committed+1, noLimit)
  680. if err != nil {
  681. r.logger.Panicf("unexpected error getting unapplied entries (%v)", err)
  682. }
  683. if n := numOfPendingConf(ents); n != 0 && r.raftLog.committed > r.raftLog.applied {
  684. r.logger.Warningf("%x cannot campaign at term %d since there are still %d pending configuration changes to apply", r.id, r.Term, n)
  685. return nil
  686. }
  687. r.logger.Infof("%x is starting a new election at term %d", r.id, r.Term)
  688. if r.preVote {
  689. r.campaign(campaignPreElection)
  690. } else {
  691. r.campaign(campaignElection)
  692. }
  693. } else {
  694. r.logger.Debugf("%x ignoring MsgHup because already leader", r.id)
  695. }
  696. case pb.MsgVote, pb.MsgPreVote:
  697. // The m.Term > r.Term clause is for MsgPreVote. For MsgVote m.Term should
  698. // always equal r.Term.
  699. if (r.Vote == None || m.Term > r.Term || r.Vote == m.From) && r.raftLog.isUpToDate(m.Index, m.LogTerm) {
  700. r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] cast %s for %x [logterm: %d, index: %d] at term %d",
  701. r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.Type, m.From, m.LogTerm, m.Index, r.Term)
  702. // When responding to Msg{Pre,}Vote messages we include the term
  703. // from the message, not the local term. To see why consider the
  704. // case where a single node was previously partitioned away and
  705. // it's local term is now of date. If we include the local term
  706. // (recall that for pre-votes we don't update the local term), the
  707. // (pre-)campaigning node on the other end will proceed to ignore
  708. // the message (it ignores all out of date messages).
  709. // The term in the original message and current local term are the
  710. // same in the case of regular votes, but different for pre-votes.
  711. r.send(pb.Message{To: m.From, Term: m.Term, Type: voteRespMsgType(m.Type)})
  712. if m.Type == pb.MsgVote {
  713. // Only record real votes.
  714. r.electionElapsed = 0
  715. r.Vote = m.From
  716. }
  717. } else {
  718. r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] rejected %s from %x [logterm: %d, index: %d] at term %d",
  719. r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.Type, m.From, m.LogTerm, m.Index, r.Term)
  720. r.send(pb.Message{To: m.From, Term: r.Term, Type: voteRespMsgType(m.Type), Reject: true})
  721. }
  722. default:
  723. r.step(r, m)
  724. }
  725. return nil
  726. }
  727. type stepFunc func(r *raft, m pb.Message)
  728. func stepLeader(r *raft, m pb.Message) {
  729. // These message types do not require any progress for m.From.
  730. switch m.Type {
  731. case pb.MsgBeat:
  732. r.bcastHeartbeat()
  733. return
  734. case pb.MsgCheckQuorum:
  735. if !r.checkQuorumActive() {
  736. r.logger.Warningf("%x stepped down to follower since quorum is not active", r.id)
  737. r.becomeFollower(r.Term, None)
  738. }
  739. return
  740. case pb.MsgProp:
  741. if len(m.Entries) == 0 {
  742. r.logger.Panicf("%x stepped empty MsgProp", r.id)
  743. }
  744. if _, ok := r.prs[r.id]; !ok {
  745. // If we are not currently a member of the range (i.e. this node
  746. // was removed from the configuration while serving as leader),
  747. // drop any new proposals.
  748. return
  749. }
  750. if r.leadTransferee != None {
  751. r.logger.Debugf("%x [term %d] transfer leadership to %x is in progress; dropping proposal", r.id, r.Term, r.leadTransferee)
  752. return
  753. }
  754. for i, e := range m.Entries {
  755. if e.Type == pb.EntryConfChange {
  756. if r.pendingConf {
  757. r.logger.Infof("propose conf %s ignored since pending unapplied configuration", e.String())
  758. m.Entries[i] = pb.Entry{Type: pb.EntryNormal}
  759. }
  760. r.pendingConf = true
  761. }
  762. }
  763. r.appendEntry(m.Entries...)
  764. r.bcastAppend()
  765. return
  766. case pb.MsgReadIndex:
  767. if r.quorum() > 1 {
  768. if r.raftLog.zeroTermOnErrCompacted(r.raftLog.term(r.raftLog.committed)) != r.Term {
  769. // Reject read only request when this leader has not committed any log entry at its term.
  770. return
  771. }
  772. // thinking: use an interally defined context instead of the user given context.
  773. // We can express this in terms of the term and index instead of a user-supplied value.
  774. // This would allow multiple reads to piggyback on the same message.
  775. switch r.readOnly.option {
  776. case ReadOnlySafe:
  777. r.readOnly.addRequest(r.raftLog.committed, m)
  778. r.bcastHeartbeatWithCtx(m.Entries[0].Data)
  779. case ReadOnlyLeaseBased:
  780. ri := r.raftLog.committed
  781. if m.From == None || m.From == r.id { // from local member
  782. r.readStates = append(r.readStates, ReadState{Index: r.raftLog.committed, RequestCtx: m.Entries[0].Data})
  783. } else {
  784. r.send(pb.Message{To: m.From, Type: pb.MsgReadIndexResp, Index: ri, Entries: m.Entries})
  785. }
  786. }
  787. } else {
  788. r.readStates = append(r.readStates, ReadState{Index: r.raftLog.committed, RequestCtx: m.Entries[0].Data})
  789. }
  790. return
  791. }
  792. // All other message types require a progress for m.From (pr).
  793. pr, prOk := r.prs[m.From]
  794. if !prOk {
  795. r.logger.Debugf("%x no progress available for %x", r.id, m.From)
  796. return
  797. }
  798. switch m.Type {
  799. case pb.MsgAppResp:
  800. pr.RecentActive = true
  801. if m.Reject {
  802. r.logger.Debugf("%x received msgApp rejection(lastindex: %d) from %x for index %d",
  803. r.id, m.RejectHint, m.From, m.Index)
  804. if pr.maybeDecrTo(m.Index, m.RejectHint) {
  805. r.logger.Debugf("%x decreased progress of %x to [%s]", r.id, m.From, pr)
  806. if pr.State == ProgressStateReplicate {
  807. pr.becomeProbe()
  808. }
  809. r.sendAppend(m.From)
  810. }
  811. } else {
  812. oldPaused := pr.IsPaused()
  813. if pr.maybeUpdate(m.Index) {
  814. switch {
  815. case pr.State == ProgressStateProbe:
  816. pr.becomeReplicate()
  817. case pr.State == ProgressStateSnapshot && pr.needSnapshotAbort():
  818. r.logger.Debugf("%x snapshot aborted, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
  819. pr.becomeProbe()
  820. case pr.State == ProgressStateReplicate:
  821. pr.ins.freeTo(m.Index)
  822. }
  823. if r.maybeCommit() {
  824. r.bcastAppend()
  825. } else if oldPaused {
  826. // update() reset the wait state on this node. If we had delayed sending
  827. // an update before, send it now.
  828. r.sendAppend(m.From)
  829. }
  830. // Transfer leadership is in progress.
  831. if m.From == r.leadTransferee && pr.Match == r.raftLog.lastIndex() {
  832. r.logger.Infof("%x sent MsgTimeoutNow to %x after received MsgAppResp", r.id, m.From)
  833. r.sendTimeoutNow(m.From)
  834. }
  835. }
  836. }
  837. case pb.MsgHeartbeatResp:
  838. pr.RecentActive = true
  839. pr.resume()
  840. // free one slot for the full inflights window to allow progress.
  841. if pr.State == ProgressStateReplicate && pr.ins.full() {
  842. pr.ins.freeFirstOne()
  843. }
  844. if pr.Match < r.raftLog.lastIndex() {
  845. r.sendAppend(m.From)
  846. }
  847. if r.readOnly.option != ReadOnlySafe || len(m.Context) == 0 {
  848. return
  849. }
  850. ackCount := r.readOnly.recvAck(m)
  851. if ackCount < r.quorum() {
  852. return
  853. }
  854. rss := r.readOnly.advance(m)
  855. for _, rs := range rss {
  856. req := rs.req
  857. if req.From == None || req.From == r.id { // from local member
  858. r.readStates = append(r.readStates, ReadState{Index: rs.index, RequestCtx: req.Entries[0].Data})
  859. } else {
  860. r.send(pb.Message{To: req.From, Type: pb.MsgReadIndexResp, Index: rs.index, Entries: req.Entries})
  861. }
  862. }
  863. case pb.MsgSnapStatus:
  864. if pr.State != ProgressStateSnapshot {
  865. return
  866. }
  867. if !m.Reject {
  868. pr.becomeProbe()
  869. r.logger.Debugf("%x snapshot succeeded, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
  870. } else {
  871. pr.snapshotFailure()
  872. pr.becomeProbe()
  873. r.logger.Debugf("%x snapshot failed, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
  874. }
  875. // If snapshot finish, wait for the msgAppResp from the remote node before sending
  876. // out the next msgApp.
  877. // If snapshot failure, wait for a heartbeat interval before next try
  878. pr.pause()
  879. case pb.MsgUnreachable:
  880. // During optimistic replication, if the remote becomes unreachable,
  881. // there is huge probability that a MsgApp is lost.
  882. if pr.State == ProgressStateReplicate {
  883. pr.becomeProbe()
  884. }
  885. r.logger.Debugf("%x failed to send message to %x because it is unreachable [%s]", r.id, m.From, pr)
  886. case pb.MsgTransferLeader:
  887. leadTransferee := m.From
  888. lastLeadTransferee := r.leadTransferee
  889. if lastLeadTransferee != None {
  890. if lastLeadTransferee == leadTransferee {
  891. r.logger.Infof("%x [term %d] transfer leadership to %x is in progress, ignores request to same node %x",
  892. r.id, r.Term, leadTransferee, leadTransferee)
  893. return
  894. }
  895. r.abortLeaderTransfer()
  896. r.logger.Infof("%x [term %d] abort previous transferring leadership to %x", r.id, r.Term, lastLeadTransferee)
  897. }
  898. if leadTransferee == r.id {
  899. r.logger.Debugf("%x is already leader. Ignored transferring leadership to self", r.id)
  900. return
  901. }
  902. // Transfer leadership to third party.
  903. r.logger.Infof("%x [term %d] starts to transfer leadership to %x", r.id, r.Term, leadTransferee)
  904. // Transfer leadership should be finished in one electionTimeout, so reset r.electionElapsed.
  905. r.electionElapsed = 0
  906. r.leadTransferee = leadTransferee
  907. if pr.Match == r.raftLog.lastIndex() {
  908. r.sendTimeoutNow(leadTransferee)
  909. r.logger.Infof("%x sends MsgTimeoutNow to %x immediately as %x already has up-to-date log", r.id, leadTransferee, leadTransferee)
  910. } else {
  911. r.sendAppend(leadTransferee)
  912. }
  913. }
  914. }
  915. // stepCandidate is shared by StateCandidate and StatePreCandidate; the difference is
  916. // whether they respond to MsgVoteResp or MsgPreVoteResp.
  917. func stepCandidate(r *raft, m pb.Message) {
  918. // Only handle vote responses corresponding to our candidacy (while in
  919. // StateCandidate, we may get stale MsgPreVoteResp messages in this term from
  920. // our pre-candidate state).
  921. var myVoteRespType pb.MessageType
  922. if r.state == StatePreCandidate {
  923. myVoteRespType = pb.MsgPreVoteResp
  924. } else {
  925. myVoteRespType = pb.MsgVoteResp
  926. }
  927. switch m.Type {
  928. case pb.MsgProp:
  929. r.logger.Infof("%x no leader at term %d; dropping proposal", r.id, r.Term)
  930. return
  931. case pb.MsgApp:
  932. r.becomeFollower(r.Term, m.From)
  933. r.handleAppendEntries(m)
  934. case pb.MsgHeartbeat:
  935. r.becomeFollower(r.Term, m.From)
  936. r.handleHeartbeat(m)
  937. case pb.MsgSnap:
  938. r.becomeFollower(m.Term, m.From)
  939. r.handleSnapshot(m)
  940. case myVoteRespType:
  941. gr := r.poll(m.From, m.Type, !m.Reject)
  942. r.logger.Infof("%x [quorum:%d] has received %d %s votes and %d vote rejections", r.id, r.quorum(), gr, m.Type, len(r.votes)-gr)
  943. switch r.quorum() {
  944. case gr:
  945. if r.state == StatePreCandidate {
  946. r.campaign(campaignElection)
  947. } else {
  948. r.becomeLeader()
  949. r.bcastAppend()
  950. }
  951. case len(r.votes) - gr:
  952. r.becomeFollower(r.Term, None)
  953. }
  954. case pb.MsgTimeoutNow:
  955. r.logger.Debugf("%x [term %d state %v] ignored MsgTimeoutNow from %x", r.id, r.Term, r.state, m.From)
  956. }
  957. }
  958. func stepFollower(r *raft, m pb.Message) {
  959. switch m.Type {
  960. case pb.MsgProp:
  961. if r.lead == None {
  962. r.logger.Infof("%x no leader at term %d; dropping proposal", r.id, r.Term)
  963. return
  964. } else if r.disableProposalForwarding {
  965. r.logger.Infof("%x not forwarding to leader %x at term %d; dropping proposal", r.id, r.lead, r.Term)
  966. return
  967. }
  968. m.To = r.lead
  969. r.send(m)
  970. case pb.MsgApp:
  971. r.electionElapsed = 0
  972. r.lead = m.From
  973. r.handleAppendEntries(m)
  974. case pb.MsgHeartbeat:
  975. r.electionElapsed = 0
  976. r.lead = m.From
  977. r.handleHeartbeat(m)
  978. case pb.MsgSnap:
  979. r.electionElapsed = 0
  980. r.lead = m.From
  981. r.handleSnapshot(m)
  982. case pb.MsgTransferLeader:
  983. if r.lead == None {
  984. r.logger.Infof("%x no leader at term %d; dropping leader transfer msg", r.id, r.Term)
  985. return
  986. }
  987. m.To = r.lead
  988. r.send(m)
  989. case pb.MsgTimeoutNow:
  990. if r.promotable() {
  991. r.logger.Infof("%x [term %d] received MsgTimeoutNow from %x and starts an election to get leadership.", r.id, r.Term, m.From)
  992. // Leadership transfers never use pre-vote even if r.preVote is true; we
  993. // know we are not recovering from a partition so there is no need for the
  994. // extra round trip.
  995. r.campaign(campaignTransfer)
  996. } else {
  997. r.logger.Infof("%x received MsgTimeoutNow from %x but is not promotable", r.id, m.From)
  998. }
  999. case pb.MsgReadIndex:
  1000. if r.lead == None {
  1001. r.logger.Infof("%x no leader at term %d; dropping index reading msg", r.id, r.Term)
  1002. return
  1003. }
  1004. m.To = r.lead
  1005. r.send(m)
  1006. case pb.MsgReadIndexResp:
  1007. if len(m.Entries) != 1 {
  1008. r.logger.Errorf("%x invalid format of MsgReadIndexResp from %x, entries count: %d", r.id, m.From, len(m.Entries))
  1009. return
  1010. }
  1011. r.readStates = append(r.readStates, ReadState{Index: m.Index, RequestCtx: m.Entries[0].Data})
  1012. }
  1013. }
  1014. func (r *raft) handleAppendEntries(m pb.Message) {
  1015. if m.Index < r.raftLog.committed {
  1016. r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.committed})
  1017. return
  1018. }
  1019. if mlastIndex, ok := r.raftLog.maybeAppend(m.Index, m.LogTerm, m.Commit, m.Entries...); ok {
  1020. r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: mlastIndex})
  1021. } else {
  1022. r.logger.Debugf("%x [logterm: %d, index: %d] rejected msgApp [logterm: %d, index: %d] from %x",
  1023. r.id, r.raftLog.zeroTermOnErrCompacted(r.raftLog.term(m.Index)), m.Index, m.LogTerm, m.Index, m.From)
  1024. r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: m.Index, Reject: true, RejectHint: r.raftLog.lastIndex()})
  1025. }
  1026. }
  1027. func (r *raft) handleHeartbeat(m pb.Message) {
  1028. r.raftLog.commitTo(m.Commit)
  1029. r.send(pb.Message{To: m.From, Type: pb.MsgHeartbeatResp, Context: m.Context})
  1030. }
  1031. func (r *raft) handleSnapshot(m pb.Message) {
  1032. sindex, sterm := m.Snapshot.Metadata.Index, m.Snapshot.Metadata.Term
  1033. if r.restore(m.Snapshot) {
  1034. r.logger.Infof("%x [commit: %d] restored snapshot [index: %d, term: %d]",
  1035. r.id, r.raftLog.committed, sindex, sterm)
  1036. r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.lastIndex()})
  1037. } else {
  1038. r.logger.Infof("%x [commit: %d] ignored snapshot [index: %d, term: %d]",
  1039. r.id, r.raftLog.committed, sindex, sterm)
  1040. r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.committed})
  1041. }
  1042. }
  1043. // restore recovers the state machine from a snapshot. It restores the log and the
  1044. // configuration of state machine.
  1045. func (r *raft) restore(s pb.Snapshot) bool {
  1046. if s.Metadata.Index <= r.raftLog.committed {
  1047. return false
  1048. }
  1049. if r.raftLog.matchTerm(s.Metadata.Index, s.Metadata.Term) {
  1050. r.logger.Infof("%x [commit: %d, lastindex: %d, lastterm: %d] fast-forwarded commit to snapshot [index: %d, term: %d]",
  1051. r.id, r.raftLog.committed, r.raftLog.lastIndex(), r.raftLog.lastTerm(), s.Metadata.Index, s.Metadata.Term)
  1052. r.raftLog.commitTo(s.Metadata.Index)
  1053. return false
  1054. }
  1055. r.logger.Infof("%x [commit: %d, lastindex: %d, lastterm: %d] starts to restore snapshot [index: %d, term: %d]",
  1056. r.id, r.raftLog.committed, r.raftLog.lastIndex(), r.raftLog.lastTerm(), s.Metadata.Index, s.Metadata.Term)
  1057. r.raftLog.restore(s)
  1058. r.prs = make(map[uint64]*Progress)
  1059. for _, n := range s.Metadata.ConfState.Nodes {
  1060. match, next := uint64(0), r.raftLog.lastIndex()+1
  1061. if n == r.id {
  1062. match = next - 1
  1063. }
  1064. r.setProgress(n, match, next)
  1065. r.logger.Infof("%x restored progress of %x [%s]", r.id, n, r.prs[n])
  1066. }
  1067. return true
  1068. }
  1069. // promotable indicates whether state machine can be promoted to leader,
  1070. // which is true when its own id is in progress list.
  1071. func (r *raft) promotable() bool {
  1072. _, ok := r.prs[r.id]
  1073. return ok
  1074. }
  1075. func (r *raft) addNode(id uint64) {
  1076. r.pendingConf = false
  1077. if _, ok := r.prs[id]; ok {
  1078. // Ignore any redundant addNode calls (which can happen because the
  1079. // initial bootstrapping entries are applied twice).
  1080. return
  1081. }
  1082. r.setProgress(id, 0, r.raftLog.lastIndex()+1)
  1083. // When a node is first added, we should mark it as recently active.
  1084. // Otherwise, CheckQuorum may cause us to step down if it is invoked
  1085. // before the added node has a chance to communicate with us.
  1086. r.prs[id].RecentActive = true
  1087. }
  1088. func (r *raft) removeNode(id uint64) {
  1089. r.delProgress(id)
  1090. r.pendingConf = false
  1091. // do not try to commit or abort transferring if there is no nodes in the cluster.
  1092. if len(r.prs) == 0 {
  1093. return
  1094. }
  1095. // The quorum size is now smaller, so see if any pending entries can
  1096. // be committed.
  1097. if r.maybeCommit() {
  1098. r.bcastAppend()
  1099. }
  1100. // If the removed node is the leadTransferee, then abort the leadership transferring.
  1101. if r.state == StateLeader && r.leadTransferee == id {
  1102. r.abortLeaderTransfer()
  1103. }
  1104. }
  1105. func (r *raft) resetPendingConf() { r.pendingConf = false }
  1106. func (r *raft) setProgress(id, match, next uint64) {
  1107. r.prs[id] = &Progress{Next: next, Match: match, ins: newInflights(r.maxInflight)}
  1108. }
  1109. func (r *raft) delProgress(id uint64) {
  1110. delete(r.prs, id)
  1111. }
  1112. func (r *raft) loadState(state pb.HardState) {
  1113. if state.Commit < r.raftLog.committed || state.Commit > r.raftLog.lastIndex() {
  1114. r.logger.Panicf("%x state.commit %d is out of range [%d, %d]", r.id, state.Commit, r.raftLog.committed, r.raftLog.lastIndex())
  1115. }
  1116. r.raftLog.committed = state.Commit
  1117. r.Term = state.Term
  1118. r.Vote = state.Vote
  1119. }
  1120. // pastElectionTimeout returns true iff r.electionElapsed is greater
  1121. // than or equal to the randomized election timeout in
  1122. // [electiontimeout, 2 * electiontimeout - 1].
  1123. func (r *raft) pastElectionTimeout() bool {
  1124. return r.electionElapsed >= r.randomizedElectionTimeout
  1125. }
  1126. func (r *raft) resetRandomizedElectionTimeout() {
  1127. r.randomizedElectionTimeout = r.electionTimeout + globalRand.Intn(r.electionTimeout)
  1128. }
  1129. // checkQuorumActive returns true if the quorum is active from
  1130. // the view of the local raft state machine. Otherwise, it returns
  1131. // false.
  1132. // checkQuorumActive also resets all RecentActive to false.
  1133. func (r *raft) checkQuorumActive() bool {
  1134. var act int
  1135. for id := range r.prs {
  1136. if id == r.id { // self is always active
  1137. act++
  1138. continue
  1139. }
  1140. if r.prs[id].RecentActive {
  1141. act++
  1142. }
  1143. r.prs[id].RecentActive = false
  1144. }
  1145. return act >= r.quorum()
  1146. }
  1147. func (r *raft) sendTimeoutNow(to uint64) {
  1148. r.send(pb.Message{To: to, Type: pb.MsgTimeoutNow})
  1149. }
  1150. func (r *raft) abortLeaderTransfer() {
  1151. r.leadTransferee = None
  1152. }
  1153. func numOfPendingConf(ents []pb.Entry) int {
  1154. n := 0
  1155. for i := range ents {
  1156. if ents[i].Type == pb.EntryConfChange {
  1157. n++
  1158. }
  1159. }
  1160. return n
  1161. }