server.go 32 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274
  1. package raft
  2. import (
  3. "encoding/json"
  4. "errors"
  5. "fmt"
  6. "hash/crc32"
  7. "io/ioutil"
  8. "os"
  9. "path"
  10. "sort"
  11. "sync"
  12. "time"
  13. )
  14. //------------------------------------------------------------------------------
  15. //
  16. // Constants
  17. //
  18. //------------------------------------------------------------------------------
  19. const (
  20. Stopped = "stopped"
  21. Follower = "follower"
  22. Candidate = "candidate"
  23. Leader = "leader"
  24. Snapshotting = "snapshotting"
  25. )
  26. const (
  27. MaxLogEntriesPerRequest = 2000
  28. NumberOfLogEntriesAfterSnapshot = 200
  29. )
  30. const (
  31. DefaultHeartbeatTimeout = 50 * time.Millisecond
  32. DefaultElectionTimeout = 150 * time.Millisecond
  33. )
  34. var stopValue interface{}
  35. //------------------------------------------------------------------------------
  36. //
  37. // Errors
  38. //
  39. //------------------------------------------------------------------------------
  40. var NotLeaderError = errors.New("raft.Server: Not current leader")
  41. var DuplicatePeerError = errors.New("raft.Server: Duplicate peer")
  42. var CommandTimeoutError = errors.New("raft: Command timeout")
  43. //------------------------------------------------------------------------------
  44. //
  45. // Typedefs
  46. //
  47. //------------------------------------------------------------------------------
  48. // A server is involved in the consensus protocol and can act as a follower,
  49. // candidate or a leader.
  50. type Server struct {
  51. name string
  52. path string
  53. state string
  54. transporter Transporter
  55. context interface{}
  56. currentTerm uint64
  57. votedFor string
  58. log *Log
  59. leader string
  60. peers map[string]*Peer
  61. mutex sync.RWMutex
  62. syncedPeer map[string]bool
  63. c chan *event
  64. electionTimeout time.Duration
  65. heartbeatTimeout time.Duration
  66. currentSnapshot *Snapshot
  67. lastSnapshot *Snapshot
  68. stateMachine StateMachine
  69. maxLogEntriesPerRequest uint64
  70. }
  71. // An event to be processed by the server's event loop.
  72. type event struct {
  73. target interface{}
  74. returnValue interface{}
  75. c chan error
  76. }
  77. //------------------------------------------------------------------------------
  78. //
  79. // Constructor
  80. //
  81. //------------------------------------------------------------------------------
  82. // Creates a new server with a log at the given path.
  83. func NewServer(name string, path string, transporter Transporter, stateMachine StateMachine, context interface{}) (*Server, error) {
  84. if name == "" {
  85. return nil, errors.New("raft.Server: Name cannot be blank")
  86. }
  87. if transporter == nil {
  88. panic("raft: Transporter required")
  89. }
  90. s := &Server{
  91. name: name,
  92. path: path,
  93. transporter: transporter,
  94. stateMachine: stateMachine,
  95. context: context,
  96. state: Stopped,
  97. peers: make(map[string]*Peer),
  98. log: newLog(),
  99. c: make(chan *event, 256),
  100. electionTimeout: DefaultElectionTimeout,
  101. heartbeatTimeout: DefaultHeartbeatTimeout,
  102. maxLogEntriesPerRequest: MaxLogEntriesPerRequest,
  103. }
  104. // Setup apply function.
  105. s.log.ApplyFunc = func(c Command) (interface{}, error) {
  106. result, err := c.Apply(s)
  107. return result, err
  108. }
  109. return s, nil
  110. }
  111. //------------------------------------------------------------------------------
  112. //
  113. // Accessors
  114. //
  115. //------------------------------------------------------------------------------
  116. //--------------------------------------
  117. // General
  118. //--------------------------------------
  119. // Retrieves the name of the server.
  120. func (s *Server) Name() string {
  121. return s.name
  122. }
  123. // Retrieves the storage path for the server.
  124. func (s *Server) Path() string {
  125. return s.path
  126. }
  127. // The name of the current leader.
  128. func (s *Server) Leader() string {
  129. return s.leader
  130. }
  131. // Retrieves a copy of the peer data.
  132. func (s *Server) Peers() map[string]*Peer {
  133. s.mutex.Lock()
  134. defer s.mutex.Unlock()
  135. peers := make(map[string]*Peer)
  136. for name, peer := range s.peers {
  137. peers[name] = peer.clone()
  138. }
  139. return peers
  140. }
  141. // Retrieves the object that transports requests.
  142. func (s *Server) Transporter() Transporter {
  143. s.mutex.RLock()
  144. defer s.mutex.RUnlock()
  145. return s.transporter
  146. }
  147. func (s *Server) SetTransporter(t Transporter) {
  148. s.mutex.Lock()
  149. defer s.mutex.Unlock()
  150. s.transporter = t
  151. }
  152. // Retrieves the context passed into the constructor.
  153. func (s *Server) Context() interface{} {
  154. return s.context
  155. }
  156. // Retrieves the log path for the server.
  157. func (s *Server) LogPath() string {
  158. return path.Join(s.path, "log")
  159. }
  160. // Retrieves the current state of the server.
  161. func (s *Server) State() string {
  162. s.mutex.RLock()
  163. defer s.mutex.RUnlock()
  164. return s.state
  165. }
  166. // Sets the state of the server.
  167. func (s *Server) setState(state string) {
  168. s.mutex.Lock()
  169. defer s.mutex.Unlock()
  170. s.state = state
  171. if state == Leader {
  172. s.leader = s.Name()
  173. }
  174. }
  175. // Retrieves the current term of the server.
  176. func (s *Server) Term() uint64 {
  177. return s.currentTerm
  178. }
  179. // Retrieves the current commit index of the server.
  180. func (s *Server) CommitIndex() uint64 {
  181. return s.log.commitIndex
  182. }
  183. // Retrieves the name of the candidate this server voted for in this term.
  184. func (s *Server) VotedFor() string {
  185. return s.votedFor
  186. }
  187. // Retrieves whether the server's log has no entries.
  188. func (s *Server) IsLogEmpty() bool {
  189. return s.log.isEmpty()
  190. }
  191. // A list of all the log entries. This should only be used for debugging purposes.
  192. func (s *Server) LogEntries() []*LogEntry {
  193. return s.log.entries
  194. }
  195. // A reference to the command name of the last entry.
  196. func (s *Server) LastCommandName() string {
  197. return s.log.lastCommandName()
  198. }
  199. // Get the state of the server for debugging
  200. func (s *Server) GetState() string {
  201. s.mutex.RLock()
  202. defer s.mutex.RUnlock()
  203. return fmt.Sprintf("Name: %s, State: %s, Term: %v, CommitedIndex: %v ", s.name, s.state, s.currentTerm, s.log.commitIndex)
  204. }
  205. // Check if the server is promotable
  206. func (s *Server) promotable() bool {
  207. return s.log.currentIndex() > 0
  208. }
  209. //--------------------------------------
  210. // Membership
  211. //--------------------------------------
  212. // Retrieves the number of member servers in the consensus.
  213. func (s *Server) MemberCount() int {
  214. s.mutex.Lock()
  215. defer s.mutex.Unlock()
  216. return len(s.peers) + 1
  217. }
  218. // Retrieves the number of servers required to make a quorum.
  219. func (s *Server) QuorumSize() int {
  220. return (s.MemberCount() / 2) + 1
  221. }
  222. //--------------------------------------
  223. // Election timeout
  224. //--------------------------------------
  225. // Retrieves the election timeout.
  226. func (s *Server) ElectionTimeout() time.Duration {
  227. s.mutex.RLock()
  228. defer s.mutex.RUnlock()
  229. return s.electionTimeout
  230. }
  231. // Sets the election timeout.
  232. func (s *Server) SetElectionTimeout(duration time.Duration) {
  233. s.mutex.Lock()
  234. defer s.mutex.Unlock()
  235. s.electionTimeout = duration
  236. }
  237. //--------------------------------------
  238. // Heartbeat timeout
  239. //--------------------------------------
  240. // Retrieves the heartbeat timeout.
  241. func (s *Server) HeartbeatTimeout() time.Duration {
  242. s.mutex.RLock()
  243. defer s.mutex.RUnlock()
  244. return s.heartbeatTimeout
  245. }
  246. // Sets the heartbeat timeout.
  247. func (s *Server) SetHeartbeatTimeout(duration time.Duration) {
  248. s.mutex.Lock()
  249. defer s.mutex.Unlock()
  250. s.heartbeatTimeout = duration
  251. for _, peer := range s.peers {
  252. peer.setHeartbeatTimeout(duration)
  253. }
  254. }
  255. //------------------------------------------------------------------------------
  256. //
  257. // Methods
  258. //
  259. //------------------------------------------------------------------------------
  260. //--------------------------------------
  261. // Initialization
  262. //--------------------------------------
  263. // Reg the NOPCommand
  264. func init() {
  265. RegisterCommand(&NOPCommand{})
  266. RegisterCommand(&DefaultJoinCommand{})
  267. RegisterCommand(&DefaultLeaveCommand{})
  268. }
  269. // Start as follow
  270. // If log entries exist then allow promotion to candidate if no AEs received.
  271. // If no log entries exist then wait for AEs from another node.
  272. // If no log entries exist and a self-join command is issued then
  273. // immediately become leader and commit entry.
  274. func (s *Server) Start() error {
  275. // Exit if the server is already running.
  276. if s.state != Stopped {
  277. return errors.New("raft.Server: Server already running")
  278. }
  279. // Create snapshot directory if not exist
  280. os.Mkdir(path.Join(s.path, "snapshot"), 0700)
  281. if err := s.readConf(); err != nil {
  282. s.debugln("raft: Conf file error: ", err)
  283. return fmt.Errorf("raft: Initialization error: %s", err)
  284. }
  285. // Initialize the log and load it up.
  286. if err := s.log.open(s.LogPath()); err != nil {
  287. s.debugln("raft: Log error: ", err)
  288. return fmt.Errorf("raft: Initialization error: %s", err)
  289. }
  290. // Update the term to the last term in the log.
  291. _, s.currentTerm = s.log.lastInfo()
  292. s.setState(Follower)
  293. // If no log entries exist then
  294. // 1. wait for AEs from another node
  295. // 2. wait for self-join command
  296. // to set itself promotable
  297. if !s.promotable() {
  298. s.debugln("start as a new raft server")
  299. // If log entries exist then allow promotion to candidate
  300. // if no AEs received.
  301. } else {
  302. s.debugln("start from previous saved state")
  303. }
  304. debugln(s.GetState())
  305. go s.loop()
  306. return nil
  307. }
  308. // Shuts down the server.
  309. func (s *Server) Stop() {
  310. s.send(&stopValue)
  311. s.mutex.Lock()
  312. defer s.mutex.Unlock()
  313. s.log.close()
  314. }
  315. // Checks if the server is currently running.
  316. func (s *Server) Running() bool {
  317. s.mutex.RLock()
  318. defer s.mutex.RUnlock()
  319. return s.state != Stopped
  320. }
  321. //--------------------------------------
  322. // Term
  323. //--------------------------------------
  324. // Sets the current term for the server. This is only used when an external
  325. // current term is found.
  326. func (s *Server) setCurrentTerm(term uint64, leaderName string, append bool) {
  327. s.mutex.Lock()
  328. defer s.mutex.Unlock()
  329. // update the term and clear vote for
  330. if term > s.currentTerm {
  331. s.state = Follower
  332. s.currentTerm = term
  333. s.leader = leaderName
  334. s.votedFor = ""
  335. return
  336. }
  337. // discover new leader when candidate
  338. // save leader name when follower
  339. if term == s.currentTerm && s.state != Leader && append {
  340. s.state = Follower
  341. s.leader = leaderName
  342. }
  343. }
  344. //--------------------------------------
  345. // Event Loop
  346. //--------------------------------------
  347. // ________
  348. // --|Snapshot| timeout
  349. // | -------- ______
  350. // recover | ^ | |
  351. // snapshot / | |snapshot | |
  352. // higher | | v | recv majority votes
  353. // term | -------- timeout ----------- -----------
  354. // |-> |Follower| ----------> | Candidate |--------------------> | Leader |
  355. // -------- ----------- -----------
  356. // ^ higher term/ | higher term |
  357. // | new leader | |
  358. // |_______________________|____________________________________ |
  359. // The main event loop for the server
  360. func (s *Server) loop() {
  361. defer s.debugln("server.loop.end")
  362. for {
  363. state := s.State()
  364. s.debugln("server.loop.run ", state)
  365. switch state {
  366. case Follower:
  367. s.followerLoop()
  368. case Candidate:
  369. s.candidateLoop()
  370. case Leader:
  371. s.leaderLoop()
  372. case Snapshotting:
  373. s.snapshotLoop()
  374. case Stopped:
  375. return
  376. }
  377. }
  378. }
  379. // Sends an event to the event loop to be processed. The function will wait
  380. // until the event is actually processed before returning.
  381. func (s *Server) send(value interface{}) (interface{}, error) {
  382. event := s.sendAsync(value)
  383. err := <-event.c
  384. return event.returnValue, err
  385. }
  386. func (s *Server) sendAsync(value interface{}) *event {
  387. event := &event{target: value, c: make(chan error, 1)}
  388. s.c <- event
  389. return event
  390. }
  391. // The event loop that is run when the server is in a Follower state.
  392. // Responds to RPCs from candidates and leaders.
  393. // Converts to candidate if election timeout elapses without either:
  394. // 1.Receiving valid AppendEntries RPC, or
  395. // 2.Granting vote to candidate
  396. func (s *Server) followerLoop() {
  397. s.setState(Follower)
  398. timeoutChan := afterBetween(s.ElectionTimeout(), s.ElectionTimeout()*2)
  399. for {
  400. var err error
  401. update := false
  402. select {
  403. case e := <-s.c:
  404. if e.target == &stopValue {
  405. s.setState(Stopped)
  406. } else {
  407. switch req := e.target.(type) {
  408. case JoinCommand:
  409. //If no log entries exist and a self-join command is issued
  410. //then immediately become leader and commit entry.
  411. if s.log.currentIndex() == 0 && req.NodeName() == s.Name() {
  412. s.debugln("selfjoin and promote to leader")
  413. s.setState(Leader)
  414. s.processCommand(req, e)
  415. } else {
  416. err = NotLeaderError
  417. }
  418. case *AppendEntriesRequest:
  419. e.returnValue, update = s.processAppendEntriesRequest(req)
  420. case *RequestVoteRequest:
  421. e.returnValue, update = s.processRequestVoteRequest(req)
  422. case *SnapshotRequest:
  423. e.returnValue = s.processSnapshotRequest(req)
  424. default:
  425. err = NotLeaderError
  426. }
  427. }
  428. // Callback to event.
  429. e.c <- err
  430. case <-timeoutChan:
  431. // only allow synced follower to promote to candidate
  432. if s.promotable() {
  433. s.setState(Candidate)
  434. } else {
  435. update = true
  436. }
  437. }
  438. // Converts to candidate if election timeout elapses without either:
  439. // 1.Receiving valid AppendEntries RPC, or
  440. // 2.Granting vote to candidate
  441. if update {
  442. timeoutChan = afterBetween(s.ElectionTimeout(), s.ElectionTimeout()*2)
  443. }
  444. // Exit loop on state change.
  445. if s.State() != Follower {
  446. break
  447. }
  448. }
  449. }
  450. // The event loop that is run when the server is in a Candidate state.
  451. func (s *Server) candidateLoop() {
  452. lastLogIndex, lastLogTerm := s.log.lastInfo()
  453. s.leader = ""
  454. for {
  455. // Increment current term, vote for self.
  456. s.currentTerm++
  457. s.votedFor = s.name
  458. // Send RequestVote RPCs to all other servers.
  459. respChan := make(chan *RequestVoteResponse, len(s.peers))
  460. for _, peer := range s.peers {
  461. go peer.sendVoteRequest(newRequestVoteRequest(s.currentTerm, s.name, lastLogIndex, lastLogTerm), respChan)
  462. }
  463. // Wait for either:
  464. // * Votes received from majority of servers: become leader
  465. // * AppendEntries RPC received from new leader: step down.
  466. // * Election timeout elapses without election resolution: increment term, start new election
  467. // * Discover higher term: step down (§5.1)
  468. votesGranted := 1
  469. timeoutChan := afterBetween(s.ElectionTimeout(), s.ElectionTimeout()*2)
  470. timeout := false
  471. for {
  472. // If we received enough votes then stop waiting for more votes.
  473. s.debugln("server.candidate.votes: ", votesGranted, " quorum:", s.QuorumSize())
  474. if votesGranted >= s.QuorumSize() {
  475. s.setState(Leader)
  476. break
  477. }
  478. // Collect votes from peers.
  479. select {
  480. case resp := <-respChan:
  481. if resp.VoteGranted {
  482. s.debugln("server.candidate.vote.granted: ", votesGranted)
  483. votesGranted++
  484. } else if resp.Term > s.currentTerm {
  485. s.debugln("server.candidate.vote.failed")
  486. s.setCurrentTerm(resp.Term, "", false)
  487. } else {
  488. s.debugln("server.candidate.vote: denied")
  489. }
  490. case e := <-s.c:
  491. var err error
  492. if e.target == &stopValue {
  493. s.setState(Stopped)
  494. } else {
  495. switch req := e.target.(type) {
  496. case Command:
  497. err = NotLeaderError
  498. case *AppendEntriesRequest:
  499. e.returnValue, _ = s.processAppendEntriesRequest(req)
  500. case *RequestVoteRequest:
  501. e.returnValue, _ = s.processRequestVoteRequest(req)
  502. }
  503. }
  504. // Callback to event.
  505. e.c <- err
  506. case <-timeoutChan:
  507. timeout = true
  508. }
  509. // both process AER and RVR can make the server to follower
  510. // also break when timeout happens
  511. if s.State() != Candidate || timeout {
  512. break
  513. }
  514. }
  515. // break when we are not candidate
  516. if s.State() != Candidate {
  517. break
  518. }
  519. // continue when timeout happened
  520. }
  521. }
  522. // The event loop that is run when the server is in a Leader state.
  523. func (s *Server) leaderLoop() {
  524. s.setState(Leader)
  525. s.syncedPeer = make(map[string]bool)
  526. logIndex, _ := s.log.lastInfo()
  527. // Update the peers prevLogIndex to leader's lastLogIndex and start heartbeat.
  528. s.debugln("leaderLoop.set.PrevIndex to ", logIndex)
  529. for _, peer := range s.peers {
  530. peer.setPrevLogIndex(logIndex)
  531. peer.startHeartbeat()
  532. }
  533. go s.Do(NOPCommand{})
  534. // Begin to collect response from followers
  535. for {
  536. var err error
  537. select {
  538. case e := <-s.c:
  539. if e.target == &stopValue {
  540. s.setState(Stopped)
  541. } else {
  542. switch req := e.target.(type) {
  543. case Command:
  544. s.processCommand(req, e)
  545. continue
  546. case *AppendEntriesRequest:
  547. e.returnValue, _ = s.processAppendEntriesRequest(req)
  548. case *AppendEntriesResponse:
  549. s.processAppendEntriesResponse(req)
  550. case *RequestVoteRequest:
  551. e.returnValue, _ = s.processRequestVoteRequest(req)
  552. }
  553. }
  554. // Callback to event.
  555. e.c <- err
  556. }
  557. // Exit loop on state change.
  558. if s.State() != Leader {
  559. break
  560. }
  561. }
  562. // Stop all peers.
  563. for _, peer := range s.peers {
  564. peer.stopHeartbeat(false)
  565. }
  566. s.syncedPeer = nil
  567. }
  568. func (s *Server) snapshotLoop() {
  569. s.setState(Snapshotting)
  570. for {
  571. var err error
  572. e := <-s.c
  573. if e.target == &stopValue {
  574. s.setState(Stopped)
  575. } else {
  576. switch req := e.target.(type) {
  577. case Command:
  578. err = NotLeaderError
  579. case *AppendEntriesRequest:
  580. e.returnValue, _ = s.processAppendEntriesRequest(req)
  581. case *RequestVoteRequest:
  582. e.returnValue, _ = s.processRequestVoteRequest(req)
  583. case *SnapshotRecoveryRequest:
  584. e.returnValue = s.processSnapshotRecoveryRequest(req)
  585. }
  586. }
  587. // Callback to event.
  588. e.c <- err
  589. // Exit loop on state change.
  590. if s.State() != Snapshotting {
  591. break
  592. }
  593. }
  594. }
  595. //--------------------------------------
  596. // Commands
  597. //--------------------------------------
  598. // Attempts to execute a command and replicate it. The function will return
  599. // when the command has been successfully committed or an error has occurred.
  600. func (s *Server) Do(command Command) (interface{}, error) {
  601. return s.send(command)
  602. }
  603. // Processes a command.
  604. func (s *Server) processCommand(command Command, e *event) {
  605. s.debugln("server.command.process")
  606. // Create an entry for the command in the log.
  607. entry, err := s.log.createEntry(s.currentTerm, command)
  608. if err != nil {
  609. s.debugln("server.command.log.entry.error:", err)
  610. e.c <- err
  611. return
  612. }
  613. if err := s.log.appendEntry(entry); err != nil {
  614. s.debugln("server.command.log.error:", err)
  615. e.c <- err
  616. return
  617. }
  618. // Issue a callback for the entry once it's committed.
  619. go func() {
  620. // Wait for the entry to be committed.
  621. select {
  622. case <-entry.commit:
  623. var err error
  624. s.debugln("server.command.commit")
  625. e.returnValue, err = s.log.getEntryResult(entry, true)
  626. e.c <- err
  627. case <-time.After(time.Second):
  628. s.debugln("server.command.timeout")
  629. e.c <- CommandTimeoutError
  630. }
  631. }()
  632. // Issue an append entries response for the server.
  633. resp := newAppendEntriesResponse(s.currentTerm, true, s.log.currentIndex(), s.log.CommitIndex())
  634. resp.append = true
  635. resp.peer = s.Name()
  636. // this must be async
  637. // sendAsync is not really async every time
  638. // when the sending speed of the user is larger than
  639. // the processing speed of the server, the buffered channel
  640. // will be full. Then sendAsync will become sync, which will
  641. // cause deadlock here.
  642. // so we use a goroutine to avoid the deadlock
  643. go s.sendAsync(resp)
  644. }
  645. //--------------------------------------
  646. // Append Entries
  647. //--------------------------------------
  648. // Appends zero or more log entry from the leader to this server.
  649. func (s *Server) AppendEntries(req *AppendEntriesRequest) *AppendEntriesResponse {
  650. ret, _ := s.send(req)
  651. resp, _ := ret.(*AppendEntriesResponse)
  652. return resp
  653. }
  654. // Processes the "append entries" request.
  655. func (s *Server) processAppendEntriesRequest(req *AppendEntriesRequest) (*AppendEntriesResponse, bool) {
  656. s.traceln("server.ae.process")
  657. if req.Term < s.currentTerm {
  658. s.debugln("server.ae.error: stale term")
  659. return newAppendEntriesResponse(s.currentTerm, false, s.log.currentIndex(), s.log.CommitIndex()), false
  660. }
  661. // Update term and leader.
  662. s.setCurrentTerm(req.Term, req.LeaderName, true)
  663. // Reject if log doesn't contain a matching previous entry.
  664. if err := s.log.truncate(req.PrevLogIndex, req.PrevLogTerm); err != nil {
  665. s.debugln("server.ae.truncate.error: ", err)
  666. return newAppendEntriesResponse(s.currentTerm, false, s.log.currentIndex(), s.log.CommitIndex()), true
  667. }
  668. // Append entries to the log.
  669. if err := s.log.appendEntries(req.Entries); err != nil {
  670. s.debugln("server.ae.append.error: ", err)
  671. return newAppendEntriesResponse(s.currentTerm, false, s.log.currentIndex(), s.log.CommitIndex()), true
  672. }
  673. // Commit up to the commit index.
  674. if err := s.log.setCommitIndex(req.CommitIndex); err != nil {
  675. s.debugln("server.ae.commit.error: ", err)
  676. return newAppendEntriesResponse(s.currentTerm, false, s.log.currentIndex(), s.log.CommitIndex()), true
  677. }
  678. // once the server appended and commited all the log entries from the leader
  679. return newAppendEntriesResponse(s.currentTerm, true, s.log.currentIndex(), s.log.CommitIndex()), true
  680. }
  681. // Processes the "append entries" response from the peer. This is only
  682. // processed when the server is a leader. Responses received during other
  683. // states are dropped.
  684. func (s *Server) processAppendEntriesResponse(resp *AppendEntriesResponse) {
  685. // If we find a higher term then change to a follower and exit.
  686. if resp.Term > s.currentTerm {
  687. s.setCurrentTerm(resp.Term, "", false)
  688. return
  689. }
  690. // panic response if it's not successful.
  691. if !resp.Success {
  692. return
  693. }
  694. // if one peer successfully append a log from the leader term,
  695. // we add it to the synced list
  696. if resp.append == true {
  697. s.syncedPeer[resp.peer] = true
  698. }
  699. // Increment the commit count to make sure we have a quorum before committing.
  700. if len(s.syncedPeer) < s.QuorumSize() {
  701. return
  702. }
  703. // Determine the committed index that a majority has.
  704. var indices []uint64
  705. indices = append(indices, s.log.currentIndex())
  706. for _, peer := range s.peers {
  707. indices = append(indices, peer.getPrevLogIndex())
  708. }
  709. sort.Sort(uint64Slice(indices))
  710. // We can commit up to the index which the majority of the members have appended.
  711. commitIndex := indices[s.QuorumSize()-1]
  712. committedIndex := s.log.commitIndex
  713. if commitIndex > committedIndex {
  714. s.log.setCommitIndex(commitIndex)
  715. s.debugln("commit index ", commitIndex)
  716. for i := committedIndex; i < commitIndex; i++ {
  717. if entry := s.log.getEntry(i + 1); entry != nil {
  718. // if the leader is a new one and the entry came from the
  719. // old leader, the commit channel will be nil and no go routine
  720. // is waiting from this channel
  721. // if we try to send to it, the new leader will get stuck
  722. if entry.commit != nil {
  723. select {
  724. case entry.commit <- true:
  725. default:
  726. panic("server unable to send signal to commit channel")
  727. }
  728. }
  729. }
  730. }
  731. }
  732. }
  733. //--------------------------------------
  734. // Request Vote
  735. //--------------------------------------
  736. // Requests a vote from a server. A vote can be obtained if the vote's term is
  737. // at the server's current term and the server has not made a vote yet. A vote
  738. // can also be obtained if the term is greater than the server's current term.
  739. func (s *Server) RequestVote(req *RequestVoteRequest) *RequestVoteResponse {
  740. ret, _ := s.send(req)
  741. resp, _ := ret.(*RequestVoteResponse)
  742. return resp
  743. }
  744. // Processes a "request vote" request.
  745. func (s *Server) processRequestVoteRequest(req *RequestVoteRequest) (*RequestVoteResponse, bool) {
  746. // If the request is coming from an old term then reject it.
  747. if req.Term < s.currentTerm {
  748. s.debugln("server.rv.error: stale term")
  749. return newRequestVoteResponse(s.currentTerm, false), false
  750. }
  751. s.setCurrentTerm(req.Term, "", false)
  752. // If we've already voted for a different candidate then don't vote for this candidate.
  753. if s.votedFor != "" && s.votedFor != req.CandidateName {
  754. s.debugln("server.rv.error: duplicate vote: ", req.CandidateName,
  755. " already vote for ", s.votedFor)
  756. return newRequestVoteResponse(s.currentTerm, false), false
  757. }
  758. // If the candidate's log is not at least as up-to-date as our last log then don't vote.
  759. lastIndex, lastTerm := s.log.lastInfo()
  760. if lastIndex > req.LastLogIndex || lastTerm > req.LastLogTerm {
  761. s.debugln("server.rv.error: out of date log: ", req.CandidateName,
  762. "Index :[", lastIndex, "]", " [", req.LastLogIndex, "]",
  763. "Term :[", lastTerm, "]", " [", req.LastLogTerm, "]")
  764. return newRequestVoteResponse(s.currentTerm, false), false
  765. }
  766. // If we made it this far then cast a vote and reset our election time out.
  767. s.debugln("server.rv.vote: ", s.name, " votes for", req.CandidateName, "at term", req.Term)
  768. s.votedFor = req.CandidateName
  769. return newRequestVoteResponse(s.currentTerm, true), true
  770. }
  771. //--------------------------------------
  772. // Membership
  773. //--------------------------------------
  774. // Adds a peer to the server.
  775. func (s *Server) AddPeer(name string, connectiongString string) error {
  776. s.debugln("server.peer.add: ", name, len(s.peers))
  777. defer s.writeConf()
  778. // Do not allow peers to be added twice.
  779. if s.peers[name] != nil {
  780. return nil
  781. }
  782. // Skip the Peer if it has the same name as the Server
  783. if s.name == name {
  784. return nil
  785. }
  786. peer := newPeer(s, name, connectiongString, s.heartbeatTimeout)
  787. if s.State() == Leader {
  788. peer.startHeartbeat()
  789. }
  790. s.peers[peer.Name] = peer
  791. s.debugln("server.peer.conf.write: ", name)
  792. return nil
  793. }
  794. // Removes a peer from the server.
  795. func (s *Server) RemovePeer(name string) error {
  796. s.debugln("server.peer.remove: ", name, len(s.peers))
  797. defer s.writeConf()
  798. if name == s.Name() {
  799. // when the removed node restart, it should be able
  800. // to know it has been removed before. So we need
  801. // to update knownCommitIndex
  802. return nil
  803. }
  804. // Return error if peer doesn't exist.
  805. peer := s.peers[name]
  806. if peer == nil {
  807. return fmt.Errorf("raft: Peer not found: %s", name)
  808. }
  809. // Stop peer and remove it.
  810. if s.State() == Leader {
  811. peer.stopHeartbeat(true)
  812. }
  813. delete(s.peers, name)
  814. return nil
  815. }
  816. //--------------------------------------
  817. // Log compaction
  818. //--------------------------------------
  819. func (s *Server) TakeSnapshot() error {
  820. //TODO put a snapshot mutex
  821. s.debugln("take Snapshot")
  822. if s.currentSnapshot != nil {
  823. return errors.New("handling snapshot")
  824. }
  825. lastIndex, lastTerm := s.log.commitInfo()
  826. if lastIndex == 0 {
  827. return errors.New("No logs")
  828. }
  829. path := s.SnapshotPath(lastIndex, lastTerm)
  830. var state []byte
  831. var err error
  832. if s.stateMachine != nil {
  833. state, err = s.stateMachine.Save()
  834. if err != nil {
  835. return err
  836. }
  837. } else {
  838. state = []byte{0}
  839. }
  840. var peers []*Peer
  841. for _, peer := range s.peers {
  842. peers = append(peers, peer.clone())
  843. }
  844. s.currentSnapshot = &Snapshot{lastIndex, lastTerm, peers, state, path}
  845. s.saveSnapshot()
  846. // We keep some log entries after the snapshot
  847. // We do not want to send the whole snapshot
  848. // to the slightly slow machines
  849. if lastIndex-s.log.startIndex > NumberOfLogEntriesAfterSnapshot {
  850. compactIndex := lastIndex - NumberOfLogEntriesAfterSnapshot
  851. compactTerm := s.log.getEntry(compactIndex).Term
  852. s.log.compact(compactIndex, compactTerm)
  853. }
  854. return nil
  855. }
  856. // Retrieves the log path for the server.
  857. func (s *Server) saveSnapshot() error {
  858. if s.currentSnapshot == nil {
  859. return errors.New("no snapshot to save")
  860. }
  861. err := s.currentSnapshot.save()
  862. if err != nil {
  863. return err
  864. }
  865. tmp := s.lastSnapshot
  866. s.lastSnapshot = s.currentSnapshot
  867. // delete the previous snapshot if there is any change
  868. if tmp != nil && !(tmp.LastIndex == s.lastSnapshot.LastIndex && tmp.LastTerm == s.lastSnapshot.LastTerm) {
  869. tmp.remove()
  870. }
  871. s.currentSnapshot = nil
  872. return nil
  873. }
  874. // Retrieves the log path for the server.
  875. func (s *Server) SnapshotPath(lastIndex uint64, lastTerm uint64) string {
  876. return path.Join(s.path, "snapshot", fmt.Sprintf("%v_%v.ss", lastTerm, lastIndex))
  877. }
  878. func (s *Server) RequestSnapshot(req *SnapshotRequest) *SnapshotResponse {
  879. ret, _ := s.send(req)
  880. resp, _ := ret.(*SnapshotResponse)
  881. return resp
  882. }
  883. func (s *Server) processSnapshotRequest(req *SnapshotRequest) *SnapshotResponse {
  884. // If the follower’s log contains an entry at the snapshot’s last index with a term
  885. // that matches the snapshot’s last term
  886. // Then the follower already has all the information found in the snapshot
  887. // and can reply false
  888. entry := s.log.getEntry(req.LastIndex)
  889. if entry != nil && entry.Term == req.LastTerm {
  890. return newSnapshotResponse(false)
  891. }
  892. s.setState(Snapshotting)
  893. return newSnapshotResponse(true)
  894. }
  895. func (s *Server) SnapshotRecoveryRequest(req *SnapshotRecoveryRequest) *SnapshotRecoveryResponse {
  896. ret, _ := s.send(req)
  897. resp, _ := ret.(*SnapshotRecoveryResponse)
  898. return resp
  899. }
  900. func (s *Server) processSnapshotRecoveryRequest(req *SnapshotRecoveryRequest) *SnapshotRecoveryResponse {
  901. s.stateMachine.Recovery(req.State)
  902. // clear the peer map
  903. s.peers = make(map[string]*Peer)
  904. // recovery the cluster configuration
  905. for _, peer := range req.Peers {
  906. s.AddPeer(peer.Name, peer.ConnectionString)
  907. }
  908. //update term and index
  909. s.currentTerm = req.LastTerm
  910. s.log.updateCommitIndex(req.LastIndex)
  911. snapshotPath := s.SnapshotPath(req.LastIndex, req.LastTerm)
  912. s.currentSnapshot = &Snapshot{req.LastIndex, req.LastTerm, req.Peers, req.State, snapshotPath}
  913. s.saveSnapshot()
  914. // clear the previous log entries
  915. s.log.compact(req.LastIndex, req.LastTerm)
  916. return newSnapshotRecoveryResponse(req.LastTerm, true, req.LastIndex)
  917. }
  918. // Load a snapshot at restart
  919. func (s *Server) LoadSnapshot() error {
  920. dir, err := os.OpenFile(path.Join(s.path, "snapshot"), os.O_RDONLY, 0)
  921. if err != nil {
  922. return err
  923. }
  924. filenames, err := dir.Readdirnames(-1)
  925. if err != nil {
  926. dir.Close()
  927. panic(err)
  928. }
  929. dir.Close()
  930. if len(filenames) == 0 {
  931. return errors.New("no snapshot")
  932. }
  933. // not sure how many snapshot we should keep
  934. sort.Strings(filenames)
  935. snapshotPath := path.Join(s.path, "snapshot", filenames[len(filenames)-1])
  936. // should not fail
  937. file, err := os.OpenFile(snapshotPath, os.O_RDONLY, 0)
  938. defer file.Close()
  939. if err != nil {
  940. panic(err)
  941. }
  942. // TODO check checksum first
  943. var snapshotBytes []byte
  944. var checksum uint32
  945. n, err := fmt.Fscanf(file, "%08x\n", &checksum)
  946. if err != nil {
  947. return err
  948. }
  949. if n != 1 {
  950. return errors.New("Bad snapshot file")
  951. }
  952. snapshotBytes, _ = ioutil.ReadAll(file)
  953. s.debugln(string(snapshotBytes))
  954. // Generate checksum.
  955. byteChecksum := crc32.ChecksumIEEE(snapshotBytes)
  956. if uint32(checksum) != byteChecksum {
  957. s.debugln(checksum, " ", byteChecksum)
  958. return errors.New("bad snapshot file")
  959. }
  960. err = json.Unmarshal(snapshotBytes, &s.lastSnapshot)
  961. if err != nil {
  962. s.debugln("unmarshal error: ", err)
  963. return err
  964. }
  965. err = s.stateMachine.Recovery(s.lastSnapshot.State)
  966. if err != nil {
  967. s.debugln("recovery error: ", err)
  968. return err
  969. }
  970. for _, peer := range s.lastSnapshot.Peers {
  971. s.AddPeer(peer.Name, peer.ConnectionString)
  972. }
  973. s.log.startTerm = s.lastSnapshot.LastTerm
  974. s.log.startIndex = s.lastSnapshot.LastIndex
  975. s.log.updateCommitIndex(s.lastSnapshot.LastIndex)
  976. return err
  977. }
  978. //--------------------------------------
  979. // Config File
  980. //--------------------------------------
  981. func (s *Server) writeConf() {
  982. peers := make([]*Peer, len(s.peers))
  983. i := 0
  984. for _, peer := range s.peers {
  985. peers[i] = peer.clone()
  986. i++
  987. }
  988. r := &Config{
  989. CommitIndex: s.log.commitIndex,
  990. Peers: peers,
  991. }
  992. b, _ := json.Marshal(r)
  993. confPath := path.Join(s.path, "conf")
  994. tmpConfPath := path.Join(s.path, "conf.tmp")
  995. err := ioutil.WriteFile(tmpConfPath, b, 0600)
  996. if err != nil {
  997. panic(err)
  998. }
  999. os.Rename(tmpConfPath, confPath)
  1000. }
  1001. // Read the configuration for the server.
  1002. func (s *Server) readConf() error {
  1003. confPath := path.Join(s.path, "conf")
  1004. s.debugln("readConf.open ", confPath)
  1005. // open conf file
  1006. b, err := ioutil.ReadFile(confPath)
  1007. if err != nil {
  1008. return nil
  1009. }
  1010. conf := &Config{}
  1011. if err = json.Unmarshal(b, conf); err != nil {
  1012. return err
  1013. }
  1014. s.log.commitIndex = conf.CommitIndex
  1015. return nil
  1016. }
  1017. //--------------------------------------
  1018. // Debugging
  1019. //--------------------------------------
  1020. func (s *Server) debugln(v ...interface{}) {
  1021. debugf("[%s Term:%d] %s", s.name, s.currentTerm, fmt.Sprintln(v...))
  1022. }
  1023. func (s *Server) traceln(v ...interface{}) {
  1024. tracef("[%s] %s", s.name, fmt.Sprintln(v...))
  1025. }