server.go 33 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325
  1. package raft
  2. import (
  3. "encoding/json"
  4. "errors"
  5. "fmt"
  6. "hash/crc32"
  7. "io/ioutil"
  8. "os"
  9. "path"
  10. "sort"
  11. "sync"
  12. "time"
  13. )
  14. //------------------------------------------------------------------------------
  15. //
  16. // Constants
  17. //
  18. //------------------------------------------------------------------------------
  19. const (
  20. Stopped = "stopped"
  21. Follower = "follower"
  22. Candidate = "candidate"
  23. Leader = "leader"
  24. Snapshotting = "snapshotting"
  25. )
  26. const (
  27. MaxLogEntriesPerRequest = 2000
  28. NumberOfLogEntriesAfterSnapshot = 200
  29. )
  30. const (
  31. DefaultHeartbeatTimeout = 50 * time.Millisecond
  32. DefaultElectionTimeout = 150 * time.Millisecond
  33. )
  34. var stopValue interface{}
  35. //------------------------------------------------------------------------------
  36. //
  37. // Errors
  38. //
  39. //------------------------------------------------------------------------------
  40. var NotLeaderError = errors.New("raft.Server: Not current leader")
  41. var DuplicatePeerError = errors.New("raft.Server: Duplicate peer")
  42. var CommandTimeoutError = errors.New("raft: Command timeout")
  43. //------------------------------------------------------------------------------
  44. //
  45. // Typedefs
  46. //
  47. //------------------------------------------------------------------------------
  48. // A server is involved in the consensus protocol and can act as a follower,
  49. // candidate or a leader.
  50. type Server interface {
  51. Name() string
  52. Context() interface{}
  53. StateMachine() StateMachine
  54. Leader() string
  55. State() string
  56. Path() string
  57. LogPath() string
  58. SnapshotPath(lastIndex uint64, lastTerm uint64) string
  59. Term() uint64
  60. CommitIndex() uint64
  61. VotedFor() string
  62. MemberCount() int
  63. QuorumSize() int
  64. IsLogEmpty() bool
  65. LogEntries() []*LogEntry
  66. LastCommandName() string
  67. GetState() string
  68. ElectionTimeout() time.Duration
  69. SetElectionTimeout(duration time.Duration)
  70. HeartbeatTimeout() time.Duration
  71. SetHeartbeatTimeout(duration time.Duration)
  72. Transporter() Transporter
  73. SetTransporter(t Transporter)
  74. AppendEntries(req *AppendEntriesRequest) *AppendEntriesResponse
  75. RequestVote(req *RequestVoteRequest) *RequestVoteResponse
  76. RequestSnapshot(req *SnapshotRequest) *SnapshotResponse
  77. SnapshotRecoveryRequest(req *SnapshotRecoveryRequest) *SnapshotRecoveryResponse
  78. AddPeer(name string, connectiongString string) error
  79. RemovePeer(name string) error
  80. Peers() map[string]*Peer
  81. Start() error
  82. Stop()
  83. Running() bool
  84. Do(command Command) (interface{}, error)
  85. TakeSnapshot() error
  86. LoadSnapshot() error
  87. }
  88. type server struct {
  89. name string
  90. path string
  91. state string
  92. transporter Transporter
  93. context interface{}
  94. currentTerm uint64
  95. votedFor string
  96. log *Log
  97. leader string
  98. peers map[string]*Peer
  99. mutex sync.RWMutex
  100. syncedPeer map[string]bool
  101. c chan *event
  102. electionTimeout time.Duration
  103. heartbeatTimeout time.Duration
  104. currentSnapshot *Snapshot
  105. lastSnapshot *Snapshot
  106. stateMachine StateMachine
  107. maxLogEntriesPerRequest uint64
  108. connectionString string
  109. }
  110. // An event to be processed by the server's event loop.
  111. type event struct {
  112. target interface{}
  113. returnValue interface{}
  114. c chan error
  115. }
  116. //------------------------------------------------------------------------------
  117. //
  118. // Constructor
  119. //
  120. //------------------------------------------------------------------------------
  121. // Creates a new server with a log at the given path.
  122. func NewServer(name string, path string, transporter Transporter, stateMachine StateMachine, context interface{}, connectionString string) (Server, error) {
  123. if name == "" {
  124. return nil, errors.New("raft.Server: Name cannot be blank")
  125. }
  126. if transporter == nil {
  127. panic("raft: Transporter required")
  128. }
  129. s := &server{
  130. name: name,
  131. path: path,
  132. transporter: transporter,
  133. stateMachine: stateMachine,
  134. context: context,
  135. state: Stopped,
  136. peers: make(map[string]*Peer),
  137. log: newLog(),
  138. c: make(chan *event, 256),
  139. electionTimeout: DefaultElectionTimeout,
  140. heartbeatTimeout: DefaultHeartbeatTimeout,
  141. maxLogEntriesPerRequest: MaxLogEntriesPerRequest,
  142. connectionString: connectionString,
  143. }
  144. // Setup apply function.
  145. s.log.ApplyFunc = func(c Command) (interface{}, error) {
  146. result, err := c.Apply(s)
  147. return result, err
  148. }
  149. return s, nil
  150. }
  151. //------------------------------------------------------------------------------
  152. //
  153. // Accessors
  154. //
  155. //------------------------------------------------------------------------------
  156. //--------------------------------------
  157. // General
  158. //--------------------------------------
  159. // Retrieves the name of the server.
  160. func (s *server) Name() string {
  161. return s.name
  162. }
  163. // Retrieves the storage path for the server.
  164. func (s *server) Path() string {
  165. return s.path
  166. }
  167. // The name of the current leader.
  168. func (s *server) Leader() string {
  169. return s.leader
  170. }
  171. // Retrieves a copy of the peer data.
  172. func (s *server) Peers() map[string]*Peer {
  173. s.mutex.Lock()
  174. defer s.mutex.Unlock()
  175. peers := make(map[string]*Peer)
  176. for name, peer := range s.peers {
  177. peers[name] = peer.clone()
  178. }
  179. return peers
  180. }
  181. // Retrieves the object that transports requests.
  182. func (s *server) Transporter() Transporter {
  183. s.mutex.RLock()
  184. defer s.mutex.RUnlock()
  185. return s.transporter
  186. }
  187. func (s *server) SetTransporter(t Transporter) {
  188. s.mutex.Lock()
  189. defer s.mutex.Unlock()
  190. s.transporter = t
  191. }
  192. // Retrieves the context passed into the constructor.
  193. func (s *server) Context() interface{} {
  194. return s.context
  195. }
  196. // Retrieves the state machine passed into the constructor.
  197. func (s *server) StateMachine() StateMachine {
  198. return s.stateMachine
  199. }
  200. // Retrieves the log path for the server.
  201. func (s *server) LogPath() string {
  202. return path.Join(s.path, "log")
  203. }
  204. // Retrieves the current state of the server.
  205. func (s *server) State() string {
  206. s.mutex.RLock()
  207. defer s.mutex.RUnlock()
  208. return s.state
  209. }
  210. // Sets the state of the server.
  211. func (s *server) setState(state string) {
  212. s.mutex.Lock()
  213. defer s.mutex.Unlock()
  214. s.state = state
  215. if state == Leader {
  216. s.leader = s.Name()
  217. }
  218. }
  219. // Retrieves the current term of the server.
  220. func (s *server) Term() uint64 {
  221. return s.currentTerm
  222. }
  223. // Retrieves the current commit index of the server.
  224. func (s *server) CommitIndex() uint64 {
  225. return s.log.commitIndex
  226. }
  227. // Retrieves the name of the candidate this server voted for in this term.
  228. func (s *server) VotedFor() string {
  229. return s.votedFor
  230. }
  231. // Retrieves whether the server's log has no entries.
  232. func (s *server) IsLogEmpty() bool {
  233. return s.log.isEmpty()
  234. }
  235. // A list of all the log entries. This should only be used for debugging purposes.
  236. func (s *server) LogEntries() []*LogEntry {
  237. return s.log.entries
  238. }
  239. // A reference to the command name of the last entry.
  240. func (s *server) LastCommandName() string {
  241. return s.log.lastCommandName()
  242. }
  243. // Get the state of the server for debugging
  244. func (s *server) GetState() string {
  245. s.mutex.RLock()
  246. defer s.mutex.RUnlock()
  247. return fmt.Sprintf("Name: %s, State: %s, Term: %v, CommitedIndex: %v ", s.name, s.state, s.currentTerm, s.log.commitIndex)
  248. }
  249. // Check if the server is promotable
  250. func (s *server) promotable() bool {
  251. return s.log.currentIndex() > 0
  252. }
  253. //--------------------------------------
  254. // Membership
  255. //--------------------------------------
  256. // Retrieves the number of member servers in the consensus.
  257. func (s *server) MemberCount() int {
  258. s.mutex.Lock()
  259. defer s.mutex.Unlock()
  260. return len(s.peers) + 1
  261. }
  262. // Retrieves the number of servers required to make a quorum.
  263. func (s *server) QuorumSize() int {
  264. return (s.MemberCount() / 2) + 1
  265. }
  266. //--------------------------------------
  267. // Election timeout
  268. //--------------------------------------
  269. // Retrieves the election timeout.
  270. func (s *server) ElectionTimeout() time.Duration {
  271. s.mutex.RLock()
  272. defer s.mutex.RUnlock()
  273. return s.electionTimeout
  274. }
  275. // Sets the election timeout.
  276. func (s *server) SetElectionTimeout(duration time.Duration) {
  277. s.mutex.Lock()
  278. defer s.mutex.Unlock()
  279. s.electionTimeout = duration
  280. }
  281. //--------------------------------------
  282. // Heartbeat timeout
  283. //--------------------------------------
  284. // Retrieves the heartbeat timeout.
  285. func (s *server) HeartbeatTimeout() time.Duration {
  286. s.mutex.RLock()
  287. defer s.mutex.RUnlock()
  288. return s.heartbeatTimeout
  289. }
  290. // Sets the heartbeat timeout.
  291. func (s *server) SetHeartbeatTimeout(duration time.Duration) {
  292. s.mutex.Lock()
  293. defer s.mutex.Unlock()
  294. s.heartbeatTimeout = duration
  295. for _, peer := range s.peers {
  296. peer.setHeartbeatTimeout(duration)
  297. }
  298. }
  299. //------------------------------------------------------------------------------
  300. //
  301. // Methods
  302. //
  303. //------------------------------------------------------------------------------
  304. //--------------------------------------
  305. // Initialization
  306. //--------------------------------------
  307. // Reg the NOPCommand
  308. func init() {
  309. RegisterCommand(&NOPCommand{})
  310. RegisterCommand(&DefaultJoinCommand{})
  311. RegisterCommand(&DefaultLeaveCommand{})
  312. }
  313. // Start as follow
  314. // If log entries exist then allow promotion to candidate if no AEs received.
  315. // If no log entries exist then wait for AEs from another node.
  316. // If no log entries exist and a self-join command is issued then
  317. // immediately become leader and commit entry.
  318. func (s *server) Start() error {
  319. // Exit if the server is already running.
  320. if s.state != Stopped {
  321. return errors.New("raft.Server: Server already running")
  322. }
  323. // Create snapshot directory if not exist
  324. os.Mkdir(path.Join(s.path, "snapshot"), 0700)
  325. if err := s.readConf(); err != nil {
  326. s.debugln("raft: Conf file error: ", err)
  327. return fmt.Errorf("raft: Initialization error: %s", err)
  328. }
  329. // Initialize the log and load it up.
  330. if err := s.log.open(s.LogPath()); err != nil {
  331. s.debugln("raft: Log error: ", err)
  332. return fmt.Errorf("raft: Initialization error: %s", err)
  333. }
  334. // Update the term to the last term in the log.
  335. _, s.currentTerm = s.log.lastInfo()
  336. s.setState(Follower)
  337. // If no log entries exist then
  338. // 1. wait for AEs from another node
  339. // 2. wait for self-join command
  340. // to set itself promotable
  341. if !s.promotable() {
  342. s.debugln("start as a new raft server")
  343. // If log entries exist then allow promotion to candidate
  344. // if no AEs received.
  345. } else {
  346. s.debugln("start from previous saved state")
  347. }
  348. debugln(s.GetState())
  349. go s.loop()
  350. return nil
  351. }
  352. // Shuts down the server.
  353. func (s *server) Stop() {
  354. s.send(&stopValue)
  355. s.mutex.Lock()
  356. defer s.mutex.Unlock()
  357. s.log.close()
  358. }
  359. // Checks if the server is currently running.
  360. func (s *server) Running() bool {
  361. s.mutex.RLock()
  362. defer s.mutex.RUnlock()
  363. return s.state != Stopped
  364. }
  365. //--------------------------------------
  366. // Term
  367. //--------------------------------------
  368. // Sets the current term for the server. This is only used when an external
  369. // current term is found.
  370. func (s *server) setCurrentTerm(term uint64, leaderName string, append bool) {
  371. s.mutex.Lock()
  372. defer s.mutex.Unlock()
  373. // update the term and clear vote for
  374. if term > s.currentTerm {
  375. s.state = Follower
  376. s.currentTerm = term
  377. s.leader = leaderName
  378. s.votedFor = ""
  379. return
  380. }
  381. // discover new leader when candidate
  382. // save leader name when follower
  383. if term == s.currentTerm && s.state != Leader && append {
  384. s.state = Follower
  385. s.leader = leaderName
  386. }
  387. }
  388. //--------------------------------------
  389. // Event Loop
  390. //--------------------------------------
  391. // ________
  392. // --|Snapshot| timeout
  393. // | -------- ______
  394. // recover | ^ | |
  395. // snapshot / | |snapshot | |
  396. // higher | | v | recv majority votes
  397. // term | -------- timeout ----------- -----------
  398. // |-> |Follower| ----------> | Candidate |--------------------> | Leader |
  399. // -------- ----------- -----------
  400. // ^ higher term/ | higher term |
  401. // | new leader | |
  402. // |_______________________|____________________________________ |
  403. // The main event loop for the server
  404. func (s *server) loop() {
  405. defer s.debugln("server.loop.end")
  406. for {
  407. state := s.State()
  408. s.debugln("server.loop.run ", state)
  409. switch state {
  410. case Follower:
  411. s.followerLoop()
  412. case Candidate:
  413. s.candidateLoop()
  414. case Leader:
  415. s.leaderLoop()
  416. case Snapshotting:
  417. s.snapshotLoop()
  418. case Stopped:
  419. return
  420. }
  421. }
  422. }
  423. // Sends an event to the event loop to be processed. The function will wait
  424. // until the event is actually processed before returning.
  425. func (s *server) send(value interface{}) (interface{}, error) {
  426. event := s.sendAsync(value)
  427. err := <-event.c
  428. return event.returnValue, err
  429. }
  430. func (s *server) sendAsync(value interface{}) *event {
  431. event := &event{target: value, c: make(chan error, 1)}
  432. s.c <- event
  433. return event
  434. }
  435. // The event loop that is run when the server is in a Follower state.
  436. // Responds to RPCs from candidates and leaders.
  437. // Converts to candidate if election timeout elapses without either:
  438. // 1.Receiving valid AppendEntries RPC, or
  439. // 2.Granting vote to candidate
  440. func (s *server) followerLoop() {
  441. s.setState(Follower)
  442. timeoutChan := afterBetween(s.ElectionTimeout(), s.ElectionTimeout()*2)
  443. for {
  444. var err error
  445. update := false
  446. select {
  447. case e := <-s.c:
  448. if e.target == &stopValue {
  449. s.setState(Stopped)
  450. } else {
  451. switch req := e.target.(type) {
  452. case JoinCommand:
  453. //If no log entries exist and a self-join command is issued
  454. //then immediately become leader and commit entry.
  455. if s.log.currentIndex() == 0 && req.NodeName() == s.Name() {
  456. s.debugln("selfjoin and promote to leader")
  457. s.setState(Leader)
  458. s.processCommand(req, e)
  459. } else {
  460. err = NotLeaderError
  461. }
  462. case *AppendEntriesRequest:
  463. e.returnValue, update = s.processAppendEntriesRequest(req)
  464. case *RequestVoteRequest:
  465. e.returnValue, update = s.processRequestVoteRequest(req)
  466. case *SnapshotRequest:
  467. e.returnValue = s.processSnapshotRequest(req)
  468. default:
  469. err = NotLeaderError
  470. }
  471. }
  472. // Callback to event.
  473. e.c <- err
  474. case <-timeoutChan:
  475. // only allow synced follower to promote to candidate
  476. if s.promotable() {
  477. s.setState(Candidate)
  478. } else {
  479. update = true
  480. }
  481. }
  482. // Converts to candidate if election timeout elapses without either:
  483. // 1.Receiving valid AppendEntries RPC, or
  484. // 2.Granting vote to candidate
  485. if update {
  486. timeoutChan = afterBetween(s.ElectionTimeout(), s.ElectionTimeout()*2)
  487. }
  488. // Exit loop on state change.
  489. if s.State() != Follower {
  490. break
  491. }
  492. }
  493. }
  494. // The event loop that is run when the server is in a Candidate state.
  495. func (s *server) candidateLoop() {
  496. lastLogIndex, lastLogTerm := s.log.lastInfo()
  497. s.leader = ""
  498. for {
  499. // Increment current term, vote for self.
  500. s.currentTerm++
  501. s.votedFor = s.name
  502. // Send RequestVote RPCs to all other servers.
  503. respChan := make(chan *RequestVoteResponse, len(s.peers))
  504. for _, peer := range s.peers {
  505. go peer.sendVoteRequest(newRequestVoteRequest(s.currentTerm, s.name, lastLogIndex, lastLogTerm), respChan)
  506. }
  507. // Wait for either:
  508. // * Votes received from majority of servers: become leader
  509. // * AppendEntries RPC received from new leader: step down.
  510. // * Election timeout elapses without election resolution: increment term, start new election
  511. // * Discover higher term: step down (§5.1)
  512. votesGranted := 1
  513. timeoutChan := afterBetween(s.ElectionTimeout(), s.ElectionTimeout()*2)
  514. timeout := false
  515. for {
  516. // If we received enough votes then stop waiting for more votes.
  517. s.debugln("server.candidate.votes: ", votesGranted, " quorum:", s.QuorumSize())
  518. if votesGranted >= s.QuorumSize() {
  519. s.setState(Leader)
  520. break
  521. }
  522. // Collect votes from peers.
  523. select {
  524. case resp := <-respChan:
  525. if resp.VoteGranted {
  526. s.debugln("server.candidate.vote.granted: ", votesGranted)
  527. votesGranted++
  528. } else if resp.Term > s.currentTerm {
  529. s.debugln("server.candidate.vote.failed")
  530. s.setCurrentTerm(resp.Term, "", false)
  531. } else {
  532. s.debugln("server.candidate.vote: denied")
  533. }
  534. case e := <-s.c:
  535. var err error
  536. if e.target == &stopValue {
  537. s.setState(Stopped)
  538. } else {
  539. switch req := e.target.(type) {
  540. case Command:
  541. err = NotLeaderError
  542. case *AppendEntriesRequest:
  543. e.returnValue, _ = s.processAppendEntriesRequest(req)
  544. case *RequestVoteRequest:
  545. e.returnValue, _ = s.processRequestVoteRequest(req)
  546. }
  547. }
  548. // Callback to event.
  549. e.c <- err
  550. case <-timeoutChan:
  551. timeout = true
  552. }
  553. // both process AER and RVR can make the server to follower
  554. // also break when timeout happens
  555. if s.State() != Candidate || timeout {
  556. break
  557. }
  558. }
  559. // break when we are not candidate
  560. if s.State() != Candidate {
  561. break
  562. }
  563. // continue when timeout happened
  564. }
  565. }
  566. // The event loop that is run when the server is in a Leader state.
  567. func (s *server) leaderLoop() {
  568. s.setState(Leader)
  569. s.syncedPeer = make(map[string]bool)
  570. logIndex, _ := s.log.lastInfo()
  571. // Update the peers prevLogIndex to leader's lastLogIndex and start heartbeat.
  572. s.debugln("leaderLoop.set.PrevIndex to ", logIndex)
  573. for _, peer := range s.peers {
  574. peer.setPrevLogIndex(logIndex)
  575. peer.startHeartbeat()
  576. }
  577. go s.Do(NOPCommand{})
  578. // Begin to collect response from followers
  579. for {
  580. var err error
  581. select {
  582. case e := <-s.c:
  583. if e.target == &stopValue {
  584. s.setState(Stopped)
  585. } else {
  586. switch req := e.target.(type) {
  587. case Command:
  588. s.processCommand(req, e)
  589. continue
  590. case *AppendEntriesRequest:
  591. e.returnValue, _ = s.processAppendEntriesRequest(req)
  592. case *AppendEntriesResponse:
  593. s.processAppendEntriesResponse(req)
  594. case *RequestVoteRequest:
  595. e.returnValue, _ = s.processRequestVoteRequest(req)
  596. }
  597. }
  598. // Callback to event.
  599. e.c <- err
  600. }
  601. // Exit loop on state change.
  602. if s.State() != Leader {
  603. break
  604. }
  605. }
  606. // Stop all peers.
  607. for _, peer := range s.peers {
  608. peer.stopHeartbeat(false)
  609. }
  610. s.syncedPeer = nil
  611. }
  612. func (s *server) snapshotLoop() {
  613. s.setState(Snapshotting)
  614. for {
  615. var err error
  616. e := <-s.c
  617. if e.target == &stopValue {
  618. s.setState(Stopped)
  619. } else {
  620. switch req := e.target.(type) {
  621. case Command:
  622. err = NotLeaderError
  623. case *AppendEntriesRequest:
  624. e.returnValue, _ = s.processAppendEntriesRequest(req)
  625. case *RequestVoteRequest:
  626. e.returnValue, _ = s.processRequestVoteRequest(req)
  627. case *SnapshotRecoveryRequest:
  628. e.returnValue = s.processSnapshotRecoveryRequest(req)
  629. }
  630. }
  631. // Callback to event.
  632. e.c <- err
  633. // Exit loop on state change.
  634. if s.State() != Snapshotting {
  635. break
  636. }
  637. }
  638. }
  639. //--------------------------------------
  640. // Commands
  641. //--------------------------------------
  642. // Attempts to execute a command and replicate it. The function will return
  643. // when the command has been successfully committed or an error has occurred.
  644. func (s *server) Do(command Command) (interface{}, error) {
  645. return s.send(command)
  646. }
  647. // Processes a command.
  648. func (s *server) processCommand(command Command, e *event) {
  649. s.debugln("server.command.process")
  650. // Create an entry for the command in the log.
  651. entry, err := s.log.createEntry(s.currentTerm, command)
  652. if err != nil {
  653. s.debugln("server.command.log.entry.error:", err)
  654. e.c <- err
  655. return
  656. }
  657. if err := s.log.appendEntry(entry); err != nil {
  658. s.debugln("server.command.log.error:", err)
  659. e.c <- err
  660. return
  661. }
  662. // Issue a callback for the entry once it's committed.
  663. go func() {
  664. // Wait for the entry to be committed.
  665. select {
  666. case <-entry.commit:
  667. var err error
  668. s.debugln("server.command.commit")
  669. e.returnValue, err = s.log.getEntryResult(entry, true)
  670. e.c <- err
  671. case <-time.After(time.Second):
  672. s.debugln("server.command.timeout")
  673. e.c <- CommandTimeoutError
  674. }
  675. }()
  676. // Issue an append entries response for the server.
  677. resp := newAppendEntriesResponse(s.currentTerm, true, s.log.currentIndex(), s.log.CommitIndex())
  678. resp.append = true
  679. resp.peer = s.Name()
  680. // this must be async
  681. // sendAsync is not really async every time
  682. // when the sending speed of the user is larger than
  683. // the processing speed of the server, the buffered channel
  684. // will be full. Then sendAsync will become sync, which will
  685. // cause deadlock here.
  686. // so we use a goroutine to avoid the deadlock
  687. go s.sendAsync(resp)
  688. }
  689. //--------------------------------------
  690. // Append Entries
  691. //--------------------------------------
  692. // Appends zero or more log entry from the leader to this server.
  693. func (s *server) AppendEntries(req *AppendEntriesRequest) *AppendEntriesResponse {
  694. ret, _ := s.send(req)
  695. resp, _ := ret.(*AppendEntriesResponse)
  696. return resp
  697. }
  698. // Processes the "append entries" request.
  699. func (s *server) processAppendEntriesRequest(req *AppendEntriesRequest) (*AppendEntriesResponse, bool) {
  700. s.traceln("server.ae.process")
  701. if req.Term < s.currentTerm {
  702. s.debugln("server.ae.error: stale term")
  703. return newAppendEntriesResponse(s.currentTerm, false, s.log.currentIndex(), s.log.CommitIndex()), false
  704. }
  705. // Update term and leader.
  706. s.setCurrentTerm(req.Term, req.LeaderName, true)
  707. // Reject if log doesn't contain a matching previous entry.
  708. if err := s.log.truncate(req.PrevLogIndex, req.PrevLogTerm); err != nil {
  709. s.debugln("server.ae.truncate.error: ", err)
  710. return newAppendEntriesResponse(s.currentTerm, false, s.log.currentIndex(), s.log.CommitIndex()), true
  711. }
  712. // Append entries to the log.
  713. if err := s.log.appendEntries(req.Entries); err != nil {
  714. s.debugln("server.ae.append.error: ", err)
  715. return newAppendEntriesResponse(s.currentTerm, false, s.log.currentIndex(), s.log.CommitIndex()), true
  716. }
  717. // Commit up to the commit index.
  718. if err := s.log.setCommitIndex(req.CommitIndex); err != nil {
  719. s.debugln("server.ae.commit.error: ", err)
  720. return newAppendEntriesResponse(s.currentTerm, false, s.log.currentIndex(), s.log.CommitIndex()), true
  721. }
  722. // once the server appended and commited all the log entries from the leader
  723. return newAppendEntriesResponse(s.currentTerm, true, s.log.currentIndex(), s.log.CommitIndex()), true
  724. }
  725. // Processes the "append entries" response from the peer. This is only
  726. // processed when the server is a leader. Responses received during other
  727. // states are dropped.
  728. func (s *server) processAppendEntriesResponse(resp *AppendEntriesResponse) {
  729. // If we find a higher term then change to a follower and exit.
  730. if resp.Term > s.currentTerm {
  731. s.setCurrentTerm(resp.Term, "", false)
  732. return
  733. }
  734. // panic response if it's not successful.
  735. if !resp.Success {
  736. return
  737. }
  738. // if one peer successfully append a log from the leader term,
  739. // we add it to the synced list
  740. if resp.append == true {
  741. s.syncedPeer[resp.peer] = true
  742. }
  743. // Increment the commit count to make sure we have a quorum before committing.
  744. if len(s.syncedPeer) < s.QuorumSize() {
  745. return
  746. }
  747. // Determine the committed index that a majority has.
  748. var indices []uint64
  749. indices = append(indices, s.log.currentIndex())
  750. for _, peer := range s.peers {
  751. indices = append(indices, peer.getPrevLogIndex())
  752. }
  753. sort.Sort(sort.Reverse(uint64Slice(indices)))
  754. // We can commit up to the index which the majority of the members have appended.
  755. commitIndex := indices[s.QuorumSize()-1]
  756. committedIndex := s.log.commitIndex
  757. if commitIndex > committedIndex {
  758. s.log.setCommitIndex(commitIndex)
  759. s.debugln("commit index ", commitIndex)
  760. for i := committedIndex; i < commitIndex; i++ {
  761. if entry := s.log.getEntry(i + 1); entry != nil {
  762. // if the leader is a new one and the entry came from the
  763. // old leader, the commit channel will be nil and no go routine
  764. // is waiting from this channel
  765. // if we try to send to it, the new leader will get stuck
  766. if entry.commit != nil {
  767. select {
  768. case entry.commit <- true:
  769. default:
  770. panic("server unable to send signal to commit channel")
  771. }
  772. }
  773. }
  774. }
  775. }
  776. }
  777. //--------------------------------------
  778. // Request Vote
  779. //--------------------------------------
  780. // Requests a vote from a server. A vote can be obtained if the vote's term is
  781. // at the server's current term and the server has not made a vote yet. A vote
  782. // can also be obtained if the term is greater than the server's current term.
  783. func (s *server) RequestVote(req *RequestVoteRequest) *RequestVoteResponse {
  784. ret, _ := s.send(req)
  785. resp, _ := ret.(*RequestVoteResponse)
  786. return resp
  787. }
  788. // Processes a "request vote" request.
  789. func (s *server) processRequestVoteRequest(req *RequestVoteRequest) (*RequestVoteResponse, bool) {
  790. // If the request is coming from an old term then reject it.
  791. if req.Term < s.currentTerm {
  792. s.debugln("server.rv.error: stale term")
  793. return newRequestVoteResponse(s.currentTerm, false), false
  794. }
  795. s.setCurrentTerm(req.Term, "", false)
  796. // If we've already voted for a different candidate then don't vote for this candidate.
  797. if s.votedFor != "" && s.votedFor != req.CandidateName {
  798. s.debugln("server.rv.error: duplicate vote: ", req.CandidateName,
  799. " already vote for ", s.votedFor)
  800. return newRequestVoteResponse(s.currentTerm, false), false
  801. }
  802. // If the candidate's log is not at least as up-to-date as our last log then don't vote.
  803. lastIndex, lastTerm := s.log.lastInfo()
  804. if lastIndex > req.LastLogIndex || lastTerm > req.LastLogTerm {
  805. s.debugln("server.rv.error: out of date log: ", req.CandidateName,
  806. "Index :[", lastIndex, "]", " [", req.LastLogIndex, "]",
  807. "Term :[", lastTerm, "]", " [", req.LastLogTerm, "]")
  808. return newRequestVoteResponse(s.currentTerm, false), false
  809. }
  810. // If we made it this far then cast a vote and reset our election time out.
  811. s.debugln("server.rv.vote: ", s.name, " votes for", req.CandidateName, "at term", req.Term)
  812. s.votedFor = req.CandidateName
  813. return newRequestVoteResponse(s.currentTerm, true), true
  814. }
  815. //--------------------------------------
  816. // Membership
  817. //--------------------------------------
  818. // Adds a peer to the server.
  819. func (s *server) AddPeer(name string, connectiongString string) error {
  820. s.debugln("server.peer.add: ", name, len(s.peers))
  821. // Do not allow peers to be added twice.
  822. if s.peers[name] != nil {
  823. return nil
  824. }
  825. // Skip the Peer if it has the same name as the Server
  826. if s.name != name {
  827. peer := newPeer(s, name, connectiongString, s.heartbeatTimeout)
  828. if s.State() == Leader {
  829. peer.startHeartbeat()
  830. }
  831. s.peers[peer.Name] = peer
  832. }
  833. // Write the configuration to file.
  834. s.writeConf()
  835. return nil
  836. }
  837. // Removes a peer from the server.
  838. func (s *server) RemovePeer(name string) error {
  839. s.debugln("server.peer.remove: ", name, len(s.peers))
  840. // Skip the Peer if it has the same name as the Server
  841. if name != s.Name() {
  842. // Return error if peer doesn't exist.
  843. peer := s.peers[name]
  844. if peer == nil {
  845. return fmt.Errorf("raft: Peer not found: %s", name)
  846. }
  847. // Stop peer and remove it.
  848. if s.State() == Leader {
  849. peer.stopHeartbeat(true)
  850. }
  851. delete(s.peers, name)
  852. }
  853. // Write the configuration to file.
  854. s.writeConf()
  855. return nil
  856. }
  857. //--------------------------------------
  858. // Log compaction
  859. //--------------------------------------
  860. func (s *server) TakeSnapshot() error {
  861. //TODO put a snapshot mutex
  862. s.debugln("take Snapshot")
  863. if s.currentSnapshot != nil {
  864. return errors.New("handling snapshot")
  865. }
  866. lastIndex, lastTerm := s.log.commitInfo()
  867. if lastIndex == 0 {
  868. return errors.New("No logs")
  869. }
  870. path := s.SnapshotPath(lastIndex, lastTerm)
  871. var state []byte
  872. var err error
  873. if s.stateMachine != nil {
  874. state, err = s.stateMachine.Save()
  875. if err != nil {
  876. return err
  877. }
  878. } else {
  879. state = []byte{0}
  880. }
  881. peers := make([]*Peer, len(s.peers)+1)
  882. i := 0
  883. for _, peer := range s.peers {
  884. peers[i] = peer.clone()
  885. i++
  886. }
  887. peers[i] = &Peer{
  888. Name: s.Name(),
  889. ConnectionString: s.connectionString,
  890. }
  891. s.currentSnapshot = &Snapshot{lastIndex, lastTerm, peers, state, path}
  892. s.saveSnapshot()
  893. // We keep some log entries after the snapshot
  894. // We do not want to send the whole snapshot
  895. // to the slightly slow machines
  896. if lastIndex-s.log.startIndex > NumberOfLogEntriesAfterSnapshot {
  897. compactIndex := lastIndex - NumberOfLogEntriesAfterSnapshot
  898. compactTerm := s.log.getEntry(compactIndex).Term
  899. s.log.compact(compactIndex, compactTerm)
  900. }
  901. return nil
  902. }
  903. // Retrieves the log path for the server.
  904. func (s *server) saveSnapshot() error {
  905. if s.currentSnapshot == nil {
  906. return errors.New("no snapshot to save")
  907. }
  908. err := s.currentSnapshot.save()
  909. if err != nil {
  910. return err
  911. }
  912. tmp := s.lastSnapshot
  913. s.lastSnapshot = s.currentSnapshot
  914. // delete the previous snapshot if there is any change
  915. if tmp != nil && !(tmp.LastIndex == s.lastSnapshot.LastIndex && tmp.LastTerm == s.lastSnapshot.LastTerm) {
  916. tmp.remove()
  917. }
  918. s.currentSnapshot = nil
  919. return nil
  920. }
  921. // Retrieves the log path for the server.
  922. func (s *server) SnapshotPath(lastIndex uint64, lastTerm uint64) string {
  923. return path.Join(s.path, "snapshot", fmt.Sprintf("%v_%v.ss", lastTerm, lastIndex))
  924. }
  925. func (s *server) RequestSnapshot(req *SnapshotRequest) *SnapshotResponse {
  926. ret, _ := s.send(req)
  927. resp, _ := ret.(*SnapshotResponse)
  928. return resp
  929. }
  930. func (s *server) processSnapshotRequest(req *SnapshotRequest) *SnapshotResponse {
  931. // If the follower’s log contains an entry at the snapshot’s last index with a term
  932. // that matches the snapshot’s last term
  933. // Then the follower already has all the information found in the snapshot
  934. // and can reply false
  935. entry := s.log.getEntry(req.LastIndex)
  936. if entry != nil && entry.Term == req.LastTerm {
  937. return newSnapshotResponse(false)
  938. }
  939. s.setState(Snapshotting)
  940. return newSnapshotResponse(true)
  941. }
  942. func (s *server) SnapshotRecoveryRequest(req *SnapshotRecoveryRequest) *SnapshotRecoveryResponse {
  943. ret, _ := s.send(req)
  944. resp, _ := ret.(*SnapshotRecoveryResponse)
  945. return resp
  946. }
  947. func (s *server) processSnapshotRecoveryRequest(req *SnapshotRecoveryRequest) *SnapshotRecoveryResponse {
  948. s.stateMachine.Recovery(req.State)
  949. // clear the peer map
  950. s.peers = make(map[string]*Peer)
  951. // recovery the cluster configuration
  952. for _, peer := range req.Peers {
  953. s.AddPeer(peer.Name, peer.ConnectionString)
  954. }
  955. //update term and index
  956. s.currentTerm = req.LastTerm
  957. s.log.updateCommitIndex(req.LastIndex)
  958. snapshotPath := s.SnapshotPath(req.LastIndex, req.LastTerm)
  959. s.currentSnapshot = &Snapshot{req.LastIndex, req.LastTerm, req.Peers, req.State, snapshotPath}
  960. s.saveSnapshot()
  961. // clear the previous log entries
  962. s.log.compact(req.LastIndex, req.LastTerm)
  963. return newSnapshotRecoveryResponse(req.LastTerm, true, req.LastIndex)
  964. }
  965. // Load a snapshot at restart
  966. func (s *server) LoadSnapshot() error {
  967. dir, err := os.OpenFile(path.Join(s.path, "snapshot"), os.O_RDONLY, 0)
  968. if err != nil {
  969. return err
  970. }
  971. filenames, err := dir.Readdirnames(-1)
  972. if err != nil {
  973. dir.Close()
  974. panic(err)
  975. }
  976. dir.Close()
  977. if len(filenames) == 0 {
  978. return errors.New("no snapshot")
  979. }
  980. // not sure how many snapshot we should keep
  981. sort.Strings(filenames)
  982. snapshotPath := path.Join(s.path, "snapshot", filenames[len(filenames)-1])
  983. // should not fail
  984. file, err := os.OpenFile(snapshotPath, os.O_RDONLY, 0)
  985. defer file.Close()
  986. if err != nil {
  987. panic(err)
  988. }
  989. // TODO check checksum first
  990. var snapshotBytes []byte
  991. var checksum uint32
  992. n, err := fmt.Fscanf(file, "%08x\n", &checksum)
  993. if err != nil {
  994. return err
  995. }
  996. if n != 1 {
  997. return errors.New("Bad snapshot file")
  998. }
  999. snapshotBytes, _ = ioutil.ReadAll(file)
  1000. s.debugln(string(snapshotBytes))
  1001. // Generate checksum.
  1002. byteChecksum := crc32.ChecksumIEEE(snapshotBytes)
  1003. if uint32(checksum) != byteChecksum {
  1004. s.debugln(checksum, " ", byteChecksum)
  1005. return errors.New("bad snapshot file")
  1006. }
  1007. err = json.Unmarshal(snapshotBytes, &s.lastSnapshot)
  1008. if err != nil {
  1009. s.debugln("unmarshal error: ", err)
  1010. return err
  1011. }
  1012. err = s.stateMachine.Recovery(s.lastSnapshot.State)
  1013. if err != nil {
  1014. s.debugln("recovery error: ", err)
  1015. return err
  1016. }
  1017. for _, peer := range s.lastSnapshot.Peers {
  1018. s.AddPeer(peer.Name, peer.ConnectionString)
  1019. }
  1020. s.log.startTerm = s.lastSnapshot.LastTerm
  1021. s.log.startIndex = s.lastSnapshot.LastIndex
  1022. s.log.updateCommitIndex(s.lastSnapshot.LastIndex)
  1023. return err
  1024. }
  1025. //--------------------------------------
  1026. // Config File
  1027. //--------------------------------------
  1028. func (s *server) writeConf() {
  1029. peers := make([]*Peer, len(s.peers))
  1030. i := 0
  1031. for _, peer := range s.peers {
  1032. peers[i] = peer.clone()
  1033. i++
  1034. }
  1035. r := &Config{
  1036. CommitIndex: s.log.commitIndex,
  1037. Peers: peers,
  1038. }
  1039. b, _ := json.Marshal(r)
  1040. confPath := path.Join(s.path, "conf")
  1041. tmpConfPath := path.Join(s.path, "conf.tmp")
  1042. err := ioutil.WriteFile(tmpConfPath, b, 0600)
  1043. if err != nil {
  1044. panic(err)
  1045. }
  1046. os.Rename(tmpConfPath, confPath)
  1047. }
  1048. // Read the configuration for the server.
  1049. func (s *server) readConf() error {
  1050. confPath := path.Join(s.path, "conf")
  1051. s.debugln("readConf.open ", confPath)
  1052. // open conf file
  1053. b, err := ioutil.ReadFile(confPath)
  1054. if err != nil {
  1055. return nil
  1056. }
  1057. conf := &Config{}
  1058. if err = json.Unmarshal(b, conf); err != nil {
  1059. return err
  1060. }
  1061. s.log.updateCommitIndex(conf.CommitIndex)
  1062. return nil
  1063. }
  1064. //--------------------------------------
  1065. // Debugging
  1066. //--------------------------------------
  1067. func (s *server) debugln(v ...interface{}) {
  1068. debugf("[%s Term:%d] %s", s.name, s.currentTerm, fmt.Sprintln(v...))
  1069. }
  1070. func (s *server) traceln(v ...interface{}) {
  1071. tracef("[%s] %s", s.name, fmt.Sprintln(v...))
  1072. }