server.go 39 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473
  1. package raft
  2. import (
  3. "encoding/json"
  4. "errors"
  5. "fmt"
  6. "hash/crc32"
  7. "io/ioutil"
  8. "os"
  9. "path"
  10. "sort"
  11. "sync"
  12. "time"
  13. )
  14. //------------------------------------------------------------------------------
  15. //
  16. // Constants
  17. //
  18. //------------------------------------------------------------------------------
  19. const (
  20. Stopped = "stopped"
  21. Initialized = "initialized"
  22. Follower = "follower"
  23. Candidate = "candidate"
  24. Leader = "leader"
  25. Snapshotting = "snapshotting"
  26. )
  27. const (
  28. MaxLogEntriesPerRequest = 2000
  29. NumberOfLogEntriesAfterSnapshot = 200
  30. )
  31. const (
  32. // DefaultHeartbeatInterval is the interval that the leader will send
  33. // AppendEntriesRequests to followers to maintain leadership.
  34. DefaultHeartbeatInterval = 50 * time.Millisecond
  35. DefaultElectionTimeout = 150 * time.Millisecond
  36. )
  37. // ElectionTimeoutThresholdPercent specifies the threshold at which the server
  38. // will dispatch warning events that the heartbeat RTT is too close to the
  39. // election timeout.
  40. const ElectionTimeoutThresholdPercent = 0.8
  41. //------------------------------------------------------------------------------
  42. //
  43. // Errors
  44. //
  45. //------------------------------------------------------------------------------
  46. var NotLeaderError = errors.New("raft.Server: Not current leader")
  47. var DuplicatePeerError = errors.New("raft.Server: Duplicate peer")
  48. var CommandTimeoutError = errors.New("raft: Command timeout")
  49. var StopError = errors.New("raft: Has been stopped")
  50. //------------------------------------------------------------------------------
  51. //
  52. // Typedefs
  53. //
  54. //------------------------------------------------------------------------------
  55. // A server is involved in the consensus protocol and can act as a follower,
  56. // candidate or a leader.
  57. type Server interface {
  58. Name() string
  59. Context() interface{}
  60. StateMachine() StateMachine
  61. Leader() string
  62. State() string
  63. Path() string
  64. LogPath() string
  65. SnapshotPath(lastIndex uint64, lastTerm uint64) string
  66. Term() uint64
  67. CommitIndex() uint64
  68. VotedFor() string
  69. MemberCount() int
  70. QuorumSize() int
  71. IsLogEmpty() bool
  72. LogEntries() []*LogEntry
  73. LastCommandName() string
  74. GetState() string
  75. ElectionTimeout() time.Duration
  76. SetElectionTimeout(duration time.Duration)
  77. HeartbeatInterval() time.Duration
  78. SetHeartbeatInterval(duration time.Duration)
  79. Transporter() Transporter
  80. SetTransporter(t Transporter)
  81. AppendEntries(req *AppendEntriesRequest) *AppendEntriesResponse
  82. RequestVote(req *RequestVoteRequest) *RequestVoteResponse
  83. RequestSnapshot(req *SnapshotRequest) *SnapshotResponse
  84. SnapshotRecoveryRequest(req *SnapshotRecoveryRequest) *SnapshotRecoveryResponse
  85. AddPeer(name string, connectiongString string) error
  86. RemovePeer(name string) error
  87. Peers() map[string]*Peer
  88. Init() error
  89. Start() error
  90. Stop()
  91. Running() bool
  92. Do(command Command) (interface{}, error)
  93. TakeSnapshot() error
  94. LoadSnapshot() error
  95. AddEventListener(string, EventListener)
  96. FlushCommitIndex()
  97. }
  98. type server struct {
  99. *eventDispatcher
  100. name string
  101. path string
  102. state string
  103. transporter Transporter
  104. context interface{}
  105. currentTerm uint64
  106. votedFor string
  107. log *Log
  108. leader string
  109. peers map[string]*Peer
  110. mutex sync.RWMutex
  111. syncedPeer map[string]bool
  112. stopped chan bool
  113. c chan *ev
  114. electionTimeout time.Duration
  115. heartbeatInterval time.Duration
  116. snapshot *Snapshot
  117. // PendingSnapshot is an unfinished snapshot.
  118. // After the pendingSnapshot is saved to disk,
  119. // it will be set to snapshot and also will be
  120. // set to nil.
  121. pendingSnapshot *Snapshot
  122. stateMachine StateMachine
  123. maxLogEntriesPerRequest uint64
  124. connectionString string
  125. routineGroup sync.WaitGroup
  126. }
  127. // An internal event to be processed by the server's event loop.
  128. type ev struct {
  129. target interface{}
  130. returnValue interface{}
  131. c chan error
  132. }
  133. //------------------------------------------------------------------------------
  134. //
  135. // Constructor
  136. //
  137. //------------------------------------------------------------------------------
  138. // Creates a new server with a log at the given path. transporter must
  139. // not be nil. stateMachine can be nil if snapshotting and log
  140. // compaction is to be disabled. context can be anything (including nil)
  141. // and is not used by the raft package except returned by
  142. // Server.Context(). connectionString can be anything.
  143. func NewServer(name string, path string, transporter Transporter, stateMachine StateMachine, ctx interface{}, connectionString string) (Server, error) {
  144. if name == "" {
  145. return nil, errors.New("raft.Server: Name cannot be blank")
  146. }
  147. if transporter == nil {
  148. panic("raft: Transporter required")
  149. }
  150. s := &server{
  151. name: name,
  152. path: path,
  153. transporter: transporter,
  154. stateMachine: stateMachine,
  155. context: ctx,
  156. state: Stopped,
  157. peers: make(map[string]*Peer),
  158. log: newLog(),
  159. c: make(chan *ev, 256),
  160. electionTimeout: DefaultElectionTimeout,
  161. heartbeatInterval: DefaultHeartbeatInterval,
  162. maxLogEntriesPerRequest: MaxLogEntriesPerRequest,
  163. connectionString: connectionString,
  164. }
  165. s.eventDispatcher = newEventDispatcher(s)
  166. // Setup apply function.
  167. s.log.ApplyFunc = func(e *LogEntry, c Command) (interface{}, error) {
  168. // Dispatch commit event.
  169. s.DispatchEvent(newEvent(CommitEventType, e, nil))
  170. // Apply command to the state machine.
  171. switch c := c.(type) {
  172. case CommandApply:
  173. return c.Apply(&context{
  174. server: s,
  175. currentTerm: s.currentTerm,
  176. currentIndex: s.log.internalCurrentIndex(),
  177. commitIndex: s.log.commitIndex,
  178. })
  179. case deprecatedCommandApply:
  180. return c.Apply(s)
  181. default:
  182. return nil, fmt.Errorf("Command does not implement Apply()")
  183. }
  184. }
  185. return s, nil
  186. }
  187. //------------------------------------------------------------------------------
  188. //
  189. // Accessors
  190. //
  191. //------------------------------------------------------------------------------
  192. //--------------------------------------
  193. // General
  194. //--------------------------------------
  195. // Retrieves the name of the server.
  196. func (s *server) Name() string {
  197. return s.name
  198. }
  199. // Retrieves the storage path for the server.
  200. func (s *server) Path() string {
  201. return s.path
  202. }
  203. // The name of the current leader.
  204. func (s *server) Leader() string {
  205. return s.leader
  206. }
  207. // Retrieves a copy of the peer data.
  208. func (s *server) Peers() map[string]*Peer {
  209. s.mutex.Lock()
  210. defer s.mutex.Unlock()
  211. peers := make(map[string]*Peer)
  212. for name, peer := range s.peers {
  213. peers[name] = peer.clone()
  214. }
  215. return peers
  216. }
  217. // Retrieves the object that transports requests.
  218. func (s *server) Transporter() Transporter {
  219. s.mutex.RLock()
  220. defer s.mutex.RUnlock()
  221. return s.transporter
  222. }
  223. func (s *server) SetTransporter(t Transporter) {
  224. s.mutex.Lock()
  225. defer s.mutex.Unlock()
  226. s.transporter = t
  227. }
  228. // Retrieves the context passed into the constructor.
  229. func (s *server) Context() interface{} {
  230. return s.context
  231. }
  232. // Retrieves the state machine passed into the constructor.
  233. func (s *server) StateMachine() StateMachine {
  234. return s.stateMachine
  235. }
  236. // Retrieves the log path for the server.
  237. func (s *server) LogPath() string {
  238. return path.Join(s.path, "log")
  239. }
  240. // Retrieves the current state of the server.
  241. func (s *server) State() string {
  242. s.mutex.RLock()
  243. defer s.mutex.RUnlock()
  244. return s.state
  245. }
  246. // Sets the state of the server.
  247. func (s *server) setState(state string) {
  248. s.mutex.Lock()
  249. defer s.mutex.Unlock()
  250. // Temporarily store previous values.
  251. prevState := s.state
  252. prevLeader := s.leader
  253. // Update state and leader.
  254. s.state = state
  255. if state == Leader {
  256. s.leader = s.Name()
  257. s.syncedPeer = make(map[string]bool)
  258. }
  259. // Dispatch state and leader change events.
  260. s.DispatchEvent(newEvent(StateChangeEventType, s.state, prevState))
  261. if prevLeader != s.leader {
  262. s.DispatchEvent(newEvent(LeaderChangeEventType, s.leader, prevLeader))
  263. }
  264. }
  265. // Retrieves the current term of the server.
  266. func (s *server) Term() uint64 {
  267. s.mutex.RLock()
  268. defer s.mutex.RUnlock()
  269. return s.currentTerm
  270. }
  271. // Retrieves the current commit index of the server.
  272. func (s *server) CommitIndex() uint64 {
  273. s.log.mutex.RLock()
  274. defer s.log.mutex.RUnlock()
  275. return s.log.commitIndex
  276. }
  277. // Retrieves the name of the candidate this server voted for in this term.
  278. func (s *server) VotedFor() string {
  279. return s.votedFor
  280. }
  281. // Retrieves whether the server's log has no entries.
  282. func (s *server) IsLogEmpty() bool {
  283. return s.log.isEmpty()
  284. }
  285. // A list of all the log entries. This should only be used for debugging purposes.
  286. func (s *server) LogEntries() []*LogEntry {
  287. s.log.mutex.RLock()
  288. defer s.log.mutex.RUnlock()
  289. return s.log.entries
  290. }
  291. // A reference to the command name of the last entry.
  292. func (s *server) LastCommandName() string {
  293. return s.log.lastCommandName()
  294. }
  295. // Get the state of the server for debugging
  296. func (s *server) GetState() string {
  297. s.mutex.RLock()
  298. defer s.mutex.RUnlock()
  299. return fmt.Sprintf("Name: %s, State: %s, Term: %v, CommitedIndex: %v ", s.name, s.state, s.currentTerm, s.log.commitIndex)
  300. }
  301. // Check if the server is promotable
  302. func (s *server) promotable() bool {
  303. return s.log.currentIndex() > 0
  304. }
  305. //--------------------------------------
  306. // Membership
  307. //--------------------------------------
  308. // Retrieves the number of member servers in the consensus.
  309. func (s *server) MemberCount() int {
  310. s.mutex.RLock()
  311. defer s.mutex.RUnlock()
  312. return len(s.peers) + 1
  313. }
  314. // Retrieves the number of servers required to make a quorum.
  315. func (s *server) QuorumSize() int {
  316. return (s.MemberCount() / 2) + 1
  317. }
  318. //--------------------------------------
  319. // Election timeout
  320. //--------------------------------------
  321. // Retrieves the election timeout.
  322. func (s *server) ElectionTimeout() time.Duration {
  323. s.mutex.RLock()
  324. defer s.mutex.RUnlock()
  325. return s.electionTimeout
  326. }
  327. // Sets the election timeout.
  328. func (s *server) SetElectionTimeout(duration time.Duration) {
  329. s.mutex.Lock()
  330. defer s.mutex.Unlock()
  331. s.electionTimeout = duration
  332. }
  333. //--------------------------------------
  334. // Heartbeat timeout
  335. //--------------------------------------
  336. // Retrieves the heartbeat timeout.
  337. func (s *server) HeartbeatInterval() time.Duration {
  338. s.mutex.RLock()
  339. defer s.mutex.RUnlock()
  340. return s.heartbeatInterval
  341. }
  342. // Sets the heartbeat timeout.
  343. func (s *server) SetHeartbeatInterval(duration time.Duration) {
  344. s.mutex.Lock()
  345. defer s.mutex.Unlock()
  346. s.heartbeatInterval = duration
  347. for _, peer := range s.peers {
  348. peer.setHeartbeatInterval(duration)
  349. }
  350. }
  351. //------------------------------------------------------------------------------
  352. //
  353. // Methods
  354. //
  355. //------------------------------------------------------------------------------
  356. //--------------------------------------
  357. // Initialization
  358. //--------------------------------------
  359. // Reg the NOPCommand
  360. func init() {
  361. RegisterCommand(&NOPCommand{})
  362. RegisterCommand(&DefaultJoinCommand{})
  363. RegisterCommand(&DefaultLeaveCommand{})
  364. }
  365. // Start the raft server
  366. // If log entries exist then allow promotion to candidate if no AEs received.
  367. // If no log entries exist then wait for AEs from another node.
  368. // If no log entries exist and a self-join command is issued then
  369. // immediately become leader and commit entry.
  370. func (s *server) Start() error {
  371. // Exit if the server is already running.
  372. if s.Running() {
  373. return fmt.Errorf("raft.Server: Server already running[%v]", s.state)
  374. }
  375. if err := s.Init(); err != nil {
  376. return err
  377. }
  378. // stopped needs to be allocated each time server starts
  379. // because it is closed at `Stop`.
  380. s.stopped = make(chan bool)
  381. s.setState(Follower)
  382. // If no log entries exist then
  383. // 1. wait for AEs from another node
  384. // 2. wait for self-join command
  385. // to set itself promotable
  386. if !s.promotable() {
  387. s.debugln("start as a new raft server")
  388. // If log entries exist then allow promotion to candidate
  389. // if no AEs received.
  390. } else {
  391. s.debugln("start from previous saved state")
  392. }
  393. debugln(s.GetState())
  394. s.routineGroup.Add(1)
  395. go func() {
  396. defer s.routineGroup.Done()
  397. s.loop()
  398. }()
  399. return nil
  400. }
  401. // Init initializes the raft server.
  402. // If there is no previous log file under the given path, Init() will create an empty log file.
  403. // Otherwise, Init() will load in the log entries from the log file.
  404. func (s *server) Init() error {
  405. if s.Running() {
  406. return fmt.Errorf("raft.Server: Server already running[%v]", s.state)
  407. }
  408. // Server has been initialized or server was stopped after initialized
  409. // If log has been initialized, we know that the server was stopped after
  410. // running.
  411. if s.state == Initialized || s.log.initialized {
  412. s.state = Initialized
  413. return nil
  414. }
  415. // Create snapshot directory if it does not exist
  416. err := os.Mkdir(path.Join(s.path, "snapshot"), 0700)
  417. if err != nil && !os.IsExist(err) {
  418. s.debugln("raft: Snapshot dir error: ", err)
  419. return fmt.Errorf("raft: Initialization error: %s", err)
  420. }
  421. if err := s.readConf(); err != nil {
  422. s.debugln("raft: Conf file error: ", err)
  423. return fmt.Errorf("raft: Initialization error: %s", err)
  424. }
  425. // Initialize the log and load it up.
  426. if err := s.log.open(s.LogPath()); err != nil {
  427. s.debugln("raft: Log error: ", err)
  428. return fmt.Errorf("raft: Initialization error: %s", err)
  429. }
  430. // Update the term to the last term in the log.
  431. _, s.currentTerm = s.log.lastInfo()
  432. s.state = Initialized
  433. return nil
  434. }
  435. // Shuts down the server.
  436. func (s *server) Stop() {
  437. if s.State() == Stopped {
  438. return
  439. }
  440. close(s.stopped)
  441. // make sure all goroutines have stopped before we close the log
  442. s.routineGroup.Wait()
  443. s.log.close()
  444. s.setState(Stopped)
  445. }
  446. // Checks if the server is currently running.
  447. func (s *server) Running() bool {
  448. s.mutex.RLock()
  449. defer s.mutex.RUnlock()
  450. return (s.state != Stopped && s.state != Initialized)
  451. }
  452. //--------------------------------------
  453. // Term
  454. //--------------------------------------
  455. // updates the current term for the server. This is only used when a larger
  456. // external term is found.
  457. func (s *server) updateCurrentTerm(term uint64, leaderName string) {
  458. _assert(term > s.currentTerm,
  459. "upadteCurrentTerm: update is called when term is not larger than currentTerm")
  460. // Store previous values temporarily.
  461. prevTerm := s.currentTerm
  462. prevLeader := s.leader
  463. // set currentTerm = T, convert to follower (§5.1)
  464. // stop heartbeats before step-down
  465. if s.state == Leader {
  466. for _, peer := range s.peers {
  467. peer.stopHeartbeat(false)
  468. }
  469. }
  470. // update the term and clear vote for
  471. if s.state != Follower {
  472. s.setState(Follower)
  473. }
  474. s.mutex.Lock()
  475. s.currentTerm = term
  476. s.leader = leaderName
  477. s.votedFor = ""
  478. s.mutex.Unlock()
  479. // Dispatch change events.
  480. s.DispatchEvent(newEvent(TermChangeEventType, s.currentTerm, prevTerm))
  481. if prevLeader != s.leader {
  482. s.DispatchEvent(newEvent(LeaderChangeEventType, s.leader, prevLeader))
  483. }
  484. }
  485. //--------------------------------------
  486. // Event Loop
  487. //--------------------------------------
  488. // ________
  489. // --|Snapshot| timeout
  490. // | -------- ______
  491. // recover | ^ | |
  492. // snapshot / | |snapshot | |
  493. // higher | | v | recv majority votes
  494. // term | -------- timeout ----------- -----------
  495. // |-> |Follower| ----------> | Candidate |--------------------> | Leader |
  496. // -------- ----------- -----------
  497. // ^ higher term/ | higher term |
  498. // | new leader | |
  499. // |_______________________|____________________________________ |
  500. // The main event loop for the server
  501. func (s *server) loop() {
  502. defer s.debugln("server.loop.end")
  503. state := s.State()
  504. for state != Stopped {
  505. s.debugln("server.loop.run ", state)
  506. switch state {
  507. case Follower:
  508. s.followerLoop()
  509. case Candidate:
  510. s.candidateLoop()
  511. case Leader:
  512. s.leaderLoop()
  513. case Snapshotting:
  514. s.snapshotLoop()
  515. }
  516. state = s.State()
  517. }
  518. }
  519. // Sends an event to the event loop to be processed. The function will wait
  520. // until the event is actually processed before returning.
  521. func (s *server) send(value interface{}) (interface{}, error) {
  522. if !s.Running() {
  523. return nil, StopError
  524. }
  525. event := &ev{target: value, c: make(chan error, 1)}
  526. select {
  527. case s.c <- event:
  528. case <-s.stopped:
  529. return nil, StopError
  530. }
  531. select {
  532. case <-s.stopped:
  533. return nil, StopError
  534. case err := <-event.c:
  535. return event.returnValue, err
  536. }
  537. }
  538. func (s *server) sendAsync(value interface{}) {
  539. if !s.Running() {
  540. return
  541. }
  542. event := &ev{target: value, c: make(chan error, 1)}
  543. // try a non-blocking send first
  544. // in most cases, this should not be blocking
  545. // avoid create unnecessary go routines
  546. select {
  547. case s.c <- event:
  548. return
  549. default:
  550. }
  551. s.routineGroup.Add(1)
  552. go func() {
  553. defer s.routineGroup.Done()
  554. select {
  555. case s.c <- event:
  556. case <-s.stopped:
  557. }
  558. }()
  559. }
  560. // The event loop that is run when the server is in a Follower state.
  561. // Responds to RPCs from candidates and leaders.
  562. // Converts to candidate if election timeout elapses without either:
  563. // 1.Receiving valid AppendEntries RPC, or
  564. // 2.Granting vote to candidate
  565. func (s *server) followerLoop() {
  566. since := time.Now()
  567. electionTimeout := s.ElectionTimeout()
  568. timeoutChan := afterBetween(s.ElectionTimeout(), s.ElectionTimeout()*2)
  569. for s.State() == Follower {
  570. var err error
  571. update := false
  572. select {
  573. case <-s.stopped:
  574. s.setState(Stopped)
  575. return
  576. case e := <-s.c:
  577. switch req := e.target.(type) {
  578. case JoinCommand:
  579. //If no log entries exist and a self-join command is issued
  580. //then immediately become leader and commit entry.
  581. if s.log.currentIndex() == 0 && req.NodeName() == s.Name() {
  582. s.debugln("selfjoin and promote to leader")
  583. s.setState(Leader)
  584. s.processCommand(req, e)
  585. } else {
  586. err = NotLeaderError
  587. }
  588. case *AppendEntriesRequest:
  589. // If heartbeats get too close to the election timeout then send an event.
  590. elapsedTime := time.Now().Sub(since)
  591. if elapsedTime > time.Duration(float64(electionTimeout)*ElectionTimeoutThresholdPercent) {
  592. s.DispatchEvent(newEvent(ElectionTimeoutThresholdEventType, elapsedTime, nil))
  593. }
  594. e.returnValue, update = s.processAppendEntriesRequest(req)
  595. case *RequestVoteRequest:
  596. e.returnValue, update = s.processRequestVoteRequest(req)
  597. case *SnapshotRequest:
  598. e.returnValue = s.processSnapshotRequest(req)
  599. default:
  600. err = NotLeaderError
  601. }
  602. // Callback to event.
  603. e.c <- err
  604. case <-timeoutChan:
  605. // only allow synced follower to promote to candidate
  606. if s.promotable() {
  607. s.setState(Candidate)
  608. } else {
  609. update = true
  610. }
  611. }
  612. // Converts to candidate if election timeout elapses without either:
  613. // 1.Receiving valid AppendEntries RPC, or
  614. // 2.Granting vote to candidate
  615. if update {
  616. since = time.Now()
  617. timeoutChan = afterBetween(s.ElectionTimeout(), s.ElectionTimeout()*2)
  618. }
  619. }
  620. }
  621. // The event loop that is run when the server is in a Candidate state.
  622. func (s *server) candidateLoop() {
  623. // Clear leader value.
  624. prevLeader := s.leader
  625. s.leader = ""
  626. if prevLeader != s.leader {
  627. s.DispatchEvent(newEvent(LeaderChangeEventType, s.leader, prevLeader))
  628. }
  629. lastLogIndex, lastLogTerm := s.log.lastInfo()
  630. doVote := true
  631. votesGranted := 0
  632. var timeoutChan <-chan time.Time
  633. var respChan chan *RequestVoteResponse
  634. for s.State() == Candidate {
  635. if doVote {
  636. // Increment current term, vote for self.
  637. s.currentTerm++
  638. s.votedFor = s.name
  639. // Send RequestVote RPCs to all other servers.
  640. respChan = make(chan *RequestVoteResponse, len(s.peers))
  641. for _, peer := range s.peers {
  642. s.routineGroup.Add(1)
  643. go func(peer *Peer) {
  644. defer s.routineGroup.Done()
  645. peer.sendVoteRequest(newRequestVoteRequest(s.currentTerm, s.name, lastLogIndex, lastLogTerm), respChan)
  646. }(peer)
  647. }
  648. // Wait for either:
  649. // * Votes received from majority of servers: become leader
  650. // * AppendEntries RPC received from new leader: step down.
  651. // * Election timeout elapses without election resolution: increment term, start new election
  652. // * Discover higher term: step down (§5.1)
  653. votesGranted = 1
  654. timeoutChan = afterBetween(s.ElectionTimeout(), s.ElectionTimeout()*2)
  655. doVote = false
  656. }
  657. // If we received enough votes then stop waiting for more votes.
  658. // And return from the candidate loop
  659. if votesGranted == s.QuorumSize() {
  660. s.debugln("server.candidate.recv.enough.votes")
  661. s.setState(Leader)
  662. return
  663. }
  664. // Collect votes from peers.
  665. select {
  666. case <-s.stopped:
  667. s.setState(Stopped)
  668. return
  669. case resp := <-respChan:
  670. if success := s.processVoteResponse(resp); success {
  671. s.debugln("server.candidate.vote.granted: ", votesGranted)
  672. votesGranted++
  673. }
  674. case e := <-s.c:
  675. var err error
  676. switch req := e.target.(type) {
  677. case Command:
  678. err = NotLeaderError
  679. case *AppendEntriesRequest:
  680. e.returnValue, _ = s.processAppendEntriesRequest(req)
  681. case *RequestVoteRequest:
  682. e.returnValue, _ = s.processRequestVoteRequest(req)
  683. }
  684. // Callback to event.
  685. e.c <- err
  686. case <-timeoutChan:
  687. doVote = true
  688. }
  689. }
  690. }
  691. // The event loop that is run when the server is in a Leader state.
  692. func (s *server) leaderLoop() {
  693. logIndex, _ := s.log.lastInfo()
  694. // Update the peers prevLogIndex to leader's lastLogIndex and start heartbeat.
  695. s.debugln("leaderLoop.set.PrevIndex to ", logIndex)
  696. for _, peer := range s.peers {
  697. peer.setPrevLogIndex(logIndex)
  698. peer.startHeartbeat()
  699. }
  700. // Commit a NOP after the server becomes leader. From the Raft paper:
  701. // "Upon election: send initial empty AppendEntries RPCs (heartbeat) to
  702. // each server; repeat during idle periods to prevent election timeouts
  703. // (§5.2)". The heartbeats started above do the "idle" period work.
  704. s.routineGroup.Add(1)
  705. go func() {
  706. defer s.routineGroup.Done()
  707. s.Do(NOPCommand{})
  708. }()
  709. // Begin to collect response from followers
  710. for s.State() == Leader {
  711. var err error
  712. select {
  713. case <-s.stopped:
  714. // Stop all peers before stop
  715. for _, peer := range s.peers {
  716. peer.stopHeartbeat(false)
  717. }
  718. s.setState(Stopped)
  719. return
  720. case e := <-s.c:
  721. switch req := e.target.(type) {
  722. case Command:
  723. s.processCommand(req, e)
  724. continue
  725. case *AppendEntriesRequest:
  726. e.returnValue, _ = s.processAppendEntriesRequest(req)
  727. case *AppendEntriesResponse:
  728. s.processAppendEntriesResponse(req)
  729. case *RequestVoteRequest:
  730. e.returnValue, _ = s.processRequestVoteRequest(req)
  731. }
  732. // Callback to event.
  733. e.c <- err
  734. }
  735. }
  736. s.syncedPeer = nil
  737. }
  738. func (s *server) snapshotLoop() {
  739. for s.State() == Snapshotting {
  740. var err error
  741. select {
  742. case <-s.stopped:
  743. s.setState(Stopped)
  744. return
  745. case e := <-s.c:
  746. switch req := e.target.(type) {
  747. case Command:
  748. err = NotLeaderError
  749. case *AppendEntriesRequest:
  750. e.returnValue, _ = s.processAppendEntriesRequest(req)
  751. case *RequestVoteRequest:
  752. e.returnValue, _ = s.processRequestVoteRequest(req)
  753. case *SnapshotRecoveryRequest:
  754. e.returnValue = s.processSnapshotRecoveryRequest(req)
  755. }
  756. // Callback to event.
  757. e.c <- err
  758. }
  759. }
  760. }
  761. //--------------------------------------
  762. // Commands
  763. //--------------------------------------
  764. // Attempts to execute a command and replicate it. The function will return
  765. // when the command has been successfully committed or an error has occurred.
  766. func (s *server) Do(command Command) (interface{}, error) {
  767. return s.send(command)
  768. }
  769. // Processes a command.
  770. func (s *server) processCommand(command Command, e *ev) {
  771. s.debugln("server.command.process")
  772. // Create an entry for the command in the log.
  773. entry, err := s.log.createEntry(s.currentTerm, command, e)
  774. if err != nil {
  775. s.debugln("server.command.log.entry.error:", err)
  776. e.c <- err
  777. return
  778. }
  779. if err := s.log.appendEntry(entry); err != nil {
  780. s.debugln("server.command.log.error:", err)
  781. e.c <- err
  782. return
  783. }
  784. s.syncedPeer[s.Name()] = true
  785. if len(s.peers) == 0 {
  786. commitIndex := s.log.currentIndex()
  787. s.log.setCommitIndex(commitIndex)
  788. s.debugln("commit index ", commitIndex)
  789. }
  790. }
  791. //--------------------------------------
  792. // Append Entries
  793. //--------------------------------------
  794. // Appends zero or more log entry from the leader to this server.
  795. func (s *server) AppendEntries(req *AppendEntriesRequest) *AppendEntriesResponse {
  796. ret, _ := s.send(req)
  797. resp, _ := ret.(*AppendEntriesResponse)
  798. return resp
  799. }
  800. // Processes the "append entries" request.
  801. func (s *server) processAppendEntriesRequest(req *AppendEntriesRequest) (*AppendEntriesResponse, bool) {
  802. s.traceln("server.ae.process")
  803. if req.Term < s.currentTerm {
  804. s.debugln("server.ae.error: stale term")
  805. return newAppendEntriesResponse(s.currentTerm, false, s.log.currentIndex(), s.log.CommitIndex()), false
  806. }
  807. if req.Term == s.currentTerm {
  808. _assert(s.State() != Leader, "leader.elected.at.same.term.%d\n", s.currentTerm)
  809. // step-down to follower when it is a candidate
  810. if s.state == Candidate {
  811. // change state to follower
  812. s.setState(Follower)
  813. }
  814. // discover new leader when candidate
  815. // save leader name when follower
  816. s.leader = req.LeaderName
  817. } else {
  818. // Update term and leader.
  819. s.updateCurrentTerm(req.Term, req.LeaderName)
  820. }
  821. // Reject if log doesn't contain a matching previous entry.
  822. if err := s.log.truncate(req.PrevLogIndex, req.PrevLogTerm); err != nil {
  823. s.debugln("server.ae.truncate.error: ", err)
  824. return newAppendEntriesResponse(s.currentTerm, false, s.log.currentIndex(), s.log.CommitIndex()), true
  825. }
  826. // Append entries to the log.
  827. if err := s.log.appendEntries(req.Entries); err != nil {
  828. s.debugln("server.ae.append.error: ", err)
  829. return newAppendEntriesResponse(s.currentTerm, false, s.log.currentIndex(), s.log.CommitIndex()), true
  830. }
  831. // Commit up to the commit index.
  832. if err := s.log.setCommitIndex(req.CommitIndex); err != nil {
  833. s.debugln("server.ae.commit.error: ", err)
  834. return newAppendEntriesResponse(s.currentTerm, false, s.log.currentIndex(), s.log.CommitIndex()), true
  835. }
  836. // once the server appended and committed all the log entries from the leader
  837. return newAppendEntriesResponse(s.currentTerm, true, s.log.currentIndex(), s.log.CommitIndex()), true
  838. }
  839. // Processes the "append entries" response from the peer. This is only
  840. // processed when the server is a leader. Responses received during other
  841. // states are dropped.
  842. func (s *server) processAppendEntriesResponse(resp *AppendEntriesResponse) {
  843. // If we find a higher term then change to a follower and exit.
  844. if resp.Term() > s.Term() {
  845. s.updateCurrentTerm(resp.Term(), "")
  846. return
  847. }
  848. // panic response if it's not successful.
  849. if !resp.Success() {
  850. return
  851. }
  852. // if one peer successfully append a log from the leader term,
  853. // we add it to the synced list
  854. if resp.append == true {
  855. s.syncedPeer[resp.peer] = true
  856. }
  857. // Increment the commit count to make sure we have a quorum before committing.
  858. if len(s.syncedPeer) < s.QuorumSize() {
  859. return
  860. }
  861. // Determine the committed index that a majority has.
  862. var indices []uint64
  863. indices = append(indices, s.log.currentIndex())
  864. for _, peer := range s.peers {
  865. indices = append(indices, peer.getPrevLogIndex())
  866. }
  867. sort.Sort(sort.Reverse(uint64Slice(indices)))
  868. // We can commit up to the index which the majority of the members have appended.
  869. commitIndex := indices[s.QuorumSize()-1]
  870. committedIndex := s.log.commitIndex
  871. if commitIndex > committedIndex {
  872. // leader needs to do a fsync before committing log entries
  873. s.log.sync()
  874. s.log.setCommitIndex(commitIndex)
  875. s.debugln("commit index ", commitIndex)
  876. }
  877. }
  878. // processVoteReponse processes a vote request:
  879. // 1. if the vote is granted for the current term of the candidate, return true
  880. // 2. if the vote is denied due to smaller term, update the term of this server
  881. // which will also cause the candidate to step-down, and return false.
  882. // 3. if the vote is for a smaller term, ignore it and return false.
  883. func (s *server) processVoteResponse(resp *RequestVoteResponse) bool {
  884. if resp.VoteGranted && resp.Term == s.currentTerm {
  885. return true
  886. }
  887. if resp.Term > s.currentTerm {
  888. s.debugln("server.candidate.vote.failed")
  889. s.updateCurrentTerm(resp.Term, "")
  890. } else {
  891. s.debugln("server.candidate.vote: denied")
  892. }
  893. return false
  894. }
  895. //--------------------------------------
  896. // Request Vote
  897. //--------------------------------------
  898. // Requests a vote from a server. A vote can be obtained if the vote's term is
  899. // at the server's current term and the server has not made a vote yet. A vote
  900. // can also be obtained if the term is greater than the server's current term.
  901. func (s *server) RequestVote(req *RequestVoteRequest) *RequestVoteResponse {
  902. ret, _ := s.send(req)
  903. resp, _ := ret.(*RequestVoteResponse)
  904. return resp
  905. }
  906. // Processes a "request vote" request.
  907. func (s *server) processRequestVoteRequest(req *RequestVoteRequest) (*RequestVoteResponse, bool) {
  908. // If the request is coming from an old term then reject it.
  909. if req.Term < s.Term() {
  910. s.debugln("server.rv.deny.vote: cause stale term")
  911. return newRequestVoteResponse(s.currentTerm, false), false
  912. }
  913. // If the term of the request peer is larger than this node, update the term
  914. // If the term is equal and we've already voted for a different candidate then
  915. // don't vote for this candidate.
  916. if req.Term > s.Term() {
  917. s.updateCurrentTerm(req.Term, "")
  918. } else if s.votedFor != "" && s.votedFor != req.CandidateName {
  919. s.debugln("server.deny.vote: cause duplicate vote: ", req.CandidateName,
  920. " already vote for ", s.votedFor)
  921. return newRequestVoteResponse(s.currentTerm, false), false
  922. }
  923. // If the candidate's log is not at least as up-to-date as our last log then don't vote.
  924. lastIndex, lastTerm := s.log.lastInfo()
  925. if lastIndex > req.LastLogIndex || lastTerm > req.LastLogTerm {
  926. s.debugln("server.deny.vote: cause out of date log: ", req.CandidateName,
  927. "Index :[", lastIndex, "]", " [", req.LastLogIndex, "]",
  928. "Term :[", lastTerm, "]", " [", req.LastLogTerm, "]")
  929. return newRequestVoteResponse(s.currentTerm, false), false
  930. }
  931. // If we made it this far then cast a vote and reset our election time out.
  932. s.debugln("server.rv.vote: ", s.name, " votes for", req.CandidateName, "at term", req.Term)
  933. s.votedFor = req.CandidateName
  934. return newRequestVoteResponse(s.currentTerm, true), true
  935. }
  936. //--------------------------------------
  937. // Membership
  938. //--------------------------------------
  939. // Adds a peer to the server.
  940. func (s *server) AddPeer(name string, connectiongString string) error {
  941. s.debugln("server.peer.add: ", name, len(s.peers))
  942. // Do not allow peers to be added twice.
  943. if s.peers[name] != nil {
  944. return nil
  945. }
  946. // Skip the Peer if it has the same name as the Server
  947. if s.name != name {
  948. peer := newPeer(s, name, connectiongString, s.heartbeatInterval)
  949. if s.State() == Leader {
  950. peer.startHeartbeat()
  951. }
  952. s.peers[peer.Name] = peer
  953. s.DispatchEvent(newEvent(AddPeerEventType, name, nil))
  954. }
  955. // Write the configuration to file.
  956. s.writeConf()
  957. return nil
  958. }
  959. // Removes a peer from the server.
  960. func (s *server) RemovePeer(name string) error {
  961. s.debugln("server.peer.remove: ", name, len(s.peers))
  962. // Skip the Peer if it has the same name as the Server
  963. if name != s.Name() {
  964. // Return error if peer doesn't exist.
  965. peer := s.peers[name]
  966. if peer == nil {
  967. return fmt.Errorf("raft: Peer not found: %s", name)
  968. }
  969. // Stop peer and remove it.
  970. if s.State() == Leader {
  971. // We create a go routine here to avoid potential deadlock.
  972. // We are holding log write lock when reach this line of code.
  973. // Peer.stopHeartbeat can be blocked without go routine, if the
  974. // target go routine (which we want to stop) is calling
  975. // log.getEntriesAfter and waiting for log read lock.
  976. // So we might be holding log lock and waiting for log lock,
  977. // which lead to a deadlock.
  978. // TODO(xiangli) refactor log lock
  979. s.routineGroup.Add(1)
  980. go func() {
  981. defer s.routineGroup.Done()
  982. peer.stopHeartbeat(true)
  983. }()
  984. }
  985. delete(s.peers, name)
  986. s.DispatchEvent(newEvent(RemovePeerEventType, name, nil))
  987. }
  988. // Write the configuration to file.
  989. s.writeConf()
  990. return nil
  991. }
  992. //--------------------------------------
  993. // Log compaction
  994. //--------------------------------------
  995. func (s *server) TakeSnapshot() error {
  996. if s.stateMachine == nil {
  997. return errors.New("Snapshot: Cannot create snapshot. Missing state machine.")
  998. }
  999. // Shortcut without lock
  1000. // Exit if the server is currently creating a snapshot.
  1001. if s.pendingSnapshot != nil {
  1002. return errors.New("Snapshot: Last snapshot is not finished.")
  1003. }
  1004. // TODO: acquire the lock and no more committed is allowed
  1005. // This will be done after finishing refactoring heartbeat
  1006. s.debugln("take.snapshot")
  1007. lastIndex, lastTerm := s.log.commitInfo()
  1008. // check if there is log has been committed since the
  1009. // last snapshot.
  1010. if lastIndex == s.log.startIndex {
  1011. return nil
  1012. }
  1013. path := s.SnapshotPath(lastIndex, lastTerm)
  1014. // Attach snapshot to pending snapshot and save it to disk.
  1015. s.pendingSnapshot = &Snapshot{lastIndex, lastTerm, nil, nil, path}
  1016. state, err := s.stateMachine.Save()
  1017. if err != nil {
  1018. return err
  1019. }
  1020. // Clone the list of peers.
  1021. peers := make([]*Peer, 0, len(s.peers)+1)
  1022. for _, peer := range s.peers {
  1023. peers = append(peers, peer.clone())
  1024. }
  1025. peers = append(peers, &Peer{Name: s.Name(), ConnectionString: s.connectionString})
  1026. // Attach snapshot to pending snapshot and save it to disk.
  1027. s.pendingSnapshot.Peers = peers
  1028. s.pendingSnapshot.State = state
  1029. s.saveSnapshot()
  1030. // We keep some log entries after the snapshot.
  1031. // We do not want to send the whole snapshot to the slightly slow machines
  1032. if lastIndex-s.log.startIndex > NumberOfLogEntriesAfterSnapshot {
  1033. compactIndex := lastIndex - NumberOfLogEntriesAfterSnapshot
  1034. compactTerm := s.log.getEntry(compactIndex).Term()
  1035. s.log.compact(compactIndex, compactTerm)
  1036. }
  1037. return nil
  1038. }
  1039. // Retrieves the log path for the server.
  1040. func (s *server) saveSnapshot() error {
  1041. if s.pendingSnapshot == nil {
  1042. return errors.New("pendingSnapshot.is.nil")
  1043. }
  1044. // Write snapshot to disk.
  1045. if err := s.pendingSnapshot.save(); err != nil {
  1046. return err
  1047. }
  1048. // Swap the current and last snapshots.
  1049. tmp := s.snapshot
  1050. s.snapshot = s.pendingSnapshot
  1051. // Delete the previous snapshot if there is any change
  1052. if tmp != nil && !(tmp.LastIndex == s.snapshot.LastIndex && tmp.LastTerm == s.snapshot.LastTerm) {
  1053. tmp.remove()
  1054. }
  1055. s.pendingSnapshot = nil
  1056. return nil
  1057. }
  1058. // Retrieves the log path for the server.
  1059. func (s *server) SnapshotPath(lastIndex uint64, lastTerm uint64) string {
  1060. return path.Join(s.path, "snapshot", fmt.Sprintf("%v_%v.ss", lastTerm, lastIndex))
  1061. }
  1062. func (s *server) RequestSnapshot(req *SnapshotRequest) *SnapshotResponse {
  1063. ret, _ := s.send(req)
  1064. resp, _ := ret.(*SnapshotResponse)
  1065. return resp
  1066. }
  1067. func (s *server) processSnapshotRequest(req *SnapshotRequest) *SnapshotResponse {
  1068. // If the follower’s log contains an entry at the snapshot’s last index with a term
  1069. // that matches the snapshot’s last term, then the follower already has all the
  1070. // information found in the snapshot and can reply false.
  1071. entry := s.log.getEntry(req.LastIndex)
  1072. if entry != nil && entry.Term() == req.LastTerm {
  1073. return newSnapshotResponse(false)
  1074. }
  1075. // Update state.
  1076. s.setState(Snapshotting)
  1077. return newSnapshotResponse(true)
  1078. }
  1079. func (s *server) SnapshotRecoveryRequest(req *SnapshotRecoveryRequest) *SnapshotRecoveryResponse {
  1080. ret, _ := s.send(req)
  1081. resp, _ := ret.(*SnapshotRecoveryResponse)
  1082. return resp
  1083. }
  1084. func (s *server) processSnapshotRecoveryRequest(req *SnapshotRecoveryRequest) *SnapshotRecoveryResponse {
  1085. // Recover state sent from request.
  1086. if err := s.stateMachine.Recovery(req.State); err != nil {
  1087. panic("cannot recover from previous state")
  1088. }
  1089. // Recover the cluster configuration.
  1090. s.peers = make(map[string]*Peer)
  1091. for _, peer := range req.Peers {
  1092. s.AddPeer(peer.Name, peer.ConnectionString)
  1093. }
  1094. // Update log state.
  1095. s.currentTerm = req.LastTerm
  1096. s.log.updateCommitIndex(req.LastIndex)
  1097. // Create local snapshot.
  1098. s.pendingSnapshot = &Snapshot{req.LastIndex, req.LastTerm, req.Peers, req.State, s.SnapshotPath(req.LastIndex, req.LastTerm)}
  1099. s.saveSnapshot()
  1100. // Clear the previous log entries.
  1101. s.log.compact(req.LastIndex, req.LastTerm)
  1102. return newSnapshotRecoveryResponse(req.LastTerm, true, req.LastIndex)
  1103. }
  1104. // Load a snapshot at restart
  1105. func (s *server) LoadSnapshot() error {
  1106. // Open snapshot/ directory.
  1107. dir, err := os.OpenFile(path.Join(s.path, "snapshot"), os.O_RDONLY, 0)
  1108. if err != nil {
  1109. s.debugln("cannot.open.snapshot: ", err)
  1110. return err
  1111. }
  1112. // Retrieve a list of all snapshots.
  1113. filenames, err := dir.Readdirnames(-1)
  1114. if err != nil {
  1115. dir.Close()
  1116. panic(err)
  1117. }
  1118. dir.Close()
  1119. if len(filenames) == 0 {
  1120. s.debugln("no.snapshot.to.load")
  1121. return nil
  1122. }
  1123. // Grab the latest snapshot.
  1124. sort.Strings(filenames)
  1125. snapshotPath := path.Join(s.path, "snapshot", filenames[len(filenames)-1])
  1126. // Read snapshot data.
  1127. file, err := os.OpenFile(snapshotPath, os.O_RDONLY, 0)
  1128. if err != nil {
  1129. return err
  1130. }
  1131. defer file.Close()
  1132. // Check checksum.
  1133. var checksum uint32
  1134. n, err := fmt.Fscanf(file, "%08x\n", &checksum)
  1135. if err != nil {
  1136. return err
  1137. } else if n != 1 {
  1138. return errors.New("checksum.err: bad.snapshot.file")
  1139. }
  1140. // Load remaining snapshot contents.
  1141. b, err := ioutil.ReadAll(file)
  1142. if err != nil {
  1143. return err
  1144. }
  1145. // Generate checksum.
  1146. byteChecksum := crc32.ChecksumIEEE(b)
  1147. if uint32(checksum) != byteChecksum {
  1148. s.debugln(checksum, " ", byteChecksum)
  1149. return errors.New("bad snapshot file")
  1150. }
  1151. // Decode snapshot.
  1152. if err = json.Unmarshal(b, &s.snapshot); err != nil {
  1153. s.debugln("unmarshal.snapshot.error: ", err)
  1154. return err
  1155. }
  1156. // Recover snapshot into state machine.
  1157. if err = s.stateMachine.Recovery(s.snapshot.State); err != nil {
  1158. s.debugln("recovery.snapshot.error: ", err)
  1159. return err
  1160. }
  1161. // Recover cluster configuration.
  1162. for _, peer := range s.snapshot.Peers {
  1163. s.AddPeer(peer.Name, peer.ConnectionString)
  1164. }
  1165. // Update log state.
  1166. s.log.startTerm = s.snapshot.LastTerm
  1167. s.log.startIndex = s.snapshot.LastIndex
  1168. s.log.updateCommitIndex(s.snapshot.LastIndex)
  1169. return err
  1170. }
  1171. //--------------------------------------
  1172. // Config File
  1173. //--------------------------------------
  1174. // Flushes commit index to the disk.
  1175. // So when the raft server restarts, it will commit upto the flushed commitIndex.
  1176. func (s *server) FlushCommitIndex() {
  1177. s.debugln("server.conf.update")
  1178. // Write the configuration to file.
  1179. s.writeConf()
  1180. }
  1181. func (s *server) writeConf() {
  1182. peers := make([]*Peer, len(s.peers))
  1183. i := 0
  1184. for _, peer := range s.peers {
  1185. peers[i] = peer.clone()
  1186. i++
  1187. }
  1188. r := &Config{
  1189. CommitIndex: s.log.commitIndex,
  1190. Peers: peers,
  1191. }
  1192. b, _ := json.Marshal(r)
  1193. confPath := path.Join(s.path, "conf")
  1194. tmpConfPath := path.Join(s.path, "conf.tmp")
  1195. err := writeFileSynced(tmpConfPath, b, 0600)
  1196. if err != nil {
  1197. panic(err)
  1198. }
  1199. os.Rename(tmpConfPath, confPath)
  1200. }
  1201. // Read the configuration for the server.
  1202. func (s *server) readConf() error {
  1203. confPath := path.Join(s.path, "conf")
  1204. s.debugln("readConf.open ", confPath)
  1205. // open conf file
  1206. b, err := ioutil.ReadFile(confPath)
  1207. if err != nil {
  1208. return nil
  1209. }
  1210. conf := &Config{}
  1211. if err = json.Unmarshal(b, conf); err != nil {
  1212. return err
  1213. }
  1214. s.log.updateCommitIndex(conf.CommitIndex)
  1215. return nil
  1216. }
  1217. //--------------------------------------
  1218. // Debugging
  1219. //--------------------------------------
  1220. func (s *server) debugln(v ...interface{}) {
  1221. if logLevel > Debug {
  1222. debugf("[%s Term:%d] %s", s.name, s.Term(), fmt.Sprintln(v...))
  1223. }
  1224. }
  1225. func (s *server) traceln(v ...interface{}) {
  1226. if logLevel > Trace {
  1227. tracef("[%s] %s", s.name, fmt.Sprintln(v...))
  1228. }
  1229. }