server.go 35 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380
  1. package raft
  2. import (
  3. "encoding/json"
  4. "errors"
  5. "fmt"
  6. "hash/crc32"
  7. "io/ioutil"
  8. "os"
  9. "path"
  10. "sort"
  11. "sync"
  12. "time"
  13. )
  14. //------------------------------------------------------------------------------
  15. //
  16. // Constants
  17. //
  18. //------------------------------------------------------------------------------
  19. const (
  20. Stopped = "stopped"
  21. Follower = "follower"
  22. Candidate = "candidate"
  23. Leader = "leader"
  24. Snapshotting = "snapshotting"
  25. )
  26. const (
  27. MaxLogEntriesPerRequest = 2000
  28. NumberOfLogEntriesAfterSnapshot = 200
  29. )
  30. const (
  31. DefaultHeartbeatTimeout = 50 * time.Millisecond
  32. DefaultElectionTimeout = 150 * time.Millisecond
  33. )
  34. // ElectionTimeoutThresholdPercent specifies the threshold at which the server
  35. // will dispatch warning events that the heartbeat RTT is too close to the
  36. // election timeout.
  37. const ElectionTimeoutThresholdPercent = 0.8
  38. var stopValue interface{}
  39. //------------------------------------------------------------------------------
  40. //
  41. // Errors
  42. //
  43. //------------------------------------------------------------------------------
  44. var NotLeaderError = errors.New("raft.Server: Not current leader")
  45. var DuplicatePeerError = errors.New("raft.Server: Duplicate peer")
  46. var CommandTimeoutError = errors.New("raft: Command timeout")
  47. //------------------------------------------------------------------------------
  48. //
  49. // Typedefs
  50. //
  51. //------------------------------------------------------------------------------
  52. // A server is involved in the consensus protocol and can act as a follower,
  53. // candidate or a leader.
  54. type Server interface {
  55. Name() string
  56. Context() interface{}
  57. StateMachine() StateMachine
  58. Leader() string
  59. State() string
  60. Path() string
  61. LogPath() string
  62. SnapshotPath(lastIndex uint64, lastTerm uint64) string
  63. Term() uint64
  64. CommitIndex() uint64
  65. VotedFor() string
  66. MemberCount() int
  67. QuorumSize() int
  68. IsLogEmpty() bool
  69. LogEntries() []*LogEntry
  70. LastCommandName() string
  71. GetState() string
  72. ElectionTimeout() time.Duration
  73. SetElectionTimeout(duration time.Duration)
  74. HeartbeatTimeout() time.Duration
  75. SetHeartbeatTimeout(duration time.Duration)
  76. Transporter() Transporter
  77. SetTransporter(t Transporter)
  78. AppendEntries(req *AppendEntriesRequest) *AppendEntriesResponse
  79. RequestVote(req *RequestVoteRequest) *RequestVoteResponse
  80. RequestSnapshot(req *SnapshotRequest) *SnapshotResponse
  81. SnapshotRecoveryRequest(req *SnapshotRecoveryRequest) *SnapshotRecoveryResponse
  82. AddPeer(name string, connectiongString string) error
  83. RemovePeer(name string) error
  84. Peers() map[string]*Peer
  85. Start() error
  86. Stop()
  87. Running() bool
  88. Do(command Command) (interface{}, error)
  89. TakeSnapshot() error
  90. LoadSnapshot() error
  91. AddEventListener(string, EventListener)
  92. }
  93. type server struct {
  94. *eventDispatcher
  95. name string
  96. path string
  97. state string
  98. transporter Transporter
  99. context interface{}
  100. currentTerm uint64
  101. votedFor string
  102. log *Log
  103. leader string
  104. peers map[string]*Peer
  105. mutex sync.RWMutex
  106. syncedPeer map[string]bool
  107. stopped chan bool
  108. c chan *ev
  109. electionTimeout time.Duration
  110. heartbeatTimeout time.Duration
  111. currentSnapshot *Snapshot
  112. lastSnapshot *Snapshot
  113. stateMachine StateMachine
  114. maxLogEntriesPerRequest uint64
  115. connectionString string
  116. }
  117. // An internal event to be processed by the server's event loop.
  118. type ev struct {
  119. target interface{}
  120. returnValue interface{}
  121. c chan error
  122. }
  123. //------------------------------------------------------------------------------
  124. //
  125. // Constructor
  126. //
  127. //------------------------------------------------------------------------------
  128. // Creates a new server with a log at the given path. transporter must
  129. // not be nil. stateMachine can be nil if snapshotting and log
  130. // compaction is to be disabled. context can be anything (including nil)
  131. // and is not used by the raft package except returned by
  132. // Server.Context(). connectionString can be anything.
  133. func NewServer(name string, path string, transporter Transporter, stateMachine StateMachine, ctx interface{}, connectionString string) (Server, error) {
  134. if name == "" {
  135. return nil, errors.New("raft.Server: Name cannot be blank")
  136. }
  137. if transporter == nil {
  138. panic("raft: Transporter required")
  139. }
  140. s := &server{
  141. name: name,
  142. path: path,
  143. transporter: transporter,
  144. stateMachine: stateMachine,
  145. context: ctx,
  146. state: Stopped,
  147. peers: make(map[string]*Peer),
  148. log: newLog(),
  149. stopped: make(chan bool),
  150. c: make(chan *ev, 256),
  151. electionTimeout: DefaultElectionTimeout,
  152. heartbeatTimeout: DefaultHeartbeatTimeout,
  153. maxLogEntriesPerRequest: MaxLogEntriesPerRequest,
  154. connectionString: connectionString,
  155. }
  156. s.eventDispatcher = newEventDispatcher(s)
  157. // Setup apply function.
  158. s.log.ApplyFunc = func(c Command) (interface{}, error) {
  159. switch c := c.(type) {
  160. case CommandApply:
  161. return c.Apply(&context{
  162. server: s,
  163. currentTerm: s.currentTerm,
  164. currentIndex: s.log.internalCurrentIndex(),
  165. commitIndex: s.log.commitIndex,
  166. })
  167. case deprecatedCommandApply:
  168. return c.Apply(s)
  169. default:
  170. return nil, fmt.Errorf("Command does not implement Apply()")
  171. }
  172. }
  173. return s, nil
  174. }
  175. //------------------------------------------------------------------------------
  176. //
  177. // Accessors
  178. //
  179. //------------------------------------------------------------------------------
  180. //--------------------------------------
  181. // General
  182. //--------------------------------------
  183. // Retrieves the name of the server.
  184. func (s *server) Name() string {
  185. return s.name
  186. }
  187. // Retrieves the storage path for the server.
  188. func (s *server) Path() string {
  189. return s.path
  190. }
  191. // The name of the current leader.
  192. func (s *server) Leader() string {
  193. return s.leader
  194. }
  195. // Retrieves a copy of the peer data.
  196. func (s *server) Peers() map[string]*Peer {
  197. s.mutex.Lock()
  198. defer s.mutex.Unlock()
  199. peers := make(map[string]*Peer)
  200. for name, peer := range s.peers {
  201. peers[name] = peer.clone()
  202. }
  203. return peers
  204. }
  205. // Retrieves the object that transports requests.
  206. func (s *server) Transporter() Transporter {
  207. s.mutex.RLock()
  208. defer s.mutex.RUnlock()
  209. return s.transporter
  210. }
  211. func (s *server) SetTransporter(t Transporter) {
  212. s.mutex.Lock()
  213. defer s.mutex.Unlock()
  214. s.transporter = t
  215. }
  216. // Retrieves the context passed into the constructor.
  217. func (s *server) Context() interface{} {
  218. return s.context
  219. }
  220. // Retrieves the state machine passed into the constructor.
  221. func (s *server) StateMachine() StateMachine {
  222. return s.stateMachine
  223. }
  224. // Retrieves the log path for the server.
  225. func (s *server) LogPath() string {
  226. return path.Join(s.path, "log")
  227. }
  228. // Retrieves the current state of the server.
  229. func (s *server) State() string {
  230. s.mutex.RLock()
  231. defer s.mutex.RUnlock()
  232. return s.state
  233. }
  234. // Sets the state of the server.
  235. func (s *server) setState(state string) {
  236. s.mutex.Lock()
  237. defer s.mutex.Unlock()
  238. // Temporarily store previous values.
  239. prevState := s.state
  240. prevLeader := s.leader
  241. // Update state and leader.
  242. s.state = state
  243. if state == Leader {
  244. s.leader = s.Name()
  245. s.syncedPeer = make(map[string]bool)
  246. }
  247. // Dispatch state and leader change events.
  248. if prevState != state {
  249. s.DispatchEvent(newEvent(StateChangeEventType, s.state, prevState))
  250. }
  251. if prevLeader != s.leader {
  252. s.DispatchEvent(newEvent(LeaderChangeEventType, s.leader, prevLeader))
  253. }
  254. }
  255. // Retrieves the current term of the server.
  256. func (s *server) Term() uint64 {
  257. s.mutex.RLock()
  258. defer s.mutex.RUnlock()
  259. return s.currentTerm
  260. }
  261. // Retrieves the current commit index of the server.
  262. func (s *server) CommitIndex() uint64 {
  263. s.log.mutex.RLock()
  264. defer s.log.mutex.RUnlock()
  265. return s.log.commitIndex
  266. }
  267. // Retrieves the name of the candidate this server voted for in this term.
  268. func (s *server) VotedFor() string {
  269. return s.votedFor
  270. }
  271. // Retrieves whether the server's log has no entries.
  272. func (s *server) IsLogEmpty() bool {
  273. return s.log.isEmpty()
  274. }
  275. // A list of all the log entries. This should only be used for debugging purposes.
  276. func (s *server) LogEntries() []*LogEntry {
  277. return s.log.entries
  278. }
  279. // A reference to the command name of the last entry.
  280. func (s *server) LastCommandName() string {
  281. return s.log.lastCommandName()
  282. }
  283. // Get the state of the server for debugging
  284. func (s *server) GetState() string {
  285. s.mutex.RLock()
  286. defer s.mutex.RUnlock()
  287. return fmt.Sprintf("Name: %s, State: %s, Term: %v, CommitedIndex: %v ", s.name, s.state, s.currentTerm, s.log.commitIndex)
  288. }
  289. // Check if the server is promotable
  290. func (s *server) promotable() bool {
  291. return s.log.currentIndex() > 0
  292. }
  293. //--------------------------------------
  294. // Membership
  295. //--------------------------------------
  296. // Retrieves the number of member servers in the consensus.
  297. func (s *server) MemberCount() int {
  298. s.mutex.Lock()
  299. defer s.mutex.Unlock()
  300. return len(s.peers) + 1
  301. }
  302. // Retrieves the number of servers required to make a quorum.
  303. func (s *server) QuorumSize() int {
  304. return (s.MemberCount() / 2) + 1
  305. }
  306. //--------------------------------------
  307. // Election timeout
  308. //--------------------------------------
  309. // Retrieves the election timeout.
  310. func (s *server) ElectionTimeout() time.Duration {
  311. s.mutex.RLock()
  312. defer s.mutex.RUnlock()
  313. return s.electionTimeout
  314. }
  315. // Sets the election timeout.
  316. func (s *server) SetElectionTimeout(duration time.Duration) {
  317. s.mutex.Lock()
  318. defer s.mutex.Unlock()
  319. s.electionTimeout = duration
  320. }
  321. //--------------------------------------
  322. // Heartbeat timeout
  323. //--------------------------------------
  324. // Retrieves the heartbeat timeout.
  325. func (s *server) HeartbeatTimeout() time.Duration {
  326. s.mutex.RLock()
  327. defer s.mutex.RUnlock()
  328. return s.heartbeatTimeout
  329. }
  330. // Sets the heartbeat timeout.
  331. func (s *server) SetHeartbeatTimeout(duration time.Duration) {
  332. s.mutex.Lock()
  333. defer s.mutex.Unlock()
  334. s.heartbeatTimeout = duration
  335. for _, peer := range s.peers {
  336. peer.setHeartbeatTimeout(duration)
  337. }
  338. }
  339. //------------------------------------------------------------------------------
  340. //
  341. // Methods
  342. //
  343. //------------------------------------------------------------------------------
  344. //--------------------------------------
  345. // Initialization
  346. //--------------------------------------
  347. // Reg the NOPCommand
  348. func init() {
  349. RegisterCommand(&NOPCommand{})
  350. RegisterCommand(&DefaultJoinCommand{})
  351. RegisterCommand(&DefaultLeaveCommand{})
  352. }
  353. // Start as follow
  354. // If log entries exist then allow promotion to candidate if no AEs received.
  355. // If no log entries exist then wait for AEs from another node.
  356. // If no log entries exist and a self-join command is issued then
  357. // immediately become leader and commit entry.
  358. func (s *server) Start() error {
  359. // Exit if the server is already running.
  360. if s.State() != Stopped {
  361. return errors.New("raft.Server: Server already running")
  362. }
  363. // Create snapshot directory if not exist
  364. os.Mkdir(path.Join(s.path, "snapshot"), 0700)
  365. if err := s.readConf(); err != nil {
  366. s.debugln("raft: Conf file error: ", err)
  367. return fmt.Errorf("raft: Initialization error: %s", err)
  368. }
  369. // Initialize the log and load it up.
  370. if err := s.log.open(s.LogPath()); err != nil {
  371. s.debugln("raft: Log error: ", err)
  372. return fmt.Errorf("raft: Initialization error: %s", err)
  373. }
  374. // Update the term to the last term in the log.
  375. _, s.currentTerm = s.log.lastInfo()
  376. s.setState(Follower)
  377. // If no log entries exist then
  378. // 1. wait for AEs from another node
  379. // 2. wait for self-join command
  380. // to set itself promotable
  381. if !s.promotable() {
  382. s.debugln("start as a new raft server")
  383. // If log entries exist then allow promotion to candidate
  384. // if no AEs received.
  385. } else {
  386. s.debugln("start from previous saved state")
  387. }
  388. debugln(s.GetState())
  389. go s.loop()
  390. return nil
  391. }
  392. // Shuts down the server.
  393. func (s *server) Stop() {
  394. s.send(&stopValue)
  395. // make sure the server has stopped before we close the log
  396. <-s.stopped
  397. s.log.close()
  398. }
  399. // Checks if the server is currently running.
  400. func (s *server) Running() bool {
  401. s.mutex.RLock()
  402. defer s.mutex.RUnlock()
  403. return s.state != Stopped
  404. }
  405. //--------------------------------------
  406. // Term
  407. //--------------------------------------
  408. // Sets the current term for the server. This is only used when an external
  409. // current term is found.
  410. func (s *server) setCurrentTerm(term uint64, leaderName string, append bool) {
  411. s.mutex.Lock()
  412. defer s.mutex.Unlock()
  413. // Store previous values temporarily.
  414. prevState := s.state
  415. prevTerm := s.currentTerm
  416. prevLeader := s.leader
  417. if term > s.currentTerm {
  418. // update the term and clear vote for
  419. s.state = Follower
  420. s.currentTerm = term
  421. s.leader = leaderName
  422. s.votedFor = ""
  423. } else if term == s.currentTerm && s.state != Leader && append {
  424. // discover new leader when candidate
  425. // save leader name when follower
  426. s.state = Follower
  427. s.leader = leaderName
  428. }
  429. // Dispatch change events.
  430. if prevState != s.state {
  431. s.DispatchEvent(newEvent(StateChangeEventType, s.state, prevState))
  432. }
  433. if prevLeader != s.leader {
  434. s.DispatchEvent(newEvent(LeaderChangeEventType, s.leader, prevLeader))
  435. }
  436. if prevTerm != s.currentTerm {
  437. s.DispatchEvent(newEvent(TermChangeEventType, s.currentTerm, prevTerm))
  438. }
  439. }
  440. //--------------------------------------
  441. // Event Loop
  442. //--------------------------------------
  443. // ________
  444. // --|Snapshot| timeout
  445. // | -------- ______
  446. // recover | ^ | |
  447. // snapshot / | |snapshot | |
  448. // higher | | v | recv majority votes
  449. // term | -------- timeout ----------- -----------
  450. // |-> |Follower| ----------> | Candidate |--------------------> | Leader |
  451. // -------- ----------- -----------
  452. // ^ higher term/ | higher term |
  453. // | new leader | |
  454. // |_______________________|____________________________________ |
  455. // The main event loop for the server
  456. func (s *server) loop() {
  457. defer s.debugln("server.loop.end")
  458. for {
  459. state := s.State()
  460. s.debugln("server.loop.run ", state)
  461. switch state {
  462. case Follower:
  463. s.followerLoop()
  464. case Candidate:
  465. s.candidateLoop()
  466. case Leader:
  467. s.leaderLoop()
  468. case Snapshotting:
  469. s.snapshotLoop()
  470. case Stopped:
  471. s.stopped <- true
  472. return
  473. }
  474. }
  475. }
  476. // Sends an event to the event loop to be processed. The function will wait
  477. // until the event is actually processed before returning.
  478. func (s *server) send(value interface{}) (interface{}, error) {
  479. event := &ev{target: value, c: make(chan error, 1)}
  480. s.c <- event
  481. err := <-event.c
  482. return event.returnValue, err
  483. }
  484. func (s *server) sendAsync(value interface{}) {
  485. event := &ev{target: value, c: make(chan error, 1)}
  486. // try a non-blocking send first
  487. // in most cases, this should not be blocking
  488. // avoid create unnecessary go routines
  489. select {
  490. case s.c <- event:
  491. return
  492. default:
  493. }
  494. go func() {
  495. s.c <- event
  496. }()
  497. }
  498. // The event loop that is run when the server is in a Follower state.
  499. // Responds to RPCs from candidates and leaders.
  500. // Converts to candidate if election timeout elapses without either:
  501. // 1.Receiving valid AppendEntries RPC, or
  502. // 2.Granting vote to candidate
  503. func (s *server) followerLoop() {
  504. s.setState(Follower)
  505. since := time.Now()
  506. electionTimeout := s.ElectionTimeout()
  507. timeoutChan := afterBetween(s.ElectionTimeout(), s.ElectionTimeout()*2)
  508. for {
  509. var err error
  510. update := false
  511. select {
  512. case e := <-s.c:
  513. if e.target == &stopValue {
  514. s.setState(Stopped)
  515. } else {
  516. switch req := e.target.(type) {
  517. case JoinCommand:
  518. //If no log entries exist and a self-join command is issued
  519. //then immediately become leader and commit entry.
  520. if s.log.currentIndex() == 0 && req.NodeName() == s.Name() {
  521. s.debugln("selfjoin and promote to leader")
  522. s.setState(Leader)
  523. s.processCommand(req, e)
  524. } else {
  525. err = NotLeaderError
  526. }
  527. case *AppendEntriesRequest:
  528. // If heartbeats get too close to the election timeout then send an event.
  529. elapsedTime := time.Now().Sub(since)
  530. if elapsedTime > time.Duration(float64(electionTimeout)*ElectionTimeoutThresholdPercent) {
  531. s.DispatchEvent(newEvent(ElectionTimeoutThresholdEventType, elapsedTime, nil))
  532. }
  533. e.returnValue, update = s.processAppendEntriesRequest(req)
  534. case *RequestVoteRequest:
  535. e.returnValue, update = s.processRequestVoteRequest(req)
  536. case *SnapshotRequest:
  537. e.returnValue = s.processSnapshotRequest(req)
  538. default:
  539. err = NotLeaderError
  540. }
  541. }
  542. // Callback to event.
  543. e.c <- err
  544. case <-timeoutChan:
  545. // only allow synced follower to promote to candidate
  546. if s.promotable() {
  547. s.setState(Candidate)
  548. } else {
  549. update = true
  550. }
  551. }
  552. // Converts to candidate if election timeout elapses without either:
  553. // 1.Receiving valid AppendEntries RPC, or
  554. // 2.Granting vote to candidate
  555. if update {
  556. since = time.Now()
  557. timeoutChan = afterBetween(s.ElectionTimeout(), s.ElectionTimeout()*2)
  558. }
  559. // Exit loop on state change.
  560. if s.State() != Follower {
  561. break
  562. }
  563. }
  564. }
  565. // The event loop that is run when the server is in a Candidate state.
  566. func (s *server) candidateLoop() {
  567. lastLogIndex, lastLogTerm := s.log.lastInfo()
  568. // Clear leader value.
  569. prevLeader := s.leader
  570. s.leader = ""
  571. if prevLeader != s.leader {
  572. s.DispatchEvent(newEvent(LeaderChangeEventType, s.leader, prevLeader))
  573. }
  574. for {
  575. // Increment current term, vote for self.
  576. s.currentTerm++
  577. s.votedFor = s.name
  578. // Send RequestVote RPCs to all other servers.
  579. respChan := make(chan *RequestVoteResponse, len(s.peers))
  580. for _, peer := range s.peers {
  581. go peer.sendVoteRequest(newRequestVoteRequest(s.currentTerm, s.name, lastLogIndex, lastLogTerm), respChan)
  582. }
  583. // Wait for either:
  584. // * Votes received from majority of servers: become leader
  585. // * AppendEntries RPC received from new leader: step down.
  586. // * Election timeout elapses without election resolution: increment term, start new election
  587. // * Discover higher term: step down (§5.1)
  588. votesGranted := 1
  589. timeoutChan := afterBetween(s.ElectionTimeout(), s.ElectionTimeout()*2)
  590. timeout := false
  591. for {
  592. // If we received enough votes then stop waiting for more votes.
  593. s.debugln("server.candidate.votes: ", votesGranted, " quorum:", s.QuorumSize())
  594. if votesGranted >= s.QuorumSize() {
  595. s.setState(Leader)
  596. break
  597. }
  598. // Collect votes from peers.
  599. select {
  600. case resp := <-respChan:
  601. if resp.VoteGranted {
  602. s.debugln("server.candidate.vote.granted: ", votesGranted)
  603. votesGranted++
  604. } else if resp.Term > s.currentTerm {
  605. s.debugln("server.candidate.vote.failed")
  606. s.setCurrentTerm(resp.Term, "", false)
  607. } else {
  608. s.debugln("server.candidate.vote: denied")
  609. }
  610. case e := <-s.c:
  611. var err error
  612. if e.target == &stopValue {
  613. s.setState(Stopped)
  614. } else {
  615. switch req := e.target.(type) {
  616. case Command:
  617. err = NotLeaderError
  618. case *AppendEntriesRequest:
  619. e.returnValue, _ = s.processAppendEntriesRequest(req)
  620. case *RequestVoteRequest:
  621. e.returnValue, _ = s.processRequestVoteRequest(req)
  622. }
  623. }
  624. // Callback to event.
  625. e.c <- err
  626. case <-timeoutChan:
  627. timeout = true
  628. }
  629. // both process AER and RVR can make the server to follower
  630. // also break when timeout happens
  631. if s.State() != Candidate || timeout {
  632. break
  633. }
  634. }
  635. // break when we are not candidate
  636. if s.State() != Candidate {
  637. break
  638. }
  639. // continue when timeout happened
  640. }
  641. }
  642. // The event loop that is run when the server is in a Leader state.
  643. func (s *server) leaderLoop() {
  644. s.setState(Leader)
  645. logIndex, _ := s.log.lastInfo()
  646. // Update the peers prevLogIndex to leader's lastLogIndex and start heartbeat.
  647. s.debugln("leaderLoop.set.PrevIndex to ", logIndex)
  648. for _, peer := range s.peers {
  649. peer.setPrevLogIndex(logIndex)
  650. peer.startHeartbeat()
  651. }
  652. go s.Do(NOPCommand{})
  653. // Begin to collect response from followers
  654. for {
  655. var err error
  656. select {
  657. case e := <-s.c:
  658. if e.target == &stopValue {
  659. s.setState(Stopped)
  660. } else {
  661. switch req := e.target.(type) {
  662. case Command:
  663. s.processCommand(req, e)
  664. continue
  665. case *AppendEntriesRequest:
  666. e.returnValue, _ = s.processAppendEntriesRequest(req)
  667. case *AppendEntriesResponse:
  668. s.processAppendEntriesResponse(req)
  669. case *RequestVoteRequest:
  670. e.returnValue, _ = s.processRequestVoteRequest(req)
  671. }
  672. }
  673. // Callback to event.
  674. e.c <- err
  675. }
  676. // Exit loop on state change.
  677. if s.State() != Leader {
  678. break
  679. }
  680. }
  681. // Stop all peers.
  682. for _, peer := range s.peers {
  683. peer.stopHeartbeat(false)
  684. }
  685. s.syncedPeer = nil
  686. }
  687. func (s *server) snapshotLoop() {
  688. s.setState(Snapshotting)
  689. for {
  690. var err error
  691. e := <-s.c
  692. if e.target == &stopValue {
  693. s.setState(Stopped)
  694. } else {
  695. switch req := e.target.(type) {
  696. case Command:
  697. err = NotLeaderError
  698. case *AppendEntriesRequest:
  699. e.returnValue, _ = s.processAppendEntriesRequest(req)
  700. case *RequestVoteRequest:
  701. e.returnValue, _ = s.processRequestVoteRequest(req)
  702. case *SnapshotRecoveryRequest:
  703. e.returnValue = s.processSnapshotRecoveryRequest(req)
  704. }
  705. }
  706. // Callback to event.
  707. e.c <- err
  708. // Exit loop on state change.
  709. if s.State() != Snapshotting {
  710. break
  711. }
  712. }
  713. }
  714. //--------------------------------------
  715. // Commands
  716. //--------------------------------------
  717. // Attempts to execute a command and replicate it. The function will return
  718. // when the command has been successfully committed or an error has occurred.
  719. func (s *server) Do(command Command) (interface{}, error) {
  720. return s.send(command)
  721. }
  722. // Processes a command.
  723. func (s *server) processCommand(command Command, e *ev) {
  724. s.debugln("server.command.process")
  725. // Create an entry for the command in the log.
  726. entry, err := s.log.createEntry(s.currentTerm, command, e)
  727. if err != nil {
  728. s.debugln("server.command.log.entry.error:", err)
  729. e.c <- err
  730. return
  731. }
  732. if err := s.log.appendEntry(entry); err != nil {
  733. s.debugln("server.command.log.error:", err)
  734. e.c <- err
  735. return
  736. }
  737. s.syncedPeer[s.Name()] = true
  738. if len(s.peers) == 0 {
  739. commitIndex := s.log.currentIndex()
  740. s.log.setCommitIndex(commitIndex)
  741. s.debugln("commit index ", commitIndex)
  742. }
  743. }
  744. //--------------------------------------
  745. // Append Entries
  746. //--------------------------------------
  747. // Appends zero or more log entry from the leader to this server.
  748. func (s *server) AppendEntries(req *AppendEntriesRequest) *AppendEntriesResponse {
  749. ret, _ := s.send(req)
  750. resp, _ := ret.(*AppendEntriesResponse)
  751. return resp
  752. }
  753. // Processes the "append entries" request.
  754. func (s *server) processAppendEntriesRequest(req *AppendEntriesRequest) (*AppendEntriesResponse, bool) {
  755. s.traceln("server.ae.process")
  756. if req.Term < s.currentTerm {
  757. s.debugln("server.ae.error: stale term")
  758. return newAppendEntriesResponse(s.currentTerm, false, s.log.currentIndex(), s.log.CommitIndex()), false
  759. }
  760. // Update term and leader.
  761. s.setCurrentTerm(req.Term, req.LeaderName, true)
  762. // Reject if log doesn't contain a matching previous entry.
  763. if err := s.log.truncate(req.PrevLogIndex, req.PrevLogTerm); err != nil {
  764. s.debugln("server.ae.truncate.error: ", err)
  765. return newAppendEntriesResponse(s.currentTerm, false, s.log.currentIndex(), s.log.CommitIndex()), true
  766. }
  767. // Append entries to the log.
  768. if err := s.log.appendEntries(req.Entries); err != nil {
  769. s.debugln("server.ae.append.error: ", err)
  770. return newAppendEntriesResponse(s.currentTerm, false, s.log.currentIndex(), s.log.CommitIndex()), true
  771. }
  772. // Commit up to the commit index.
  773. if err := s.log.setCommitIndex(req.CommitIndex); err != nil {
  774. s.debugln("server.ae.commit.error: ", err)
  775. return newAppendEntriesResponse(s.currentTerm, false, s.log.currentIndex(), s.log.CommitIndex()), true
  776. }
  777. // once the server appended and committed all the log entries from the leader
  778. return newAppendEntriesResponse(s.currentTerm, true, s.log.currentIndex(), s.log.CommitIndex()), true
  779. }
  780. // Processes the "append entries" response from the peer. This is only
  781. // processed when the server is a leader. Responses received during other
  782. // states are dropped.
  783. func (s *server) processAppendEntriesResponse(resp *AppendEntriesResponse) {
  784. // If we find a higher term then change to a follower and exit.
  785. if resp.Term > s.Term() {
  786. s.setCurrentTerm(resp.Term, "", false)
  787. return
  788. }
  789. // panic response if it's not successful.
  790. if !resp.Success {
  791. return
  792. }
  793. // if one peer successfully append a log from the leader term,
  794. // we add it to the synced list
  795. if resp.append == true {
  796. s.syncedPeer[resp.peer] = true
  797. }
  798. // Increment the commit count to make sure we have a quorum before committing.
  799. if len(s.syncedPeer) < s.QuorumSize() {
  800. return
  801. }
  802. // Determine the committed index that a majority has.
  803. var indices []uint64
  804. indices = append(indices, s.log.currentIndex())
  805. for _, peer := range s.peers {
  806. indices = append(indices, peer.getPrevLogIndex())
  807. }
  808. sort.Sort(sort.Reverse(uint64Slice(indices)))
  809. // We can commit up to the index which the majority of the members have appended.
  810. commitIndex := indices[s.QuorumSize()-1]
  811. committedIndex := s.log.commitIndex
  812. if commitIndex > committedIndex {
  813. // leader needs to do a fsync before committing log entries
  814. s.log.sync()
  815. s.log.setCommitIndex(commitIndex)
  816. s.debugln("commit index ", commitIndex)
  817. }
  818. }
  819. //--------------------------------------
  820. // Request Vote
  821. //--------------------------------------
  822. // Requests a vote from a server. A vote can be obtained if the vote's term is
  823. // at the server's current term and the server has not made a vote yet. A vote
  824. // can also be obtained if the term is greater than the server's current term.
  825. func (s *server) RequestVote(req *RequestVoteRequest) *RequestVoteResponse {
  826. ret, _ := s.send(req)
  827. resp, _ := ret.(*RequestVoteResponse)
  828. return resp
  829. }
  830. // Processes a "request vote" request.
  831. func (s *server) processRequestVoteRequest(req *RequestVoteRequest) (*RequestVoteResponse, bool) {
  832. // If the request is coming from an old term then reject it.
  833. if req.Term < s.Term() {
  834. s.debugln("server.rv.deny.vote: cause stale term")
  835. return newRequestVoteResponse(s.currentTerm, false), false
  836. }
  837. s.setCurrentTerm(req.Term, "", false)
  838. // If we've already voted for a different candidate then don't vote for this candidate.
  839. if s.votedFor != "" && s.votedFor != req.CandidateName {
  840. s.debugln("server.deny.vote: cause duplicate vote: ", req.CandidateName,
  841. " already vote for ", s.votedFor)
  842. return newRequestVoteResponse(s.currentTerm, false), false
  843. }
  844. // If the candidate's log is not at least as up-to-date as our last log then don't vote.
  845. lastIndex, lastTerm := s.log.lastInfo()
  846. if lastIndex > req.LastLogIndex || lastTerm > req.LastLogTerm {
  847. s.debugln("server.deny.vote: cause out of date log: ", req.CandidateName,
  848. "Index :[", lastIndex, "]", " [", req.LastLogIndex, "]",
  849. "Term :[", lastTerm, "]", " [", req.LastLogTerm, "]")
  850. return newRequestVoteResponse(s.currentTerm, false), false
  851. }
  852. // If we made it this far then cast a vote and reset our election time out.
  853. s.debugln("server.rv.vote: ", s.name, " votes for", req.CandidateName, "at term", req.Term)
  854. s.votedFor = req.CandidateName
  855. return newRequestVoteResponse(s.currentTerm, true), true
  856. }
  857. //--------------------------------------
  858. // Membership
  859. //--------------------------------------
  860. // Adds a peer to the server.
  861. func (s *server) AddPeer(name string, connectiongString string) error {
  862. s.debugln("server.peer.add: ", name, len(s.peers))
  863. // Do not allow peers to be added twice.
  864. if s.peers[name] != nil {
  865. return nil
  866. }
  867. // Skip the Peer if it has the same name as the Server
  868. if s.name != name {
  869. peer := newPeer(s, name, connectiongString, s.heartbeatTimeout)
  870. if s.State() == Leader {
  871. peer.startHeartbeat()
  872. }
  873. s.peers[peer.Name] = peer
  874. s.DispatchEvent(newEvent(AddPeerEventType, name, nil))
  875. }
  876. // Write the configuration to file.
  877. s.writeConf()
  878. return nil
  879. }
  880. // Removes a peer from the server.
  881. func (s *server) RemovePeer(name string) error {
  882. s.debugln("server.peer.remove: ", name, len(s.peers))
  883. // Skip the Peer if it has the same name as the Server
  884. if name != s.Name() {
  885. // Return error if peer doesn't exist.
  886. peer := s.peers[name]
  887. if peer == nil {
  888. return fmt.Errorf("raft: Peer not found: %s", name)
  889. }
  890. // Stop peer and remove it.
  891. if s.State() == Leader {
  892. peer.stopHeartbeat(true)
  893. }
  894. delete(s.peers, name)
  895. s.DispatchEvent(newEvent(RemovePeerEventType, name, nil))
  896. }
  897. // Write the configuration to file.
  898. s.writeConf()
  899. return nil
  900. }
  901. //--------------------------------------
  902. // Log compaction
  903. //--------------------------------------
  904. func (s *server) TakeSnapshot() error {
  905. //TODO put a snapshot mutex
  906. s.debugln("take Snapshot")
  907. if s.currentSnapshot != nil {
  908. return errors.New("handling snapshot")
  909. }
  910. lastIndex, lastTerm := s.log.commitInfo()
  911. if lastIndex == 0 {
  912. return errors.New("No logs")
  913. }
  914. path := s.SnapshotPath(lastIndex, lastTerm)
  915. var state []byte
  916. var err error
  917. if s.stateMachine != nil {
  918. state, err = s.stateMachine.Save()
  919. if err != nil {
  920. return err
  921. }
  922. } else {
  923. state = []byte{0}
  924. }
  925. peers := make([]*Peer, len(s.peers)+1)
  926. i := 0
  927. for _, peer := range s.peers {
  928. peers[i] = peer.clone()
  929. i++
  930. }
  931. peers[i] = &Peer{
  932. Name: s.Name(),
  933. ConnectionString: s.connectionString,
  934. }
  935. s.currentSnapshot = &Snapshot{lastIndex, lastTerm, peers, state, path}
  936. s.saveSnapshot()
  937. // We keep some log entries after the snapshot
  938. // We do not want to send the whole snapshot
  939. // to the slightly slow machines
  940. if lastIndex-s.log.startIndex > NumberOfLogEntriesAfterSnapshot {
  941. compactIndex := lastIndex - NumberOfLogEntriesAfterSnapshot
  942. compactTerm := s.log.getEntry(compactIndex).Term
  943. s.log.compact(compactIndex, compactTerm)
  944. }
  945. return nil
  946. }
  947. // Retrieves the log path for the server.
  948. func (s *server) saveSnapshot() error {
  949. if s.currentSnapshot == nil {
  950. return errors.New("no snapshot to save")
  951. }
  952. err := s.currentSnapshot.save()
  953. if err != nil {
  954. return err
  955. }
  956. tmp := s.lastSnapshot
  957. s.lastSnapshot = s.currentSnapshot
  958. // delete the previous snapshot if there is any change
  959. if tmp != nil && !(tmp.LastIndex == s.lastSnapshot.LastIndex && tmp.LastTerm == s.lastSnapshot.LastTerm) {
  960. tmp.remove()
  961. }
  962. s.currentSnapshot = nil
  963. return nil
  964. }
  965. // Retrieves the log path for the server.
  966. func (s *server) SnapshotPath(lastIndex uint64, lastTerm uint64) string {
  967. return path.Join(s.path, "snapshot", fmt.Sprintf("%v_%v.ss", lastTerm, lastIndex))
  968. }
  969. func (s *server) RequestSnapshot(req *SnapshotRequest) *SnapshotResponse {
  970. ret, _ := s.send(req)
  971. resp, _ := ret.(*SnapshotResponse)
  972. return resp
  973. }
  974. func (s *server) processSnapshotRequest(req *SnapshotRequest) *SnapshotResponse {
  975. // If the follower’s log contains an entry at the snapshot’s last index with a term
  976. // that matches the snapshot’s last term
  977. // Then the follower already has all the information found in the snapshot
  978. // and can reply false
  979. entry := s.log.getEntry(req.LastIndex)
  980. if entry != nil && entry.Term == req.LastTerm {
  981. return newSnapshotResponse(false)
  982. }
  983. s.setState(Snapshotting)
  984. return newSnapshotResponse(true)
  985. }
  986. func (s *server) SnapshotRecoveryRequest(req *SnapshotRecoveryRequest) *SnapshotRecoveryResponse {
  987. ret, _ := s.send(req)
  988. resp, _ := ret.(*SnapshotRecoveryResponse)
  989. return resp
  990. }
  991. func (s *server) processSnapshotRecoveryRequest(req *SnapshotRecoveryRequest) *SnapshotRecoveryResponse {
  992. s.stateMachine.Recovery(req.State)
  993. // clear the peer map
  994. s.peers = make(map[string]*Peer)
  995. // recovery the cluster configuration
  996. for _, peer := range req.Peers {
  997. s.AddPeer(peer.Name, peer.ConnectionString)
  998. }
  999. //update term and index
  1000. s.currentTerm = req.LastTerm
  1001. s.log.updateCommitIndex(req.LastIndex)
  1002. snapshotPath := s.SnapshotPath(req.LastIndex, req.LastTerm)
  1003. s.currentSnapshot = &Snapshot{req.LastIndex, req.LastTerm, req.Peers, req.State, snapshotPath}
  1004. s.saveSnapshot()
  1005. // clear the previous log entries
  1006. s.log.compact(req.LastIndex, req.LastTerm)
  1007. return newSnapshotRecoveryResponse(req.LastTerm, true, req.LastIndex)
  1008. }
  1009. // Load a snapshot at restart
  1010. func (s *server) LoadSnapshot() error {
  1011. dir, err := os.OpenFile(path.Join(s.path, "snapshot"), os.O_RDONLY, 0)
  1012. if err != nil {
  1013. return err
  1014. }
  1015. filenames, err := dir.Readdirnames(-1)
  1016. if err != nil {
  1017. dir.Close()
  1018. panic(err)
  1019. }
  1020. dir.Close()
  1021. if len(filenames) == 0 {
  1022. return errors.New("no snapshot")
  1023. }
  1024. // not sure how many snapshot we should keep
  1025. sort.Strings(filenames)
  1026. snapshotPath := path.Join(s.path, "snapshot", filenames[len(filenames)-1])
  1027. // should not fail
  1028. file, err := os.OpenFile(snapshotPath, os.O_RDONLY, 0)
  1029. defer file.Close()
  1030. if err != nil {
  1031. panic(err)
  1032. }
  1033. // TODO check checksum first
  1034. var snapshotBytes []byte
  1035. var checksum uint32
  1036. n, err := fmt.Fscanf(file, "%08x\n", &checksum)
  1037. if err != nil {
  1038. return err
  1039. }
  1040. if n != 1 {
  1041. return errors.New("Bad snapshot file")
  1042. }
  1043. snapshotBytes, _ = ioutil.ReadAll(file)
  1044. s.debugln(string(snapshotBytes))
  1045. // Generate checksum.
  1046. byteChecksum := crc32.ChecksumIEEE(snapshotBytes)
  1047. if uint32(checksum) != byteChecksum {
  1048. s.debugln(checksum, " ", byteChecksum)
  1049. return errors.New("bad snapshot file")
  1050. }
  1051. err = json.Unmarshal(snapshotBytes, &s.lastSnapshot)
  1052. if err != nil {
  1053. s.debugln("unmarshal error: ", err)
  1054. return err
  1055. }
  1056. err = s.stateMachine.Recovery(s.lastSnapshot.State)
  1057. if err != nil {
  1058. s.debugln("recovery error: ", err)
  1059. return err
  1060. }
  1061. for _, peer := range s.lastSnapshot.Peers {
  1062. s.AddPeer(peer.Name, peer.ConnectionString)
  1063. }
  1064. s.log.startTerm = s.lastSnapshot.LastTerm
  1065. s.log.startIndex = s.lastSnapshot.LastIndex
  1066. s.log.updateCommitIndex(s.lastSnapshot.LastIndex)
  1067. return err
  1068. }
  1069. //--------------------------------------
  1070. // Config File
  1071. //--------------------------------------
  1072. func (s *server) writeConf() {
  1073. peers := make([]*Peer, len(s.peers))
  1074. i := 0
  1075. for _, peer := range s.peers {
  1076. peers[i] = peer.clone()
  1077. i++
  1078. }
  1079. r := &Config{
  1080. CommitIndex: s.log.commitIndex,
  1081. Peers: peers,
  1082. }
  1083. b, _ := json.Marshal(r)
  1084. confPath := path.Join(s.path, "conf")
  1085. tmpConfPath := path.Join(s.path, "conf.tmp")
  1086. err := writeFileSynced(tmpConfPath, b, 0600)
  1087. if err != nil {
  1088. panic(err)
  1089. }
  1090. os.Rename(tmpConfPath, confPath)
  1091. }
  1092. // Read the configuration for the server.
  1093. func (s *server) readConf() error {
  1094. confPath := path.Join(s.path, "conf")
  1095. s.debugln("readConf.open ", confPath)
  1096. // open conf file
  1097. b, err := ioutil.ReadFile(confPath)
  1098. if err != nil {
  1099. return nil
  1100. }
  1101. conf := &Config{}
  1102. if err = json.Unmarshal(b, conf); err != nil {
  1103. return err
  1104. }
  1105. s.log.updateCommitIndex(conf.CommitIndex)
  1106. return nil
  1107. }
  1108. //--------------------------------------
  1109. // Debugging
  1110. //--------------------------------------
  1111. func (s *server) debugln(v ...interface{}) {
  1112. if logLevel > Debug {
  1113. debugf("[%s Term:%d] %s", s.name, s.Term(), fmt.Sprintln(v...))
  1114. }
  1115. }
  1116. func (s *server) traceln(v ...interface{}) {
  1117. if logLevel > Trace {
  1118. tracef("[%s] %s", s.name, fmt.Sprintln(v...))
  1119. }
  1120. }