server.go 35 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367
  1. package raft
  2. import (
  3. "encoding/json"
  4. "errors"
  5. "fmt"
  6. "hash/crc32"
  7. "io/ioutil"
  8. "os"
  9. "path"
  10. "sort"
  11. "sync"
  12. "time"
  13. )
  14. //------------------------------------------------------------------------------
  15. //
  16. // Constants
  17. //
  18. //------------------------------------------------------------------------------
  19. const (
  20. Stopped = "stopped"
  21. Follower = "follower"
  22. Candidate = "candidate"
  23. Leader = "leader"
  24. Snapshotting = "snapshotting"
  25. )
  26. const (
  27. MaxLogEntriesPerRequest = 2000
  28. NumberOfLogEntriesAfterSnapshot = 200
  29. )
  30. const (
  31. DefaultHeartbeatTimeout = 50 * time.Millisecond
  32. DefaultElectionTimeout = 150 * time.Millisecond
  33. )
  34. // ElectionTimeoutThresholdPercent specifies the threshold at which the server
  35. // will dispatch warning events that the heartbeat RTT is too close to the
  36. // election timeout.
  37. const ElectionTimeoutThresholdPercent = 0.8
  38. var stopValue interface{}
  39. //------------------------------------------------------------------------------
  40. //
  41. // Errors
  42. //
  43. //------------------------------------------------------------------------------
  44. var NotLeaderError = errors.New("raft.Server: Not current leader")
  45. var DuplicatePeerError = errors.New("raft.Server: Duplicate peer")
  46. var CommandTimeoutError = errors.New("raft: Command timeout")
  47. //------------------------------------------------------------------------------
  48. //
  49. // Typedefs
  50. //
  51. //------------------------------------------------------------------------------
  52. // A server is involved in the consensus protocol and can act as a follower,
  53. // candidate or a leader.
  54. type Server interface {
  55. Name() string
  56. Context() interface{}
  57. StateMachine() StateMachine
  58. Leader() string
  59. State() string
  60. Path() string
  61. LogPath() string
  62. SnapshotPath(lastIndex uint64, lastTerm uint64) string
  63. Term() uint64
  64. CommitIndex() uint64
  65. VotedFor() string
  66. MemberCount() int
  67. QuorumSize() int
  68. IsLogEmpty() bool
  69. LogEntries() []*LogEntry
  70. LastCommandName() string
  71. GetState() string
  72. ElectionTimeout() time.Duration
  73. SetElectionTimeout(duration time.Duration)
  74. HeartbeatTimeout() time.Duration
  75. SetHeartbeatTimeout(duration time.Duration)
  76. Transporter() Transporter
  77. SetTransporter(t Transporter)
  78. AppendEntries(req *AppendEntriesRequest) *AppendEntriesResponse
  79. RequestVote(req *RequestVoteRequest) *RequestVoteResponse
  80. RequestSnapshot(req *SnapshotRequest) *SnapshotResponse
  81. SnapshotRecoveryRequest(req *SnapshotRecoveryRequest) *SnapshotRecoveryResponse
  82. AddPeer(name string, connectiongString string) error
  83. RemovePeer(name string) error
  84. Peers() map[string]*Peer
  85. Start() error
  86. Stop()
  87. Running() bool
  88. Do(command Command) (interface{}, error)
  89. TakeSnapshot() error
  90. LoadSnapshot() error
  91. AddEventListener(string, EventListener)
  92. }
  93. type server struct {
  94. *eventDispatcher
  95. name string
  96. path string
  97. state string
  98. transporter Transporter
  99. context interface{}
  100. currentTerm uint64
  101. votedFor string
  102. log *Log
  103. leader string
  104. peers map[string]*Peer
  105. mutex sync.RWMutex
  106. syncedPeer map[string]bool
  107. c chan *ev
  108. electionTimeout time.Duration
  109. heartbeatTimeout time.Duration
  110. currentSnapshot *Snapshot
  111. lastSnapshot *Snapshot
  112. stateMachine StateMachine
  113. maxLogEntriesPerRequest uint64
  114. connectionString string
  115. }
  116. // An internal event to be processed by the server's event loop.
  117. type ev struct {
  118. target interface{}
  119. returnValue interface{}
  120. c chan error
  121. }
  122. //------------------------------------------------------------------------------
  123. //
  124. // Constructor
  125. //
  126. //------------------------------------------------------------------------------
  127. // Creates a new server with a log at the given path. transporter must
  128. // not be nil. stateMachine can be nil if snapshotting and log
  129. // compaction is to be disabled. context can be anything (including nil)
  130. // and is not used by the raft package except returned by
  131. // Server.Context(). connectionString can be anything.
  132. func NewServer(name string, path string, transporter Transporter, stateMachine StateMachine, ctx interface{}, connectionString string) (Server, error) {
  133. if name == "" {
  134. return nil, errors.New("raft.Server: Name cannot be blank")
  135. }
  136. if transporter == nil {
  137. panic("raft: Transporter required")
  138. }
  139. s := &server{
  140. name: name,
  141. path: path,
  142. transporter: transporter,
  143. stateMachine: stateMachine,
  144. context: ctx,
  145. state: Stopped,
  146. peers: make(map[string]*Peer),
  147. log: newLog(),
  148. c: make(chan *ev, 256),
  149. electionTimeout: DefaultElectionTimeout,
  150. heartbeatTimeout: DefaultHeartbeatTimeout,
  151. maxLogEntriesPerRequest: MaxLogEntriesPerRequest,
  152. connectionString: connectionString,
  153. }
  154. s.eventDispatcher = newEventDispatcher(s)
  155. // Setup apply function.
  156. s.log.ApplyFunc = func(c Command) (interface{}, error) {
  157. switch c := c.(type) {
  158. case CommandApply:
  159. return c.Apply(&context{
  160. server: s,
  161. currentTerm: s.currentTerm,
  162. currentIndex: s.log.internalCurrentIndex(),
  163. commitIndex: s.log.commitIndex,
  164. })
  165. case deprecatedCommandApply:
  166. return c.Apply(s)
  167. default:
  168. return nil, fmt.Errorf("Command does not implement Apply()")
  169. }
  170. }
  171. return s, nil
  172. }
  173. //------------------------------------------------------------------------------
  174. //
  175. // Accessors
  176. //
  177. //------------------------------------------------------------------------------
  178. //--------------------------------------
  179. // General
  180. //--------------------------------------
  181. // Retrieves the name of the server.
  182. func (s *server) Name() string {
  183. return s.name
  184. }
  185. // Retrieves the storage path for the server.
  186. func (s *server) Path() string {
  187. return s.path
  188. }
  189. // The name of the current leader.
  190. func (s *server) Leader() string {
  191. return s.leader
  192. }
  193. // Retrieves a copy of the peer data.
  194. func (s *server) Peers() map[string]*Peer {
  195. s.mutex.Lock()
  196. defer s.mutex.Unlock()
  197. peers := make(map[string]*Peer)
  198. for name, peer := range s.peers {
  199. peers[name] = peer.clone()
  200. }
  201. return peers
  202. }
  203. // Retrieves the object that transports requests.
  204. func (s *server) Transporter() Transporter {
  205. s.mutex.RLock()
  206. defer s.mutex.RUnlock()
  207. return s.transporter
  208. }
  209. func (s *server) SetTransporter(t Transporter) {
  210. s.mutex.Lock()
  211. defer s.mutex.Unlock()
  212. s.transporter = t
  213. }
  214. // Retrieves the context passed into the constructor.
  215. func (s *server) Context() interface{} {
  216. return s.context
  217. }
  218. // Retrieves the state machine passed into the constructor.
  219. func (s *server) StateMachine() StateMachine {
  220. return s.stateMachine
  221. }
  222. // Retrieves the log path for the server.
  223. func (s *server) LogPath() string {
  224. return path.Join(s.path, "log")
  225. }
  226. // Retrieves the current state of the server.
  227. func (s *server) State() string {
  228. s.mutex.RLock()
  229. defer s.mutex.RUnlock()
  230. return s.state
  231. }
  232. // Sets the state of the server.
  233. func (s *server) setState(state string) {
  234. s.mutex.Lock()
  235. defer s.mutex.Unlock()
  236. // Temporarily store previous values.
  237. prevState := s.state
  238. prevLeader := s.leader
  239. // Update state and leader.
  240. s.state = state
  241. if state == Leader {
  242. s.leader = s.Name()
  243. }
  244. // Dispatch state and leader change events.
  245. if prevState != state {
  246. s.DispatchEvent(newEvent(StateChangeEventType, s.state, prevState))
  247. }
  248. if prevLeader != s.leader {
  249. s.DispatchEvent(newEvent(LeaderChangeEventType, s.leader, prevLeader))
  250. }
  251. }
  252. // Retrieves the current term of the server.
  253. func (s *server) Term() uint64 {
  254. s.mutex.RLock()
  255. defer s.mutex.RUnlock()
  256. return s.currentTerm
  257. }
  258. // Retrieves the current commit index of the server.
  259. func (s *server) CommitIndex() uint64 {
  260. s.log.mutex.RLock()
  261. defer s.log.mutex.RUnlock()
  262. return s.log.commitIndex
  263. }
  264. // Retrieves the name of the candidate this server voted for in this term.
  265. func (s *server) VotedFor() string {
  266. return s.votedFor
  267. }
  268. // Retrieves whether the server's log has no entries.
  269. func (s *server) IsLogEmpty() bool {
  270. return s.log.isEmpty()
  271. }
  272. // A list of all the log entries. This should only be used for debugging purposes.
  273. func (s *server) LogEntries() []*LogEntry {
  274. return s.log.entries
  275. }
  276. // A reference to the command name of the last entry.
  277. func (s *server) LastCommandName() string {
  278. return s.log.lastCommandName()
  279. }
  280. // Get the state of the server for debugging
  281. func (s *server) GetState() string {
  282. s.mutex.RLock()
  283. defer s.mutex.RUnlock()
  284. return fmt.Sprintf("Name: %s, State: %s, Term: %v, CommitedIndex: %v ", s.name, s.state, s.currentTerm, s.log.commitIndex)
  285. }
  286. // Check if the server is promotable
  287. func (s *server) promotable() bool {
  288. return s.log.currentIndex() > 0
  289. }
  290. //--------------------------------------
  291. // Membership
  292. //--------------------------------------
  293. // Retrieves the number of member servers in the consensus.
  294. func (s *server) MemberCount() int {
  295. s.mutex.Lock()
  296. defer s.mutex.Unlock()
  297. return len(s.peers) + 1
  298. }
  299. // Retrieves the number of servers required to make a quorum.
  300. func (s *server) QuorumSize() int {
  301. return (s.MemberCount() / 2) + 1
  302. }
  303. //--------------------------------------
  304. // Election timeout
  305. //--------------------------------------
  306. // Retrieves the election timeout.
  307. func (s *server) ElectionTimeout() time.Duration {
  308. s.mutex.RLock()
  309. defer s.mutex.RUnlock()
  310. return s.electionTimeout
  311. }
  312. // Sets the election timeout.
  313. func (s *server) SetElectionTimeout(duration time.Duration) {
  314. s.mutex.Lock()
  315. defer s.mutex.Unlock()
  316. s.electionTimeout = duration
  317. }
  318. //--------------------------------------
  319. // Heartbeat timeout
  320. //--------------------------------------
  321. // Retrieves the heartbeat timeout.
  322. func (s *server) HeartbeatTimeout() time.Duration {
  323. s.mutex.RLock()
  324. defer s.mutex.RUnlock()
  325. return s.heartbeatTimeout
  326. }
  327. // Sets the heartbeat timeout.
  328. func (s *server) SetHeartbeatTimeout(duration time.Duration) {
  329. s.mutex.Lock()
  330. defer s.mutex.Unlock()
  331. s.heartbeatTimeout = duration
  332. for _, peer := range s.peers {
  333. peer.setHeartbeatTimeout(duration)
  334. }
  335. }
  336. //------------------------------------------------------------------------------
  337. //
  338. // Methods
  339. //
  340. //------------------------------------------------------------------------------
  341. //--------------------------------------
  342. // Initialization
  343. //--------------------------------------
  344. // Reg the NOPCommand
  345. func init() {
  346. RegisterCommand(&NOPCommand{})
  347. RegisterCommand(&DefaultJoinCommand{})
  348. RegisterCommand(&DefaultLeaveCommand{})
  349. }
  350. // Start as follow
  351. // If log entries exist then allow promotion to candidate if no AEs received.
  352. // If no log entries exist then wait for AEs from another node.
  353. // If no log entries exist and a self-join command is issued then
  354. // immediately become leader and commit entry.
  355. func (s *server) Start() error {
  356. // Exit if the server is already running.
  357. if s.State() != Stopped {
  358. return errors.New("raft.Server: Server already running")
  359. }
  360. // Create snapshot directory if not exist
  361. os.Mkdir(path.Join(s.path, "snapshot"), 0700)
  362. if err := s.readConf(); err != nil {
  363. s.debugln("raft: Conf file error: ", err)
  364. return fmt.Errorf("raft: Initialization error: %s", err)
  365. }
  366. // Initialize the log and load it up.
  367. if err := s.log.open(s.LogPath()); err != nil {
  368. s.debugln("raft: Log error: ", err)
  369. return fmt.Errorf("raft: Initialization error: %s", err)
  370. }
  371. // Update the term to the last term in the log.
  372. _, s.currentTerm = s.log.lastInfo()
  373. s.setState(Follower)
  374. // If no log entries exist then
  375. // 1. wait for AEs from another node
  376. // 2. wait for self-join command
  377. // to set itself promotable
  378. if !s.promotable() {
  379. s.debugln("start as a new raft server")
  380. // If log entries exist then allow promotion to candidate
  381. // if no AEs received.
  382. } else {
  383. s.debugln("start from previous saved state")
  384. }
  385. debugln(s.GetState())
  386. go s.loop()
  387. return nil
  388. }
  389. // Shuts down the server.
  390. func (s *server) Stop() {
  391. s.send(&stopValue)
  392. s.mutex.Lock()
  393. defer s.mutex.Unlock()
  394. s.log.close()
  395. }
  396. // Checks if the server is currently running.
  397. func (s *server) Running() bool {
  398. s.mutex.RLock()
  399. defer s.mutex.RUnlock()
  400. return s.state != Stopped
  401. }
  402. //--------------------------------------
  403. // Term
  404. //--------------------------------------
  405. // Sets the current term for the server. This is only used when an external
  406. // current term is found.
  407. func (s *server) setCurrentTerm(term uint64, leaderName string, append bool) {
  408. s.mutex.Lock()
  409. defer s.mutex.Unlock()
  410. // Store previous values temporarily.
  411. prevState := s.state
  412. prevTerm := s.currentTerm
  413. prevLeader := s.leader
  414. if term > s.currentTerm {
  415. // update the term and clear vote for
  416. s.state = Follower
  417. s.currentTerm = term
  418. s.leader = leaderName
  419. s.votedFor = ""
  420. } else if term == s.currentTerm && s.state != Leader && append {
  421. // discover new leader when candidate
  422. // save leader name when follower
  423. s.state = Follower
  424. s.leader = leaderName
  425. }
  426. // Dispatch change events.
  427. if prevState != s.state {
  428. s.DispatchEvent(newEvent(StateChangeEventType, s.state, prevState))
  429. }
  430. if prevLeader != s.leader {
  431. s.DispatchEvent(newEvent(LeaderChangeEventType, s.leader, prevLeader))
  432. }
  433. if prevTerm != s.currentTerm {
  434. s.DispatchEvent(newEvent(TermChangeEventType, s.currentTerm, prevTerm))
  435. }
  436. }
  437. //--------------------------------------
  438. // Event Loop
  439. //--------------------------------------
  440. // ________
  441. // --|Snapshot| timeout
  442. // | -------- ______
  443. // recover | ^ | |
  444. // snapshot / | |snapshot | |
  445. // higher | | v | recv majority votes
  446. // term | -------- timeout ----------- -----------
  447. // |-> |Follower| ----------> | Candidate |--------------------> | Leader |
  448. // -------- ----------- -----------
  449. // ^ higher term/ | higher term |
  450. // | new leader | |
  451. // |_______________________|____________________________________ |
  452. // The main event loop for the server
  453. func (s *server) loop() {
  454. defer s.debugln("server.loop.end")
  455. for {
  456. state := s.State()
  457. s.debugln("server.loop.run ", state)
  458. switch state {
  459. case Follower:
  460. s.followerLoop()
  461. case Candidate:
  462. s.candidateLoop()
  463. case Leader:
  464. s.leaderLoop()
  465. case Snapshotting:
  466. s.snapshotLoop()
  467. case Stopped:
  468. return
  469. }
  470. }
  471. }
  472. // Sends an event to the event loop to be processed. The function will wait
  473. // until the event is actually processed before returning.
  474. func (s *server) send(value interface{}) (interface{}, error) {
  475. event := s.sendAsync(value)
  476. err := <-event.c
  477. return event.returnValue, err
  478. }
  479. func (s *server) sendAsync(value interface{}) *ev {
  480. event := &ev{target: value, c: make(chan error, 1)}
  481. s.c <- event
  482. return event
  483. }
  484. // The event loop that is run when the server is in a Follower state.
  485. // Responds to RPCs from candidates and leaders.
  486. // Converts to candidate if election timeout elapses without either:
  487. // 1.Receiving valid AppendEntries RPC, or
  488. // 2.Granting vote to candidate
  489. func (s *server) followerLoop() {
  490. s.setState(Follower)
  491. since := time.Now()
  492. electionTimeout := s.ElectionTimeout()
  493. timeoutChan := afterBetween(s.ElectionTimeout(), s.ElectionTimeout()*2)
  494. for {
  495. var err error
  496. update := false
  497. select {
  498. case e := <-s.c:
  499. if e.target == &stopValue {
  500. s.setState(Stopped)
  501. } else {
  502. switch req := e.target.(type) {
  503. case JoinCommand:
  504. //If no log entries exist and a self-join command is issued
  505. //then immediately become leader and commit entry.
  506. if s.log.currentIndex() == 0 && req.NodeName() == s.Name() {
  507. s.debugln("selfjoin and promote to leader")
  508. s.setState(Leader)
  509. s.processCommand(req, e)
  510. } else {
  511. err = NotLeaderError
  512. }
  513. case *AppendEntriesRequest:
  514. // If heartbeats get too close to the election timeout then send an event.
  515. elapsedTime := time.Now().Sub(since)
  516. if elapsedTime > time.Duration(float64(electionTimeout)*ElectionTimeoutThresholdPercent) {
  517. s.DispatchEvent(newEvent(ElectionTimeoutThresholdEventType, elapsedTime, nil))
  518. }
  519. e.returnValue, update = s.processAppendEntriesRequest(req)
  520. case *RequestVoteRequest:
  521. e.returnValue, update = s.processRequestVoteRequest(req)
  522. case *SnapshotRequest:
  523. e.returnValue = s.processSnapshotRequest(req)
  524. default:
  525. err = NotLeaderError
  526. }
  527. }
  528. // Callback to event.
  529. e.c <- err
  530. case <-timeoutChan:
  531. // only allow synced follower to promote to candidate
  532. if s.promotable() {
  533. s.setState(Candidate)
  534. } else {
  535. update = true
  536. }
  537. }
  538. // Converts to candidate if election timeout elapses without either:
  539. // 1.Receiving valid AppendEntries RPC, or
  540. // 2.Granting vote to candidate
  541. if update {
  542. since = time.Now()
  543. timeoutChan = afterBetween(s.ElectionTimeout(), s.ElectionTimeout()*2)
  544. }
  545. // Exit loop on state change.
  546. if s.State() != Follower {
  547. break
  548. }
  549. }
  550. }
  551. // The event loop that is run when the server is in a Candidate state.
  552. func (s *server) candidateLoop() {
  553. lastLogIndex, lastLogTerm := s.log.lastInfo()
  554. // Clear leader value.
  555. prevLeader := s.leader
  556. s.leader = ""
  557. if prevLeader != s.leader {
  558. s.DispatchEvent(newEvent(LeaderChangeEventType, s.leader, prevLeader))
  559. }
  560. for {
  561. // Increment current term, vote for self.
  562. s.currentTerm++
  563. s.votedFor = s.name
  564. // Send RequestVote RPCs to all other servers.
  565. respChan := make(chan *RequestVoteResponse, len(s.peers))
  566. for _, peer := range s.peers {
  567. go peer.sendVoteRequest(newRequestVoteRequest(s.currentTerm, s.name, lastLogIndex, lastLogTerm), respChan)
  568. }
  569. // Wait for either:
  570. // * Votes received from majority of servers: become leader
  571. // * AppendEntries RPC received from new leader: step down.
  572. // * Election timeout elapses without election resolution: increment term, start new election
  573. // * Discover higher term: step down (§5.1)
  574. votesGranted := 1
  575. timeoutChan := afterBetween(s.ElectionTimeout(), s.ElectionTimeout()*2)
  576. timeout := false
  577. for {
  578. // If we received enough votes then stop waiting for more votes.
  579. s.debugln("server.candidate.votes: ", votesGranted, " quorum:", s.QuorumSize())
  580. if votesGranted >= s.QuorumSize() {
  581. s.setState(Leader)
  582. break
  583. }
  584. // Collect votes from peers.
  585. select {
  586. case resp := <-respChan:
  587. if resp.VoteGranted {
  588. s.debugln("server.candidate.vote.granted: ", votesGranted)
  589. votesGranted++
  590. } else if resp.Term > s.currentTerm {
  591. s.debugln("server.candidate.vote.failed")
  592. s.setCurrentTerm(resp.Term, "", false)
  593. } else {
  594. s.debugln("server.candidate.vote: denied")
  595. }
  596. case e := <-s.c:
  597. var err error
  598. if e.target == &stopValue {
  599. s.setState(Stopped)
  600. } else {
  601. switch req := e.target.(type) {
  602. case Command:
  603. err = NotLeaderError
  604. case *AppendEntriesRequest:
  605. e.returnValue, _ = s.processAppendEntriesRequest(req)
  606. case *RequestVoteRequest:
  607. e.returnValue, _ = s.processRequestVoteRequest(req)
  608. }
  609. }
  610. // Callback to event.
  611. e.c <- err
  612. case <-timeoutChan:
  613. timeout = true
  614. }
  615. // both process AER and RVR can make the server to follower
  616. // also break when timeout happens
  617. if s.State() != Candidate || timeout {
  618. break
  619. }
  620. }
  621. // break when we are not candidate
  622. if s.State() != Candidate {
  623. break
  624. }
  625. // continue when timeout happened
  626. }
  627. }
  628. // The event loop that is run when the server is in a Leader state.
  629. func (s *server) leaderLoop() {
  630. s.setState(Leader)
  631. s.syncedPeer = make(map[string]bool)
  632. logIndex, _ := s.log.lastInfo()
  633. // Update the peers prevLogIndex to leader's lastLogIndex and start heartbeat.
  634. s.debugln("leaderLoop.set.PrevIndex to ", logIndex)
  635. for _, peer := range s.peers {
  636. peer.setPrevLogIndex(logIndex)
  637. peer.startHeartbeat()
  638. }
  639. go s.Do(NOPCommand{})
  640. // Begin to collect response from followers
  641. for {
  642. var err error
  643. select {
  644. case e := <-s.c:
  645. if e.target == &stopValue {
  646. s.setState(Stopped)
  647. } else {
  648. switch req := e.target.(type) {
  649. case Command:
  650. s.processCommand(req, e)
  651. continue
  652. case *AppendEntriesRequest:
  653. e.returnValue, _ = s.processAppendEntriesRequest(req)
  654. case *AppendEntriesResponse:
  655. s.processAppendEntriesResponse(req)
  656. case *RequestVoteRequest:
  657. e.returnValue, _ = s.processRequestVoteRequest(req)
  658. }
  659. }
  660. // Callback to event.
  661. e.c <- err
  662. }
  663. // Exit loop on state change.
  664. if s.State() != Leader {
  665. break
  666. }
  667. }
  668. // Stop all peers.
  669. for _, peer := range s.peers {
  670. peer.stopHeartbeat(false)
  671. }
  672. s.syncedPeer = nil
  673. }
  674. func (s *server) snapshotLoop() {
  675. s.setState(Snapshotting)
  676. for {
  677. var err error
  678. e := <-s.c
  679. if e.target == &stopValue {
  680. s.setState(Stopped)
  681. } else {
  682. switch req := e.target.(type) {
  683. case Command:
  684. err = NotLeaderError
  685. case *AppendEntriesRequest:
  686. e.returnValue, _ = s.processAppendEntriesRequest(req)
  687. case *RequestVoteRequest:
  688. e.returnValue, _ = s.processRequestVoteRequest(req)
  689. case *SnapshotRecoveryRequest:
  690. e.returnValue = s.processSnapshotRecoveryRequest(req)
  691. }
  692. }
  693. // Callback to event.
  694. e.c <- err
  695. // Exit loop on state change.
  696. if s.State() != Snapshotting {
  697. break
  698. }
  699. }
  700. }
  701. //--------------------------------------
  702. // Commands
  703. //--------------------------------------
  704. // Attempts to execute a command and replicate it. The function will return
  705. // when the command has been successfully committed or an error has occurred.
  706. func (s *server) Do(command Command) (interface{}, error) {
  707. return s.send(command)
  708. }
  709. // Processes a command.
  710. func (s *server) processCommand(command Command, e *ev) {
  711. s.debugln("server.command.process")
  712. // Create an entry for the command in the log.
  713. entry, err := s.log.createEntry(s.currentTerm, command, e)
  714. if err != nil {
  715. s.debugln("server.command.log.entry.error:", err)
  716. e.c <- err
  717. return
  718. }
  719. if err := s.log.appendEntry(entry); err != nil {
  720. s.debugln("server.command.log.error:", err)
  721. e.c <- err
  722. return
  723. }
  724. // Issue an append entries response for the server.
  725. resp := newAppendEntriesResponse(s.currentTerm, true, s.log.currentIndex(), s.log.CommitIndex())
  726. resp.append = true
  727. resp.peer = s.Name()
  728. // this must be async
  729. // sendAsync is not really async every time
  730. // when the sending speed of the user is larger than
  731. // the processing speed of the server, the buffered channel
  732. // will be full. Then sendAsync will become sync, which will
  733. // cause deadlock here.
  734. // so we use a goroutine to avoid the deadlock
  735. go s.sendAsync(resp)
  736. }
  737. //--------------------------------------
  738. // Append Entries
  739. //--------------------------------------
  740. // Appends zero or more log entry from the leader to this server.
  741. func (s *server) AppendEntries(req *AppendEntriesRequest) *AppendEntriesResponse {
  742. ret, _ := s.send(req)
  743. resp, _ := ret.(*AppendEntriesResponse)
  744. return resp
  745. }
  746. // Processes the "append entries" request.
  747. func (s *server) processAppendEntriesRequest(req *AppendEntriesRequest) (*AppendEntriesResponse, bool) {
  748. s.traceln("server.ae.process")
  749. if req.Term < s.currentTerm {
  750. s.debugln("server.ae.error: stale term")
  751. return newAppendEntriesResponse(s.currentTerm, false, s.log.currentIndex(), s.log.CommitIndex()), false
  752. }
  753. // Update term and leader.
  754. s.setCurrentTerm(req.Term, req.LeaderName, true)
  755. // Reject if log doesn't contain a matching previous entry.
  756. if err := s.log.truncate(req.PrevLogIndex, req.PrevLogTerm); err != nil {
  757. s.debugln("server.ae.truncate.error: ", err)
  758. return newAppendEntriesResponse(s.currentTerm, false, s.log.currentIndex(), s.log.CommitIndex()), true
  759. }
  760. // Append entries to the log.
  761. if err := s.log.appendEntries(req.Entries); err != nil {
  762. s.debugln("server.ae.append.error: ", err)
  763. return newAppendEntriesResponse(s.currentTerm, false, s.log.currentIndex(), s.log.CommitIndex()), true
  764. }
  765. // Commit up to the commit index.
  766. if err := s.log.setCommitIndex(req.CommitIndex); err != nil {
  767. s.debugln("server.ae.commit.error: ", err)
  768. return newAppendEntriesResponse(s.currentTerm, false, s.log.currentIndex(), s.log.CommitIndex()), true
  769. }
  770. // once the server appended and commited all the log entries from the leader
  771. return newAppendEntriesResponse(s.currentTerm, true, s.log.currentIndex(), s.log.CommitIndex()), true
  772. }
  773. // Processes the "append entries" response from the peer. This is only
  774. // processed when the server is a leader. Responses received during other
  775. // states are dropped.
  776. func (s *server) processAppendEntriesResponse(resp *AppendEntriesResponse) {
  777. // If we find a higher term then change to a follower and exit.
  778. if resp.Term > s.Term() {
  779. s.setCurrentTerm(resp.Term, "", false)
  780. return
  781. }
  782. // panic response if it's not successful.
  783. if !resp.Success {
  784. return
  785. }
  786. // if one peer successfully append a log from the leader term,
  787. // we add it to the synced list
  788. if resp.append == true {
  789. s.syncedPeer[resp.peer] = true
  790. }
  791. // Increment the commit count to make sure we have a quorum before committing.
  792. if len(s.syncedPeer) < s.QuorumSize() {
  793. return
  794. }
  795. // Determine the committed index that a majority has.
  796. var indices []uint64
  797. indices = append(indices, s.log.currentIndex())
  798. for _, peer := range s.peers {
  799. indices = append(indices, peer.getPrevLogIndex())
  800. }
  801. sort.Sort(sort.Reverse(uint64Slice(indices)))
  802. // We can commit up to the index which the majority of the members have appended.
  803. commitIndex := indices[s.QuorumSize()-1]
  804. committedIndex := s.log.commitIndex
  805. if commitIndex > committedIndex {
  806. s.log.setCommitIndex(commitIndex)
  807. s.debugln("commit index ", commitIndex)
  808. }
  809. }
  810. //--------------------------------------
  811. // Request Vote
  812. //--------------------------------------
  813. // Requests a vote from a server. A vote can be obtained if the vote's term is
  814. // at the server's current term and the server has not made a vote yet. A vote
  815. // can also be obtained if the term is greater than the server's current term.
  816. func (s *server) RequestVote(req *RequestVoteRequest) *RequestVoteResponse {
  817. ret, _ := s.send(req)
  818. resp, _ := ret.(*RequestVoteResponse)
  819. return resp
  820. }
  821. // Processes a "request vote" request.
  822. func (s *server) processRequestVoteRequest(req *RequestVoteRequest) (*RequestVoteResponse, bool) {
  823. // If the request is coming from an old term then reject it.
  824. if req.Term < s.Term() {
  825. s.debugln("server.rv.error: stale term")
  826. return newRequestVoteResponse(s.currentTerm, false), false
  827. }
  828. s.setCurrentTerm(req.Term, "", false)
  829. // If we've already voted for a different candidate then don't vote for this candidate.
  830. if s.votedFor != "" && s.votedFor != req.CandidateName {
  831. s.debugln("server.rv.error: duplicate vote: ", req.CandidateName,
  832. " already vote for ", s.votedFor)
  833. return newRequestVoteResponse(s.currentTerm, false), false
  834. }
  835. // If the candidate's log is not at least as up-to-date as our last log then don't vote.
  836. lastIndex, lastTerm := s.log.lastInfo()
  837. if lastIndex > req.LastLogIndex || lastTerm > req.LastLogTerm {
  838. s.debugln("server.rv.error: out of date log: ", req.CandidateName,
  839. "Index :[", lastIndex, "]", " [", req.LastLogIndex, "]",
  840. "Term :[", lastTerm, "]", " [", req.LastLogTerm, "]")
  841. return newRequestVoteResponse(s.currentTerm, false), false
  842. }
  843. // If we made it this far then cast a vote and reset our election time out.
  844. s.debugln("server.rv.vote: ", s.name, " votes for", req.CandidateName, "at term", req.Term)
  845. s.votedFor = req.CandidateName
  846. return newRequestVoteResponse(s.currentTerm, true), true
  847. }
  848. //--------------------------------------
  849. // Membership
  850. //--------------------------------------
  851. // Adds a peer to the server.
  852. func (s *server) AddPeer(name string, connectiongString string) error {
  853. s.debugln("server.peer.add: ", name, len(s.peers))
  854. // Do not allow peers to be added twice.
  855. if s.peers[name] != nil {
  856. return nil
  857. }
  858. // Skip the Peer if it has the same name as the Server
  859. if s.name != name {
  860. peer := newPeer(s, name, connectiongString, s.heartbeatTimeout)
  861. if s.State() == Leader {
  862. peer.startHeartbeat()
  863. }
  864. s.peers[peer.Name] = peer
  865. s.DispatchEvent(newEvent(AddPeerEventType, name, nil))
  866. }
  867. // Write the configuration to file.
  868. s.writeConf()
  869. return nil
  870. }
  871. // Removes a peer from the server.
  872. func (s *server) RemovePeer(name string) error {
  873. s.debugln("server.peer.remove: ", name, len(s.peers))
  874. // Skip the Peer if it has the same name as the Server
  875. if name != s.Name() {
  876. // Return error if peer doesn't exist.
  877. peer := s.peers[name]
  878. if peer == nil {
  879. return fmt.Errorf("raft: Peer not found: %s", name)
  880. }
  881. // Stop peer and remove it.
  882. if s.State() == Leader {
  883. peer.stopHeartbeat(true)
  884. }
  885. delete(s.peers, name)
  886. s.DispatchEvent(newEvent(RemovePeerEventType, name, nil))
  887. }
  888. // Write the configuration to file.
  889. s.writeConf()
  890. return nil
  891. }
  892. //--------------------------------------
  893. // Log compaction
  894. //--------------------------------------
  895. func (s *server) TakeSnapshot() error {
  896. //TODO put a snapshot mutex
  897. s.debugln("take Snapshot")
  898. if s.currentSnapshot != nil {
  899. return errors.New("handling snapshot")
  900. }
  901. lastIndex, lastTerm := s.log.commitInfo()
  902. if lastIndex == 0 {
  903. return errors.New("No logs")
  904. }
  905. path := s.SnapshotPath(lastIndex, lastTerm)
  906. var state []byte
  907. var err error
  908. if s.stateMachine != nil {
  909. state, err = s.stateMachine.Save()
  910. if err != nil {
  911. return err
  912. }
  913. } else {
  914. state = []byte{0}
  915. }
  916. peers := make([]*Peer, len(s.peers)+1)
  917. i := 0
  918. for _, peer := range s.peers {
  919. peers[i] = peer.clone()
  920. i++
  921. }
  922. peers[i] = &Peer{
  923. Name: s.Name(),
  924. ConnectionString: s.connectionString,
  925. }
  926. s.currentSnapshot = &Snapshot{lastIndex, lastTerm, peers, state, path}
  927. s.saveSnapshot()
  928. // We keep some log entries after the snapshot
  929. // We do not want to send the whole snapshot
  930. // to the slightly slow machines
  931. if lastIndex-s.log.startIndex > NumberOfLogEntriesAfterSnapshot {
  932. compactIndex := lastIndex - NumberOfLogEntriesAfterSnapshot
  933. compactTerm := s.log.getEntry(compactIndex).Term
  934. s.log.compact(compactIndex, compactTerm)
  935. }
  936. return nil
  937. }
  938. // Retrieves the log path for the server.
  939. func (s *server) saveSnapshot() error {
  940. if s.currentSnapshot == nil {
  941. return errors.New("no snapshot to save")
  942. }
  943. err := s.currentSnapshot.save()
  944. if err != nil {
  945. return err
  946. }
  947. tmp := s.lastSnapshot
  948. s.lastSnapshot = s.currentSnapshot
  949. // delete the previous snapshot if there is any change
  950. if tmp != nil && !(tmp.LastIndex == s.lastSnapshot.LastIndex && tmp.LastTerm == s.lastSnapshot.LastTerm) {
  951. tmp.remove()
  952. }
  953. s.currentSnapshot = nil
  954. return nil
  955. }
  956. // Retrieves the log path for the server.
  957. func (s *server) SnapshotPath(lastIndex uint64, lastTerm uint64) string {
  958. return path.Join(s.path, "snapshot", fmt.Sprintf("%v_%v.ss", lastTerm, lastIndex))
  959. }
  960. func (s *server) RequestSnapshot(req *SnapshotRequest) *SnapshotResponse {
  961. ret, _ := s.send(req)
  962. resp, _ := ret.(*SnapshotResponse)
  963. return resp
  964. }
  965. func (s *server) processSnapshotRequest(req *SnapshotRequest) *SnapshotResponse {
  966. // If the follower’s log contains an entry at the snapshot’s last index with a term
  967. // that matches the snapshot’s last term
  968. // Then the follower already has all the information found in the snapshot
  969. // and can reply false
  970. entry := s.log.getEntry(req.LastIndex)
  971. if entry != nil && entry.Term == req.LastTerm {
  972. return newSnapshotResponse(false)
  973. }
  974. s.setState(Snapshotting)
  975. return newSnapshotResponse(true)
  976. }
  977. func (s *server) SnapshotRecoveryRequest(req *SnapshotRecoveryRequest) *SnapshotRecoveryResponse {
  978. ret, _ := s.send(req)
  979. resp, _ := ret.(*SnapshotRecoveryResponse)
  980. return resp
  981. }
  982. func (s *server) processSnapshotRecoveryRequest(req *SnapshotRecoveryRequest) *SnapshotRecoveryResponse {
  983. s.stateMachine.Recovery(req.State)
  984. // clear the peer map
  985. s.peers = make(map[string]*Peer)
  986. // recovery the cluster configuration
  987. for _, peer := range req.Peers {
  988. s.AddPeer(peer.Name, peer.ConnectionString)
  989. }
  990. //update term and index
  991. s.currentTerm = req.LastTerm
  992. s.log.updateCommitIndex(req.LastIndex)
  993. snapshotPath := s.SnapshotPath(req.LastIndex, req.LastTerm)
  994. s.currentSnapshot = &Snapshot{req.LastIndex, req.LastTerm, req.Peers, req.State, snapshotPath}
  995. s.saveSnapshot()
  996. // clear the previous log entries
  997. s.log.compact(req.LastIndex, req.LastTerm)
  998. return newSnapshotRecoveryResponse(req.LastTerm, true, req.LastIndex)
  999. }
  1000. // Load a snapshot at restart
  1001. func (s *server) LoadSnapshot() error {
  1002. dir, err := os.OpenFile(path.Join(s.path, "snapshot"), os.O_RDONLY, 0)
  1003. if err != nil {
  1004. return err
  1005. }
  1006. filenames, err := dir.Readdirnames(-1)
  1007. if err != nil {
  1008. dir.Close()
  1009. panic(err)
  1010. }
  1011. dir.Close()
  1012. if len(filenames) == 0 {
  1013. return errors.New("no snapshot")
  1014. }
  1015. // not sure how many snapshot we should keep
  1016. sort.Strings(filenames)
  1017. snapshotPath := path.Join(s.path, "snapshot", filenames[len(filenames)-1])
  1018. // should not fail
  1019. file, err := os.OpenFile(snapshotPath, os.O_RDONLY, 0)
  1020. defer file.Close()
  1021. if err != nil {
  1022. panic(err)
  1023. }
  1024. // TODO check checksum first
  1025. var snapshotBytes []byte
  1026. var checksum uint32
  1027. n, err := fmt.Fscanf(file, "%08x\n", &checksum)
  1028. if err != nil {
  1029. return err
  1030. }
  1031. if n != 1 {
  1032. return errors.New("Bad snapshot file")
  1033. }
  1034. snapshotBytes, _ = ioutil.ReadAll(file)
  1035. s.debugln(string(snapshotBytes))
  1036. // Generate checksum.
  1037. byteChecksum := crc32.ChecksumIEEE(snapshotBytes)
  1038. if uint32(checksum) != byteChecksum {
  1039. s.debugln(checksum, " ", byteChecksum)
  1040. return errors.New("bad snapshot file")
  1041. }
  1042. err = json.Unmarshal(snapshotBytes, &s.lastSnapshot)
  1043. if err != nil {
  1044. s.debugln("unmarshal error: ", err)
  1045. return err
  1046. }
  1047. err = s.stateMachine.Recovery(s.lastSnapshot.State)
  1048. if err != nil {
  1049. s.debugln("recovery error: ", err)
  1050. return err
  1051. }
  1052. for _, peer := range s.lastSnapshot.Peers {
  1053. s.AddPeer(peer.Name, peer.ConnectionString)
  1054. }
  1055. s.log.startTerm = s.lastSnapshot.LastTerm
  1056. s.log.startIndex = s.lastSnapshot.LastIndex
  1057. s.log.updateCommitIndex(s.lastSnapshot.LastIndex)
  1058. return err
  1059. }
  1060. //--------------------------------------
  1061. // Config File
  1062. //--------------------------------------
  1063. func (s *server) writeConf() {
  1064. peers := make([]*Peer, len(s.peers))
  1065. i := 0
  1066. for _, peer := range s.peers {
  1067. peers[i] = peer.clone()
  1068. i++
  1069. }
  1070. r := &Config{
  1071. CommitIndex: s.log.commitIndex,
  1072. Peers: peers,
  1073. }
  1074. b, _ := json.Marshal(r)
  1075. confPath := path.Join(s.path, "conf")
  1076. tmpConfPath := path.Join(s.path, "conf.tmp")
  1077. err := ioutil.WriteFile(tmpConfPath, b, 0600)
  1078. if err != nil {
  1079. panic(err)
  1080. }
  1081. os.Rename(tmpConfPath, confPath)
  1082. }
  1083. // Read the configuration for the server.
  1084. func (s *server) readConf() error {
  1085. confPath := path.Join(s.path, "conf")
  1086. s.debugln("readConf.open ", confPath)
  1087. // open conf file
  1088. b, err := ioutil.ReadFile(confPath)
  1089. if err != nil {
  1090. return nil
  1091. }
  1092. conf := &Config{}
  1093. if err = json.Unmarshal(b, conf); err != nil {
  1094. return err
  1095. }
  1096. s.log.updateCommitIndex(conf.CommitIndex)
  1097. return nil
  1098. }
  1099. //--------------------------------------
  1100. // Debugging
  1101. //--------------------------------------
  1102. func (s *server) debugln(v ...interface{}) {
  1103. debugf("[%s Term:%d] %s", s.name, s.Term(), fmt.Sprintln(v...))
  1104. }
  1105. func (s *server) traceln(v ...interface{}) {
  1106. tracef("[%s] %s", s.name, fmt.Sprintln(v...))
  1107. }