server.go 39 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484
  1. package raft
  2. import (
  3. "encoding/json"
  4. "errors"
  5. "fmt"
  6. "hash/crc32"
  7. "io/ioutil"
  8. "math"
  9. "os"
  10. "path"
  11. "sort"
  12. "sync"
  13. "time"
  14. )
  15. //------------------------------------------------------------------------------
  16. //
  17. // Constants
  18. //
  19. //------------------------------------------------------------------------------
  20. const (
  21. Stopped = "stopped"
  22. Initialized = "initialized"
  23. Follower = "follower"
  24. Candidate = "candidate"
  25. Leader = "leader"
  26. Snapshotting = "snapshotting"
  27. )
  28. const (
  29. MaxLogEntriesPerRequest = 2000
  30. NumberOfLogEntriesAfterSnapshot = 200
  31. )
  32. const (
  33. // DefaultHeartbeatInterval is the interval that the leader will send
  34. // AppendEntriesRequests to followers to maintain leadership.
  35. DefaultHeartbeatInterval = 50 * time.Millisecond
  36. DefaultElectionTimeout = 150 * time.Millisecond
  37. )
  38. // ElectionTimeoutThresholdPercent specifies the threshold at which the server
  39. // will dispatch warning events that the heartbeat RTT is too close to the
  40. // election timeout.
  41. const ElectionTimeoutThresholdPercent = 0.8
  42. //------------------------------------------------------------------------------
  43. //
  44. // Errors
  45. //
  46. //------------------------------------------------------------------------------
  47. var NotLeaderError = errors.New("raft.Server: Not current leader")
  48. var DuplicatePeerError = errors.New("raft.Server: Duplicate peer")
  49. var CommandTimeoutError = errors.New("raft: Command timeout")
  50. var StopError = errors.New("raft: Has been stopped")
  51. //------------------------------------------------------------------------------
  52. //
  53. // Typedefs
  54. //
  55. //------------------------------------------------------------------------------
  56. // A server is involved in the consensus protocol and can act as a follower,
  57. // candidate or a leader.
  58. type Server interface {
  59. Name() string
  60. Context() interface{}
  61. StateMachine() StateMachine
  62. Leader() string
  63. State() string
  64. Path() string
  65. LogPath() string
  66. SnapshotPath(lastIndex uint64, lastTerm uint64) string
  67. Term() uint64
  68. CommitIndex() uint64
  69. VotedFor() string
  70. MemberCount() int
  71. QuorumSize() int
  72. IsLogEmpty() bool
  73. LogEntries() []*LogEntry
  74. LastCommandName() string
  75. GetState() string
  76. ElectionTimeout() time.Duration
  77. SetElectionTimeout(duration time.Duration)
  78. HeartbeatInterval() time.Duration
  79. SetHeartbeatInterval(duration time.Duration)
  80. Transporter() Transporter
  81. SetTransporter(t Transporter)
  82. AppendEntries(req *AppendEntriesRequest) *AppendEntriesResponse
  83. RequestVote(req *RequestVoteRequest) *RequestVoteResponse
  84. RequestSnapshot(req *SnapshotRequest) *SnapshotResponse
  85. SnapshotRecoveryRequest(req *SnapshotRecoveryRequest) *SnapshotRecoveryResponse
  86. AddPeer(name string, connectiongString string) error
  87. RemovePeer(name string) error
  88. Peers() map[string]*Peer
  89. Init() error
  90. Start() error
  91. Stop()
  92. Running() bool
  93. Do(command Command) (interface{}, error)
  94. TakeSnapshot() error
  95. LoadSnapshot() error
  96. AddEventListener(string, EventListener)
  97. FlushCommitIndex()
  98. }
  99. type server struct {
  100. *eventDispatcher
  101. name string
  102. path string
  103. state string
  104. transporter Transporter
  105. context interface{}
  106. currentTerm uint64
  107. votedFor string
  108. log *Log
  109. leader string
  110. peers map[string]*Peer
  111. mutex sync.RWMutex
  112. syncedPeer map[string]bool
  113. stopped chan bool
  114. c chan *ev
  115. electionTimeout time.Duration
  116. heartbeatInterval time.Duration
  117. snapshot *Snapshot
  118. // PendingSnapshot is an unfinished snapshot.
  119. // After the pendingSnapshot is saved to disk,
  120. // it will be set to snapshot and also will be
  121. // set to nil.
  122. pendingSnapshot *Snapshot
  123. stateMachine StateMachine
  124. maxLogEntriesPerRequest uint64
  125. connectionString string
  126. routineGroup sync.WaitGroup
  127. }
  128. // An internal event to be processed by the server's event loop.
  129. type ev struct {
  130. target interface{}
  131. returnValue interface{}
  132. c chan error
  133. }
  134. //------------------------------------------------------------------------------
  135. //
  136. // Constructor
  137. //
  138. //------------------------------------------------------------------------------
  139. // Creates a new server with a log at the given path. transporter must
  140. // not be nil. stateMachine can be nil if snapshotting and log
  141. // compaction is to be disabled. context can be anything (including nil)
  142. // and is not used by the raft package except returned by
  143. // Server.Context(). connectionString can be anything.
  144. func NewServer(name string, path string, transporter Transporter, stateMachine StateMachine, ctx interface{}, connectionString string) (Server, error) {
  145. if name == "" {
  146. return nil, errors.New("raft.Server: Name cannot be blank")
  147. }
  148. if transporter == nil {
  149. panic("raft: Transporter required")
  150. }
  151. s := &server{
  152. name: name,
  153. path: path,
  154. transporter: transporter,
  155. stateMachine: stateMachine,
  156. context: ctx,
  157. state: Stopped,
  158. peers: make(map[string]*Peer),
  159. log: newLog(),
  160. c: make(chan *ev, 256),
  161. electionTimeout: DefaultElectionTimeout,
  162. heartbeatInterval: DefaultHeartbeatInterval,
  163. maxLogEntriesPerRequest: MaxLogEntriesPerRequest,
  164. connectionString: connectionString,
  165. }
  166. s.eventDispatcher = newEventDispatcher(s)
  167. // Setup apply function.
  168. s.log.ApplyFunc = func(e *LogEntry, c Command) (interface{}, error) {
  169. // Dispatch commit event.
  170. s.DispatchEvent(newEvent(CommitEventType, e, nil))
  171. // Apply command to the state machine.
  172. switch c := c.(type) {
  173. case CommandApply:
  174. return c.Apply(&context{
  175. server: s,
  176. currentTerm: s.currentTerm,
  177. currentIndex: s.log.internalCurrentIndex(),
  178. commitIndex: s.log.commitIndex,
  179. })
  180. case deprecatedCommandApply:
  181. return c.Apply(s)
  182. default:
  183. return nil, fmt.Errorf("Command does not implement Apply()")
  184. }
  185. }
  186. return s, nil
  187. }
  188. //------------------------------------------------------------------------------
  189. //
  190. // Accessors
  191. //
  192. //------------------------------------------------------------------------------
  193. //--------------------------------------
  194. // General
  195. //--------------------------------------
  196. // Retrieves the name of the server.
  197. func (s *server) Name() string {
  198. return s.name
  199. }
  200. // Retrieves the storage path for the server.
  201. func (s *server) Path() string {
  202. return s.path
  203. }
  204. // The name of the current leader.
  205. func (s *server) Leader() string {
  206. return s.leader
  207. }
  208. // Retrieves a copy of the peer data.
  209. func (s *server) Peers() map[string]*Peer {
  210. s.mutex.Lock()
  211. defer s.mutex.Unlock()
  212. peers := make(map[string]*Peer)
  213. for name, peer := range s.peers {
  214. peers[name] = peer.clone()
  215. }
  216. return peers
  217. }
  218. // Retrieves the object that transports requests.
  219. func (s *server) Transporter() Transporter {
  220. s.mutex.RLock()
  221. defer s.mutex.RUnlock()
  222. return s.transporter
  223. }
  224. func (s *server) SetTransporter(t Transporter) {
  225. s.mutex.Lock()
  226. defer s.mutex.Unlock()
  227. s.transporter = t
  228. }
  229. // Retrieves the context passed into the constructor.
  230. func (s *server) Context() interface{} {
  231. return s.context
  232. }
  233. // Retrieves the state machine passed into the constructor.
  234. func (s *server) StateMachine() StateMachine {
  235. return s.stateMachine
  236. }
  237. // Retrieves the log path for the server.
  238. func (s *server) LogPath() string {
  239. return path.Join(s.path, "log")
  240. }
  241. // Retrieves the current state of the server.
  242. func (s *server) State() string {
  243. s.mutex.RLock()
  244. defer s.mutex.RUnlock()
  245. return s.state
  246. }
  247. // Sets the state of the server.
  248. func (s *server) setState(state string) {
  249. s.mutex.Lock()
  250. defer s.mutex.Unlock()
  251. // Temporarily store previous values.
  252. prevState := s.state
  253. prevLeader := s.leader
  254. // Update state and leader.
  255. s.state = state
  256. if state == Leader {
  257. s.leader = s.Name()
  258. s.syncedPeer = make(map[string]bool)
  259. }
  260. // Dispatch state and leader change events.
  261. s.DispatchEvent(newEvent(StateChangeEventType, s.state, prevState))
  262. if prevLeader != s.leader {
  263. s.DispatchEvent(newEvent(LeaderChangeEventType, s.leader, prevLeader))
  264. }
  265. }
  266. // Retrieves the current term of the server.
  267. func (s *server) Term() uint64 {
  268. s.mutex.RLock()
  269. defer s.mutex.RUnlock()
  270. return s.currentTerm
  271. }
  272. // Retrieves the current commit index of the server.
  273. func (s *server) CommitIndex() uint64 {
  274. s.log.mutex.RLock()
  275. defer s.log.mutex.RUnlock()
  276. return s.log.commitIndex
  277. }
  278. // Retrieves the name of the candidate this server voted for in this term.
  279. func (s *server) VotedFor() string {
  280. return s.votedFor
  281. }
  282. // Retrieves whether the server's log has no entries.
  283. func (s *server) IsLogEmpty() bool {
  284. return s.log.isEmpty()
  285. }
  286. // A list of all the log entries. This should only be used for debugging purposes.
  287. func (s *server) LogEntries() []*LogEntry {
  288. s.log.mutex.RLock()
  289. defer s.log.mutex.RUnlock()
  290. return s.log.entries
  291. }
  292. // A reference to the command name of the last entry.
  293. func (s *server) LastCommandName() string {
  294. return s.log.lastCommandName()
  295. }
  296. // Get the state of the server for debugging
  297. func (s *server) GetState() string {
  298. s.mutex.RLock()
  299. defer s.mutex.RUnlock()
  300. return fmt.Sprintf("Name: %s, State: %s, Term: %v, CommitedIndex: %v ", s.name, s.state, s.currentTerm, s.log.commitIndex)
  301. }
  302. // Check if the server is promotable
  303. func (s *server) promotable() bool {
  304. return s.log.currentIndex() > 0
  305. }
  306. //--------------------------------------
  307. // Membership
  308. //--------------------------------------
  309. // Retrieves the number of member servers in the consensus.
  310. func (s *server) MemberCount() int {
  311. s.mutex.RLock()
  312. defer s.mutex.RUnlock()
  313. return len(s.peers) + 1
  314. }
  315. // Retrieves the number of servers required to make a quorum.
  316. func (s *server) QuorumSize() int {
  317. return (s.MemberCount() / 2) + 1
  318. }
  319. //--------------------------------------
  320. // Election timeout
  321. //--------------------------------------
  322. // Retrieves the election timeout.
  323. func (s *server) ElectionTimeout() time.Duration {
  324. s.mutex.RLock()
  325. defer s.mutex.RUnlock()
  326. return s.electionTimeout
  327. }
  328. // Sets the election timeout.
  329. func (s *server) SetElectionTimeout(duration time.Duration) {
  330. s.mutex.Lock()
  331. defer s.mutex.Unlock()
  332. s.electionTimeout = duration
  333. }
  334. //--------------------------------------
  335. // Heartbeat timeout
  336. //--------------------------------------
  337. // Retrieves the heartbeat timeout.
  338. func (s *server) HeartbeatInterval() time.Duration {
  339. s.mutex.RLock()
  340. defer s.mutex.RUnlock()
  341. return s.heartbeatInterval
  342. }
  343. // Sets the heartbeat timeout.
  344. func (s *server) SetHeartbeatInterval(duration time.Duration) {
  345. s.mutex.Lock()
  346. defer s.mutex.Unlock()
  347. s.heartbeatInterval = duration
  348. for _, peer := range s.peers {
  349. peer.setHeartbeatInterval(duration)
  350. }
  351. }
  352. //------------------------------------------------------------------------------
  353. //
  354. // Methods
  355. //
  356. //------------------------------------------------------------------------------
  357. //--------------------------------------
  358. // Initialization
  359. //--------------------------------------
  360. // Reg the NOPCommand
  361. func init() {
  362. RegisterCommand(&NOPCommand{})
  363. RegisterCommand(&DefaultJoinCommand{})
  364. RegisterCommand(&DefaultLeaveCommand{})
  365. }
  366. // Start the raft server
  367. // If log entries exist then allow promotion to candidate if no AEs received.
  368. // If no log entries exist then wait for AEs from another node.
  369. // If no log entries exist and a self-join command is issued then
  370. // immediately become leader and commit entry.
  371. func (s *server) Start() error {
  372. // Exit if the server is already running.
  373. if s.Running() {
  374. return fmt.Errorf("raft.Server: Server already running[%v]", s.state)
  375. }
  376. if err := s.Init(); err != nil {
  377. return err
  378. }
  379. // stopped needs to be allocated each time server starts
  380. // because it is closed at `Stop`.
  381. s.stopped = make(chan bool)
  382. s.setState(Follower)
  383. // If no log entries exist then
  384. // 1. wait for AEs from another node
  385. // 2. wait for self-join command
  386. // to set itself promotable
  387. if !s.promotable() {
  388. s.debugln("start as a new raft server")
  389. // If log entries exist then allow promotion to candidate
  390. // if no AEs received.
  391. } else {
  392. s.debugln("start from previous saved state")
  393. }
  394. debugln(s.GetState())
  395. s.routineGroup.Add(1)
  396. go func() {
  397. defer s.routineGroup.Done()
  398. s.loop()
  399. }()
  400. return nil
  401. }
  402. // Init initializes the raft server.
  403. // If there is no previous log file under the given path, Init() will create an empty log file.
  404. // Otherwise, Init() will load in the log entries from the log file.
  405. func (s *server) Init() error {
  406. if s.Running() {
  407. return fmt.Errorf("raft.Server: Server already running[%v]", s.state)
  408. }
  409. // Server has been initialized or server was stopped after initialized
  410. // If log has been initialized, we know that the server was stopped after
  411. // running.
  412. if s.state == Initialized || s.log.initialized {
  413. s.state = Initialized
  414. return nil
  415. }
  416. // Create snapshot directory if it does not exist
  417. err := os.Mkdir(path.Join(s.path, "snapshot"), 0700)
  418. if err != nil && !os.IsExist(err) {
  419. s.debugln("raft: Snapshot dir error: ", err)
  420. return fmt.Errorf("raft: Initialization error: %s", err)
  421. }
  422. if err := s.readConf(); err != nil {
  423. s.debugln("raft: Conf file error: ", err)
  424. return fmt.Errorf("raft: Initialization error: %s", err)
  425. }
  426. // Initialize the log and load it up.
  427. if err := s.log.open(s.LogPath()); err != nil {
  428. s.debugln("raft: Log error: ", err)
  429. return fmt.Errorf("raft: Initialization error: %s", err)
  430. }
  431. // Update the term to the last term in the log.
  432. _, s.currentTerm = s.log.lastInfo()
  433. s.state = Initialized
  434. return nil
  435. }
  436. // Shuts down the server.
  437. func (s *server) Stop() {
  438. if s.State() == Stopped {
  439. return
  440. }
  441. close(s.stopped)
  442. // make sure all goroutines have stopped before we close the log
  443. s.routineGroup.Wait()
  444. s.log.close()
  445. s.setState(Stopped)
  446. }
  447. // Checks if the server is currently running.
  448. func (s *server) Running() bool {
  449. s.mutex.RLock()
  450. defer s.mutex.RUnlock()
  451. return (s.state != Stopped && s.state != Initialized)
  452. }
  453. //--------------------------------------
  454. // Term
  455. //--------------------------------------
  456. // updates the current term for the server. This is only used when a larger
  457. // external term is found.
  458. func (s *server) updateCurrentTerm(term uint64, leaderName string) {
  459. _assert(term > s.currentTerm,
  460. "upadteCurrentTerm: update is called when term is not larger than currentTerm")
  461. // Store previous values temporarily.
  462. prevTerm := s.currentTerm
  463. prevLeader := s.leader
  464. // set currentTerm = T, convert to follower (§5.1)
  465. // stop heartbeats before step-down
  466. if s.state == Leader {
  467. for _, peer := range s.peers {
  468. peer.stopHeartbeat(false)
  469. }
  470. }
  471. // update the term and clear vote for
  472. if s.state != Follower {
  473. s.setState(Follower)
  474. }
  475. s.mutex.Lock()
  476. s.currentTerm = term
  477. s.leader = leaderName
  478. s.votedFor = ""
  479. s.mutex.Unlock()
  480. // Dispatch change events.
  481. s.DispatchEvent(newEvent(TermChangeEventType, s.currentTerm, prevTerm))
  482. if prevLeader != s.leader {
  483. s.DispatchEvent(newEvent(LeaderChangeEventType, s.leader, prevLeader))
  484. }
  485. }
  486. //--------------------------------------
  487. // Event Loop
  488. //--------------------------------------
  489. // ________
  490. // --|Snapshot| timeout
  491. // | -------- ______
  492. // recover | ^ | |
  493. // snapshot / | |snapshot | |
  494. // higher | | v | recv majority votes
  495. // term | -------- timeout ----------- -----------
  496. // |-> |Follower| ----------> | Candidate |--------------------> | Leader |
  497. // -------- ----------- -----------
  498. // ^ higher term/ | higher term |
  499. // | new leader | |
  500. // |_______________________|____________________________________ |
  501. // The main event loop for the server
  502. func (s *server) loop() {
  503. defer s.debugln("server.loop.end")
  504. state := s.State()
  505. for state != Stopped {
  506. s.debugln("server.loop.run ", state)
  507. switch state {
  508. case Follower:
  509. s.followerLoop()
  510. case Candidate:
  511. s.candidateLoop()
  512. case Leader:
  513. s.leaderLoop()
  514. case Snapshotting:
  515. s.snapshotLoop()
  516. }
  517. state = s.State()
  518. }
  519. }
  520. // Sends an event to the event loop to be processed. The function will wait
  521. // until the event is actually processed before returning.
  522. func (s *server) send(value interface{}) (interface{}, error) {
  523. if !s.Running() {
  524. return nil, StopError
  525. }
  526. event := &ev{target: value, c: make(chan error, 1)}
  527. select {
  528. case s.c <- event:
  529. case <-s.stopped:
  530. return nil, StopError
  531. }
  532. select {
  533. case <-s.stopped:
  534. return nil, StopError
  535. case err := <-event.c:
  536. return event.returnValue, err
  537. }
  538. }
  539. func (s *server) sendAsync(value interface{}) {
  540. if !s.Running() {
  541. return
  542. }
  543. event := &ev{target: value, c: make(chan error, 1)}
  544. // try a non-blocking send first
  545. // in most cases, this should not be blocking
  546. // avoid create unnecessary go routines
  547. select {
  548. case s.c <- event:
  549. return
  550. default:
  551. }
  552. s.routineGroup.Add(1)
  553. go func() {
  554. defer s.routineGroup.Done()
  555. select {
  556. case s.c <- event:
  557. case <-s.stopped:
  558. }
  559. }()
  560. }
  561. // The event loop that is run when the server is in a Follower state.
  562. // Responds to RPCs from candidates and leaders.
  563. // Converts to candidate if election timeout elapses without either:
  564. // 1.Receiving valid AppendEntries RPC, or
  565. // 2.Granting vote to candidate
  566. func (s *server) followerLoop() {
  567. since := time.Now()
  568. electionTimeout := s.ElectionTimeout()
  569. timeoutChan := afterBetween(s.ElectionTimeout(), s.ElectionTimeout()*2)
  570. for s.State() == Follower {
  571. var err error
  572. update := false
  573. select {
  574. case <-s.stopped:
  575. s.setState(Stopped)
  576. return
  577. case e := <-s.c:
  578. switch req := e.target.(type) {
  579. case JoinCommand:
  580. //If no log entries exist and a self-join command is issued
  581. //then immediately become leader and commit entry.
  582. if s.log.currentIndex() == 0 && req.NodeName() == s.Name() {
  583. s.debugln("selfjoin and promote to leader")
  584. s.setState(Leader)
  585. s.processCommand(req, e)
  586. } else {
  587. err = NotLeaderError
  588. }
  589. case *AppendEntriesRequest:
  590. // If heartbeats get too close to the election timeout then send an event.
  591. elapsedTime := time.Now().Sub(since)
  592. if elapsedTime > time.Duration(float64(electionTimeout)*ElectionTimeoutThresholdPercent) {
  593. s.DispatchEvent(newEvent(ElectionTimeoutThresholdEventType, elapsedTime, nil))
  594. }
  595. e.returnValue, update = s.processAppendEntriesRequest(req)
  596. case *RequestVoteRequest:
  597. e.returnValue, update = s.processRequestVoteRequest(req)
  598. case *SnapshotRequest:
  599. e.returnValue = s.processSnapshotRequest(req)
  600. default:
  601. err = NotLeaderError
  602. }
  603. // Callback to event.
  604. e.c <- err
  605. case <-timeoutChan:
  606. // only allow synced follower to promote to candidate
  607. if s.promotable() {
  608. s.setState(Candidate)
  609. } else {
  610. update = true
  611. }
  612. }
  613. // Converts to candidate if election timeout elapses without either:
  614. // 1.Receiving valid AppendEntries RPC, or
  615. // 2.Granting vote to candidate
  616. if update {
  617. since = time.Now()
  618. timeoutChan = afterBetween(s.ElectionTimeout(), s.ElectionTimeout()*2)
  619. }
  620. }
  621. }
  622. // The event loop that is run when the server is in a Candidate state.
  623. func (s *server) candidateLoop() {
  624. // Clear leader value.
  625. prevLeader := s.leader
  626. s.leader = ""
  627. if prevLeader != s.leader {
  628. s.DispatchEvent(newEvent(LeaderChangeEventType, s.leader, prevLeader))
  629. }
  630. lastLogIndex, lastLogTerm := s.log.lastInfo()
  631. doVote := true
  632. votesGranted := 0
  633. var timeoutChan <-chan time.Time
  634. var respChan chan *RequestVoteResponse
  635. for s.State() == Candidate {
  636. if doVote {
  637. // Increment current term, vote for self.
  638. s.currentTerm++
  639. s.votedFor = s.name
  640. // Send RequestVote RPCs to all other servers.
  641. respChan = make(chan *RequestVoteResponse, len(s.peers))
  642. for _, peer := range s.peers {
  643. s.routineGroup.Add(1)
  644. go func(peer *Peer) {
  645. defer s.routineGroup.Done()
  646. peer.sendVoteRequest(newRequestVoteRequest(s.currentTerm, s.name, lastLogIndex, lastLogTerm), respChan)
  647. }(peer)
  648. }
  649. // Wait for either:
  650. // * Votes received from majority of servers: become leader
  651. // * AppendEntries RPC received from new leader: step down.
  652. // * Election timeout elapses without election resolution: increment term, start new election
  653. // * Discover higher term: step down (§5.1)
  654. votesGranted = 1
  655. timeoutChan = afterBetween(s.ElectionTimeout(), s.ElectionTimeout()*2)
  656. doVote = false
  657. }
  658. // If we received enough votes then stop waiting for more votes.
  659. // And return from the candidate loop
  660. if votesGranted == s.QuorumSize() {
  661. s.debugln("server.candidate.recv.enough.votes")
  662. s.setState(Leader)
  663. return
  664. }
  665. // Collect votes from peers.
  666. select {
  667. case <-s.stopped:
  668. s.setState(Stopped)
  669. return
  670. case resp := <-respChan:
  671. if success := s.processVoteResponse(resp); success {
  672. s.debugln("server.candidate.vote.granted: ", votesGranted)
  673. votesGranted++
  674. }
  675. case e := <-s.c:
  676. var err error
  677. switch req := e.target.(type) {
  678. case Command:
  679. err = NotLeaderError
  680. case *AppendEntriesRequest:
  681. e.returnValue, _ = s.processAppendEntriesRequest(req)
  682. case *RequestVoteRequest:
  683. e.returnValue, _ = s.processRequestVoteRequest(req)
  684. }
  685. // Callback to event.
  686. e.c <- err
  687. case <-timeoutChan:
  688. doVote = true
  689. }
  690. }
  691. }
  692. // The event loop that is run when the server is in a Leader state.
  693. func (s *server) leaderLoop() {
  694. logIndex, _ := s.log.lastInfo()
  695. // Update the peers prevLogIndex to leader's lastLogIndex and start heartbeat.
  696. s.debugln("leaderLoop.set.PrevIndex to ", logIndex)
  697. for _, peer := range s.peers {
  698. peer.setPrevLogIndex(logIndex)
  699. peer.startHeartbeat()
  700. }
  701. // Commit a NOP after the server becomes leader. From the Raft paper:
  702. // "Upon election: send initial empty AppendEntries RPCs (heartbeat) to
  703. // each server; repeat during idle periods to prevent election timeouts
  704. // (§5.2)". The heartbeats started above do the "idle" period work.
  705. s.routineGroup.Add(1)
  706. go func() {
  707. defer s.routineGroup.Done()
  708. s.Do(NOPCommand{})
  709. }()
  710. // Begin to collect response from followers
  711. for s.State() == Leader {
  712. var err error
  713. select {
  714. case <-s.stopped:
  715. // Stop all peers before stop
  716. for _, peer := range s.peers {
  717. peer.stopHeartbeat(false)
  718. }
  719. s.setState(Stopped)
  720. return
  721. case e := <-s.c:
  722. switch req := e.target.(type) {
  723. case Command:
  724. s.processCommand(req, e)
  725. continue
  726. case *AppendEntriesRequest:
  727. e.returnValue, _ = s.processAppendEntriesRequest(req)
  728. case *AppendEntriesResponse:
  729. s.processAppendEntriesResponse(req)
  730. case *RequestVoteRequest:
  731. e.returnValue, _ = s.processRequestVoteRequest(req)
  732. }
  733. // Callback to event.
  734. e.c <- err
  735. }
  736. }
  737. s.syncedPeer = nil
  738. }
  739. func (s *server) snapshotLoop() {
  740. for s.State() == Snapshotting {
  741. var err error
  742. select {
  743. case <-s.stopped:
  744. s.setState(Stopped)
  745. return
  746. case e := <-s.c:
  747. switch req := e.target.(type) {
  748. case Command:
  749. err = NotLeaderError
  750. case *AppendEntriesRequest:
  751. e.returnValue, _ = s.processAppendEntriesRequest(req)
  752. case *RequestVoteRequest:
  753. e.returnValue, _ = s.processRequestVoteRequest(req)
  754. case *SnapshotRecoveryRequest:
  755. e.returnValue = s.processSnapshotRecoveryRequest(req)
  756. }
  757. // Callback to event.
  758. e.c <- err
  759. }
  760. }
  761. }
  762. //--------------------------------------
  763. // Commands
  764. //--------------------------------------
  765. // Attempts to execute a command and replicate it. The function will return
  766. // when the command has been successfully committed or an error has occurred.
  767. func (s *server) Do(command Command) (interface{}, error) {
  768. return s.send(command)
  769. }
  770. // Processes a command.
  771. func (s *server) processCommand(command Command, e *ev) {
  772. s.debugln("server.command.process")
  773. // Create an entry for the command in the log.
  774. entry, err := s.log.createEntry(s.currentTerm, command, e)
  775. if err != nil {
  776. s.debugln("server.command.log.entry.error:", err)
  777. e.c <- err
  778. return
  779. }
  780. if err := s.log.appendEntry(entry); err != nil {
  781. s.debugln("server.command.log.error:", err)
  782. e.c <- err
  783. return
  784. }
  785. s.syncedPeer[s.Name()] = true
  786. if len(s.peers) == 0 {
  787. commitIndex := s.log.currentIndex()
  788. s.log.setCommitIndex(commitIndex)
  789. s.debugln("commit index ", commitIndex)
  790. }
  791. }
  792. //--------------------------------------
  793. // Append Entries
  794. //--------------------------------------
  795. // Appends zero or more log entry from the leader to this server.
  796. func (s *server) AppendEntries(req *AppendEntriesRequest) *AppendEntriesResponse {
  797. ret, _ := s.send(req)
  798. resp, _ := ret.(*AppendEntriesResponse)
  799. return resp
  800. }
  801. // Processes the "append entries" request.
  802. func (s *server) processAppendEntriesRequest(req *AppendEntriesRequest) (*AppendEntriesResponse, bool) {
  803. s.traceln("server.ae.process")
  804. if req.Term < s.currentTerm {
  805. s.debugln("server.ae.error: stale term")
  806. return newAppendEntriesResponse(s.currentTerm, false, s.log.currentIndex(), s.log.CommitIndex()), false
  807. }
  808. if req.Term == s.currentTerm {
  809. _assert(s.State() != Leader, "leader.elected.at.same.term.%d\n", s.currentTerm)
  810. // step-down to follower when it is a candidate
  811. if s.state == Candidate {
  812. // change state to follower
  813. s.setState(Follower)
  814. }
  815. // discover new leader when candidate
  816. // save leader name when follower
  817. s.leader = req.LeaderName
  818. } else {
  819. // Update term and leader.
  820. s.updateCurrentTerm(req.Term, req.LeaderName)
  821. }
  822. // Reject if log doesn't contain a matching previous entry.
  823. if err := s.log.truncate(req.PrevLogIndex, req.PrevLogTerm); err != nil {
  824. s.debugln("server.ae.truncate.error: ", err)
  825. return newAppendEntriesResponse(s.currentTerm, false, s.log.currentIndex(), s.log.CommitIndex()), true
  826. }
  827. // Append entries to the log.
  828. if err := s.log.appendEntries(req.Entries); err != nil {
  829. s.debugln("server.ae.append.error: ", err)
  830. return newAppendEntriesResponse(s.currentTerm, false, s.log.currentIndex(), s.log.CommitIndex()), true
  831. }
  832. // Commit up to the commit index.
  833. if err := s.log.setCommitIndex(req.CommitIndex); err != nil {
  834. s.debugln("server.ae.commit.error: ", err)
  835. return newAppendEntriesResponse(s.currentTerm, false, s.log.currentIndex(), s.log.CommitIndex()), true
  836. }
  837. // once the server appended and committed all the log entries from the leader
  838. return newAppendEntriesResponse(s.currentTerm, true, s.log.currentIndex(), s.log.CommitIndex()), true
  839. }
  840. // Processes the "append entries" response from the peer. This is only
  841. // processed when the server is a leader. Responses received during other
  842. // states are dropped.
  843. func (s *server) processAppendEntriesResponse(resp *AppendEntriesResponse) {
  844. // If we find a higher term then change to a follower and exit.
  845. if resp.Term() > s.Term() {
  846. s.updateCurrentTerm(resp.Term(), "")
  847. return
  848. }
  849. // panic response if it's not successful.
  850. if !resp.Success() {
  851. return
  852. }
  853. // if one peer successfully append a log from the leader term,
  854. // we add it to the synced list
  855. if resp.append == true {
  856. s.syncedPeer[resp.peer] = true
  857. }
  858. // Increment the commit count to make sure we have a quorum before committing.
  859. if len(s.syncedPeer) < s.QuorumSize() {
  860. return
  861. }
  862. // Determine the committed index that a majority has.
  863. var indices []uint64
  864. indices = append(indices, s.log.currentIndex())
  865. for _, peer := range s.peers {
  866. indices = append(indices, peer.getPrevLogIndex())
  867. }
  868. sort.Sort(sort.Reverse(uint64Slice(indices)))
  869. // We can commit up to the index which the majority of the members have appended.
  870. commitIndex := indices[s.QuorumSize()-1]
  871. committedIndex := s.log.commitIndex
  872. if commitIndex > committedIndex {
  873. // leader needs to do a fsync before committing log entries
  874. s.log.sync()
  875. s.log.setCommitIndex(commitIndex)
  876. s.debugln("commit index ", commitIndex)
  877. }
  878. }
  879. // processVoteReponse processes a vote request:
  880. // 1. if the vote is granted for the current term of the candidate, return true
  881. // 2. if the vote is denied due to smaller term, update the term of this server
  882. // which will also cause the candidate to step-down, and return false.
  883. // 3. if the vote is for a smaller term, ignore it and return false.
  884. func (s *server) processVoteResponse(resp *RequestVoteResponse) bool {
  885. if resp.VoteGranted && resp.Term == s.currentTerm {
  886. return true
  887. }
  888. if resp.Term == math.MaxUint64 {
  889. s.debugln("got a removal notification, stopping")
  890. s.DispatchEvent(newEvent(RemovedEventType, nil, nil))
  891. }
  892. if resp.Term > s.currentTerm {
  893. s.debugln("server.candidate.vote.failed")
  894. s.updateCurrentTerm(resp.Term, "")
  895. } else {
  896. s.debugln("server.candidate.vote: denied")
  897. }
  898. return false
  899. }
  900. //--------------------------------------
  901. // Request Vote
  902. //--------------------------------------
  903. // Requests a vote from a server. A vote can be obtained if the vote's term is
  904. // at the server's current term and the server has not made a vote yet. A vote
  905. // can also be obtained if the term is greater than the server's current term.
  906. func (s *server) RequestVote(req *RequestVoteRequest) *RequestVoteResponse {
  907. ret, _ := s.send(req)
  908. resp, _ := ret.(*RequestVoteResponse)
  909. return resp
  910. }
  911. // Processes a "request vote" request.
  912. func (s *server) processRequestVoteRequest(req *RequestVoteRequest) (*RequestVoteResponse, bool) {
  913. // Deny the vote quest if the candidate is not in the current cluster
  914. if _, ok := s.peers[req.CandidateName]; !ok {
  915. s.debugln("server.rv.deny.vote: unknown peer ", req.CandidateName)
  916. return newRequestVoteResponse(math.MaxUint64, false), false
  917. }
  918. // If the request is coming from an old term then reject it.
  919. if req.Term < s.Term() {
  920. s.debugln("server.rv.deny.vote: cause stale term")
  921. return newRequestVoteResponse(s.currentTerm, false), false
  922. }
  923. // If the term of the request peer is larger than this node, update the term
  924. // If the term is equal and we've already voted for a different candidate then
  925. // don't vote for this candidate.
  926. if req.Term > s.Term() {
  927. s.updateCurrentTerm(req.Term, "")
  928. } else if s.votedFor != "" && s.votedFor != req.CandidateName {
  929. s.debugln("server.deny.vote: cause duplicate vote: ", req.CandidateName,
  930. " already vote for ", s.votedFor)
  931. return newRequestVoteResponse(s.currentTerm, false), false
  932. }
  933. // If the candidate's log is not at least as up-to-date as our last log then don't vote.
  934. lastIndex, lastTerm := s.log.lastInfo()
  935. if lastIndex > req.LastLogIndex || lastTerm > req.LastLogTerm {
  936. s.debugln("server.deny.vote: cause out of date log: ", req.CandidateName,
  937. "Index :[", lastIndex, "]", " [", req.LastLogIndex, "]",
  938. "Term :[", lastTerm, "]", " [", req.LastLogTerm, "]")
  939. return newRequestVoteResponse(s.currentTerm, false), false
  940. }
  941. // If we made it this far then cast a vote and reset our election time out.
  942. s.debugln("server.rv.vote: ", s.name, " votes for", req.CandidateName, "at term", req.Term)
  943. s.votedFor = req.CandidateName
  944. return newRequestVoteResponse(s.currentTerm, true), true
  945. }
  946. //--------------------------------------
  947. // Membership
  948. //--------------------------------------
  949. // Adds a peer to the server.
  950. func (s *server) AddPeer(name string, connectiongString string) error {
  951. s.debugln("server.peer.add: ", name, len(s.peers))
  952. // Do not allow peers to be added twice.
  953. if s.peers[name] != nil {
  954. return nil
  955. }
  956. // Skip the Peer if it has the same name as the Server
  957. if s.name != name {
  958. peer := newPeer(s, name, connectiongString, s.heartbeatInterval)
  959. if s.State() == Leader {
  960. peer.startHeartbeat()
  961. }
  962. s.peers[peer.Name] = peer
  963. s.DispatchEvent(newEvent(AddPeerEventType, name, nil))
  964. }
  965. // Write the configuration to file.
  966. s.writeConf()
  967. return nil
  968. }
  969. // Removes a peer from the server.
  970. func (s *server) RemovePeer(name string) error {
  971. s.debugln("server.peer.remove: ", name, len(s.peers))
  972. // Skip the Peer if it has the same name as the Server
  973. if name != s.Name() {
  974. // Return error if peer doesn't exist.
  975. peer := s.peers[name]
  976. if peer == nil {
  977. return fmt.Errorf("raft: Peer not found: %s", name)
  978. }
  979. // Stop peer and remove it.
  980. if s.State() == Leader {
  981. // We create a go routine here to avoid potential deadlock.
  982. // We are holding log write lock when reach this line of code.
  983. // Peer.stopHeartbeat can be blocked without go routine, if the
  984. // target go routine (which we want to stop) is calling
  985. // log.getEntriesAfter and waiting for log read lock.
  986. // So we might be holding log lock and waiting for log lock,
  987. // which lead to a deadlock.
  988. // TODO(xiangli) refactor log lock
  989. s.routineGroup.Add(1)
  990. go func() {
  991. defer s.routineGroup.Done()
  992. peer.stopHeartbeat(true)
  993. }()
  994. }
  995. delete(s.peers, name)
  996. s.DispatchEvent(newEvent(RemovePeerEventType, name, nil))
  997. }
  998. // Write the configuration to file.
  999. s.writeConf()
  1000. return nil
  1001. }
  1002. //--------------------------------------
  1003. // Log compaction
  1004. //--------------------------------------
  1005. func (s *server) TakeSnapshot() error {
  1006. if s.stateMachine == nil {
  1007. return errors.New("Snapshot: Cannot create snapshot. Missing state machine.")
  1008. }
  1009. // Shortcut without lock
  1010. // Exit if the server is currently creating a snapshot.
  1011. if s.pendingSnapshot != nil {
  1012. return errors.New("Snapshot: Last snapshot is not finished.")
  1013. }
  1014. // TODO: acquire the lock and no more committed is allowed
  1015. // This will be done after finishing refactoring heartbeat
  1016. s.debugln("take.snapshot")
  1017. lastIndex, lastTerm := s.log.commitInfo()
  1018. // check if there is log has been committed since the
  1019. // last snapshot.
  1020. if lastIndex == s.log.startIndex {
  1021. return nil
  1022. }
  1023. path := s.SnapshotPath(lastIndex, lastTerm)
  1024. // Attach snapshot to pending snapshot and save it to disk.
  1025. s.pendingSnapshot = &Snapshot{lastIndex, lastTerm, nil, nil, path}
  1026. state, err := s.stateMachine.Save()
  1027. if err != nil {
  1028. return err
  1029. }
  1030. // Clone the list of peers.
  1031. peers := make([]*Peer, 0, len(s.peers)+1)
  1032. for _, peer := range s.peers {
  1033. peers = append(peers, peer.clone())
  1034. }
  1035. peers = append(peers, &Peer{Name: s.Name(), ConnectionString: s.connectionString})
  1036. // Attach snapshot to pending snapshot and save it to disk.
  1037. s.pendingSnapshot.Peers = peers
  1038. s.pendingSnapshot.State = state
  1039. s.saveSnapshot()
  1040. // We keep some log entries after the snapshot.
  1041. // We do not want to send the whole snapshot to the slightly slow machines
  1042. if lastIndex-s.log.startIndex > NumberOfLogEntriesAfterSnapshot {
  1043. compactIndex := lastIndex - NumberOfLogEntriesAfterSnapshot
  1044. compactTerm := s.log.getEntry(compactIndex).Term()
  1045. s.log.compact(compactIndex, compactTerm)
  1046. }
  1047. return nil
  1048. }
  1049. // Retrieves the log path for the server.
  1050. func (s *server) saveSnapshot() error {
  1051. if s.pendingSnapshot == nil {
  1052. return errors.New("pendingSnapshot.is.nil")
  1053. }
  1054. // Write snapshot to disk.
  1055. if err := s.pendingSnapshot.save(); err != nil {
  1056. return err
  1057. }
  1058. // Swap the current and last snapshots.
  1059. tmp := s.snapshot
  1060. s.snapshot = s.pendingSnapshot
  1061. // Delete the previous snapshot if there is any change
  1062. if tmp != nil && !(tmp.LastIndex == s.snapshot.LastIndex && tmp.LastTerm == s.snapshot.LastTerm) {
  1063. tmp.remove()
  1064. }
  1065. s.pendingSnapshot = nil
  1066. return nil
  1067. }
  1068. // Retrieves the log path for the server.
  1069. func (s *server) SnapshotPath(lastIndex uint64, lastTerm uint64) string {
  1070. return path.Join(s.path, "snapshot", fmt.Sprintf("%v_%v.ss", lastTerm, lastIndex))
  1071. }
  1072. func (s *server) RequestSnapshot(req *SnapshotRequest) *SnapshotResponse {
  1073. ret, _ := s.send(req)
  1074. resp, _ := ret.(*SnapshotResponse)
  1075. return resp
  1076. }
  1077. func (s *server) processSnapshotRequest(req *SnapshotRequest) *SnapshotResponse {
  1078. // If the follower’s log contains an entry at the snapshot’s last index with a term
  1079. // that matches the snapshot’s last term, then the follower already has all the
  1080. // information found in the snapshot and can reply false.
  1081. entry := s.log.getEntry(req.LastIndex)
  1082. if entry != nil && entry.Term() == req.LastTerm {
  1083. return newSnapshotResponse(false)
  1084. }
  1085. // Update state.
  1086. s.setState(Snapshotting)
  1087. return newSnapshotResponse(true)
  1088. }
  1089. func (s *server) SnapshotRecoveryRequest(req *SnapshotRecoveryRequest) *SnapshotRecoveryResponse {
  1090. ret, _ := s.send(req)
  1091. resp, _ := ret.(*SnapshotRecoveryResponse)
  1092. return resp
  1093. }
  1094. func (s *server) processSnapshotRecoveryRequest(req *SnapshotRecoveryRequest) *SnapshotRecoveryResponse {
  1095. // Recover state sent from request.
  1096. if err := s.stateMachine.Recovery(req.State); err != nil {
  1097. panic("cannot recover from previous state")
  1098. }
  1099. // Recover the cluster configuration.
  1100. s.peers = make(map[string]*Peer)
  1101. for _, peer := range req.Peers {
  1102. s.AddPeer(peer.Name, peer.ConnectionString)
  1103. }
  1104. // Update log state.
  1105. s.currentTerm = req.LastTerm
  1106. s.log.updateCommitIndex(req.LastIndex)
  1107. // Create local snapshot.
  1108. s.pendingSnapshot = &Snapshot{req.LastIndex, req.LastTerm, req.Peers, req.State, s.SnapshotPath(req.LastIndex, req.LastTerm)}
  1109. s.saveSnapshot()
  1110. // Clear the previous log entries.
  1111. s.log.compact(req.LastIndex, req.LastTerm)
  1112. return newSnapshotRecoveryResponse(req.LastTerm, true, req.LastIndex)
  1113. }
  1114. // Load a snapshot at restart
  1115. func (s *server) LoadSnapshot() error {
  1116. // Open snapshot/ directory.
  1117. dir, err := os.OpenFile(path.Join(s.path, "snapshot"), os.O_RDONLY, 0)
  1118. if err != nil {
  1119. s.debugln("cannot.open.snapshot: ", err)
  1120. return err
  1121. }
  1122. // Retrieve a list of all snapshots.
  1123. filenames, err := dir.Readdirnames(-1)
  1124. if err != nil {
  1125. dir.Close()
  1126. panic(err)
  1127. }
  1128. dir.Close()
  1129. if len(filenames) == 0 {
  1130. s.debugln("no.snapshot.to.load")
  1131. return nil
  1132. }
  1133. // Grab the latest snapshot.
  1134. sort.Strings(filenames)
  1135. snapshotPath := path.Join(s.path, "snapshot", filenames[len(filenames)-1])
  1136. // Read snapshot data.
  1137. file, err := os.OpenFile(snapshotPath, os.O_RDONLY, 0)
  1138. if err != nil {
  1139. return err
  1140. }
  1141. defer file.Close()
  1142. // Check checksum.
  1143. var checksum uint32
  1144. n, err := fmt.Fscanf(file, "%08x\n", &checksum)
  1145. if err != nil {
  1146. return err
  1147. } else if n != 1 {
  1148. return errors.New("checksum.err: bad.snapshot.file")
  1149. }
  1150. // Load remaining snapshot contents.
  1151. b, err := ioutil.ReadAll(file)
  1152. if err != nil {
  1153. return err
  1154. }
  1155. // Generate checksum.
  1156. byteChecksum := crc32.ChecksumIEEE(b)
  1157. if uint32(checksum) != byteChecksum {
  1158. s.debugln(checksum, " ", byteChecksum)
  1159. return errors.New("bad snapshot file")
  1160. }
  1161. // Decode snapshot.
  1162. if err = json.Unmarshal(b, &s.snapshot); err != nil {
  1163. s.debugln("unmarshal.snapshot.error: ", err)
  1164. return err
  1165. }
  1166. // Recover snapshot into state machine.
  1167. if err = s.stateMachine.Recovery(s.snapshot.State); err != nil {
  1168. s.debugln("recovery.snapshot.error: ", err)
  1169. return err
  1170. }
  1171. // Recover cluster configuration.
  1172. for _, peer := range s.snapshot.Peers {
  1173. s.AddPeer(peer.Name, peer.ConnectionString)
  1174. }
  1175. // Update log state.
  1176. s.log.startTerm = s.snapshot.LastTerm
  1177. s.log.startIndex = s.snapshot.LastIndex
  1178. s.log.updateCommitIndex(s.snapshot.LastIndex)
  1179. return err
  1180. }
  1181. //--------------------------------------
  1182. // Config File
  1183. //--------------------------------------
  1184. // Flushes commit index to the disk.
  1185. // So when the raft server restarts, it will commit upto the flushed commitIndex.
  1186. func (s *server) FlushCommitIndex() {
  1187. s.debugln("server.conf.update")
  1188. // Write the configuration to file.
  1189. s.writeConf()
  1190. }
  1191. func (s *server) writeConf() {
  1192. peers := make([]*Peer, len(s.peers))
  1193. i := 0
  1194. for _, peer := range s.peers {
  1195. peers[i] = peer.clone()
  1196. i++
  1197. }
  1198. r := &Config{
  1199. CommitIndex: s.log.commitIndex,
  1200. Peers: peers,
  1201. }
  1202. b, _ := json.Marshal(r)
  1203. confPath := path.Join(s.path, "conf")
  1204. tmpConfPath := path.Join(s.path, "conf.tmp")
  1205. err := writeFileSynced(tmpConfPath, b, 0600)
  1206. if err != nil {
  1207. panic(err)
  1208. }
  1209. os.Rename(tmpConfPath, confPath)
  1210. }
  1211. // Read the configuration for the server.
  1212. func (s *server) readConf() error {
  1213. confPath := path.Join(s.path, "conf")
  1214. s.debugln("readConf.open ", confPath)
  1215. // open conf file
  1216. b, err := ioutil.ReadFile(confPath)
  1217. if err != nil {
  1218. return nil
  1219. }
  1220. conf := &Config{}
  1221. if err = json.Unmarshal(b, conf); err != nil {
  1222. return err
  1223. }
  1224. s.log.updateCommitIndex(conf.CommitIndex)
  1225. return nil
  1226. }
  1227. //--------------------------------------
  1228. // Debugging
  1229. //--------------------------------------
  1230. func (s *server) debugln(v ...interface{}) {
  1231. if logLevel > Debug {
  1232. debugf("[%s Term:%d] %s", s.name, s.Term(), fmt.Sprintln(v...))
  1233. }
  1234. }
  1235. func (s *server) traceln(v ...interface{}) {
  1236. if logLevel > Trace {
  1237. tracef("[%s] %s", s.name, fmt.Sprintln(v...))
  1238. }
  1239. }