server.go 35 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385
  1. package raft
  2. import (
  3. "encoding/json"
  4. "errors"
  5. "fmt"
  6. "hash/crc32"
  7. "io/ioutil"
  8. "os"
  9. "path"
  10. "sort"
  11. "sync"
  12. "time"
  13. )
  14. //------------------------------------------------------------------------------
  15. //
  16. // Constants
  17. //
  18. //------------------------------------------------------------------------------
  19. const (
  20. Stopped = "stopped"
  21. Follower = "follower"
  22. Candidate = "candidate"
  23. Leader = "leader"
  24. Snapshotting = "snapshotting"
  25. )
  26. const (
  27. MaxLogEntriesPerRequest = 2000
  28. NumberOfLogEntriesAfterSnapshot = 200
  29. )
  30. const (
  31. DefaultHeartbeatTimeout = 50 * time.Millisecond
  32. DefaultElectionTimeout = 150 * time.Millisecond
  33. )
  34. var stopValue interface{}
  35. //------------------------------------------------------------------------------
  36. //
  37. // Errors
  38. //
  39. //------------------------------------------------------------------------------
  40. var NotLeaderError = errors.New("raft.Server: Not current leader")
  41. var DuplicatePeerError = errors.New("raft.Server: Duplicate peer")
  42. var CommandTimeoutError = errors.New("raft: Command timeout")
  43. //------------------------------------------------------------------------------
  44. //
  45. // Typedefs
  46. //
  47. //------------------------------------------------------------------------------
  48. // A server is involved in the consensus protocol and can act as a follower,
  49. // candidate or a leader.
  50. type Server interface {
  51. Name() string
  52. Context() interface{}
  53. StateMachine() StateMachine
  54. Leader() string
  55. State() string
  56. Path() string
  57. LogPath() string
  58. SnapshotPath(lastIndex uint64, lastTerm uint64) string
  59. Term() uint64
  60. CommitIndex() uint64
  61. VotedFor() string
  62. MemberCount() int
  63. QuorumSize() int
  64. IsLogEmpty() bool
  65. LogEntries() []*LogEntry
  66. LastCommandName() string
  67. GetState() string
  68. ElectionTimeout() time.Duration
  69. SetElectionTimeout(duration time.Duration)
  70. HeartbeatTimeout() time.Duration
  71. SetHeartbeatTimeout(duration time.Duration)
  72. Transporter() Transporter
  73. SetTransporter(t Transporter)
  74. AppendEntries(req *AppendEntriesRequest) *AppendEntriesResponse
  75. RequestVote(req *RequestVoteRequest) *RequestVoteResponse
  76. RequestSnapshot(req *SnapshotRequest) *SnapshotResponse
  77. SnapshotRecoveryRequest(req *SnapshotRecoveryRequest) *SnapshotRecoveryResponse
  78. AddPeer(name string, connectiongString string) error
  79. RemovePeer(name string) error
  80. Peers() map[string]*Peer
  81. Start() error
  82. Stop()
  83. Running() bool
  84. Do(command Command) (interface{}, error)
  85. TakeSnapshot() error
  86. LoadSnapshot() error
  87. AddEventListener(string, EventListener)
  88. }
  89. type server struct {
  90. *eventDispatcher
  91. name string
  92. path string
  93. state string
  94. transporter Transporter
  95. context interface{}
  96. currentTerm uint64
  97. votedFor string
  98. log *Log
  99. leader string
  100. peers map[string]*Peer
  101. mutex sync.RWMutex
  102. syncedPeer map[string]bool
  103. c chan *ev
  104. electionTimeout time.Duration
  105. heartbeatTimeout time.Duration
  106. currentSnapshot *Snapshot
  107. lastSnapshot *Snapshot
  108. stateMachine StateMachine
  109. maxLogEntriesPerRequest uint64
  110. connectionString string
  111. }
  112. // An internal event to be processed by the server's event loop.
  113. type ev struct {
  114. target interface{}
  115. returnValue interface{}
  116. c chan error
  117. }
  118. //------------------------------------------------------------------------------
  119. //
  120. // Constructor
  121. //
  122. //------------------------------------------------------------------------------
  123. // Creates a new server with a log at the given path. transporter must
  124. // not be nil. stateMachine can be nil if snapshotting and log
  125. // compaction is to be disabled. context can be anything (including nil)
  126. // and is not used by the raft package except returned by
  127. // Server.Context(). connectionString can be anything.
  128. func NewServer(name string, path string, transporter Transporter, stateMachine StateMachine, ctx interface{}, connectionString string) (Server, error) {
  129. if name == "" {
  130. return nil, errors.New("raft.Server: Name cannot be blank")
  131. }
  132. if transporter == nil {
  133. panic("raft: Transporter required")
  134. }
  135. s := &server{
  136. name: name,
  137. path: path,
  138. transporter: transporter,
  139. stateMachine: stateMachine,
  140. context: ctx,
  141. state: Stopped,
  142. peers: make(map[string]*Peer),
  143. log: newLog(),
  144. c: make(chan *ev, 256),
  145. electionTimeout: DefaultElectionTimeout,
  146. heartbeatTimeout: DefaultHeartbeatTimeout,
  147. maxLogEntriesPerRequest: MaxLogEntriesPerRequest,
  148. connectionString: connectionString,
  149. }
  150. s.eventDispatcher = newEventDispatcher(s)
  151. // Setup apply function.
  152. s.log.ApplyFunc = func(c Command) (interface{}, error) {
  153. switch c := c.(type) {
  154. case CommandApply:
  155. return c.Apply(&context{
  156. server: s,
  157. currentTerm: s.currentTerm,
  158. currentIndex: s.log.internalCurrentIndex(),
  159. commitIndex: s.log.commitIndex,
  160. })
  161. case deprecatedCommandApply:
  162. return c.Apply(s)
  163. default:
  164. return nil, fmt.Errorf("Command does not implement Apply()")
  165. }
  166. }
  167. return s, nil
  168. }
  169. //------------------------------------------------------------------------------
  170. //
  171. // Accessors
  172. //
  173. //------------------------------------------------------------------------------
  174. //--------------------------------------
  175. // General
  176. //--------------------------------------
  177. // Retrieves the name of the server.
  178. func (s *server) Name() string {
  179. return s.name
  180. }
  181. // Retrieves the storage path for the server.
  182. func (s *server) Path() string {
  183. return s.path
  184. }
  185. // The name of the current leader.
  186. func (s *server) Leader() string {
  187. return s.leader
  188. }
  189. // Retrieves a copy of the peer data.
  190. func (s *server) Peers() map[string]*Peer {
  191. s.mutex.Lock()
  192. defer s.mutex.Unlock()
  193. peers := make(map[string]*Peer)
  194. for name, peer := range s.peers {
  195. peers[name] = peer.clone()
  196. }
  197. return peers
  198. }
  199. // Retrieves the object that transports requests.
  200. func (s *server) Transporter() Transporter {
  201. s.mutex.RLock()
  202. defer s.mutex.RUnlock()
  203. return s.transporter
  204. }
  205. func (s *server) SetTransporter(t Transporter) {
  206. s.mutex.Lock()
  207. defer s.mutex.Unlock()
  208. s.transporter = t
  209. }
  210. // Retrieves the context passed into the constructor.
  211. func (s *server) Context() interface{} {
  212. return s.context
  213. }
  214. // Retrieves the state machine passed into the constructor.
  215. func (s *server) StateMachine() StateMachine {
  216. return s.stateMachine
  217. }
  218. // Retrieves the log path for the server.
  219. func (s *server) LogPath() string {
  220. return path.Join(s.path, "log")
  221. }
  222. // Retrieves the current state of the server.
  223. func (s *server) State() string {
  224. s.mutex.RLock()
  225. defer s.mutex.RUnlock()
  226. return s.state
  227. }
  228. // Sets the state of the server.
  229. func (s *server) setState(state string) {
  230. s.mutex.Lock()
  231. defer s.mutex.Unlock()
  232. // Temporarily store previous values.
  233. prevState := s.state
  234. prevLeader := s.leader
  235. // Update state and leader.
  236. s.state = state
  237. if state == Leader {
  238. s.leader = s.Name()
  239. }
  240. // Dispatch state and leader change events.
  241. if prevState != state {
  242. s.DispatchEvent(newEvent(StateChangeEventType, s.state, prevState))
  243. }
  244. if prevLeader != s.leader {
  245. s.DispatchEvent(newEvent(LeaderChangeEventType, s.leader, prevLeader))
  246. }
  247. }
  248. // Retrieves the current term of the server.
  249. func (s *server) Term() uint64 {
  250. s.mutex.RLock()
  251. defer s.mutex.RUnlock()
  252. return s.currentTerm
  253. }
  254. // Retrieves the current commit index of the server.
  255. func (s *server) CommitIndex() uint64 {
  256. s.log.mutex.RLock()
  257. defer s.log.mutex.RUnlock()
  258. return s.log.commitIndex
  259. }
  260. // Retrieves the name of the candidate this server voted for in this term.
  261. func (s *server) VotedFor() string {
  262. return s.votedFor
  263. }
  264. // Retrieves whether the server's log has no entries.
  265. func (s *server) IsLogEmpty() bool {
  266. return s.log.isEmpty()
  267. }
  268. // A list of all the log entries. This should only be used for debugging purposes.
  269. func (s *server) LogEntries() []*LogEntry {
  270. return s.log.entries
  271. }
  272. // A reference to the command name of the last entry.
  273. func (s *server) LastCommandName() string {
  274. return s.log.lastCommandName()
  275. }
  276. // Get the state of the server for debugging
  277. func (s *server) GetState() string {
  278. s.mutex.RLock()
  279. defer s.mutex.RUnlock()
  280. return fmt.Sprintf("Name: %s, State: %s, Term: %v, CommitedIndex: %v ", s.name, s.state, s.currentTerm, s.log.commitIndex)
  281. }
  282. // Check if the server is promotable
  283. func (s *server) promotable() bool {
  284. return s.log.currentIndex() > 0
  285. }
  286. //--------------------------------------
  287. // Membership
  288. //--------------------------------------
  289. // Retrieves the number of member servers in the consensus.
  290. func (s *server) MemberCount() int {
  291. s.mutex.Lock()
  292. defer s.mutex.Unlock()
  293. return len(s.peers) + 1
  294. }
  295. // Retrieves the number of servers required to make a quorum.
  296. func (s *server) QuorumSize() int {
  297. return (s.MemberCount() / 2) + 1
  298. }
  299. //--------------------------------------
  300. // Election timeout
  301. //--------------------------------------
  302. // Retrieves the election timeout.
  303. func (s *server) ElectionTimeout() time.Duration {
  304. s.mutex.RLock()
  305. defer s.mutex.RUnlock()
  306. return s.electionTimeout
  307. }
  308. // Sets the election timeout.
  309. func (s *server) SetElectionTimeout(duration time.Duration) {
  310. s.mutex.Lock()
  311. defer s.mutex.Unlock()
  312. s.electionTimeout = duration
  313. }
  314. //--------------------------------------
  315. // Heartbeat timeout
  316. //--------------------------------------
  317. // Retrieves the heartbeat timeout.
  318. func (s *server) HeartbeatTimeout() time.Duration {
  319. s.mutex.RLock()
  320. defer s.mutex.RUnlock()
  321. return s.heartbeatTimeout
  322. }
  323. // Sets the heartbeat timeout.
  324. func (s *server) SetHeartbeatTimeout(duration time.Duration) {
  325. s.mutex.Lock()
  326. defer s.mutex.Unlock()
  327. s.heartbeatTimeout = duration
  328. for _, peer := range s.peers {
  329. peer.setHeartbeatTimeout(duration)
  330. }
  331. }
  332. //------------------------------------------------------------------------------
  333. //
  334. // Methods
  335. //
  336. //------------------------------------------------------------------------------
  337. //--------------------------------------
  338. // Initialization
  339. //--------------------------------------
  340. // Reg the NOPCommand
  341. func init() {
  342. RegisterCommand(&NOPCommand{})
  343. RegisterCommand(&DefaultJoinCommand{})
  344. RegisterCommand(&DefaultLeaveCommand{})
  345. }
  346. // Start as follow
  347. // If log entries exist then allow promotion to candidate if no AEs received.
  348. // If no log entries exist then wait for AEs from another node.
  349. // If no log entries exist and a self-join command is issued then
  350. // immediately become leader and commit entry.
  351. func (s *server) Start() error {
  352. // Exit if the server is already running.
  353. if s.State() != Stopped {
  354. return errors.New("raft.Server: Server already running")
  355. }
  356. // Create snapshot directory if not exist
  357. os.Mkdir(path.Join(s.path, "snapshot"), 0700)
  358. if err := s.readConf(); err != nil {
  359. s.debugln("raft: Conf file error: ", err)
  360. return fmt.Errorf("raft: Initialization error: %s", err)
  361. }
  362. // Initialize the log and load it up.
  363. if err := s.log.open(s.LogPath()); err != nil {
  364. s.debugln("raft: Log error: ", err)
  365. return fmt.Errorf("raft: Initialization error: %s", err)
  366. }
  367. // Update the term to the last term in the log.
  368. _, s.currentTerm = s.log.lastInfo()
  369. s.setState(Follower)
  370. // If no log entries exist then
  371. // 1. wait for AEs from another node
  372. // 2. wait for self-join command
  373. // to set itself promotable
  374. if !s.promotable() {
  375. s.debugln("start as a new raft server")
  376. // If log entries exist then allow promotion to candidate
  377. // if no AEs received.
  378. } else {
  379. s.debugln("start from previous saved state")
  380. }
  381. debugln(s.GetState())
  382. go s.loop()
  383. return nil
  384. }
  385. // Shuts down the server.
  386. func (s *server) Stop() {
  387. s.send(&stopValue)
  388. s.mutex.Lock()
  389. defer s.mutex.Unlock()
  390. s.log.close()
  391. }
  392. // Checks if the server is currently running.
  393. func (s *server) Running() bool {
  394. s.mutex.RLock()
  395. defer s.mutex.RUnlock()
  396. return s.state != Stopped
  397. }
  398. //--------------------------------------
  399. // Term
  400. //--------------------------------------
  401. // Sets the current term for the server. This is only used when an external
  402. // current term is found.
  403. func (s *server) setCurrentTerm(term uint64, leaderName string, append bool) {
  404. s.mutex.Lock()
  405. defer s.mutex.Unlock()
  406. // Store previous values temporarily.
  407. prevState := s.state
  408. prevTerm := s.currentTerm
  409. prevLeader := s.leader
  410. if term > s.currentTerm {
  411. // update the term and clear vote for
  412. s.state = Follower
  413. s.currentTerm = term
  414. s.leader = leaderName
  415. s.votedFor = ""
  416. } else if term == s.currentTerm && s.state != Leader && append {
  417. // discover new leader when candidate
  418. // save leader name when follower
  419. s.state = Follower
  420. s.leader = leaderName
  421. }
  422. // Dispatch change events.
  423. if prevState != s.state {
  424. s.DispatchEvent(newEvent(StateChangeEventType, s.state, prevState))
  425. }
  426. if prevLeader != s.leader {
  427. s.DispatchEvent(newEvent(LeaderChangeEventType, s.leader, prevLeader))
  428. }
  429. if prevTerm != s.currentTerm {
  430. s.DispatchEvent(newEvent(TermChangeEventType, s.currentTerm, prevTerm))
  431. }
  432. }
  433. //--------------------------------------
  434. // Event Loop
  435. //--------------------------------------
  436. // ________
  437. // --|Snapshot| timeout
  438. // | -------- ______
  439. // recover | ^ | |
  440. // snapshot / | |snapshot | |
  441. // higher | | v | recv majority votes
  442. // term | -------- timeout ----------- -----------
  443. // |-> |Follower| ----------> | Candidate |--------------------> | Leader |
  444. // -------- ----------- -----------
  445. // ^ higher term/ | higher term |
  446. // | new leader | |
  447. // |_______________________|____________________________________ |
  448. // The main event loop for the server
  449. func (s *server) loop() {
  450. defer s.debugln("server.loop.end")
  451. for {
  452. state := s.State()
  453. s.debugln("server.loop.run ", state)
  454. switch state {
  455. case Follower:
  456. s.followerLoop()
  457. case Candidate:
  458. s.candidateLoop()
  459. case Leader:
  460. s.leaderLoop()
  461. case Snapshotting:
  462. s.snapshotLoop()
  463. case Stopped:
  464. return
  465. }
  466. }
  467. }
  468. // Sends an event to the event loop to be processed. The function will wait
  469. // until the event is actually processed before returning.
  470. func (s *server) send(value interface{}) (interface{}, error) {
  471. event := s.sendAsync(value)
  472. err := <-event.c
  473. return event.returnValue, err
  474. }
  475. func (s *server) sendAsync(value interface{}) *ev {
  476. event := &ev{target: value, c: make(chan error, 1)}
  477. s.c <- event
  478. return event
  479. }
  480. // The event loop that is run when the server is in a Follower state.
  481. // Responds to RPCs from candidates and leaders.
  482. // Converts to candidate if election timeout elapses without either:
  483. // 1.Receiving valid AppendEntries RPC, or
  484. // 2.Granting vote to candidate
  485. func (s *server) followerLoop() {
  486. s.setState(Follower)
  487. timeoutChan := afterBetween(s.ElectionTimeout(), s.ElectionTimeout()*2)
  488. for {
  489. var err error
  490. update := false
  491. select {
  492. case e := <-s.c:
  493. if e.target == &stopValue {
  494. s.setState(Stopped)
  495. } else {
  496. switch req := e.target.(type) {
  497. case JoinCommand:
  498. //If no log entries exist and a self-join command is issued
  499. //then immediately become leader and commit entry.
  500. if s.log.currentIndex() == 0 && req.NodeName() == s.Name() {
  501. s.debugln("selfjoin and promote to leader")
  502. s.setState(Leader)
  503. s.processCommand(req, e)
  504. } else {
  505. err = NotLeaderError
  506. }
  507. case *AppendEntriesRequest:
  508. e.returnValue, update = s.processAppendEntriesRequest(req)
  509. case *RequestVoteRequest:
  510. e.returnValue, update = s.processRequestVoteRequest(req)
  511. case *SnapshotRequest:
  512. e.returnValue = s.processSnapshotRequest(req)
  513. default:
  514. err = NotLeaderError
  515. }
  516. }
  517. // Callback to event.
  518. e.c <- err
  519. case <-timeoutChan:
  520. // only allow synced follower to promote to candidate
  521. if s.promotable() {
  522. s.setState(Candidate)
  523. } else {
  524. update = true
  525. }
  526. }
  527. // Converts to candidate if election timeout elapses without either:
  528. // 1.Receiving valid AppendEntries RPC, or
  529. // 2.Granting vote to candidate
  530. if update {
  531. timeoutChan = afterBetween(s.ElectionTimeout(), s.ElectionTimeout()*2)
  532. }
  533. // Exit loop on state change.
  534. if s.State() != Follower {
  535. break
  536. }
  537. }
  538. }
  539. // The event loop that is run when the server is in a Candidate state.
  540. func (s *server) candidateLoop() {
  541. lastLogIndex, lastLogTerm := s.log.lastInfo()
  542. // Clear leader value.
  543. prevLeader := s.leader
  544. s.leader = ""
  545. if prevLeader != s.leader {
  546. s.DispatchEvent(newEvent(LeaderChangeEventType, s.leader, prevLeader))
  547. }
  548. for {
  549. // Increment current term, vote for self.
  550. s.currentTerm++
  551. s.votedFor = s.name
  552. // Send RequestVote RPCs to all other servers.
  553. respChan := make(chan *RequestVoteResponse, len(s.peers))
  554. for _, peer := range s.peers {
  555. go peer.sendVoteRequest(newRequestVoteRequest(s.currentTerm, s.name, lastLogIndex, lastLogTerm), respChan)
  556. }
  557. // Wait for either:
  558. // * Votes received from majority of servers: become leader
  559. // * AppendEntries RPC received from new leader: step down.
  560. // * Election timeout elapses without election resolution: increment term, start new election
  561. // * Discover higher term: step down (§5.1)
  562. votesGranted := 1
  563. timeoutChan := afterBetween(s.ElectionTimeout(), s.ElectionTimeout()*2)
  564. timeout := false
  565. for {
  566. // If we received enough votes then stop waiting for more votes.
  567. s.debugln("server.candidate.votes: ", votesGranted, " quorum:", s.QuorumSize())
  568. if votesGranted >= s.QuorumSize() {
  569. s.setState(Leader)
  570. break
  571. }
  572. // Collect votes from peers.
  573. select {
  574. case resp := <-respChan:
  575. if resp.VoteGranted {
  576. s.debugln("server.candidate.vote.granted: ", votesGranted)
  577. votesGranted++
  578. } else if resp.Term > s.currentTerm {
  579. s.debugln("server.candidate.vote.failed")
  580. s.setCurrentTerm(resp.Term, "", false)
  581. } else {
  582. s.debugln("server.candidate.vote: denied")
  583. }
  584. case e := <-s.c:
  585. var err error
  586. if e.target == &stopValue {
  587. s.setState(Stopped)
  588. } else {
  589. switch req := e.target.(type) {
  590. case Command:
  591. err = NotLeaderError
  592. case *AppendEntriesRequest:
  593. e.returnValue, _ = s.processAppendEntriesRequest(req)
  594. case *RequestVoteRequest:
  595. e.returnValue, _ = s.processRequestVoteRequest(req)
  596. }
  597. }
  598. // Callback to event.
  599. e.c <- err
  600. case <-timeoutChan:
  601. timeout = true
  602. }
  603. // both process AER and RVR can make the server to follower
  604. // also break when timeout happens
  605. if s.State() != Candidate || timeout {
  606. break
  607. }
  608. }
  609. // break when we are not candidate
  610. if s.State() != Candidate {
  611. break
  612. }
  613. // continue when timeout happened
  614. }
  615. }
  616. // The event loop that is run when the server is in a Leader state.
  617. func (s *server) leaderLoop() {
  618. s.setState(Leader)
  619. s.syncedPeer = make(map[string]bool)
  620. logIndex, _ := s.log.lastInfo()
  621. // Update the peers prevLogIndex to leader's lastLogIndex and start heartbeat.
  622. s.debugln("leaderLoop.set.PrevIndex to ", logIndex)
  623. for _, peer := range s.peers {
  624. peer.setPrevLogIndex(logIndex)
  625. peer.startHeartbeat()
  626. }
  627. go s.Do(NOPCommand{})
  628. // Begin to collect response from followers
  629. for {
  630. var err error
  631. select {
  632. case e := <-s.c:
  633. if e.target == &stopValue {
  634. s.setState(Stopped)
  635. } else {
  636. switch req := e.target.(type) {
  637. case Command:
  638. s.processCommand(req, e)
  639. continue
  640. case *AppendEntriesRequest:
  641. e.returnValue, _ = s.processAppendEntriesRequest(req)
  642. case *AppendEntriesResponse:
  643. s.processAppendEntriesResponse(req)
  644. case *RequestVoteRequest:
  645. e.returnValue, _ = s.processRequestVoteRequest(req)
  646. }
  647. }
  648. // Callback to event.
  649. e.c <- err
  650. }
  651. // Exit loop on state change.
  652. if s.State() != Leader {
  653. break
  654. }
  655. }
  656. // Stop all peers.
  657. for _, peer := range s.peers {
  658. peer.stopHeartbeat(false)
  659. }
  660. s.syncedPeer = nil
  661. }
  662. func (s *server) snapshotLoop() {
  663. s.setState(Snapshotting)
  664. for {
  665. var err error
  666. e := <-s.c
  667. if e.target == &stopValue {
  668. s.setState(Stopped)
  669. } else {
  670. switch req := e.target.(type) {
  671. case Command:
  672. err = NotLeaderError
  673. case *AppendEntriesRequest:
  674. e.returnValue, _ = s.processAppendEntriesRequest(req)
  675. case *RequestVoteRequest:
  676. e.returnValue, _ = s.processRequestVoteRequest(req)
  677. case *SnapshotRecoveryRequest:
  678. e.returnValue = s.processSnapshotRecoveryRequest(req)
  679. }
  680. }
  681. // Callback to event.
  682. e.c <- err
  683. // Exit loop on state change.
  684. if s.State() != Snapshotting {
  685. break
  686. }
  687. }
  688. }
  689. //--------------------------------------
  690. // Commands
  691. //--------------------------------------
  692. // Attempts to execute a command and replicate it. The function will return
  693. // when the command has been successfully committed or an error has occurred.
  694. func (s *server) Do(command Command) (interface{}, error) {
  695. return s.send(command)
  696. }
  697. // Processes a command.
  698. func (s *server) processCommand(command Command, e *ev) {
  699. s.debugln("server.command.process")
  700. // Create an entry for the command in the log.
  701. entry, err := s.log.createEntry(s.currentTerm, command)
  702. if err != nil {
  703. s.debugln("server.command.log.entry.error:", err)
  704. e.c <- err
  705. return
  706. }
  707. if err := s.log.appendEntry(entry); err != nil {
  708. s.debugln("server.command.log.error:", err)
  709. e.c <- err
  710. return
  711. }
  712. // Issue a callback for the entry once it's committed.
  713. go func() {
  714. // Wait for the entry to be committed.
  715. select {
  716. case <-entry.commit:
  717. var err error
  718. s.debugln("server.command.commit")
  719. e.returnValue, err = s.log.getEntryResult(entry, true)
  720. e.c <- err
  721. case <-time.After(time.Second):
  722. s.debugln("server.command.timeout")
  723. e.c <- CommandTimeoutError
  724. }
  725. }()
  726. // Issue an append entries response for the server.
  727. resp := newAppendEntriesResponse(s.currentTerm, true, s.log.currentIndex(), s.log.CommitIndex())
  728. resp.append = true
  729. resp.peer = s.Name()
  730. // this must be async
  731. // sendAsync is not really async every time
  732. // when the sending speed of the user is larger than
  733. // the processing speed of the server, the buffered channel
  734. // will be full. Then sendAsync will become sync, which will
  735. // cause deadlock here.
  736. // so we use a goroutine to avoid the deadlock
  737. go s.sendAsync(resp)
  738. }
  739. //--------------------------------------
  740. // Append Entries
  741. //--------------------------------------
  742. // Appends zero or more log entry from the leader to this server.
  743. func (s *server) AppendEntries(req *AppendEntriesRequest) *AppendEntriesResponse {
  744. ret, _ := s.send(req)
  745. resp, _ := ret.(*AppendEntriesResponse)
  746. return resp
  747. }
  748. // Processes the "append entries" request.
  749. func (s *server) processAppendEntriesRequest(req *AppendEntriesRequest) (*AppendEntriesResponse, bool) {
  750. s.traceln("server.ae.process")
  751. if req.Term < s.currentTerm {
  752. s.debugln("server.ae.error: stale term")
  753. return newAppendEntriesResponse(s.currentTerm, false, s.log.currentIndex(), s.log.CommitIndex()), false
  754. }
  755. // Update term and leader.
  756. s.setCurrentTerm(req.Term, req.LeaderName, true)
  757. // Reject if log doesn't contain a matching previous entry.
  758. if err := s.log.truncate(req.PrevLogIndex, req.PrevLogTerm); err != nil {
  759. s.debugln("server.ae.truncate.error: ", err)
  760. return newAppendEntriesResponse(s.currentTerm, false, s.log.currentIndex(), s.log.CommitIndex()), true
  761. }
  762. // Append entries to the log.
  763. if err := s.log.appendEntries(req.Entries); err != nil {
  764. s.debugln("server.ae.append.error: ", err)
  765. return newAppendEntriesResponse(s.currentTerm, false, s.log.currentIndex(), s.log.CommitIndex()), true
  766. }
  767. // Commit up to the commit index.
  768. if err := s.log.setCommitIndex(req.CommitIndex); err != nil {
  769. s.debugln("server.ae.commit.error: ", err)
  770. return newAppendEntriesResponse(s.currentTerm, false, s.log.currentIndex(), s.log.CommitIndex()), true
  771. }
  772. // once the server appended and commited all the log entries from the leader
  773. return newAppendEntriesResponse(s.currentTerm, true, s.log.currentIndex(), s.log.CommitIndex()), true
  774. }
  775. // Processes the "append entries" response from the peer. This is only
  776. // processed when the server is a leader. Responses received during other
  777. // states are dropped.
  778. func (s *server) processAppendEntriesResponse(resp *AppendEntriesResponse) {
  779. // If we find a higher term then change to a follower and exit.
  780. if resp.Term > s.Term() {
  781. s.setCurrentTerm(resp.Term, "", false)
  782. return
  783. }
  784. // panic response if it's not successful.
  785. if !resp.Success {
  786. return
  787. }
  788. // if one peer successfully append a log from the leader term,
  789. // we add it to the synced list
  790. if resp.append == true {
  791. s.syncedPeer[resp.peer] = true
  792. }
  793. // Increment the commit count to make sure we have a quorum before committing.
  794. if len(s.syncedPeer) < s.QuorumSize() {
  795. return
  796. }
  797. // Determine the committed index that a majority has.
  798. var indices []uint64
  799. indices = append(indices, s.log.currentIndex())
  800. for _, peer := range s.peers {
  801. indices = append(indices, peer.getPrevLogIndex())
  802. }
  803. sort.Sort(sort.Reverse(uint64Slice(indices)))
  804. // We can commit up to the index which the majority of the members have appended.
  805. commitIndex := indices[s.QuorumSize()-1]
  806. committedIndex := s.log.commitIndex
  807. if commitIndex > committedIndex {
  808. s.log.setCommitIndex(commitIndex)
  809. s.debugln("commit index ", commitIndex)
  810. for i := committedIndex; i < commitIndex; i++ {
  811. if entry := s.log.getEntry(i + 1); entry != nil {
  812. // if the leader is a new one and the entry came from the
  813. // old leader, the commit channel will be nil and no go routine
  814. // is waiting from this channel
  815. // if we try to send to it, the new leader will get stuck
  816. if entry.commit != nil {
  817. select {
  818. case entry.commit <- true:
  819. default:
  820. panic("server unable to send signal to commit channel")
  821. }
  822. entry.commit = nil
  823. }
  824. }
  825. }
  826. }
  827. }
  828. //--------------------------------------
  829. // Request Vote
  830. //--------------------------------------
  831. // Requests a vote from a server. A vote can be obtained if the vote's term is
  832. // at the server's current term and the server has not made a vote yet. A vote
  833. // can also be obtained if the term is greater than the server's current term.
  834. func (s *server) RequestVote(req *RequestVoteRequest) *RequestVoteResponse {
  835. ret, _ := s.send(req)
  836. resp, _ := ret.(*RequestVoteResponse)
  837. return resp
  838. }
  839. // Processes a "request vote" request.
  840. func (s *server) processRequestVoteRequest(req *RequestVoteRequest) (*RequestVoteResponse, bool) {
  841. // If the request is coming from an old term then reject it.
  842. if req.Term < s.Term() {
  843. s.debugln("server.rv.error: stale term")
  844. return newRequestVoteResponse(s.currentTerm, false), false
  845. }
  846. s.setCurrentTerm(req.Term, "", false)
  847. // If we've already voted for a different candidate then don't vote for this candidate.
  848. if s.votedFor != "" && s.votedFor != req.CandidateName {
  849. s.debugln("server.rv.error: duplicate vote: ", req.CandidateName,
  850. " already vote for ", s.votedFor)
  851. return newRequestVoteResponse(s.currentTerm, false), false
  852. }
  853. // If the candidate's log is not at least as up-to-date as our last log then don't vote.
  854. lastIndex, lastTerm := s.log.lastInfo()
  855. if lastIndex > req.LastLogIndex || lastTerm > req.LastLogTerm {
  856. s.debugln("server.rv.error: out of date log: ", req.CandidateName,
  857. "Index :[", lastIndex, "]", " [", req.LastLogIndex, "]",
  858. "Term :[", lastTerm, "]", " [", req.LastLogTerm, "]")
  859. return newRequestVoteResponse(s.currentTerm, false), false
  860. }
  861. // If we made it this far then cast a vote and reset our election time out.
  862. s.debugln("server.rv.vote: ", s.name, " votes for", req.CandidateName, "at term", req.Term)
  863. s.votedFor = req.CandidateName
  864. return newRequestVoteResponse(s.currentTerm, true), true
  865. }
  866. //--------------------------------------
  867. // Membership
  868. //--------------------------------------
  869. // Adds a peer to the server.
  870. func (s *server) AddPeer(name string, connectiongString string) error {
  871. s.debugln("server.peer.add: ", name, len(s.peers))
  872. // Do not allow peers to be added twice.
  873. if s.peers[name] != nil {
  874. return nil
  875. }
  876. // Skip the Peer if it has the same name as the Server
  877. if s.name != name {
  878. peer := newPeer(s, name, connectiongString, s.heartbeatTimeout)
  879. if s.State() == Leader {
  880. peer.startHeartbeat()
  881. }
  882. s.peers[peer.Name] = peer
  883. s.DispatchEvent(newEvent(AddPeerEventType, name, nil))
  884. }
  885. // Write the configuration to file.
  886. s.writeConf()
  887. return nil
  888. }
  889. // Removes a peer from the server.
  890. func (s *server) RemovePeer(name string) error {
  891. s.debugln("server.peer.remove: ", name, len(s.peers))
  892. // Skip the Peer if it has the same name as the Server
  893. if name != s.Name() {
  894. // Return error if peer doesn't exist.
  895. peer := s.peers[name]
  896. if peer == nil {
  897. return fmt.Errorf("raft: Peer not found: %s", name)
  898. }
  899. // Stop peer and remove it.
  900. if s.State() == Leader {
  901. peer.stopHeartbeat(true)
  902. }
  903. delete(s.peers, name)
  904. s.DispatchEvent(newEvent(RemovePeerEventType, name, nil))
  905. }
  906. // Write the configuration to file.
  907. s.writeConf()
  908. return nil
  909. }
  910. //--------------------------------------
  911. // Log compaction
  912. //--------------------------------------
  913. func (s *server) TakeSnapshot() error {
  914. //TODO put a snapshot mutex
  915. s.debugln("take Snapshot")
  916. if s.currentSnapshot != nil {
  917. return errors.New("handling snapshot")
  918. }
  919. lastIndex, lastTerm := s.log.commitInfo()
  920. if lastIndex == 0 {
  921. return errors.New("No logs")
  922. }
  923. path := s.SnapshotPath(lastIndex, lastTerm)
  924. var state []byte
  925. var err error
  926. if s.stateMachine != nil {
  927. state, err = s.stateMachine.Save()
  928. if err != nil {
  929. return err
  930. }
  931. } else {
  932. state = []byte{0}
  933. }
  934. peers := make([]*Peer, len(s.peers)+1)
  935. i := 0
  936. for _, peer := range s.peers {
  937. peers[i] = peer.clone()
  938. i++
  939. }
  940. peers[i] = &Peer{
  941. Name: s.Name(),
  942. ConnectionString: s.connectionString,
  943. }
  944. s.currentSnapshot = &Snapshot{lastIndex, lastTerm, peers, state, path}
  945. s.saveSnapshot()
  946. // We keep some log entries after the snapshot
  947. // We do not want to send the whole snapshot
  948. // to the slightly slow machines
  949. if lastIndex-s.log.startIndex > NumberOfLogEntriesAfterSnapshot {
  950. compactIndex := lastIndex - NumberOfLogEntriesAfterSnapshot
  951. compactTerm := s.log.getEntry(compactIndex).Term
  952. s.log.compact(compactIndex, compactTerm)
  953. }
  954. return nil
  955. }
  956. // Retrieves the log path for the server.
  957. func (s *server) saveSnapshot() error {
  958. if s.currentSnapshot == nil {
  959. return errors.New("no snapshot to save")
  960. }
  961. err := s.currentSnapshot.save()
  962. if err != nil {
  963. return err
  964. }
  965. tmp := s.lastSnapshot
  966. s.lastSnapshot = s.currentSnapshot
  967. // delete the previous snapshot if there is any change
  968. if tmp != nil && !(tmp.LastIndex == s.lastSnapshot.LastIndex && tmp.LastTerm == s.lastSnapshot.LastTerm) {
  969. tmp.remove()
  970. }
  971. s.currentSnapshot = nil
  972. return nil
  973. }
  974. // Retrieves the log path for the server.
  975. func (s *server) SnapshotPath(lastIndex uint64, lastTerm uint64) string {
  976. return path.Join(s.path, "snapshot", fmt.Sprintf("%v_%v.ss", lastTerm, lastIndex))
  977. }
  978. func (s *server) RequestSnapshot(req *SnapshotRequest) *SnapshotResponse {
  979. ret, _ := s.send(req)
  980. resp, _ := ret.(*SnapshotResponse)
  981. return resp
  982. }
  983. func (s *server) processSnapshotRequest(req *SnapshotRequest) *SnapshotResponse {
  984. // If the follower’s log contains an entry at the snapshot’s last index with a term
  985. // that matches the snapshot’s last term
  986. // Then the follower already has all the information found in the snapshot
  987. // and can reply false
  988. entry := s.log.getEntry(req.LastIndex)
  989. if entry != nil && entry.Term == req.LastTerm {
  990. return newSnapshotResponse(false)
  991. }
  992. s.setState(Snapshotting)
  993. return newSnapshotResponse(true)
  994. }
  995. func (s *server) SnapshotRecoveryRequest(req *SnapshotRecoveryRequest) *SnapshotRecoveryResponse {
  996. ret, _ := s.send(req)
  997. resp, _ := ret.(*SnapshotRecoveryResponse)
  998. return resp
  999. }
  1000. func (s *server) processSnapshotRecoveryRequest(req *SnapshotRecoveryRequest) *SnapshotRecoveryResponse {
  1001. s.stateMachine.Recovery(req.State)
  1002. // clear the peer map
  1003. s.peers = make(map[string]*Peer)
  1004. // recovery the cluster configuration
  1005. for _, peer := range req.Peers {
  1006. s.AddPeer(peer.Name, peer.ConnectionString)
  1007. }
  1008. //update term and index
  1009. s.currentTerm = req.LastTerm
  1010. s.log.updateCommitIndex(req.LastIndex)
  1011. snapshotPath := s.SnapshotPath(req.LastIndex, req.LastTerm)
  1012. s.currentSnapshot = &Snapshot{req.LastIndex, req.LastTerm, req.Peers, req.State, snapshotPath}
  1013. s.saveSnapshot()
  1014. // clear the previous log entries
  1015. s.log.compact(req.LastIndex, req.LastTerm)
  1016. return newSnapshotRecoveryResponse(req.LastTerm, true, req.LastIndex)
  1017. }
  1018. // Load a snapshot at restart
  1019. func (s *server) LoadSnapshot() error {
  1020. dir, err := os.OpenFile(path.Join(s.path, "snapshot"), os.O_RDONLY, 0)
  1021. if err != nil {
  1022. return err
  1023. }
  1024. filenames, err := dir.Readdirnames(-1)
  1025. if err != nil {
  1026. dir.Close()
  1027. panic(err)
  1028. }
  1029. dir.Close()
  1030. if len(filenames) == 0 {
  1031. return errors.New("no snapshot")
  1032. }
  1033. // not sure how many snapshot we should keep
  1034. sort.Strings(filenames)
  1035. snapshotPath := path.Join(s.path, "snapshot", filenames[len(filenames)-1])
  1036. // should not fail
  1037. file, err := os.OpenFile(snapshotPath, os.O_RDONLY, 0)
  1038. defer file.Close()
  1039. if err != nil {
  1040. panic(err)
  1041. }
  1042. // TODO check checksum first
  1043. var snapshotBytes []byte
  1044. var checksum uint32
  1045. n, err := fmt.Fscanf(file, "%08x\n", &checksum)
  1046. if err != nil {
  1047. return err
  1048. }
  1049. if n != 1 {
  1050. return errors.New("Bad snapshot file")
  1051. }
  1052. snapshotBytes, _ = ioutil.ReadAll(file)
  1053. s.debugln(string(snapshotBytes))
  1054. // Generate checksum.
  1055. byteChecksum := crc32.ChecksumIEEE(snapshotBytes)
  1056. if uint32(checksum) != byteChecksum {
  1057. s.debugln(checksum, " ", byteChecksum)
  1058. return errors.New("bad snapshot file")
  1059. }
  1060. err = json.Unmarshal(snapshotBytes, &s.lastSnapshot)
  1061. if err != nil {
  1062. s.debugln("unmarshal error: ", err)
  1063. return err
  1064. }
  1065. err = s.stateMachine.Recovery(s.lastSnapshot.State)
  1066. if err != nil {
  1067. s.debugln("recovery error: ", err)
  1068. return err
  1069. }
  1070. for _, peer := range s.lastSnapshot.Peers {
  1071. s.AddPeer(peer.Name, peer.ConnectionString)
  1072. }
  1073. s.log.startTerm = s.lastSnapshot.LastTerm
  1074. s.log.startIndex = s.lastSnapshot.LastIndex
  1075. s.log.updateCommitIndex(s.lastSnapshot.LastIndex)
  1076. return err
  1077. }
  1078. //--------------------------------------
  1079. // Config File
  1080. //--------------------------------------
  1081. func (s *server) writeConf() {
  1082. peers := make([]*Peer, len(s.peers))
  1083. i := 0
  1084. for _, peer := range s.peers {
  1085. peers[i] = peer.clone()
  1086. i++
  1087. }
  1088. r := &Config{
  1089. CommitIndex: s.log.commitIndex,
  1090. Peers: peers,
  1091. }
  1092. b, _ := json.Marshal(r)
  1093. confPath := path.Join(s.path, "conf")
  1094. tmpConfPath := path.Join(s.path, "conf.tmp")
  1095. err := ioutil.WriteFile(tmpConfPath, b, 0600)
  1096. if err != nil {
  1097. panic(err)
  1098. }
  1099. os.Rename(tmpConfPath, confPath)
  1100. }
  1101. // Read the configuration for the server.
  1102. func (s *server) readConf() error {
  1103. confPath := path.Join(s.path, "conf")
  1104. s.debugln("readConf.open ", confPath)
  1105. // open conf file
  1106. b, err := ioutil.ReadFile(confPath)
  1107. if err != nil {
  1108. return nil
  1109. }
  1110. conf := &Config{}
  1111. if err = json.Unmarshal(b, conf); err != nil {
  1112. return err
  1113. }
  1114. s.log.updateCommitIndex(conf.CommitIndex)
  1115. return nil
  1116. }
  1117. //--------------------------------------
  1118. // Debugging
  1119. //--------------------------------------
  1120. func (s *server) debugln(v ...interface{}) {
  1121. debugf("[%s Term:%d] %s", s.name, s.Term(), fmt.Sprintln(v...))
  1122. }
  1123. func (s *server) traceln(v ...interface{}) {
  1124. tracef("[%s] %s", s.name, fmt.Sprintln(v...))
  1125. }