server.go 32 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280
  1. package raft
  2. import (
  3. "encoding/json"
  4. "errors"
  5. "fmt"
  6. "hash/crc32"
  7. "io/ioutil"
  8. "os"
  9. "path"
  10. "sort"
  11. "sync"
  12. "time"
  13. )
  14. //------------------------------------------------------------------------------
  15. //
  16. // Constants
  17. //
  18. //------------------------------------------------------------------------------
  19. const (
  20. Stopped = "stopped"
  21. Follower = "follower"
  22. Candidate = "candidate"
  23. Leader = "leader"
  24. Snapshotting = "snapshotting"
  25. )
  26. const (
  27. MaxLogEntriesPerRequest = 2000
  28. NumberOfLogEntriesAfterSnapshot = 200
  29. )
  30. const (
  31. DefaultHeartbeatTimeout = 50 * time.Millisecond
  32. DefaultElectionTimeout = 150 * time.Millisecond
  33. )
  34. var stopValue interface{}
  35. //------------------------------------------------------------------------------
  36. //
  37. // Errors
  38. //
  39. //------------------------------------------------------------------------------
  40. var NotLeaderError = errors.New("raft.Server: Not current leader")
  41. var DuplicatePeerError = errors.New("raft.Server: Duplicate peer")
  42. var CommandTimeoutError = errors.New("raft: Command timeout")
  43. //------------------------------------------------------------------------------
  44. //
  45. // Typedefs
  46. //
  47. //------------------------------------------------------------------------------
  48. // A server is involved in the consensus protocol and can act as a follower,
  49. // candidate or a leader.
  50. type Server struct {
  51. name string
  52. path string
  53. state string
  54. transporter Transporter
  55. context interface{}
  56. currentTerm uint64
  57. votedFor string
  58. log *Log
  59. leader string
  60. peers map[string]*Peer
  61. mutex sync.RWMutex
  62. syncedPeer map[string]bool
  63. c chan *event
  64. electionTimeout time.Duration
  65. heartbeatTimeout time.Duration
  66. currentSnapshot *Snapshot
  67. lastSnapshot *Snapshot
  68. stateMachine StateMachine
  69. maxLogEntriesPerRequest uint64
  70. connectionString string
  71. }
  72. // An event to be processed by the server's event loop.
  73. type event struct {
  74. target interface{}
  75. returnValue interface{}
  76. c chan error
  77. }
  78. //------------------------------------------------------------------------------
  79. //
  80. // Constructor
  81. //
  82. //------------------------------------------------------------------------------
  83. // Creates a new server with a log at the given path.
  84. func NewServer(name string, path string, transporter Transporter, stateMachine StateMachine, context interface{}, connectiongString string) (*Server, error) {
  85. if name == "" {
  86. return nil, errors.New("raft.Server: Name cannot be blank")
  87. }
  88. if transporter == nil {
  89. panic("raft: Transporter required")
  90. }
  91. s := &Server{
  92. name: name,
  93. path: path,
  94. transporter: transporter,
  95. stateMachine: stateMachine,
  96. context: context,
  97. state: Stopped,
  98. peers: make(map[string]*Peer),
  99. log: newLog(),
  100. c: make(chan *event, 256),
  101. electionTimeout: DefaultElectionTimeout,
  102. heartbeatTimeout: DefaultHeartbeatTimeout,
  103. maxLogEntriesPerRequest: MaxLogEntriesPerRequest,
  104. connectionString: connectiongString,
  105. }
  106. // Setup apply function.
  107. s.log.ApplyFunc = func(c Command) (interface{}, error) {
  108. result, err := c.Apply(s)
  109. return result, err
  110. }
  111. return s, nil
  112. }
  113. //------------------------------------------------------------------------------
  114. //
  115. // Accessors
  116. //
  117. //------------------------------------------------------------------------------
  118. //--------------------------------------
  119. // General
  120. //--------------------------------------
  121. // Retrieves the name of the server.
  122. func (s *Server) Name() string {
  123. return s.name
  124. }
  125. // Retrieves the storage path for the server.
  126. func (s *Server) Path() string {
  127. return s.path
  128. }
  129. // The name of the current leader.
  130. func (s *Server) Leader() string {
  131. return s.leader
  132. }
  133. // Retrieves a copy of the peer data.
  134. func (s *Server) Peers() map[string]*Peer {
  135. s.mutex.Lock()
  136. defer s.mutex.Unlock()
  137. peers := make(map[string]*Peer)
  138. for name, peer := range s.peers {
  139. peers[name] = peer.clone()
  140. }
  141. return peers
  142. }
  143. // Retrieves the object that transports requests.
  144. func (s *Server) Transporter() Transporter {
  145. s.mutex.RLock()
  146. defer s.mutex.RUnlock()
  147. return s.transporter
  148. }
  149. func (s *Server) SetTransporter(t Transporter) {
  150. s.mutex.Lock()
  151. defer s.mutex.Unlock()
  152. s.transporter = t
  153. }
  154. // Retrieves the context passed into the constructor.
  155. func (s *Server) Context() interface{} {
  156. return s.context
  157. }
  158. // Retrieves the log path for the server.
  159. func (s *Server) LogPath() string {
  160. return path.Join(s.path, "log")
  161. }
  162. // Retrieves the current state of the server.
  163. func (s *Server) State() string {
  164. s.mutex.RLock()
  165. defer s.mutex.RUnlock()
  166. return s.state
  167. }
  168. // Sets the state of the server.
  169. func (s *Server) setState(state string) {
  170. s.mutex.Lock()
  171. defer s.mutex.Unlock()
  172. s.state = state
  173. if state == Leader {
  174. s.leader = s.Name()
  175. }
  176. }
  177. // Retrieves the current term of the server.
  178. func (s *Server) Term() uint64 {
  179. return s.currentTerm
  180. }
  181. // Retrieves the current commit index of the server.
  182. func (s *Server) CommitIndex() uint64 {
  183. return s.log.commitIndex
  184. }
  185. // Retrieves the name of the candidate this server voted for in this term.
  186. func (s *Server) VotedFor() string {
  187. return s.votedFor
  188. }
  189. // Retrieves whether the server's log has no entries.
  190. func (s *Server) IsLogEmpty() bool {
  191. return s.log.isEmpty()
  192. }
  193. // A list of all the log entries. This should only be used for debugging purposes.
  194. func (s *Server) LogEntries() []*LogEntry {
  195. return s.log.entries
  196. }
  197. // A reference to the command name of the last entry.
  198. func (s *Server) LastCommandName() string {
  199. return s.log.lastCommandName()
  200. }
  201. // Get the state of the server for debugging
  202. func (s *Server) GetState() string {
  203. s.mutex.RLock()
  204. defer s.mutex.RUnlock()
  205. return fmt.Sprintf("Name: %s, State: %s, Term: %v, CommitedIndex: %v ", s.name, s.state, s.currentTerm, s.log.commitIndex)
  206. }
  207. // Check if the server is promotable
  208. func (s *Server) promotable() bool {
  209. return s.log.currentIndex() > 0
  210. }
  211. //--------------------------------------
  212. // Membership
  213. //--------------------------------------
  214. // Retrieves the number of member servers in the consensus.
  215. func (s *Server) MemberCount() int {
  216. s.mutex.Lock()
  217. defer s.mutex.Unlock()
  218. return len(s.peers) + 1
  219. }
  220. // Retrieves the number of servers required to make a quorum.
  221. func (s *Server) QuorumSize() int {
  222. return (s.MemberCount() / 2) + 1
  223. }
  224. //--------------------------------------
  225. // Election timeout
  226. //--------------------------------------
  227. // Retrieves the election timeout.
  228. func (s *Server) ElectionTimeout() time.Duration {
  229. s.mutex.RLock()
  230. defer s.mutex.RUnlock()
  231. return s.electionTimeout
  232. }
  233. // Sets the election timeout.
  234. func (s *Server) SetElectionTimeout(duration time.Duration) {
  235. s.mutex.Lock()
  236. defer s.mutex.Unlock()
  237. s.electionTimeout = duration
  238. }
  239. //--------------------------------------
  240. // Heartbeat timeout
  241. //--------------------------------------
  242. // Retrieves the heartbeat timeout.
  243. func (s *Server) HeartbeatTimeout() time.Duration {
  244. s.mutex.RLock()
  245. defer s.mutex.RUnlock()
  246. return s.heartbeatTimeout
  247. }
  248. // Sets the heartbeat timeout.
  249. func (s *Server) SetHeartbeatTimeout(duration time.Duration) {
  250. s.mutex.Lock()
  251. defer s.mutex.Unlock()
  252. s.heartbeatTimeout = duration
  253. for _, peer := range s.peers {
  254. peer.setHeartbeatTimeout(duration)
  255. }
  256. }
  257. //------------------------------------------------------------------------------
  258. //
  259. // Methods
  260. //
  261. //------------------------------------------------------------------------------
  262. //--------------------------------------
  263. // Initialization
  264. //--------------------------------------
  265. // Reg the NOPCommand
  266. func init() {
  267. RegisterCommand(&NOPCommand{})
  268. RegisterCommand(&DefaultJoinCommand{})
  269. RegisterCommand(&DefaultLeaveCommand{})
  270. }
  271. // Start as follow
  272. // If log entries exist then allow promotion to candidate if no AEs received.
  273. // If no log entries exist then wait for AEs from another node.
  274. // If no log entries exist and a self-join command is issued then
  275. // immediately become leader and commit entry.
  276. func (s *Server) Start() error {
  277. // Exit if the server is already running.
  278. if s.state != Stopped {
  279. return errors.New("raft.Server: Server already running")
  280. }
  281. // Create snapshot directory if not exist
  282. os.Mkdir(path.Join(s.path, "snapshot"), 0700)
  283. if err := s.readConf(); err != nil {
  284. s.debugln("raft: Conf file error: ", err)
  285. return fmt.Errorf("raft: Initialization error: %s", err)
  286. }
  287. // Initialize the log and load it up.
  288. if err := s.log.open(s.LogPath()); err != nil {
  289. s.debugln("raft: Log error: ", err)
  290. return fmt.Errorf("raft: Initialization error: %s", err)
  291. }
  292. // Update the term to the last term in the log.
  293. _, s.currentTerm = s.log.lastInfo()
  294. s.setState(Follower)
  295. // If no log entries exist then
  296. // 1. wait for AEs from another node
  297. // 2. wait for self-join command
  298. // to set itself promotable
  299. if !s.promotable() {
  300. s.debugln("start as a new raft server")
  301. // If log entries exist then allow promotion to candidate
  302. // if no AEs received.
  303. } else {
  304. s.debugln("start from previous saved state")
  305. }
  306. debugln(s.GetState())
  307. go s.loop()
  308. return nil
  309. }
  310. // Shuts down the server.
  311. func (s *Server) Stop() {
  312. s.send(&stopValue)
  313. s.mutex.Lock()
  314. defer s.mutex.Unlock()
  315. s.log.close()
  316. }
  317. // Checks if the server is currently running.
  318. func (s *Server) Running() bool {
  319. s.mutex.RLock()
  320. defer s.mutex.RUnlock()
  321. return s.state != Stopped
  322. }
  323. //--------------------------------------
  324. // Term
  325. //--------------------------------------
  326. // Sets the current term for the server. This is only used when an external
  327. // current term is found.
  328. func (s *Server) setCurrentTerm(term uint64, leaderName string, append bool) {
  329. s.mutex.Lock()
  330. defer s.mutex.Unlock()
  331. // update the term and clear vote for
  332. if term > s.currentTerm {
  333. s.state = Follower
  334. s.currentTerm = term
  335. s.leader = leaderName
  336. s.votedFor = ""
  337. return
  338. }
  339. // discover new leader when candidate
  340. // save leader name when follower
  341. if term == s.currentTerm && s.state != Leader && append {
  342. s.state = Follower
  343. s.leader = leaderName
  344. }
  345. }
  346. //--------------------------------------
  347. // Event Loop
  348. //--------------------------------------
  349. // ________
  350. // --|Snapshot| timeout
  351. // | -------- ______
  352. // recover | ^ | |
  353. // snapshot / | |snapshot | |
  354. // higher | | v | recv majority votes
  355. // term | -------- timeout ----------- -----------
  356. // |-> |Follower| ----------> | Candidate |--------------------> | Leader |
  357. // -------- ----------- -----------
  358. // ^ higher term/ | higher term |
  359. // | new leader | |
  360. // |_______________________|____________________________________ |
  361. // The main event loop for the server
  362. func (s *Server) loop() {
  363. defer s.debugln("server.loop.end")
  364. for {
  365. state := s.State()
  366. s.debugln("server.loop.run ", state)
  367. switch state {
  368. case Follower:
  369. s.followerLoop()
  370. case Candidate:
  371. s.candidateLoop()
  372. case Leader:
  373. s.leaderLoop()
  374. case Snapshotting:
  375. s.snapshotLoop()
  376. case Stopped:
  377. return
  378. }
  379. }
  380. }
  381. // Sends an event to the event loop to be processed. The function will wait
  382. // until the event is actually processed before returning.
  383. func (s *Server) send(value interface{}) (interface{}, error) {
  384. event := s.sendAsync(value)
  385. err := <-event.c
  386. return event.returnValue, err
  387. }
  388. func (s *Server) sendAsync(value interface{}) *event {
  389. event := &event{target: value, c: make(chan error, 1)}
  390. s.c <- event
  391. return event
  392. }
  393. // The event loop that is run when the server is in a Follower state.
  394. // Responds to RPCs from candidates and leaders.
  395. // Converts to candidate if election timeout elapses without either:
  396. // 1.Receiving valid AppendEntries RPC, or
  397. // 2.Granting vote to candidate
  398. func (s *Server) followerLoop() {
  399. s.setState(Follower)
  400. timeoutChan := afterBetween(s.ElectionTimeout(), s.ElectionTimeout()*2)
  401. for {
  402. var err error
  403. update := false
  404. select {
  405. case e := <-s.c:
  406. if e.target == &stopValue {
  407. s.setState(Stopped)
  408. } else {
  409. switch req := e.target.(type) {
  410. case JoinCommand:
  411. //If no log entries exist and a self-join command is issued
  412. //then immediately become leader and commit entry.
  413. if s.log.currentIndex() == 0 && req.NodeName() == s.Name() {
  414. s.debugln("selfjoin and promote to leader")
  415. s.setState(Leader)
  416. s.processCommand(req, e)
  417. } else {
  418. err = NotLeaderError
  419. }
  420. case *AppendEntriesRequest:
  421. e.returnValue, update = s.processAppendEntriesRequest(req)
  422. case *RequestVoteRequest:
  423. e.returnValue, update = s.processRequestVoteRequest(req)
  424. case *SnapshotRequest:
  425. e.returnValue = s.processSnapshotRequest(req)
  426. default:
  427. err = NotLeaderError
  428. }
  429. }
  430. // Callback to event.
  431. e.c <- err
  432. case <-timeoutChan:
  433. // only allow synced follower to promote to candidate
  434. if s.promotable() {
  435. s.setState(Candidate)
  436. } else {
  437. update = true
  438. }
  439. }
  440. // Converts to candidate if election timeout elapses without either:
  441. // 1.Receiving valid AppendEntries RPC, or
  442. // 2.Granting vote to candidate
  443. if update {
  444. timeoutChan = afterBetween(s.ElectionTimeout(), s.ElectionTimeout()*2)
  445. }
  446. // Exit loop on state change.
  447. if s.State() != Follower {
  448. break
  449. }
  450. }
  451. }
  452. // The event loop that is run when the server is in a Candidate state.
  453. func (s *Server) candidateLoop() {
  454. lastLogIndex, lastLogTerm := s.log.lastInfo()
  455. s.leader = ""
  456. for {
  457. // Increment current term, vote for self.
  458. s.currentTerm++
  459. s.votedFor = s.name
  460. // Send RequestVote RPCs to all other servers.
  461. respChan := make(chan *RequestVoteResponse, len(s.peers))
  462. for _, peer := range s.peers {
  463. go peer.sendVoteRequest(newRequestVoteRequest(s.currentTerm, s.name, lastLogIndex, lastLogTerm), respChan)
  464. }
  465. // Wait for either:
  466. // * Votes received from majority of servers: become leader
  467. // * AppendEntries RPC received from new leader: step down.
  468. // * Election timeout elapses without election resolution: increment term, start new election
  469. // * Discover higher term: step down (§5.1)
  470. votesGranted := 1
  471. timeoutChan := afterBetween(s.ElectionTimeout(), s.ElectionTimeout()*2)
  472. timeout := false
  473. for {
  474. // If we received enough votes then stop waiting for more votes.
  475. s.debugln("server.candidate.votes: ", votesGranted, " quorum:", s.QuorumSize())
  476. if votesGranted >= s.QuorumSize() {
  477. s.setState(Leader)
  478. break
  479. }
  480. // Collect votes from peers.
  481. select {
  482. case resp := <-respChan:
  483. if resp.VoteGranted {
  484. s.debugln("server.candidate.vote.granted: ", votesGranted)
  485. votesGranted++
  486. } else if resp.Term > s.currentTerm {
  487. s.debugln("server.candidate.vote.failed")
  488. s.setCurrentTerm(resp.Term, "", false)
  489. } else {
  490. s.debugln("server.candidate.vote: denied")
  491. }
  492. case e := <-s.c:
  493. var err error
  494. if e.target == &stopValue {
  495. s.setState(Stopped)
  496. } else {
  497. switch req := e.target.(type) {
  498. case Command:
  499. err = NotLeaderError
  500. case *AppendEntriesRequest:
  501. e.returnValue, _ = s.processAppendEntriesRequest(req)
  502. case *RequestVoteRequest:
  503. e.returnValue, _ = s.processRequestVoteRequest(req)
  504. }
  505. }
  506. // Callback to event.
  507. e.c <- err
  508. case <-timeoutChan:
  509. timeout = true
  510. }
  511. // both process AER and RVR can make the server to follower
  512. // also break when timeout happens
  513. if s.State() != Candidate || timeout {
  514. break
  515. }
  516. }
  517. // break when we are not candidate
  518. if s.State() != Candidate {
  519. break
  520. }
  521. // continue when timeout happened
  522. }
  523. }
  524. // The event loop that is run when the server is in a Leader state.
  525. func (s *Server) leaderLoop() {
  526. s.setState(Leader)
  527. s.syncedPeer = make(map[string]bool)
  528. logIndex, _ := s.log.lastInfo()
  529. // Update the peers prevLogIndex to leader's lastLogIndex and start heartbeat.
  530. s.debugln("leaderLoop.set.PrevIndex to ", logIndex)
  531. for _, peer := range s.peers {
  532. peer.setPrevLogIndex(logIndex)
  533. peer.startHeartbeat()
  534. }
  535. go s.Do(NOPCommand{})
  536. // Begin to collect response from followers
  537. for {
  538. var err error
  539. select {
  540. case e := <-s.c:
  541. if e.target == &stopValue {
  542. s.setState(Stopped)
  543. } else {
  544. switch req := e.target.(type) {
  545. case Command:
  546. s.processCommand(req, e)
  547. continue
  548. case *AppendEntriesRequest:
  549. e.returnValue, _ = s.processAppendEntriesRequest(req)
  550. case *AppendEntriesResponse:
  551. s.processAppendEntriesResponse(req)
  552. case *RequestVoteRequest:
  553. e.returnValue, _ = s.processRequestVoteRequest(req)
  554. }
  555. }
  556. // Callback to event.
  557. e.c <- err
  558. }
  559. // Exit loop on state change.
  560. if s.State() != Leader {
  561. break
  562. }
  563. }
  564. // Stop all peers.
  565. for _, peer := range s.peers {
  566. peer.stopHeartbeat(false)
  567. }
  568. s.syncedPeer = nil
  569. }
  570. func (s *Server) snapshotLoop() {
  571. s.setState(Snapshotting)
  572. for {
  573. var err error
  574. e := <-s.c
  575. if e.target == &stopValue {
  576. s.setState(Stopped)
  577. } else {
  578. switch req := e.target.(type) {
  579. case Command:
  580. err = NotLeaderError
  581. case *AppendEntriesRequest:
  582. e.returnValue, _ = s.processAppendEntriesRequest(req)
  583. case *RequestVoteRequest:
  584. e.returnValue, _ = s.processRequestVoteRequest(req)
  585. case *SnapshotRecoveryRequest:
  586. e.returnValue = s.processSnapshotRecoveryRequest(req)
  587. }
  588. }
  589. // Callback to event.
  590. e.c <- err
  591. // Exit loop on state change.
  592. if s.State() != Snapshotting {
  593. break
  594. }
  595. }
  596. }
  597. //--------------------------------------
  598. // Commands
  599. //--------------------------------------
  600. // Attempts to execute a command and replicate it. The function will return
  601. // when the command has been successfully committed or an error has occurred.
  602. func (s *Server) Do(command Command) (interface{}, error) {
  603. return s.send(command)
  604. }
  605. // Processes a command.
  606. func (s *Server) processCommand(command Command, e *event) {
  607. s.debugln("server.command.process")
  608. // Create an entry for the command in the log.
  609. entry, err := s.log.createEntry(s.currentTerm, command)
  610. if err != nil {
  611. s.debugln("server.command.log.entry.error:", err)
  612. e.c <- err
  613. return
  614. }
  615. if err := s.log.appendEntry(entry); err != nil {
  616. s.debugln("server.command.log.error:", err)
  617. e.c <- err
  618. return
  619. }
  620. // Issue a callback for the entry once it's committed.
  621. go func() {
  622. // Wait for the entry to be committed.
  623. select {
  624. case <-entry.commit:
  625. var err error
  626. s.debugln("server.command.commit")
  627. e.returnValue, err = s.log.getEntryResult(entry, true)
  628. e.c <- err
  629. case <-time.After(time.Second):
  630. s.debugln("server.command.timeout")
  631. e.c <- CommandTimeoutError
  632. }
  633. }()
  634. // Issue an append entries response for the server.
  635. resp := newAppendEntriesResponse(s.currentTerm, true, s.log.currentIndex(), s.log.CommitIndex())
  636. resp.append = true
  637. resp.peer = s.Name()
  638. // this must be async
  639. // sendAsync is not really async every time
  640. // when the sending speed of the user is larger than
  641. // the processing speed of the server, the buffered channel
  642. // will be full. Then sendAsync will become sync, which will
  643. // cause deadlock here.
  644. // so we use a goroutine to avoid the deadlock
  645. go s.sendAsync(resp)
  646. }
  647. //--------------------------------------
  648. // Append Entries
  649. //--------------------------------------
  650. // Appends zero or more log entry from the leader to this server.
  651. func (s *Server) AppendEntries(req *AppendEntriesRequest) *AppendEntriesResponse {
  652. ret, _ := s.send(req)
  653. resp, _ := ret.(*AppendEntriesResponse)
  654. return resp
  655. }
  656. // Processes the "append entries" request.
  657. func (s *Server) processAppendEntriesRequest(req *AppendEntriesRequest) (*AppendEntriesResponse, bool) {
  658. s.traceln("server.ae.process")
  659. if req.Term < s.currentTerm {
  660. s.debugln("server.ae.error: stale term")
  661. return newAppendEntriesResponse(s.currentTerm, false, s.log.currentIndex(), s.log.CommitIndex()), false
  662. }
  663. // Update term and leader.
  664. s.setCurrentTerm(req.Term, req.LeaderName, true)
  665. // Reject if log doesn't contain a matching previous entry.
  666. if err := s.log.truncate(req.PrevLogIndex, req.PrevLogTerm); err != nil {
  667. s.debugln("server.ae.truncate.error: ", err)
  668. return newAppendEntriesResponse(s.currentTerm, false, s.log.currentIndex(), s.log.CommitIndex()), true
  669. }
  670. // Append entries to the log.
  671. if err := s.log.appendEntries(req.Entries); err != nil {
  672. s.debugln("server.ae.append.error: ", err)
  673. return newAppendEntriesResponse(s.currentTerm, false, s.log.currentIndex(), s.log.CommitIndex()), true
  674. }
  675. // Commit up to the commit index.
  676. if err := s.log.setCommitIndex(req.CommitIndex); err != nil {
  677. s.debugln("server.ae.commit.error: ", err)
  678. return newAppendEntriesResponse(s.currentTerm, false, s.log.currentIndex(), s.log.CommitIndex()), true
  679. }
  680. // once the server appended and commited all the log entries from the leader
  681. return newAppendEntriesResponse(s.currentTerm, true, s.log.currentIndex(), s.log.CommitIndex()), true
  682. }
  683. // Processes the "append entries" response from the peer. This is only
  684. // processed when the server is a leader. Responses received during other
  685. // states are dropped.
  686. func (s *Server) processAppendEntriesResponse(resp *AppendEntriesResponse) {
  687. // If we find a higher term then change to a follower and exit.
  688. if resp.Term > s.currentTerm {
  689. s.setCurrentTerm(resp.Term, "", false)
  690. return
  691. }
  692. // panic response if it's not successful.
  693. if !resp.Success {
  694. return
  695. }
  696. // if one peer successfully append a log from the leader term,
  697. // we add it to the synced list
  698. if resp.append == true {
  699. s.syncedPeer[resp.peer] = true
  700. }
  701. // Increment the commit count to make sure we have a quorum before committing.
  702. if len(s.syncedPeer) < s.QuorumSize() {
  703. return
  704. }
  705. // Determine the committed index that a majority has.
  706. var indices []uint64
  707. indices = append(indices, s.log.currentIndex())
  708. for _, peer := range s.peers {
  709. indices = append(indices, peer.getPrevLogIndex())
  710. }
  711. sort.Sort(uint64Slice(indices))
  712. // We can commit up to the index which the majority of the members have appended.
  713. commitIndex := indices[s.QuorumSize()-1]
  714. committedIndex := s.log.commitIndex
  715. if commitIndex > committedIndex {
  716. s.log.setCommitIndex(commitIndex)
  717. s.debugln("commit index ", commitIndex)
  718. for i := committedIndex; i < commitIndex; i++ {
  719. if entry := s.log.getEntry(i + 1); entry != nil {
  720. // if the leader is a new one and the entry came from the
  721. // old leader, the commit channel will be nil and no go routine
  722. // is waiting from this channel
  723. // if we try to send to it, the new leader will get stuck
  724. if entry.commit != nil {
  725. select {
  726. case entry.commit <- true:
  727. default:
  728. panic("server unable to send signal to commit channel")
  729. }
  730. }
  731. }
  732. }
  733. }
  734. }
  735. //--------------------------------------
  736. // Request Vote
  737. //--------------------------------------
  738. // Requests a vote from a server. A vote can be obtained if the vote's term is
  739. // at the server's current term and the server has not made a vote yet. A vote
  740. // can also be obtained if the term is greater than the server's current term.
  741. func (s *Server) RequestVote(req *RequestVoteRequest) *RequestVoteResponse {
  742. ret, _ := s.send(req)
  743. resp, _ := ret.(*RequestVoteResponse)
  744. return resp
  745. }
  746. // Processes a "request vote" request.
  747. func (s *Server) processRequestVoteRequest(req *RequestVoteRequest) (*RequestVoteResponse, bool) {
  748. // If the request is coming from an old term then reject it.
  749. if req.Term < s.currentTerm {
  750. s.debugln("server.rv.error: stale term")
  751. return newRequestVoteResponse(s.currentTerm, false), false
  752. }
  753. s.setCurrentTerm(req.Term, "", false)
  754. // If we've already voted for a different candidate then don't vote for this candidate.
  755. if s.votedFor != "" && s.votedFor != req.CandidateName {
  756. s.debugln("server.rv.error: duplicate vote: ", req.CandidateName,
  757. " already vote for ", s.votedFor)
  758. return newRequestVoteResponse(s.currentTerm, false), false
  759. }
  760. // If the candidate's log is not at least as up-to-date as our last log then don't vote.
  761. lastIndex, lastTerm := s.log.lastInfo()
  762. if lastIndex > req.LastLogIndex || lastTerm > req.LastLogTerm {
  763. s.debugln("server.rv.error: out of date log: ", req.CandidateName,
  764. "Index :[", lastIndex, "]", " [", req.LastLogIndex, "]",
  765. "Term :[", lastTerm, "]", " [", req.LastLogTerm, "]")
  766. return newRequestVoteResponse(s.currentTerm, false), false
  767. }
  768. // If we made it this far then cast a vote and reset our election time out.
  769. s.debugln("server.rv.vote: ", s.name, " votes for", req.CandidateName, "at term", req.Term)
  770. s.votedFor = req.CandidateName
  771. return newRequestVoteResponse(s.currentTerm, true), true
  772. }
  773. //--------------------------------------
  774. // Membership
  775. //--------------------------------------
  776. // Adds a peer to the server.
  777. func (s *Server) AddPeer(name string, connectiongString string) error {
  778. s.debugln("server.peer.add: ", name, len(s.peers))
  779. // Do not allow peers to be added twice.
  780. if s.peers[name] != nil {
  781. return nil
  782. }
  783. // Skip the Peer if it has the same name as the Server
  784. if s.name != name {
  785. peer := newPeer(s, name, connectiongString, s.heartbeatTimeout)
  786. if s.State() == Leader {
  787. peer.startHeartbeat()
  788. }
  789. s.peers[peer.Name] = peer
  790. }
  791. // Write the configuration to file.
  792. s.writeConf()
  793. return nil
  794. }
  795. // Removes a peer from the server.
  796. func (s *Server) RemovePeer(name string) error {
  797. s.debugln("server.peer.remove: ", name, len(s.peers))
  798. // Skip the Peer if it has the same name as the Server
  799. if name != s.Name() {
  800. // Return error if peer doesn't exist.
  801. peer := s.peers[name]
  802. if peer == nil {
  803. return fmt.Errorf("raft: Peer not found: %s", name)
  804. }
  805. // Stop peer and remove it.
  806. if s.State() == Leader {
  807. peer.stopHeartbeat(true)
  808. }
  809. delete(s.peers, name)
  810. }
  811. // Write the configuration to file.
  812. s.writeConf()
  813. return nil
  814. }
  815. //--------------------------------------
  816. // Log compaction
  817. //--------------------------------------
  818. func (s *Server) TakeSnapshot() error {
  819. //TODO put a snapshot mutex
  820. s.debugln("take Snapshot")
  821. if s.currentSnapshot != nil {
  822. return errors.New("handling snapshot")
  823. }
  824. lastIndex, lastTerm := s.log.commitInfo()
  825. if lastIndex == 0 {
  826. return errors.New("No logs")
  827. }
  828. path := s.SnapshotPath(lastIndex, lastTerm)
  829. var state []byte
  830. var err error
  831. if s.stateMachine != nil {
  832. state, err = s.stateMachine.Save()
  833. if err != nil {
  834. return err
  835. }
  836. } else {
  837. state = []byte{0}
  838. }
  839. peers := make([]*Peer, len(s.peers)+1)
  840. i := 0
  841. for _, peer := range s.peers {
  842. peers[i] = peer.clone()
  843. }
  844. peers[i] = &Peer{
  845. Name: s.Name(),
  846. ConnectionString: s.connectionString,
  847. }
  848. s.currentSnapshot = &Snapshot{lastIndex, lastTerm, peers, state, path}
  849. s.saveSnapshot()
  850. // We keep some log entries after the snapshot
  851. // We do not want to send the whole snapshot
  852. // to the slightly slow machines
  853. if lastIndex-s.log.startIndex > NumberOfLogEntriesAfterSnapshot {
  854. compactIndex := lastIndex - NumberOfLogEntriesAfterSnapshot
  855. compactTerm := s.log.getEntry(compactIndex).Term
  856. s.log.compact(compactIndex, compactTerm)
  857. }
  858. return nil
  859. }
  860. // Retrieves the log path for the server.
  861. func (s *Server) saveSnapshot() error {
  862. if s.currentSnapshot == nil {
  863. return errors.New("no snapshot to save")
  864. }
  865. err := s.currentSnapshot.save()
  866. if err != nil {
  867. return err
  868. }
  869. tmp := s.lastSnapshot
  870. s.lastSnapshot = s.currentSnapshot
  871. // delete the previous snapshot if there is any change
  872. if tmp != nil && !(tmp.LastIndex == s.lastSnapshot.LastIndex && tmp.LastTerm == s.lastSnapshot.LastTerm) {
  873. tmp.remove()
  874. }
  875. s.currentSnapshot = nil
  876. return nil
  877. }
  878. // Retrieves the log path for the server.
  879. func (s *Server) SnapshotPath(lastIndex uint64, lastTerm uint64) string {
  880. return path.Join(s.path, "snapshot", fmt.Sprintf("%v_%v.ss", lastTerm, lastIndex))
  881. }
  882. func (s *Server) RequestSnapshot(req *SnapshotRequest) *SnapshotResponse {
  883. ret, _ := s.send(req)
  884. resp, _ := ret.(*SnapshotResponse)
  885. return resp
  886. }
  887. func (s *Server) processSnapshotRequest(req *SnapshotRequest) *SnapshotResponse {
  888. // If the follower’s log contains an entry at the snapshot’s last index with a term
  889. // that matches the snapshot’s last term
  890. // Then the follower already has all the information found in the snapshot
  891. // and can reply false
  892. entry := s.log.getEntry(req.LastIndex)
  893. if entry != nil && entry.Term == req.LastTerm {
  894. return newSnapshotResponse(false)
  895. }
  896. s.setState(Snapshotting)
  897. return newSnapshotResponse(true)
  898. }
  899. func (s *Server) SnapshotRecoveryRequest(req *SnapshotRecoveryRequest) *SnapshotRecoveryResponse {
  900. ret, _ := s.send(req)
  901. resp, _ := ret.(*SnapshotRecoveryResponse)
  902. return resp
  903. }
  904. func (s *Server) processSnapshotRecoveryRequest(req *SnapshotRecoveryRequest) *SnapshotRecoveryResponse {
  905. s.stateMachine.Recovery(req.State)
  906. // clear the peer map
  907. s.peers = make(map[string]*Peer)
  908. // recovery the cluster configuration
  909. for _, peer := range req.Peers {
  910. s.AddPeer(peer.Name, peer.ConnectionString)
  911. }
  912. //update term and index
  913. s.currentTerm = req.LastTerm
  914. s.log.updateCommitIndex(req.LastIndex)
  915. snapshotPath := s.SnapshotPath(req.LastIndex, req.LastTerm)
  916. s.currentSnapshot = &Snapshot{req.LastIndex, req.LastTerm, req.Peers, req.State, snapshotPath}
  917. s.saveSnapshot()
  918. // clear the previous log entries
  919. s.log.compact(req.LastIndex, req.LastTerm)
  920. return newSnapshotRecoveryResponse(req.LastTerm, true, req.LastIndex)
  921. }
  922. // Load a snapshot at restart
  923. func (s *Server) LoadSnapshot() error {
  924. dir, err := os.OpenFile(path.Join(s.path, "snapshot"), os.O_RDONLY, 0)
  925. if err != nil {
  926. return err
  927. }
  928. filenames, err := dir.Readdirnames(-1)
  929. if err != nil {
  930. dir.Close()
  931. panic(err)
  932. }
  933. dir.Close()
  934. if len(filenames) == 0 {
  935. return errors.New("no snapshot")
  936. }
  937. // not sure how many snapshot we should keep
  938. sort.Strings(filenames)
  939. snapshotPath := path.Join(s.path, "snapshot", filenames[len(filenames)-1])
  940. // should not fail
  941. file, err := os.OpenFile(snapshotPath, os.O_RDONLY, 0)
  942. defer file.Close()
  943. if err != nil {
  944. panic(err)
  945. }
  946. // TODO check checksum first
  947. var snapshotBytes []byte
  948. var checksum uint32
  949. n, err := fmt.Fscanf(file, "%08x\n", &checksum)
  950. if err != nil {
  951. return err
  952. }
  953. if n != 1 {
  954. return errors.New("Bad snapshot file")
  955. }
  956. snapshotBytes, _ = ioutil.ReadAll(file)
  957. s.debugln(string(snapshotBytes))
  958. // Generate checksum.
  959. byteChecksum := crc32.ChecksumIEEE(snapshotBytes)
  960. if uint32(checksum) != byteChecksum {
  961. s.debugln(checksum, " ", byteChecksum)
  962. return errors.New("bad snapshot file")
  963. }
  964. err = json.Unmarshal(snapshotBytes, &s.lastSnapshot)
  965. if err != nil {
  966. s.debugln("unmarshal error: ", err)
  967. return err
  968. }
  969. err = s.stateMachine.Recovery(s.lastSnapshot.State)
  970. if err != nil {
  971. s.debugln("recovery error: ", err)
  972. return err
  973. }
  974. for _, peer := range s.lastSnapshot.Peers {
  975. s.AddPeer(peer.Name, peer.ConnectionString)
  976. }
  977. s.log.startTerm = s.lastSnapshot.LastTerm
  978. s.log.startIndex = s.lastSnapshot.LastIndex
  979. s.log.updateCommitIndex(s.lastSnapshot.LastIndex)
  980. return err
  981. }
  982. //--------------------------------------
  983. // Config File
  984. //--------------------------------------
  985. func (s *Server) writeConf() {
  986. peers := make([]*Peer, len(s.peers))
  987. i := 0
  988. for _, peer := range s.peers {
  989. peers[i] = peer.clone()
  990. i++
  991. }
  992. r := &Config{
  993. CommitIndex: s.log.commitIndex,
  994. Peers: peers,
  995. }
  996. b, _ := json.Marshal(r)
  997. confPath := path.Join(s.path, "conf")
  998. tmpConfPath := path.Join(s.path, "conf.tmp")
  999. err := ioutil.WriteFile(tmpConfPath, b, 0600)
  1000. if err != nil {
  1001. panic(err)
  1002. }
  1003. os.Rename(tmpConfPath, confPath)
  1004. }
  1005. // Read the configuration for the server.
  1006. func (s *Server) readConf() error {
  1007. confPath := path.Join(s.path, "conf")
  1008. s.debugln("readConf.open ", confPath)
  1009. // open conf file
  1010. b, err := ioutil.ReadFile(confPath)
  1011. if err != nil {
  1012. return nil
  1013. }
  1014. conf := &Config{}
  1015. if err = json.Unmarshal(b, conf); err != nil {
  1016. return err
  1017. }
  1018. s.log.updateCommitIndex(conf.CommitIndex)
  1019. return nil
  1020. }
  1021. //--------------------------------------
  1022. // Debugging
  1023. //--------------------------------------
  1024. func (s *Server) debugln(v ...interface{}) {
  1025. debugf("[%s Term:%d] %s", s.name, s.currentTerm, fmt.Sprintln(v...))
  1026. }
  1027. func (s *Server) traceln(v ...interface{}) {
  1028. tracef("[%s] %s", s.name, fmt.Sprintln(v...))
  1029. }