server.go 32 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286
  1. package raft
  2. import (
  3. "encoding/json"
  4. "errors"
  5. "fmt"
  6. "hash/crc32"
  7. "io/ioutil"
  8. "os"
  9. "path"
  10. "sort"
  11. "sync"
  12. "time"
  13. )
  14. //------------------------------------------------------------------------------
  15. //
  16. // Constants
  17. //
  18. //------------------------------------------------------------------------------
  19. const (
  20. Stopped = "stopped"
  21. Follower = "follower"
  22. Candidate = "candidate"
  23. Leader = "leader"
  24. Snapshotting = "snapshotting"
  25. )
  26. const (
  27. MaxLogEntriesPerRequest = 2000
  28. NumberOfLogEntriesAfterSnapshot = 200
  29. )
  30. const (
  31. DefaultHeartbeatTimeout = 50 * time.Millisecond
  32. DefaultElectionTimeout = 150 * time.Millisecond
  33. )
  34. var stopValue interface{}
  35. //------------------------------------------------------------------------------
  36. //
  37. // Errors
  38. //
  39. //------------------------------------------------------------------------------
  40. var NotLeaderError = errors.New("raft.Server: Not current leader")
  41. var DuplicatePeerError = errors.New("raft.Server: Duplicate peer")
  42. var CommandTimeoutError = errors.New("raft: Command timeout")
  43. //------------------------------------------------------------------------------
  44. //
  45. // Typedefs
  46. //
  47. //------------------------------------------------------------------------------
  48. // A server is involved in the consensus protocol and can act as a follower,
  49. // candidate or a leader.
  50. type Server struct {
  51. name string
  52. path string
  53. state string
  54. transporter Transporter
  55. context interface{}
  56. currentTerm uint64
  57. votedFor string
  58. log *Log
  59. leader string
  60. peers map[string]*Peer
  61. mutex sync.RWMutex
  62. syncedPeer map[string]bool
  63. c chan *event
  64. electionTimeout time.Duration
  65. heartbeatTimeout time.Duration
  66. currentSnapshot *Snapshot
  67. lastSnapshot *Snapshot
  68. stateMachine StateMachine
  69. maxLogEntriesPerRequest uint64
  70. connectionString string
  71. }
  72. // An event to be processed by the server's event loop.
  73. type event struct {
  74. target interface{}
  75. returnValue interface{}
  76. c chan error
  77. }
  78. //------------------------------------------------------------------------------
  79. //
  80. // Constructor
  81. //
  82. //------------------------------------------------------------------------------
  83. // Creates a new server with a log at the given path.
  84. func NewServer(name string, path string, transporter Transporter, stateMachine StateMachine, context interface{}, connectionString string) (*Server, error) {
  85. if name == "" {
  86. return nil, errors.New("raft.Server: Name cannot be blank")
  87. }
  88. if transporter == nil {
  89. panic("raft: Transporter required")
  90. }
  91. s := &Server{
  92. name: name,
  93. path: path,
  94. transporter: transporter,
  95. stateMachine: stateMachine,
  96. context: context,
  97. state: Stopped,
  98. peers: make(map[string]*Peer),
  99. log: newLog(),
  100. c: make(chan *event, 256),
  101. electionTimeout: DefaultElectionTimeout,
  102. heartbeatTimeout: DefaultHeartbeatTimeout,
  103. maxLogEntriesPerRequest: MaxLogEntriesPerRequest,
  104. connectionString: connectionString,
  105. }
  106. // Setup apply function.
  107. s.log.ApplyFunc = func(c Command) (interface{}, error) {
  108. result, err := c.Apply(s)
  109. return result, err
  110. }
  111. return s, nil
  112. }
  113. //------------------------------------------------------------------------------
  114. //
  115. // Accessors
  116. //
  117. //------------------------------------------------------------------------------
  118. //--------------------------------------
  119. // General
  120. //--------------------------------------
  121. // Retrieves the name of the server.
  122. func (s *Server) Name() string {
  123. return s.name
  124. }
  125. // Retrieves the storage path for the server.
  126. func (s *Server) Path() string {
  127. return s.path
  128. }
  129. // The name of the current leader.
  130. func (s *Server) Leader() string {
  131. return s.leader
  132. }
  133. // Retrieves a copy of the peer data.
  134. func (s *Server) Peers() map[string]*Peer {
  135. s.mutex.Lock()
  136. defer s.mutex.Unlock()
  137. peers := make(map[string]*Peer)
  138. for name, peer := range s.peers {
  139. peers[name] = peer.clone()
  140. }
  141. return peers
  142. }
  143. // Retrieves the object that transports requests.
  144. func (s *Server) Transporter() Transporter {
  145. s.mutex.RLock()
  146. defer s.mutex.RUnlock()
  147. return s.transporter
  148. }
  149. func (s *Server) SetTransporter(t Transporter) {
  150. s.mutex.Lock()
  151. defer s.mutex.Unlock()
  152. s.transporter = t
  153. }
  154. // Retrieves the context passed into the constructor.
  155. func (s *Server) Context() interface{} {
  156. return s.context
  157. }
  158. // Retrieves the state machine passed into the constructor.
  159. func (s *Server) StateMachine() StateMachine {
  160. return s.stateMachine
  161. }
  162. // Retrieves the log path for the server.
  163. func (s *Server) LogPath() string {
  164. return path.Join(s.path, "log")
  165. }
  166. // Retrieves the current state of the server.
  167. func (s *Server) State() string {
  168. s.mutex.RLock()
  169. defer s.mutex.RUnlock()
  170. return s.state
  171. }
  172. // Sets the state of the server.
  173. func (s *Server) setState(state string) {
  174. s.mutex.Lock()
  175. defer s.mutex.Unlock()
  176. s.state = state
  177. if state == Leader {
  178. s.leader = s.Name()
  179. }
  180. }
  181. // Retrieves the current term of the server.
  182. func (s *Server) Term() uint64 {
  183. return s.currentTerm
  184. }
  185. // Retrieves the current commit index of the server.
  186. func (s *Server) CommitIndex() uint64 {
  187. return s.log.commitIndex
  188. }
  189. // Retrieves the name of the candidate this server voted for in this term.
  190. func (s *Server) VotedFor() string {
  191. return s.votedFor
  192. }
  193. // Retrieves whether the server's log has no entries.
  194. func (s *Server) IsLogEmpty() bool {
  195. return s.log.isEmpty()
  196. }
  197. // A list of all the log entries. This should only be used for debugging purposes.
  198. func (s *Server) LogEntries() []*LogEntry {
  199. return s.log.entries
  200. }
  201. // A reference to the command name of the last entry.
  202. func (s *Server) LastCommandName() string {
  203. return s.log.lastCommandName()
  204. }
  205. // Get the state of the server for debugging
  206. func (s *Server) GetState() string {
  207. s.mutex.RLock()
  208. defer s.mutex.RUnlock()
  209. return fmt.Sprintf("Name: %s, State: %s, Term: %v, CommitedIndex: %v ", s.name, s.state, s.currentTerm, s.log.commitIndex)
  210. }
  211. // Check if the server is promotable
  212. func (s *Server) promotable() bool {
  213. return s.log.currentIndex() > 0
  214. }
  215. //--------------------------------------
  216. // Membership
  217. //--------------------------------------
  218. // Retrieves the number of member servers in the consensus.
  219. func (s *Server) MemberCount() int {
  220. s.mutex.Lock()
  221. defer s.mutex.Unlock()
  222. return len(s.peers) + 1
  223. }
  224. // Retrieves the number of servers required to make a quorum.
  225. func (s *Server) QuorumSize() int {
  226. return (s.MemberCount() / 2) + 1
  227. }
  228. //--------------------------------------
  229. // Election timeout
  230. //--------------------------------------
  231. // Retrieves the election timeout.
  232. func (s *Server) ElectionTimeout() time.Duration {
  233. s.mutex.RLock()
  234. defer s.mutex.RUnlock()
  235. return s.electionTimeout
  236. }
  237. // Sets the election timeout.
  238. func (s *Server) SetElectionTimeout(duration time.Duration) {
  239. s.mutex.Lock()
  240. defer s.mutex.Unlock()
  241. s.electionTimeout = duration
  242. }
  243. //--------------------------------------
  244. // Heartbeat timeout
  245. //--------------------------------------
  246. // Retrieves the heartbeat timeout.
  247. func (s *Server) HeartbeatTimeout() time.Duration {
  248. s.mutex.RLock()
  249. defer s.mutex.RUnlock()
  250. return s.heartbeatTimeout
  251. }
  252. // Sets the heartbeat timeout.
  253. func (s *Server) SetHeartbeatTimeout(duration time.Duration) {
  254. s.mutex.Lock()
  255. defer s.mutex.Unlock()
  256. s.heartbeatTimeout = duration
  257. for _, peer := range s.peers {
  258. peer.setHeartbeatTimeout(duration)
  259. }
  260. }
  261. //------------------------------------------------------------------------------
  262. //
  263. // Methods
  264. //
  265. //------------------------------------------------------------------------------
  266. //--------------------------------------
  267. // Initialization
  268. //--------------------------------------
  269. // Reg the NOPCommand
  270. func init() {
  271. RegisterCommand(&NOPCommand{})
  272. RegisterCommand(&DefaultJoinCommand{})
  273. RegisterCommand(&DefaultLeaveCommand{})
  274. }
  275. // Start as follow
  276. // If log entries exist then allow promotion to candidate if no AEs received.
  277. // If no log entries exist then wait for AEs from another node.
  278. // If no log entries exist and a self-join command is issued then
  279. // immediately become leader and commit entry.
  280. func (s *Server) Start() error {
  281. // Exit if the server is already running.
  282. if s.state != Stopped {
  283. return errors.New("raft.Server: Server already running")
  284. }
  285. // Create snapshot directory if not exist
  286. os.Mkdir(path.Join(s.path, "snapshot"), 0700)
  287. if err := s.readConf(); err != nil {
  288. s.debugln("raft: Conf file error: ", err)
  289. return fmt.Errorf("raft: Initialization error: %s", err)
  290. }
  291. // Initialize the log and load it up.
  292. if err := s.log.open(s.LogPath()); err != nil {
  293. s.debugln("raft: Log error: ", err)
  294. return fmt.Errorf("raft: Initialization error: %s", err)
  295. }
  296. // Update the term to the last term in the log.
  297. _, s.currentTerm = s.log.lastInfo()
  298. s.setState(Follower)
  299. // If no log entries exist then
  300. // 1. wait for AEs from another node
  301. // 2. wait for self-join command
  302. // to set itself promotable
  303. if !s.promotable() {
  304. s.debugln("start as a new raft server")
  305. // If log entries exist then allow promotion to candidate
  306. // if no AEs received.
  307. } else {
  308. s.debugln("start from previous saved state")
  309. }
  310. debugln(s.GetState())
  311. go s.loop()
  312. return nil
  313. }
  314. // Shuts down the server.
  315. func (s *Server) Stop() {
  316. s.send(&stopValue)
  317. s.mutex.Lock()
  318. defer s.mutex.Unlock()
  319. s.log.close()
  320. }
  321. // Checks if the server is currently running.
  322. func (s *Server) Running() bool {
  323. s.mutex.RLock()
  324. defer s.mutex.RUnlock()
  325. return s.state != Stopped
  326. }
  327. //--------------------------------------
  328. // Term
  329. //--------------------------------------
  330. // Sets the current term for the server. This is only used when an external
  331. // current term is found.
  332. func (s *Server) setCurrentTerm(term uint64, leaderName string, append bool) {
  333. s.mutex.Lock()
  334. defer s.mutex.Unlock()
  335. // update the term and clear vote for
  336. if term > s.currentTerm {
  337. s.state = Follower
  338. s.currentTerm = term
  339. s.leader = leaderName
  340. s.votedFor = ""
  341. return
  342. }
  343. // discover new leader when candidate
  344. // save leader name when follower
  345. if term == s.currentTerm && s.state != Leader && append {
  346. s.state = Follower
  347. s.leader = leaderName
  348. }
  349. }
  350. //--------------------------------------
  351. // Event Loop
  352. //--------------------------------------
  353. // ________
  354. // --|Snapshot| timeout
  355. // | -------- ______
  356. // recover | ^ | |
  357. // snapshot / | |snapshot | |
  358. // higher | | v | recv majority votes
  359. // term | -------- timeout ----------- -----------
  360. // |-> |Follower| ----------> | Candidate |--------------------> | Leader |
  361. // -------- ----------- -----------
  362. // ^ higher term/ | higher term |
  363. // | new leader | |
  364. // |_______________________|____________________________________ |
  365. // The main event loop for the server
  366. func (s *Server) loop() {
  367. defer s.debugln("server.loop.end")
  368. for {
  369. state := s.State()
  370. s.debugln("server.loop.run ", state)
  371. switch state {
  372. case Follower:
  373. s.followerLoop()
  374. case Candidate:
  375. s.candidateLoop()
  376. case Leader:
  377. s.leaderLoop()
  378. case Snapshotting:
  379. s.snapshotLoop()
  380. case Stopped:
  381. return
  382. }
  383. }
  384. }
  385. // Sends an event to the event loop to be processed. The function will wait
  386. // until the event is actually processed before returning.
  387. func (s *Server) send(value interface{}) (interface{}, error) {
  388. event := s.sendAsync(value)
  389. err := <-event.c
  390. return event.returnValue, err
  391. }
  392. func (s *Server) sendAsync(value interface{}) *event {
  393. event := &event{target: value, c: make(chan error, 1)}
  394. s.c <- event
  395. return event
  396. }
  397. // The event loop that is run when the server is in a Follower state.
  398. // Responds to RPCs from candidates and leaders.
  399. // Converts to candidate if election timeout elapses without either:
  400. // 1.Receiving valid AppendEntries RPC, or
  401. // 2.Granting vote to candidate
  402. func (s *Server) followerLoop() {
  403. s.setState(Follower)
  404. timeoutChan := afterBetween(s.ElectionTimeout(), s.ElectionTimeout()*2)
  405. for {
  406. var err error
  407. update := false
  408. select {
  409. case e := <-s.c:
  410. if e.target == &stopValue {
  411. s.setState(Stopped)
  412. } else {
  413. switch req := e.target.(type) {
  414. case JoinCommand:
  415. //If no log entries exist and a self-join command is issued
  416. //then immediately become leader and commit entry.
  417. if s.log.currentIndex() == 0 && req.NodeName() == s.Name() {
  418. s.debugln("selfjoin and promote to leader")
  419. s.setState(Leader)
  420. s.processCommand(req, e)
  421. } else {
  422. err = NotLeaderError
  423. }
  424. case *AppendEntriesRequest:
  425. e.returnValue, update = s.processAppendEntriesRequest(req)
  426. case *RequestVoteRequest:
  427. e.returnValue, update = s.processRequestVoteRequest(req)
  428. case *SnapshotRequest:
  429. e.returnValue = s.processSnapshotRequest(req)
  430. default:
  431. err = NotLeaderError
  432. }
  433. }
  434. // Callback to event.
  435. e.c <- err
  436. case <-timeoutChan:
  437. // only allow synced follower to promote to candidate
  438. if s.promotable() {
  439. s.setState(Candidate)
  440. } else {
  441. update = true
  442. }
  443. }
  444. // Converts to candidate if election timeout elapses without either:
  445. // 1.Receiving valid AppendEntries RPC, or
  446. // 2.Granting vote to candidate
  447. if update {
  448. timeoutChan = afterBetween(s.ElectionTimeout(), s.ElectionTimeout()*2)
  449. }
  450. // Exit loop on state change.
  451. if s.State() != Follower {
  452. break
  453. }
  454. }
  455. }
  456. // The event loop that is run when the server is in a Candidate state.
  457. func (s *Server) candidateLoop() {
  458. lastLogIndex, lastLogTerm := s.log.lastInfo()
  459. s.leader = ""
  460. for {
  461. // Increment current term, vote for self.
  462. s.currentTerm++
  463. s.votedFor = s.name
  464. // Send RequestVote RPCs to all other servers.
  465. respChan := make(chan *RequestVoteResponse, len(s.peers))
  466. for _, peer := range s.peers {
  467. go peer.sendVoteRequest(newRequestVoteRequest(s.currentTerm, s.name, lastLogIndex, lastLogTerm), respChan)
  468. }
  469. // Wait for either:
  470. // * Votes received from majority of servers: become leader
  471. // * AppendEntries RPC received from new leader: step down.
  472. // * Election timeout elapses without election resolution: increment term, start new election
  473. // * Discover higher term: step down (§5.1)
  474. votesGranted := 1
  475. timeoutChan := afterBetween(s.ElectionTimeout(), s.ElectionTimeout()*2)
  476. timeout := false
  477. for {
  478. // If we received enough votes then stop waiting for more votes.
  479. s.debugln("server.candidate.votes: ", votesGranted, " quorum:", s.QuorumSize())
  480. if votesGranted >= s.QuorumSize() {
  481. s.setState(Leader)
  482. break
  483. }
  484. // Collect votes from peers.
  485. select {
  486. case resp := <-respChan:
  487. if resp.VoteGranted {
  488. s.debugln("server.candidate.vote.granted: ", votesGranted)
  489. votesGranted++
  490. } else if resp.Term > s.currentTerm {
  491. s.debugln("server.candidate.vote.failed")
  492. s.setCurrentTerm(resp.Term, "", false)
  493. } else {
  494. s.debugln("server.candidate.vote: denied")
  495. }
  496. case e := <-s.c:
  497. var err error
  498. if e.target == &stopValue {
  499. s.setState(Stopped)
  500. } else {
  501. switch req := e.target.(type) {
  502. case Command:
  503. err = NotLeaderError
  504. case *AppendEntriesRequest:
  505. e.returnValue, _ = s.processAppendEntriesRequest(req)
  506. case *RequestVoteRequest:
  507. e.returnValue, _ = s.processRequestVoteRequest(req)
  508. }
  509. }
  510. // Callback to event.
  511. e.c <- err
  512. case <-timeoutChan:
  513. timeout = true
  514. }
  515. // both process AER and RVR can make the server to follower
  516. // also break when timeout happens
  517. if s.State() != Candidate || timeout {
  518. break
  519. }
  520. }
  521. // break when we are not candidate
  522. if s.State() != Candidate {
  523. break
  524. }
  525. // continue when timeout happened
  526. }
  527. }
  528. // The event loop that is run when the server is in a Leader state.
  529. func (s *Server) leaderLoop() {
  530. s.setState(Leader)
  531. s.syncedPeer = make(map[string]bool)
  532. logIndex, _ := s.log.lastInfo()
  533. // Update the peers prevLogIndex to leader's lastLogIndex and start heartbeat.
  534. s.debugln("leaderLoop.set.PrevIndex to ", logIndex)
  535. for _, peer := range s.peers {
  536. peer.setPrevLogIndex(logIndex)
  537. peer.startHeartbeat()
  538. }
  539. go s.Do(NOPCommand{})
  540. // Begin to collect response from followers
  541. for {
  542. var err error
  543. select {
  544. case e := <-s.c:
  545. if e.target == &stopValue {
  546. s.setState(Stopped)
  547. } else {
  548. switch req := e.target.(type) {
  549. case Command:
  550. s.processCommand(req, e)
  551. continue
  552. case *AppendEntriesRequest:
  553. e.returnValue, _ = s.processAppendEntriesRequest(req)
  554. case *AppendEntriesResponse:
  555. s.processAppendEntriesResponse(req)
  556. case *RequestVoteRequest:
  557. e.returnValue, _ = s.processRequestVoteRequest(req)
  558. }
  559. }
  560. // Callback to event.
  561. e.c <- err
  562. }
  563. // Exit loop on state change.
  564. if s.State() != Leader {
  565. break
  566. }
  567. }
  568. // Stop all peers.
  569. for _, peer := range s.peers {
  570. peer.stopHeartbeat(false)
  571. }
  572. s.syncedPeer = nil
  573. }
  574. func (s *Server) snapshotLoop() {
  575. s.setState(Snapshotting)
  576. for {
  577. var err error
  578. e := <-s.c
  579. if e.target == &stopValue {
  580. s.setState(Stopped)
  581. } else {
  582. switch req := e.target.(type) {
  583. case Command:
  584. err = NotLeaderError
  585. case *AppendEntriesRequest:
  586. e.returnValue, _ = s.processAppendEntriesRequest(req)
  587. case *RequestVoteRequest:
  588. e.returnValue, _ = s.processRequestVoteRequest(req)
  589. case *SnapshotRecoveryRequest:
  590. e.returnValue = s.processSnapshotRecoveryRequest(req)
  591. }
  592. }
  593. // Callback to event.
  594. e.c <- err
  595. // Exit loop on state change.
  596. if s.State() != Snapshotting {
  597. break
  598. }
  599. }
  600. }
  601. //--------------------------------------
  602. // Commands
  603. //--------------------------------------
  604. // Attempts to execute a command and replicate it. The function will return
  605. // when the command has been successfully committed or an error has occurred.
  606. func (s *Server) Do(command Command) (interface{}, error) {
  607. return s.send(command)
  608. }
  609. // Processes a command.
  610. func (s *Server) processCommand(command Command, e *event) {
  611. s.debugln("server.command.process")
  612. // Create an entry for the command in the log.
  613. entry, err := s.log.createEntry(s.currentTerm, command)
  614. if err != nil {
  615. s.debugln("server.command.log.entry.error:", err)
  616. e.c <- err
  617. return
  618. }
  619. if err := s.log.appendEntry(entry); err != nil {
  620. s.debugln("server.command.log.error:", err)
  621. e.c <- err
  622. return
  623. }
  624. // Issue a callback for the entry once it's committed.
  625. go func() {
  626. // Wait for the entry to be committed.
  627. select {
  628. case <-entry.commit:
  629. var err error
  630. s.debugln("server.command.commit")
  631. e.returnValue, err = s.log.getEntryResult(entry, true)
  632. e.c <- err
  633. case <-time.After(time.Second):
  634. s.debugln("server.command.timeout")
  635. e.c <- CommandTimeoutError
  636. }
  637. }()
  638. // Issue an append entries response for the server.
  639. resp := newAppendEntriesResponse(s.currentTerm, true, s.log.currentIndex(), s.log.CommitIndex())
  640. resp.append = true
  641. resp.peer = s.Name()
  642. // this must be async
  643. // sendAsync is not really async every time
  644. // when the sending speed of the user is larger than
  645. // the processing speed of the server, the buffered channel
  646. // will be full. Then sendAsync will become sync, which will
  647. // cause deadlock here.
  648. // so we use a goroutine to avoid the deadlock
  649. go s.sendAsync(resp)
  650. }
  651. //--------------------------------------
  652. // Append Entries
  653. //--------------------------------------
  654. // Appends zero or more log entry from the leader to this server.
  655. func (s *Server) AppendEntries(req *AppendEntriesRequest) *AppendEntriesResponse {
  656. ret, _ := s.send(req)
  657. resp, _ := ret.(*AppendEntriesResponse)
  658. return resp
  659. }
  660. // Processes the "append entries" request.
  661. func (s *Server) processAppendEntriesRequest(req *AppendEntriesRequest) (*AppendEntriesResponse, bool) {
  662. s.traceln("server.ae.process")
  663. if req.Term < s.currentTerm {
  664. s.debugln("server.ae.error: stale term")
  665. return newAppendEntriesResponse(s.currentTerm, false, s.log.currentIndex(), s.log.CommitIndex()), false
  666. }
  667. // Update term and leader.
  668. s.setCurrentTerm(req.Term, req.LeaderName, true)
  669. // Reject if log doesn't contain a matching previous entry.
  670. if err := s.log.truncate(req.PrevLogIndex, req.PrevLogTerm); err != nil {
  671. s.debugln("server.ae.truncate.error: ", err)
  672. return newAppendEntriesResponse(s.currentTerm, false, s.log.currentIndex(), s.log.CommitIndex()), true
  673. }
  674. // Append entries to the log.
  675. if err := s.log.appendEntries(req.Entries); err != nil {
  676. s.debugln("server.ae.append.error: ", err)
  677. return newAppendEntriesResponse(s.currentTerm, false, s.log.currentIndex(), s.log.CommitIndex()), true
  678. }
  679. // Commit up to the commit index.
  680. if err := s.log.setCommitIndex(req.CommitIndex); err != nil {
  681. s.debugln("server.ae.commit.error: ", err)
  682. return newAppendEntriesResponse(s.currentTerm, false, s.log.currentIndex(), s.log.CommitIndex()), true
  683. }
  684. // once the server appended and commited all the log entries from the leader
  685. return newAppendEntriesResponse(s.currentTerm, true, s.log.currentIndex(), s.log.CommitIndex()), true
  686. }
  687. // Processes the "append entries" response from the peer. This is only
  688. // processed when the server is a leader. Responses received during other
  689. // states are dropped.
  690. func (s *Server) processAppendEntriesResponse(resp *AppendEntriesResponse) {
  691. // If we find a higher term then change to a follower and exit.
  692. if resp.Term > s.currentTerm {
  693. s.setCurrentTerm(resp.Term, "", false)
  694. return
  695. }
  696. // panic response if it's not successful.
  697. if !resp.Success {
  698. return
  699. }
  700. // if one peer successfully append a log from the leader term,
  701. // we add it to the synced list
  702. if resp.append == true {
  703. s.syncedPeer[resp.peer] = true
  704. }
  705. // Increment the commit count to make sure we have a quorum before committing.
  706. if len(s.syncedPeer) < s.QuorumSize() {
  707. return
  708. }
  709. // Determine the committed index that a majority has.
  710. var indices []uint64
  711. indices = append(indices, s.log.currentIndex())
  712. for _, peer := range s.peers {
  713. indices = append(indices, peer.getPrevLogIndex())
  714. }
  715. sort.Sort(uint64Slice(indices))
  716. // We can commit up to the index which the majority of the members have appended.
  717. commitIndex := indices[s.QuorumSize()-1]
  718. committedIndex := s.log.commitIndex
  719. if commitIndex > committedIndex {
  720. s.log.setCommitIndex(commitIndex)
  721. s.debugln("commit index ", commitIndex)
  722. for i := committedIndex; i < commitIndex; i++ {
  723. if entry := s.log.getEntry(i + 1); entry != nil {
  724. // if the leader is a new one and the entry came from the
  725. // old leader, the commit channel will be nil and no go routine
  726. // is waiting from this channel
  727. // if we try to send to it, the new leader will get stuck
  728. if entry.commit != nil {
  729. select {
  730. case entry.commit <- true:
  731. default:
  732. panic("server unable to send signal to commit channel")
  733. }
  734. }
  735. }
  736. }
  737. }
  738. }
  739. //--------------------------------------
  740. // Request Vote
  741. //--------------------------------------
  742. // Requests a vote from a server. A vote can be obtained if the vote's term is
  743. // at the server's current term and the server has not made a vote yet. A vote
  744. // can also be obtained if the term is greater than the server's current term.
  745. func (s *Server) RequestVote(req *RequestVoteRequest) *RequestVoteResponse {
  746. ret, _ := s.send(req)
  747. resp, _ := ret.(*RequestVoteResponse)
  748. return resp
  749. }
  750. // Processes a "request vote" request.
  751. func (s *Server) processRequestVoteRequest(req *RequestVoteRequest) (*RequestVoteResponse, bool) {
  752. // If the request is coming from an old term then reject it.
  753. if req.Term < s.currentTerm {
  754. s.debugln("server.rv.error: stale term")
  755. return newRequestVoteResponse(s.currentTerm, false), false
  756. }
  757. s.setCurrentTerm(req.Term, "", false)
  758. // If we've already voted for a different candidate then don't vote for this candidate.
  759. if s.votedFor != "" && s.votedFor != req.CandidateName {
  760. s.debugln("server.rv.error: duplicate vote: ", req.CandidateName,
  761. " already vote for ", s.votedFor)
  762. return newRequestVoteResponse(s.currentTerm, false), false
  763. }
  764. // If the candidate's log is not at least as up-to-date as our last log then don't vote.
  765. lastIndex, lastTerm := s.log.lastInfo()
  766. if lastIndex > req.LastLogIndex || lastTerm > req.LastLogTerm {
  767. s.debugln("server.rv.error: out of date log: ", req.CandidateName,
  768. "Index :[", lastIndex, "]", " [", req.LastLogIndex, "]",
  769. "Term :[", lastTerm, "]", " [", req.LastLogTerm, "]")
  770. return newRequestVoteResponse(s.currentTerm, false), false
  771. }
  772. // If we made it this far then cast a vote and reset our election time out.
  773. s.debugln("server.rv.vote: ", s.name, " votes for", req.CandidateName, "at term", req.Term)
  774. s.votedFor = req.CandidateName
  775. return newRequestVoteResponse(s.currentTerm, true), true
  776. }
  777. //--------------------------------------
  778. // Membership
  779. //--------------------------------------
  780. // Adds a peer to the server.
  781. func (s *Server) AddPeer(name string, connectiongString string) error {
  782. s.debugln("server.peer.add: ", name, len(s.peers))
  783. // Do not allow peers to be added twice.
  784. if s.peers[name] != nil {
  785. return nil
  786. }
  787. // Skip the Peer if it has the same name as the Server
  788. if s.name != name {
  789. peer := newPeer(s, name, connectiongString, s.heartbeatTimeout)
  790. if s.State() == Leader {
  791. peer.startHeartbeat()
  792. }
  793. s.peers[peer.Name] = peer
  794. }
  795. // Write the configuration to file.
  796. s.writeConf()
  797. return nil
  798. }
  799. // Removes a peer from the server.
  800. func (s *Server) RemovePeer(name string) error {
  801. s.debugln("server.peer.remove: ", name, len(s.peers))
  802. // Skip the Peer if it has the same name as the Server
  803. if name != s.Name() {
  804. // Return error if peer doesn't exist.
  805. peer := s.peers[name]
  806. if peer == nil {
  807. return fmt.Errorf("raft: Peer not found: %s", name)
  808. }
  809. // Stop peer and remove it.
  810. if s.State() == Leader {
  811. peer.stopHeartbeat(true)
  812. }
  813. delete(s.peers, name)
  814. }
  815. // Write the configuration to file.
  816. s.writeConf()
  817. return nil
  818. }
  819. //--------------------------------------
  820. // Log compaction
  821. //--------------------------------------
  822. func (s *Server) TakeSnapshot() error {
  823. //TODO put a snapshot mutex
  824. s.debugln("take Snapshot")
  825. if s.currentSnapshot != nil {
  826. return errors.New("handling snapshot")
  827. }
  828. lastIndex, lastTerm := s.log.commitInfo()
  829. if lastIndex == 0 {
  830. return errors.New("No logs")
  831. }
  832. path := s.SnapshotPath(lastIndex, lastTerm)
  833. var state []byte
  834. var err error
  835. if s.stateMachine != nil {
  836. state, err = s.stateMachine.Save()
  837. if err != nil {
  838. return err
  839. }
  840. } else {
  841. state = []byte{0}
  842. }
  843. peers := make([]*Peer, len(s.peers)+1)
  844. i := 0
  845. for _, peer := range s.peers {
  846. peers[i] = peer.clone()
  847. i++
  848. }
  849. peers[i] = &Peer{
  850. Name: s.Name(),
  851. ConnectionString: s.connectionString,
  852. }
  853. s.currentSnapshot = &Snapshot{lastIndex, lastTerm, peers, state, path}
  854. s.saveSnapshot()
  855. // We keep some log entries after the snapshot
  856. // We do not want to send the whole snapshot
  857. // to the slightly slow machines
  858. if lastIndex-s.log.startIndex > NumberOfLogEntriesAfterSnapshot {
  859. compactIndex := lastIndex - NumberOfLogEntriesAfterSnapshot
  860. compactTerm := s.log.getEntry(compactIndex).Term
  861. s.log.compact(compactIndex, compactTerm)
  862. }
  863. return nil
  864. }
  865. // Retrieves the log path for the server.
  866. func (s *Server) saveSnapshot() error {
  867. if s.currentSnapshot == nil {
  868. return errors.New("no snapshot to save")
  869. }
  870. err := s.currentSnapshot.save()
  871. if err != nil {
  872. return err
  873. }
  874. tmp := s.lastSnapshot
  875. s.lastSnapshot = s.currentSnapshot
  876. // delete the previous snapshot if there is any change
  877. if tmp != nil && !(tmp.LastIndex == s.lastSnapshot.LastIndex && tmp.LastTerm == s.lastSnapshot.LastTerm) {
  878. tmp.remove()
  879. }
  880. s.currentSnapshot = nil
  881. return nil
  882. }
  883. // Retrieves the log path for the server.
  884. func (s *Server) SnapshotPath(lastIndex uint64, lastTerm uint64) string {
  885. return path.Join(s.path, "snapshot", fmt.Sprintf("%v_%v.ss", lastTerm, lastIndex))
  886. }
  887. func (s *Server) RequestSnapshot(req *SnapshotRequest) *SnapshotResponse {
  888. ret, _ := s.send(req)
  889. resp, _ := ret.(*SnapshotResponse)
  890. return resp
  891. }
  892. func (s *Server) processSnapshotRequest(req *SnapshotRequest) *SnapshotResponse {
  893. // If the follower’s log contains an entry at the snapshot’s last index with a term
  894. // that matches the snapshot’s last term
  895. // Then the follower already has all the information found in the snapshot
  896. // and can reply false
  897. entry := s.log.getEntry(req.LastIndex)
  898. if entry != nil && entry.Term == req.LastTerm {
  899. return newSnapshotResponse(false)
  900. }
  901. s.setState(Snapshotting)
  902. return newSnapshotResponse(true)
  903. }
  904. func (s *Server) SnapshotRecoveryRequest(req *SnapshotRecoveryRequest) *SnapshotRecoveryResponse {
  905. ret, _ := s.send(req)
  906. resp, _ := ret.(*SnapshotRecoveryResponse)
  907. return resp
  908. }
  909. func (s *Server) processSnapshotRecoveryRequest(req *SnapshotRecoveryRequest) *SnapshotRecoveryResponse {
  910. s.stateMachine.Recovery(req.State)
  911. // clear the peer map
  912. s.peers = make(map[string]*Peer)
  913. // recovery the cluster configuration
  914. for _, peer := range req.Peers {
  915. s.AddPeer(peer.Name, peer.ConnectionString)
  916. }
  917. //update term and index
  918. s.currentTerm = req.LastTerm
  919. s.log.updateCommitIndex(req.LastIndex)
  920. snapshotPath := s.SnapshotPath(req.LastIndex, req.LastTerm)
  921. s.currentSnapshot = &Snapshot{req.LastIndex, req.LastTerm, req.Peers, req.State, snapshotPath}
  922. s.saveSnapshot()
  923. // clear the previous log entries
  924. s.log.compact(req.LastIndex, req.LastTerm)
  925. return newSnapshotRecoveryResponse(req.LastTerm, true, req.LastIndex)
  926. }
  927. // Load a snapshot at restart
  928. func (s *Server) LoadSnapshot() error {
  929. dir, err := os.OpenFile(path.Join(s.path, "snapshot"), os.O_RDONLY, 0)
  930. if err != nil {
  931. return err
  932. }
  933. filenames, err := dir.Readdirnames(-1)
  934. if err != nil {
  935. dir.Close()
  936. panic(err)
  937. }
  938. dir.Close()
  939. if len(filenames) == 0 {
  940. return errors.New("no snapshot")
  941. }
  942. // not sure how many snapshot we should keep
  943. sort.Strings(filenames)
  944. snapshotPath := path.Join(s.path, "snapshot", filenames[len(filenames)-1])
  945. // should not fail
  946. file, err := os.OpenFile(snapshotPath, os.O_RDONLY, 0)
  947. defer file.Close()
  948. if err != nil {
  949. panic(err)
  950. }
  951. // TODO check checksum first
  952. var snapshotBytes []byte
  953. var checksum uint32
  954. n, err := fmt.Fscanf(file, "%08x\n", &checksum)
  955. if err != nil {
  956. return err
  957. }
  958. if n != 1 {
  959. return errors.New("Bad snapshot file")
  960. }
  961. snapshotBytes, _ = ioutil.ReadAll(file)
  962. s.debugln(string(snapshotBytes))
  963. // Generate checksum.
  964. byteChecksum := crc32.ChecksumIEEE(snapshotBytes)
  965. if uint32(checksum) != byteChecksum {
  966. s.debugln(checksum, " ", byteChecksum)
  967. return errors.New("bad snapshot file")
  968. }
  969. err = json.Unmarshal(snapshotBytes, &s.lastSnapshot)
  970. if err != nil {
  971. s.debugln("unmarshal error: ", err)
  972. return err
  973. }
  974. err = s.stateMachine.Recovery(s.lastSnapshot.State)
  975. if err != nil {
  976. s.debugln("recovery error: ", err)
  977. return err
  978. }
  979. for _, peer := range s.lastSnapshot.Peers {
  980. s.AddPeer(peer.Name, peer.ConnectionString)
  981. }
  982. s.log.startTerm = s.lastSnapshot.LastTerm
  983. s.log.startIndex = s.lastSnapshot.LastIndex
  984. s.log.updateCommitIndex(s.lastSnapshot.LastIndex)
  985. return err
  986. }
  987. //--------------------------------------
  988. // Config File
  989. //--------------------------------------
  990. func (s *Server) writeConf() {
  991. peers := make([]*Peer, len(s.peers))
  992. i := 0
  993. for _, peer := range s.peers {
  994. peers[i] = peer.clone()
  995. i++
  996. }
  997. r := &Config{
  998. CommitIndex: s.log.commitIndex,
  999. Peers: peers,
  1000. }
  1001. b, _ := json.Marshal(r)
  1002. confPath := path.Join(s.path, "conf")
  1003. tmpConfPath := path.Join(s.path, "conf.tmp")
  1004. err := ioutil.WriteFile(tmpConfPath, b, 0600)
  1005. if err != nil {
  1006. panic(err)
  1007. }
  1008. os.Rename(tmpConfPath, confPath)
  1009. }
  1010. // Read the configuration for the server.
  1011. func (s *Server) readConf() error {
  1012. confPath := path.Join(s.path, "conf")
  1013. s.debugln("readConf.open ", confPath)
  1014. // open conf file
  1015. b, err := ioutil.ReadFile(confPath)
  1016. if err != nil {
  1017. return nil
  1018. }
  1019. conf := &Config{}
  1020. if err = json.Unmarshal(b, conf); err != nil {
  1021. return err
  1022. }
  1023. s.log.updateCommitIndex(conf.CommitIndex)
  1024. return nil
  1025. }
  1026. //--------------------------------------
  1027. // Debugging
  1028. //--------------------------------------
  1029. func (s *Server) debugln(v ...interface{}) {
  1030. debugf("[%s Term:%d] %s", s.name, s.currentTerm, fmt.Sprintln(v...))
  1031. }
  1032. func (s *Server) traceln(v ...interface{}) {
  1033. tracef("[%s] %s", s.name, fmt.Sprintln(v...))
  1034. }