server.go 36 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359
  1. package raft
  2. import (
  3. "encoding/json"
  4. "errors"
  5. "fmt"
  6. "hash/crc32"
  7. "io/ioutil"
  8. "os"
  9. "path"
  10. "sort"
  11. "sync"
  12. "time"
  13. )
  14. //------------------------------------------------------------------------------
  15. //
  16. // Constants
  17. //
  18. //------------------------------------------------------------------------------
  19. const (
  20. Stopped = "stopped"
  21. Follower = "follower"
  22. Candidate = "candidate"
  23. Leader = "leader"
  24. Snapshotting = "snapshotting"
  25. )
  26. const (
  27. MaxLogEntriesPerRequest = 2000
  28. NumberOfLogEntriesAfterSnapshot = 200
  29. )
  30. const (
  31. // DefaultHeartbeatInterval is the interval that the leader will send
  32. // AppendEntriesRequests to followers to maintain leadership.
  33. DefaultHeartbeatInterval = 50 * time.Millisecond
  34. DefaultElectionTimeout = 150 * time.Millisecond
  35. )
  36. // ElectionTimeoutThresholdPercent specifies the threshold at which the server
  37. // will dispatch warning events that the heartbeat RTT is too close to the
  38. // election timeout.
  39. const ElectionTimeoutThresholdPercent = 0.8
  40. var stopValue interface{}
  41. //------------------------------------------------------------------------------
  42. //
  43. // Errors
  44. //
  45. //------------------------------------------------------------------------------
  46. var NotLeaderError = errors.New("raft.Server: Not current leader")
  47. var DuplicatePeerError = errors.New("raft.Server: Duplicate peer")
  48. var CommandTimeoutError = errors.New("raft: Command timeout")
  49. //------------------------------------------------------------------------------
  50. //
  51. // Typedefs
  52. //
  53. //------------------------------------------------------------------------------
  54. // A server is involved in the consensus protocol and can act as a follower,
  55. // candidate or a leader.
  56. type Server interface {
  57. Name() string
  58. Context() interface{}
  59. StateMachine() StateMachine
  60. Leader() string
  61. State() string
  62. Path() string
  63. LogPath() string
  64. SnapshotPath(lastIndex uint64, lastTerm uint64) string
  65. Term() uint64
  66. CommitIndex() uint64
  67. VotedFor() string
  68. MemberCount() int
  69. QuorumSize() int
  70. IsLogEmpty() bool
  71. LogEntries() []*LogEntry
  72. LastCommandName() string
  73. GetState() string
  74. ElectionTimeout() time.Duration
  75. SetElectionTimeout(duration time.Duration)
  76. HeartbeatInterval() time.Duration
  77. SetHeartbeatInterval(duration time.Duration)
  78. Transporter() Transporter
  79. SetTransporter(t Transporter)
  80. AppendEntries(req *AppendEntriesRequest) *AppendEntriesResponse
  81. RequestVote(req *RequestVoteRequest) *RequestVoteResponse
  82. RequestSnapshot(req *SnapshotRequest) *SnapshotResponse
  83. SnapshotRecoveryRequest(req *SnapshotRecoveryRequest) *SnapshotRecoveryResponse
  84. AddPeer(name string, connectiongString string) error
  85. RemovePeer(name string) error
  86. Peers() map[string]*Peer
  87. Start() error
  88. Stop()
  89. Running() bool
  90. Do(command Command) (interface{}, error)
  91. TakeSnapshot() error
  92. LoadSnapshot() error
  93. AddEventListener(string, EventListener)
  94. }
  95. type server struct {
  96. *eventDispatcher
  97. name string
  98. path string
  99. state string
  100. transporter Transporter
  101. context interface{}
  102. currentTerm uint64
  103. votedFor string
  104. log *Log
  105. leader string
  106. peers map[string]*Peer
  107. mutex sync.RWMutex
  108. syncedPeer map[string]bool
  109. stopped chan bool
  110. c chan *ev
  111. electionTimeout time.Duration
  112. heartbeatInterval time.Duration
  113. currentSnapshot *Snapshot
  114. lastSnapshot *Snapshot
  115. stateMachine StateMachine
  116. maxLogEntriesPerRequest uint64
  117. connectionString string
  118. }
  119. // An internal event to be processed by the server's event loop.
  120. type ev struct {
  121. target interface{}
  122. returnValue interface{}
  123. c chan error
  124. }
  125. //------------------------------------------------------------------------------
  126. //
  127. // Constructor
  128. //
  129. //------------------------------------------------------------------------------
  130. // Creates a new server with a log at the given path. transporter must
  131. // not be nil. stateMachine can be nil if snapshotting and log
  132. // compaction is to be disabled. context can be anything (including nil)
  133. // and is not used by the raft package except returned by
  134. // Server.Context(). connectionString can be anything.
  135. func NewServer(name string, path string, transporter Transporter, stateMachine StateMachine, ctx interface{}, connectionString string) (Server, error) {
  136. if name == "" {
  137. return nil, errors.New("raft.Server: Name cannot be blank")
  138. }
  139. if transporter == nil {
  140. panic("raft: Transporter required")
  141. }
  142. s := &server{
  143. name: name,
  144. path: path,
  145. transporter: transporter,
  146. stateMachine: stateMachine,
  147. context: ctx,
  148. state: Stopped,
  149. peers: make(map[string]*Peer),
  150. log: newLog(),
  151. stopped: make(chan bool),
  152. c: make(chan *ev, 256),
  153. electionTimeout: DefaultElectionTimeout,
  154. heartbeatInterval: DefaultHeartbeatInterval,
  155. maxLogEntriesPerRequest: MaxLogEntriesPerRequest,
  156. connectionString: connectionString,
  157. }
  158. s.eventDispatcher = newEventDispatcher(s)
  159. // Setup apply function.
  160. s.log.ApplyFunc = func(e *LogEntry, c Command) (interface{}, error) {
  161. // Dispatch commit event.
  162. s.DispatchEvent(newEvent(CommitEventType, e, nil))
  163. // Apply command to the state machine.
  164. switch c := c.(type) {
  165. case CommandApply:
  166. return c.Apply(&context{
  167. server: s,
  168. currentTerm: s.currentTerm,
  169. currentIndex: s.log.internalCurrentIndex(),
  170. commitIndex: s.log.commitIndex,
  171. })
  172. case deprecatedCommandApply:
  173. return c.Apply(s)
  174. default:
  175. return nil, fmt.Errorf("Command does not implement Apply()")
  176. }
  177. }
  178. return s, nil
  179. }
  180. //------------------------------------------------------------------------------
  181. //
  182. // Accessors
  183. //
  184. //------------------------------------------------------------------------------
  185. //--------------------------------------
  186. // General
  187. //--------------------------------------
  188. // Retrieves the name of the server.
  189. func (s *server) Name() string {
  190. return s.name
  191. }
  192. // Retrieves the storage path for the server.
  193. func (s *server) Path() string {
  194. return s.path
  195. }
  196. // The name of the current leader.
  197. func (s *server) Leader() string {
  198. return s.leader
  199. }
  200. // Retrieves a copy of the peer data.
  201. func (s *server) Peers() map[string]*Peer {
  202. s.mutex.Lock()
  203. defer s.mutex.Unlock()
  204. peers := make(map[string]*Peer)
  205. for name, peer := range s.peers {
  206. peers[name] = peer.clone()
  207. }
  208. return peers
  209. }
  210. // Retrieves the object that transports requests.
  211. func (s *server) Transporter() Transporter {
  212. s.mutex.RLock()
  213. defer s.mutex.RUnlock()
  214. return s.transporter
  215. }
  216. func (s *server) SetTransporter(t Transporter) {
  217. s.mutex.Lock()
  218. defer s.mutex.Unlock()
  219. s.transporter = t
  220. }
  221. // Retrieves the context passed into the constructor.
  222. func (s *server) Context() interface{} {
  223. return s.context
  224. }
  225. // Retrieves the state machine passed into the constructor.
  226. func (s *server) StateMachine() StateMachine {
  227. return s.stateMachine
  228. }
  229. // Retrieves the log path for the server.
  230. func (s *server) LogPath() string {
  231. return path.Join(s.path, "log")
  232. }
  233. // Retrieves the current state of the server.
  234. func (s *server) State() string {
  235. s.mutex.RLock()
  236. defer s.mutex.RUnlock()
  237. return s.state
  238. }
  239. // Sets the state of the server.
  240. func (s *server) setState(state string) {
  241. s.mutex.Lock()
  242. defer s.mutex.Unlock()
  243. // Temporarily store previous values.
  244. prevState := s.state
  245. prevLeader := s.leader
  246. // Update state and leader.
  247. s.state = state
  248. if state == Leader {
  249. s.leader = s.Name()
  250. s.syncedPeer = make(map[string]bool)
  251. }
  252. // Dispatch state and leader change events.
  253. if prevState != state {
  254. s.DispatchEvent(newEvent(StateChangeEventType, s.state, prevState))
  255. }
  256. if prevLeader != s.leader {
  257. s.DispatchEvent(newEvent(LeaderChangeEventType, s.leader, prevLeader))
  258. }
  259. }
  260. // Retrieves the current term of the server.
  261. func (s *server) Term() uint64 {
  262. s.mutex.RLock()
  263. defer s.mutex.RUnlock()
  264. return s.currentTerm
  265. }
  266. // Retrieves the current commit index of the server.
  267. func (s *server) CommitIndex() uint64 {
  268. s.log.mutex.RLock()
  269. defer s.log.mutex.RUnlock()
  270. return s.log.commitIndex
  271. }
  272. // Retrieves the name of the candidate this server voted for in this term.
  273. func (s *server) VotedFor() string {
  274. return s.votedFor
  275. }
  276. // Retrieves whether the server's log has no entries.
  277. func (s *server) IsLogEmpty() bool {
  278. return s.log.isEmpty()
  279. }
  280. // A list of all the log entries. This should only be used for debugging purposes.
  281. func (s *server) LogEntries() []*LogEntry {
  282. return s.log.entries
  283. }
  284. // A reference to the command name of the last entry.
  285. func (s *server) LastCommandName() string {
  286. return s.log.lastCommandName()
  287. }
  288. // Get the state of the server for debugging
  289. func (s *server) GetState() string {
  290. s.mutex.RLock()
  291. defer s.mutex.RUnlock()
  292. return fmt.Sprintf("Name: %s, State: %s, Term: %v, CommitedIndex: %v ", s.name, s.state, s.currentTerm, s.log.commitIndex)
  293. }
  294. // Check if the server is promotable
  295. func (s *server) promotable() bool {
  296. return s.log.currentIndex() > 0
  297. }
  298. //--------------------------------------
  299. // Membership
  300. //--------------------------------------
  301. // Retrieves the number of member servers in the consensus.
  302. func (s *server) MemberCount() int {
  303. s.mutex.Lock()
  304. defer s.mutex.Unlock()
  305. return len(s.peers) + 1
  306. }
  307. // Retrieves the number of servers required to make a quorum.
  308. func (s *server) QuorumSize() int {
  309. return (s.MemberCount() / 2) + 1
  310. }
  311. //--------------------------------------
  312. // Election timeout
  313. //--------------------------------------
  314. // Retrieves the election timeout.
  315. func (s *server) ElectionTimeout() time.Duration {
  316. s.mutex.RLock()
  317. defer s.mutex.RUnlock()
  318. return s.electionTimeout
  319. }
  320. // Sets the election timeout.
  321. func (s *server) SetElectionTimeout(duration time.Duration) {
  322. s.mutex.Lock()
  323. defer s.mutex.Unlock()
  324. s.electionTimeout = duration
  325. }
  326. //--------------------------------------
  327. // Heartbeat timeout
  328. //--------------------------------------
  329. // Retrieves the heartbeat timeout.
  330. func (s *server) HeartbeatInterval() time.Duration {
  331. s.mutex.RLock()
  332. defer s.mutex.RUnlock()
  333. return s.heartbeatInterval
  334. }
  335. // Sets the heartbeat timeout.
  336. func (s *server) SetHeartbeatInterval(duration time.Duration) {
  337. s.mutex.Lock()
  338. defer s.mutex.Unlock()
  339. s.heartbeatInterval = duration
  340. for _, peer := range s.peers {
  341. peer.setHeartbeatInterval(duration)
  342. }
  343. }
  344. //------------------------------------------------------------------------------
  345. //
  346. // Methods
  347. //
  348. //------------------------------------------------------------------------------
  349. //--------------------------------------
  350. // Initialization
  351. //--------------------------------------
  352. // Reg the NOPCommand
  353. func init() {
  354. RegisterCommand(&NOPCommand{})
  355. RegisterCommand(&DefaultJoinCommand{})
  356. RegisterCommand(&DefaultLeaveCommand{})
  357. }
  358. // Start as follow
  359. // If log entries exist then allow promotion to candidate if no AEs received.
  360. // If no log entries exist then wait for AEs from another node.
  361. // If no log entries exist and a self-join command is issued then
  362. // immediately become leader and commit entry.
  363. func (s *server) Start() error {
  364. // Exit if the server is already running.
  365. if s.State() != Stopped {
  366. return errors.New("raft.Server: Server already running")
  367. }
  368. // Create snapshot directory if not exist
  369. os.Mkdir(path.Join(s.path, "snapshot"), 0700)
  370. if err := s.readConf(); err != nil {
  371. s.debugln("raft: Conf file error: ", err)
  372. return fmt.Errorf("raft: Initialization error: %s", err)
  373. }
  374. // Initialize the log and load it up.
  375. if err := s.log.open(s.LogPath()); err != nil {
  376. s.debugln("raft: Log error: ", err)
  377. return fmt.Errorf("raft: Initialization error: %s", err)
  378. }
  379. // Update the term to the last term in the log.
  380. _, s.currentTerm = s.log.lastInfo()
  381. s.setState(Follower)
  382. // If no log entries exist then
  383. // 1. wait for AEs from another node
  384. // 2. wait for self-join command
  385. // to set itself promotable
  386. if !s.promotable() {
  387. s.debugln("start as a new raft server")
  388. // If log entries exist then allow promotion to candidate
  389. // if no AEs received.
  390. } else {
  391. s.debugln("start from previous saved state")
  392. }
  393. debugln(s.GetState())
  394. go s.loop()
  395. return nil
  396. }
  397. // Shuts down the server.
  398. func (s *server) Stop() {
  399. s.send(&stopValue)
  400. // make sure the server has stopped before we close the log
  401. <-s.stopped
  402. s.log.close()
  403. s.state = Stopped
  404. }
  405. // Checks if the server is currently running.
  406. func (s *server) Running() bool {
  407. s.mutex.RLock()
  408. defer s.mutex.RUnlock()
  409. return s.state != Stopped
  410. }
  411. //--------------------------------------
  412. // Term
  413. //--------------------------------------
  414. // Sets the current term for the server. This is only used when an external
  415. // current term is found.
  416. func (s *server) setCurrentTerm(term uint64, leaderName string, append bool) {
  417. s.mutex.Lock()
  418. defer s.mutex.Unlock()
  419. // Store previous values temporarily.
  420. prevState := s.state
  421. prevTerm := s.currentTerm
  422. prevLeader := s.leader
  423. if term > s.currentTerm {
  424. // stop heartbeats before step-down
  425. if s.state == Leader {
  426. s.mutex.Unlock()
  427. for _, peer := range s.peers {
  428. peer.stopHeartbeat(false)
  429. }
  430. s.mutex.Lock()
  431. }
  432. // update the term and clear vote for
  433. s.state = Follower
  434. s.currentTerm = term
  435. s.leader = leaderName
  436. s.votedFor = ""
  437. } else if term == s.currentTerm && s.state != Leader && append {
  438. // discover new leader when candidate
  439. // save leader name when follower
  440. s.state = Follower
  441. s.leader = leaderName
  442. }
  443. // Dispatch change events.
  444. if prevState != s.state {
  445. s.DispatchEvent(newEvent(StateChangeEventType, s.state, prevState))
  446. }
  447. if prevLeader != s.leader {
  448. s.DispatchEvent(newEvent(LeaderChangeEventType, s.leader, prevLeader))
  449. }
  450. if prevTerm != s.currentTerm {
  451. s.DispatchEvent(newEvent(TermChangeEventType, s.currentTerm, prevTerm))
  452. }
  453. }
  454. //--------------------------------------
  455. // Event Loop
  456. //--------------------------------------
  457. // ________
  458. // --|Snapshot| timeout
  459. // | -------- ______
  460. // recover | ^ | |
  461. // snapshot / | |snapshot | |
  462. // higher | | v | recv majority votes
  463. // term | -------- timeout ----------- -----------
  464. // |-> |Follower| ----------> | Candidate |--------------------> | Leader |
  465. // -------- ----------- -----------
  466. // ^ higher term/ | higher term |
  467. // | new leader | |
  468. // |_______________________|____________________________________ |
  469. // The main event loop for the server
  470. func (s *server) loop() {
  471. defer s.debugln("server.loop.end")
  472. for {
  473. state := s.State()
  474. s.debugln("server.loop.run ", state)
  475. switch state {
  476. case Follower:
  477. s.followerLoop()
  478. case Candidate:
  479. s.candidateLoop()
  480. case Leader:
  481. s.leaderLoop()
  482. case Snapshotting:
  483. s.snapshotLoop()
  484. case Stopped:
  485. s.stopped <- true
  486. return
  487. }
  488. }
  489. }
  490. // Sends an event to the event loop to be processed. The function will wait
  491. // until the event is actually processed before returning.
  492. func (s *server) send(value interface{}) (interface{}, error) {
  493. event := &ev{target: value, c: make(chan error, 1)}
  494. s.c <- event
  495. err := <-event.c
  496. return event.returnValue, err
  497. }
  498. func (s *server) sendAsync(value interface{}) {
  499. event := &ev{target: value, c: make(chan error, 1)}
  500. // try a non-blocking send first
  501. // in most cases, this should not be blocking
  502. // avoid create unnecessary go routines
  503. select {
  504. case s.c <- event:
  505. return
  506. default:
  507. }
  508. go func() {
  509. s.c <- event
  510. }()
  511. }
  512. // The event loop that is run when the server is in a Follower state.
  513. // Responds to RPCs from candidates and leaders.
  514. // Converts to candidate if election timeout elapses without either:
  515. // 1.Receiving valid AppendEntries RPC, or
  516. // 2.Granting vote to candidate
  517. func (s *server) followerLoop() {
  518. s.setState(Follower)
  519. since := time.Now()
  520. electionTimeout := s.ElectionTimeout()
  521. timeoutChan := afterBetween(s.ElectionTimeout(), s.ElectionTimeout()*2)
  522. for s.State() == Follower {
  523. var err error
  524. update := false
  525. select {
  526. case e := <-s.c:
  527. if e.target == &stopValue {
  528. s.setState(Stopped)
  529. } else {
  530. switch req := e.target.(type) {
  531. case JoinCommand:
  532. //If no log entries exist and a self-join command is issued
  533. //then immediately become leader and commit entry.
  534. if s.log.currentIndex() == 0 && req.NodeName() == s.Name() {
  535. s.debugln("selfjoin and promote to leader")
  536. s.setState(Leader)
  537. s.processCommand(req, e)
  538. } else {
  539. err = NotLeaderError
  540. }
  541. case *AppendEntriesRequest:
  542. // If heartbeats get too close to the election timeout then send an event.
  543. elapsedTime := time.Now().Sub(since)
  544. if elapsedTime > time.Duration(float64(electionTimeout)*ElectionTimeoutThresholdPercent) {
  545. s.DispatchEvent(newEvent(ElectionTimeoutThresholdEventType, elapsedTime, nil))
  546. }
  547. e.returnValue, update = s.processAppendEntriesRequest(req)
  548. case *RequestVoteRequest:
  549. e.returnValue, update = s.processRequestVoteRequest(req)
  550. case *SnapshotRequest:
  551. e.returnValue = s.processSnapshotRequest(req)
  552. default:
  553. err = NotLeaderError
  554. }
  555. }
  556. // Callback to event.
  557. e.c <- err
  558. case <-timeoutChan:
  559. // only allow synced follower to promote to candidate
  560. if s.promotable() {
  561. s.setState(Candidate)
  562. } else {
  563. update = true
  564. }
  565. }
  566. // Converts to candidate if election timeout elapses without either:
  567. // 1.Receiving valid AppendEntries RPC, or
  568. // 2.Granting vote to candidate
  569. if update {
  570. since = time.Now()
  571. timeoutChan = afterBetween(s.ElectionTimeout(), s.ElectionTimeout()*2)
  572. }
  573. }
  574. }
  575. // The event loop that is run when the server is in a Candidate state.
  576. func (s *server) candidateLoop() {
  577. lastLogIndex, lastLogTerm := s.log.lastInfo()
  578. // Clear leader value.
  579. prevLeader := s.leader
  580. s.leader = ""
  581. if prevLeader != s.leader {
  582. s.DispatchEvent(newEvent(LeaderChangeEventType, s.leader, prevLeader))
  583. }
  584. for s.State() == Candidate {
  585. // Increment current term, vote for self.
  586. s.currentTerm++
  587. s.votedFor = s.name
  588. // Send RequestVote RPCs to all other servers.
  589. respChan := make(chan *RequestVoteResponse, len(s.peers))
  590. for _, peer := range s.peers {
  591. go peer.sendVoteRequest(newRequestVoteRequest(s.currentTerm, s.name, lastLogIndex, lastLogTerm), respChan)
  592. }
  593. // Wait for either:
  594. // * Votes received from majority of servers: become leader
  595. // * AppendEntries RPC received from new leader: step down.
  596. // * Election timeout elapses without election resolution: increment term, start new election
  597. // * Discover higher term: step down (§5.1)
  598. votesGranted := 1
  599. timeoutChan := afterBetween(s.ElectionTimeout(), s.ElectionTimeout()*2)
  600. timeout := false
  601. for {
  602. // If we received enough votes then stop waiting for more votes.
  603. s.debugln("server.candidate.votes: ", votesGranted, " quorum:", s.QuorumSize())
  604. if votesGranted >= s.QuorumSize() {
  605. s.setState(Leader)
  606. break
  607. }
  608. // Collect votes from peers.
  609. select {
  610. case resp := <-respChan:
  611. if resp.VoteGranted {
  612. s.debugln("server.candidate.vote.granted: ", votesGranted)
  613. votesGranted++
  614. } else if resp.Term > s.currentTerm {
  615. s.debugln("server.candidate.vote.failed")
  616. s.setCurrentTerm(resp.Term, "", false)
  617. } else {
  618. s.debugln("server.candidate.vote: denied")
  619. }
  620. case e := <-s.c:
  621. var err error
  622. if e.target == &stopValue {
  623. s.setState(Stopped)
  624. } else {
  625. switch req := e.target.(type) {
  626. case Command:
  627. err = NotLeaderError
  628. case *AppendEntriesRequest:
  629. e.returnValue, _ = s.processAppendEntriesRequest(req)
  630. case *RequestVoteRequest:
  631. e.returnValue, _ = s.processRequestVoteRequest(req)
  632. }
  633. }
  634. // Callback to event.
  635. e.c <- err
  636. case <-timeoutChan:
  637. timeout = true
  638. }
  639. // both process AER and RVR can make the server to follower
  640. // also break when timeout happens
  641. if s.State() != Candidate || timeout {
  642. break
  643. }
  644. }
  645. // continue when timeout happened
  646. }
  647. }
  648. // The event loop that is run when the server is in a Leader state.
  649. func (s *server) leaderLoop() {
  650. s.setState(Leader)
  651. logIndex, _ := s.log.lastInfo()
  652. // Update the peers prevLogIndex to leader's lastLogIndex and start heartbeat.
  653. s.debugln("leaderLoop.set.PrevIndex to ", logIndex)
  654. for _, peer := range s.peers {
  655. peer.setPrevLogIndex(logIndex)
  656. peer.startHeartbeat()
  657. }
  658. // Commit a NOP after the server becomes leader. From the Raft paper:
  659. // "Upon election: send initial empty AppendEntries RPCs (heartbeat) to
  660. // each server; repeat during idle periods to prevent election timeouts
  661. // (§5.2)". The heartbeats started above do the "idle" period work.
  662. go s.Do(NOPCommand{})
  663. // Begin to collect response from followers
  664. for s.State() == Leader {
  665. var err error
  666. select {
  667. case e := <-s.c:
  668. if e.target == &stopValue {
  669. // Stop all peers before stop
  670. for _, peer := range s.peers {
  671. peer.stopHeartbeat(false)
  672. }
  673. s.setState(Stopped)
  674. } else {
  675. switch req := e.target.(type) {
  676. case Command:
  677. s.processCommand(req, e)
  678. continue
  679. case *AppendEntriesRequest:
  680. e.returnValue, _ = s.processAppendEntriesRequest(req)
  681. case *AppendEntriesResponse:
  682. s.processAppendEntriesResponse(req)
  683. case *RequestVoteRequest:
  684. e.returnValue, _ = s.processRequestVoteRequest(req)
  685. }
  686. }
  687. // Callback to event.
  688. e.c <- err
  689. }
  690. }
  691. s.syncedPeer = nil
  692. }
  693. func (s *server) snapshotLoop() {
  694. s.setState(Snapshotting)
  695. for s.State() == Snapshotting {
  696. var err error
  697. e := <-s.c
  698. if e.target == &stopValue {
  699. s.setState(Stopped)
  700. } else {
  701. switch req := e.target.(type) {
  702. case Command:
  703. err = NotLeaderError
  704. case *AppendEntriesRequest:
  705. e.returnValue, _ = s.processAppendEntriesRequest(req)
  706. case *RequestVoteRequest:
  707. e.returnValue, _ = s.processRequestVoteRequest(req)
  708. case *SnapshotRecoveryRequest:
  709. e.returnValue = s.processSnapshotRecoveryRequest(req)
  710. }
  711. }
  712. // Callback to event.
  713. e.c <- err
  714. }
  715. }
  716. //--------------------------------------
  717. // Commands
  718. //--------------------------------------
  719. // Attempts to execute a command and replicate it. The function will return
  720. // when the command has been successfully committed or an error has occurred.
  721. func (s *server) Do(command Command) (interface{}, error) {
  722. return s.send(command)
  723. }
  724. // Processes a command.
  725. func (s *server) processCommand(command Command, e *ev) {
  726. s.debugln("server.command.process")
  727. // Create an entry for the command in the log.
  728. entry, err := s.log.createEntry(s.currentTerm, command, e)
  729. if err != nil {
  730. s.debugln("server.command.log.entry.error:", err)
  731. e.c <- err
  732. return
  733. }
  734. if err := s.log.appendEntry(entry); err != nil {
  735. s.debugln("server.command.log.error:", err)
  736. e.c <- err
  737. return
  738. }
  739. s.syncedPeer[s.Name()] = true
  740. if len(s.peers) == 0 {
  741. commitIndex := s.log.currentIndex()
  742. s.log.setCommitIndex(commitIndex)
  743. s.debugln("commit index ", commitIndex)
  744. }
  745. }
  746. //--------------------------------------
  747. // Append Entries
  748. //--------------------------------------
  749. // Appends zero or more log entry from the leader to this server.
  750. func (s *server) AppendEntries(req *AppendEntriesRequest) *AppendEntriesResponse {
  751. ret, _ := s.send(req)
  752. resp, _ := ret.(*AppendEntriesResponse)
  753. return resp
  754. }
  755. // Processes the "append entries" request.
  756. func (s *server) processAppendEntriesRequest(req *AppendEntriesRequest) (*AppendEntriesResponse, bool) {
  757. s.traceln("server.ae.process")
  758. if req.Term < s.currentTerm {
  759. s.debugln("server.ae.error: stale term")
  760. return newAppendEntriesResponse(s.currentTerm, false, s.log.currentIndex(), s.log.CommitIndex()), false
  761. }
  762. // Update term and leader.
  763. s.setCurrentTerm(req.Term, req.LeaderName, true)
  764. // Reject if log doesn't contain a matching previous entry.
  765. if err := s.log.truncate(req.PrevLogIndex, req.PrevLogTerm); err != nil {
  766. s.debugln("server.ae.truncate.error: ", err)
  767. return newAppendEntriesResponse(s.currentTerm, false, s.log.currentIndex(), s.log.CommitIndex()), true
  768. }
  769. // Append entries to the log.
  770. if err := s.log.appendEntries(req.Entries); err != nil {
  771. s.debugln("server.ae.append.error: ", err)
  772. return newAppendEntriesResponse(s.currentTerm, false, s.log.currentIndex(), s.log.CommitIndex()), true
  773. }
  774. // Commit up to the commit index.
  775. if err := s.log.setCommitIndex(req.CommitIndex); err != nil {
  776. s.debugln("server.ae.commit.error: ", err)
  777. return newAppendEntriesResponse(s.currentTerm, false, s.log.currentIndex(), s.log.CommitIndex()), true
  778. }
  779. // once the server appended and committed all the log entries from the leader
  780. return newAppendEntriesResponse(s.currentTerm, true, s.log.currentIndex(), s.log.CommitIndex()), true
  781. }
  782. // Processes the "append entries" response from the peer. This is only
  783. // processed when the server is a leader. Responses received during other
  784. // states are dropped.
  785. func (s *server) processAppendEntriesResponse(resp *AppendEntriesResponse) {
  786. // If we find a higher term then change to a follower and exit.
  787. if resp.Term() > s.Term() {
  788. s.setCurrentTerm(resp.Term(), "", false)
  789. return
  790. }
  791. // panic response if it's not successful.
  792. if !resp.Success() {
  793. return
  794. }
  795. // if one peer successfully append a log from the leader term,
  796. // we add it to the synced list
  797. if resp.append == true {
  798. s.syncedPeer[resp.peer] = true
  799. }
  800. // Increment the commit count to make sure we have a quorum before committing.
  801. if len(s.syncedPeer) < s.QuorumSize() {
  802. return
  803. }
  804. // Determine the committed index that a majority has.
  805. var indices []uint64
  806. indices = append(indices, s.log.currentIndex())
  807. for _, peer := range s.peers {
  808. indices = append(indices, peer.getPrevLogIndex())
  809. }
  810. sort.Sort(sort.Reverse(uint64Slice(indices)))
  811. // We can commit up to the index which the majority of the members have appended.
  812. commitIndex := indices[s.QuorumSize()-1]
  813. committedIndex := s.log.commitIndex
  814. if commitIndex > committedIndex {
  815. // leader needs to do a fsync before committing log entries
  816. s.log.sync()
  817. s.log.setCommitIndex(commitIndex)
  818. s.debugln("commit index ", commitIndex)
  819. }
  820. }
  821. //--------------------------------------
  822. // Request Vote
  823. //--------------------------------------
  824. // Requests a vote from a server. A vote can be obtained if the vote's term is
  825. // at the server's current term and the server has not made a vote yet. A vote
  826. // can also be obtained if the term is greater than the server's current term.
  827. func (s *server) RequestVote(req *RequestVoteRequest) *RequestVoteResponse {
  828. ret, _ := s.send(req)
  829. resp, _ := ret.(*RequestVoteResponse)
  830. return resp
  831. }
  832. // Processes a "request vote" request.
  833. func (s *server) processRequestVoteRequest(req *RequestVoteRequest) (*RequestVoteResponse, bool) {
  834. // If the request is coming from an old term then reject it.
  835. if req.Term < s.Term() {
  836. s.debugln("server.rv.deny.vote: cause stale term")
  837. return newRequestVoteResponse(s.currentTerm, false), false
  838. }
  839. s.setCurrentTerm(req.Term, "", false)
  840. // If we've already voted for a different candidate then don't vote for this candidate.
  841. if s.votedFor != "" && s.votedFor != req.CandidateName {
  842. s.debugln("server.deny.vote: cause duplicate vote: ", req.CandidateName,
  843. " already vote for ", s.votedFor)
  844. return newRequestVoteResponse(s.currentTerm, false), false
  845. }
  846. // If the candidate's log is not at least as up-to-date as our last log then don't vote.
  847. lastIndex, lastTerm := s.log.lastInfo()
  848. if lastIndex > req.LastLogIndex || lastTerm > req.LastLogTerm {
  849. s.debugln("server.deny.vote: cause out of date log: ", req.CandidateName,
  850. "Index :[", lastIndex, "]", " [", req.LastLogIndex, "]",
  851. "Term :[", lastTerm, "]", " [", req.LastLogTerm, "]")
  852. return newRequestVoteResponse(s.currentTerm, false), false
  853. }
  854. // If we made it this far then cast a vote and reset our election time out.
  855. s.debugln("server.rv.vote: ", s.name, " votes for", req.CandidateName, "at term", req.Term)
  856. s.votedFor = req.CandidateName
  857. return newRequestVoteResponse(s.currentTerm, true), true
  858. }
  859. //--------------------------------------
  860. // Membership
  861. //--------------------------------------
  862. // Adds a peer to the server.
  863. func (s *server) AddPeer(name string, connectiongString string) error {
  864. s.debugln("server.peer.add: ", name, len(s.peers))
  865. // Do not allow peers to be added twice.
  866. if s.peers[name] != nil {
  867. return nil
  868. }
  869. // Skip the Peer if it has the same name as the Server
  870. if s.name != name {
  871. peer := newPeer(s, name, connectiongString, s.heartbeatInterval)
  872. if s.State() == Leader {
  873. peer.startHeartbeat()
  874. }
  875. s.peers[peer.Name] = peer
  876. s.DispatchEvent(newEvent(AddPeerEventType, name, nil))
  877. }
  878. // Write the configuration to file.
  879. s.writeConf()
  880. return nil
  881. }
  882. // Removes a peer from the server.
  883. func (s *server) RemovePeer(name string) error {
  884. s.debugln("server.peer.remove: ", name, len(s.peers))
  885. // Skip the Peer if it has the same name as the Server
  886. if name != s.Name() {
  887. // Return error if peer doesn't exist.
  888. peer := s.peers[name]
  889. if peer == nil {
  890. return fmt.Errorf("raft: Peer not found: %s", name)
  891. }
  892. // Stop peer and remove it.
  893. if s.State() == Leader {
  894. peer.stopHeartbeat(true)
  895. }
  896. delete(s.peers, name)
  897. s.DispatchEvent(newEvent(RemovePeerEventType, name, nil))
  898. }
  899. // Write the configuration to file.
  900. s.writeConf()
  901. return nil
  902. }
  903. //--------------------------------------
  904. // Log compaction
  905. //--------------------------------------
  906. func (s *server) TakeSnapshot() error {
  907. // TODO: put a snapshot mutex
  908. s.debugln("take Snapshot")
  909. // Exit if the server is currently creating a snapshot.
  910. if s.currentSnapshot != nil {
  911. return errors.New("handling snapshot")
  912. }
  913. // Exit if there are no logs yet in the system.
  914. lastIndex, lastTerm := s.log.commitInfo()
  915. path := s.SnapshotPath(lastIndex, lastTerm)
  916. if lastIndex == 0 {
  917. return errors.New("No logs")
  918. }
  919. var state []byte
  920. var err error
  921. if s.stateMachine != nil {
  922. state, err = s.stateMachine.Save()
  923. if err != nil {
  924. return err
  925. }
  926. } else {
  927. state = []byte{0}
  928. }
  929. // Clone the list of peers.
  930. peers := make([]*Peer, 0, len(s.peers)+1)
  931. for _, peer := range s.peers {
  932. peers = append(peers, peer.clone())
  933. }
  934. peers = append(peers, &Peer{Name: s.Name(), ConnectionString: s.connectionString})
  935. // Attach current snapshot and save it to disk.
  936. s.currentSnapshot = &Snapshot{lastIndex, lastTerm, peers, state, path}
  937. s.saveSnapshot()
  938. // We keep some log entries after the snapshot.
  939. // We do not want to send the whole snapshot to the slightly slow machines
  940. if lastIndex-s.log.startIndex > NumberOfLogEntriesAfterSnapshot {
  941. compactIndex := lastIndex - NumberOfLogEntriesAfterSnapshot
  942. compactTerm := s.log.getEntry(compactIndex).Term()
  943. s.log.compact(compactIndex, compactTerm)
  944. }
  945. return nil
  946. }
  947. // Retrieves the log path for the server.
  948. func (s *server) saveSnapshot() error {
  949. if s.currentSnapshot == nil {
  950. return errors.New("no snapshot to save")
  951. }
  952. // Write snapshot to disk.
  953. if err := s.currentSnapshot.save(); err != nil {
  954. return err
  955. }
  956. // Swap the current and last snapshots.
  957. tmp := s.lastSnapshot
  958. s.lastSnapshot = s.currentSnapshot
  959. // Delete the previous snapshot if there is any change
  960. if tmp != nil && !(tmp.LastIndex == s.lastSnapshot.LastIndex && tmp.LastTerm == s.lastSnapshot.LastTerm) {
  961. tmp.remove()
  962. }
  963. s.currentSnapshot = nil
  964. return nil
  965. }
  966. // Retrieves the log path for the server.
  967. func (s *server) SnapshotPath(lastIndex uint64, lastTerm uint64) string {
  968. return path.Join(s.path, "snapshot", fmt.Sprintf("%v_%v.ss", lastTerm, lastIndex))
  969. }
  970. func (s *server) RequestSnapshot(req *SnapshotRequest) *SnapshotResponse {
  971. ret, _ := s.send(req)
  972. resp, _ := ret.(*SnapshotResponse)
  973. return resp
  974. }
  975. func (s *server) processSnapshotRequest(req *SnapshotRequest) *SnapshotResponse {
  976. // If the follower’s log contains an entry at the snapshot’s last index with a term
  977. // that matches the snapshot’s last term, then the follower already has all the
  978. // information found in the snapshot and can reply false.
  979. entry := s.log.getEntry(req.LastIndex)
  980. if entry != nil && entry.Term() == req.LastTerm {
  981. return newSnapshotResponse(false)
  982. }
  983. // Update state.
  984. s.setState(Snapshotting)
  985. return newSnapshotResponse(true)
  986. }
  987. func (s *server) SnapshotRecoveryRequest(req *SnapshotRecoveryRequest) *SnapshotRecoveryResponse {
  988. ret, _ := s.send(req)
  989. resp, _ := ret.(*SnapshotRecoveryResponse)
  990. return resp
  991. }
  992. func (s *server) processSnapshotRecoveryRequest(req *SnapshotRecoveryRequest) *SnapshotRecoveryResponse {
  993. // Recover state sent from request.
  994. if err := s.stateMachine.Recovery(req.State); err != nil {
  995. return newSnapshotRecoveryResponse(req.LastTerm, false, req.LastIndex)
  996. }
  997. // Recover the cluster configuration.
  998. s.peers = make(map[string]*Peer)
  999. for _, peer := range req.Peers {
  1000. s.AddPeer(peer.Name, peer.ConnectionString)
  1001. }
  1002. // Update log state.
  1003. s.currentTerm = req.LastTerm
  1004. s.log.updateCommitIndex(req.LastIndex)
  1005. // Create local snapshot.
  1006. s.currentSnapshot = &Snapshot{req.LastIndex, req.LastTerm, req.Peers, req.State, s.SnapshotPath(req.LastIndex, req.LastTerm)}
  1007. s.saveSnapshot()
  1008. // Clear the previous log entries.
  1009. s.log.compact(req.LastIndex, req.LastTerm)
  1010. return newSnapshotRecoveryResponse(req.LastTerm, true, req.LastIndex)
  1011. }
  1012. // Load a snapshot at restart
  1013. func (s *server) LoadSnapshot() error {
  1014. // Open snapshot/ directory.
  1015. dir, err := os.OpenFile(path.Join(s.path, "snapshot"), os.O_RDONLY, 0)
  1016. if err != nil {
  1017. return err
  1018. }
  1019. // Retrieve a list of all snapshots.
  1020. filenames, err := dir.Readdirnames(-1)
  1021. if err != nil {
  1022. dir.Close()
  1023. panic(err)
  1024. }
  1025. dir.Close()
  1026. if len(filenames) == 0 {
  1027. return errors.New("no snapshot")
  1028. }
  1029. // Grab the latest snapshot.
  1030. sort.Strings(filenames)
  1031. snapshotPath := path.Join(s.path, "snapshot", filenames[len(filenames)-1])
  1032. // Read snapshot data.
  1033. file, err := os.OpenFile(snapshotPath, os.O_RDONLY, 0)
  1034. if err != nil {
  1035. return err
  1036. }
  1037. defer file.Close()
  1038. // Check checksum.
  1039. var checksum uint32
  1040. n, err := fmt.Fscanf(file, "%08x\n", &checksum)
  1041. if err != nil {
  1042. return err
  1043. } else if n != 1 {
  1044. return errors.New("Bad snapshot file")
  1045. }
  1046. // Load remaining snapshot contents.
  1047. b, err := ioutil.ReadAll(file)
  1048. if err != nil {
  1049. return err
  1050. }
  1051. // Generate checksum.
  1052. byteChecksum := crc32.ChecksumIEEE(b)
  1053. if uint32(checksum) != byteChecksum {
  1054. s.debugln(checksum, " ", byteChecksum)
  1055. return errors.New("bad snapshot file")
  1056. }
  1057. // Decode snapshot.
  1058. if err = json.Unmarshal(b, &s.lastSnapshot); err != nil {
  1059. s.debugln("unmarshal error: ", err)
  1060. return err
  1061. }
  1062. // Recover snapshot into state machine.
  1063. if err = s.stateMachine.Recovery(s.lastSnapshot.State); err != nil {
  1064. s.debugln("recovery error: ", err)
  1065. return err
  1066. }
  1067. // Recover cluster configuration.
  1068. for _, peer := range s.lastSnapshot.Peers {
  1069. s.AddPeer(peer.Name, peer.ConnectionString)
  1070. }
  1071. // Update log state.
  1072. s.log.startTerm = s.lastSnapshot.LastTerm
  1073. s.log.startIndex = s.lastSnapshot.LastIndex
  1074. s.log.updateCommitIndex(s.lastSnapshot.LastIndex)
  1075. return err
  1076. }
  1077. //--------------------------------------
  1078. // Config File
  1079. //--------------------------------------
  1080. func (s *server) writeConf() {
  1081. peers := make([]*Peer, len(s.peers))
  1082. i := 0
  1083. for _, peer := range s.peers {
  1084. peers[i] = peer.clone()
  1085. i++
  1086. }
  1087. r := &Config{
  1088. CommitIndex: s.log.commitIndex,
  1089. Peers: peers,
  1090. }
  1091. b, _ := json.Marshal(r)
  1092. confPath := path.Join(s.path, "conf")
  1093. tmpConfPath := path.Join(s.path, "conf.tmp")
  1094. err := writeFileSynced(tmpConfPath, b, 0600)
  1095. if err != nil {
  1096. panic(err)
  1097. }
  1098. os.Rename(tmpConfPath, confPath)
  1099. }
  1100. // Read the configuration for the server.
  1101. func (s *server) readConf() error {
  1102. confPath := path.Join(s.path, "conf")
  1103. s.debugln("readConf.open ", confPath)
  1104. // open conf file
  1105. b, err := ioutil.ReadFile(confPath)
  1106. if err != nil {
  1107. return nil
  1108. }
  1109. conf := &Config{}
  1110. if err = json.Unmarshal(b, conf); err != nil {
  1111. return err
  1112. }
  1113. s.log.updateCommitIndex(conf.CommitIndex)
  1114. return nil
  1115. }
  1116. //--------------------------------------
  1117. // Debugging
  1118. //--------------------------------------
  1119. func (s *server) debugln(v ...interface{}) {
  1120. if logLevel > Debug {
  1121. debugf("[%s Term:%d] %s", s.name, s.Term(), fmt.Sprintln(v...))
  1122. }
  1123. }
  1124. func (s *server) traceln(v ...interface{}) {
  1125. if logLevel > Trace {
  1126. tracef("[%s] %s", s.name, fmt.Sprintln(v...))
  1127. }
  1128. }