server.go 32 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281
  1. package raft
  2. import (
  3. "encoding/json"
  4. "errors"
  5. "fmt"
  6. "hash/crc32"
  7. "io/ioutil"
  8. "os"
  9. "path"
  10. "sort"
  11. "sync"
  12. "time"
  13. )
  14. //------------------------------------------------------------------------------
  15. //
  16. // Constants
  17. //
  18. //------------------------------------------------------------------------------
  19. const (
  20. Stopped = "stopped"
  21. Follower = "follower"
  22. Candidate = "candidate"
  23. Leader = "leader"
  24. Snapshotting = "snapshotting"
  25. )
  26. const (
  27. MaxLogEntriesPerRequest = 2000
  28. NumberOfLogEntriesAfterSnapshot = 200
  29. )
  30. const (
  31. DefaultHeartbeatTimeout = 50 * time.Millisecond
  32. DefaultElectionTimeout = 150 * time.Millisecond
  33. )
  34. var stopValue interface{}
  35. //------------------------------------------------------------------------------
  36. //
  37. // Errors
  38. //
  39. //------------------------------------------------------------------------------
  40. var NotLeaderError = errors.New("raft.Server: Not current leader")
  41. var DuplicatePeerError = errors.New("raft.Server: Duplicate peer")
  42. var CommandTimeoutError = errors.New("raft: Command timeout")
  43. //------------------------------------------------------------------------------
  44. //
  45. // Typedefs
  46. //
  47. //------------------------------------------------------------------------------
  48. // A server is involved in the consensus protocol and can act as a follower,
  49. // candidate or a leader.
  50. type Server struct {
  51. name string
  52. path string
  53. state string
  54. transporter Transporter
  55. context interface{}
  56. currentTerm uint64
  57. votedFor string
  58. log *Log
  59. leader string
  60. peers map[string]*Peer
  61. mutex sync.RWMutex
  62. syncedPeer map[string]bool
  63. c chan *event
  64. electionTimeout time.Duration
  65. heartbeatTimeout time.Duration
  66. currentSnapshot *Snapshot
  67. lastSnapshot *Snapshot
  68. stateMachine StateMachine
  69. maxLogEntriesPerRequest uint64
  70. connectionString string
  71. }
  72. // An event to be processed by the server's event loop.
  73. type event struct {
  74. target interface{}
  75. returnValue interface{}
  76. c chan error
  77. }
  78. //------------------------------------------------------------------------------
  79. //
  80. // Constructor
  81. //
  82. //------------------------------------------------------------------------------
  83. // Creates a new server with a log at the given path.
  84. func NewServer(name string, path string, transporter Transporter, stateMachine StateMachine, context interface{}, connectionString string) (*Server, error) {
  85. if name == "" {
  86. return nil, errors.New("raft.Server: Name cannot be blank")
  87. }
  88. if transporter == nil {
  89. panic("raft: Transporter required")
  90. }
  91. s := &Server{
  92. name: name,
  93. path: path,
  94. transporter: transporter,
  95. stateMachine: stateMachine,
  96. context: context,
  97. state: Stopped,
  98. peers: make(map[string]*Peer),
  99. log: newLog(),
  100. c: make(chan *event, 256),
  101. electionTimeout: DefaultElectionTimeout,
  102. heartbeatTimeout: DefaultHeartbeatTimeout,
  103. maxLogEntriesPerRequest: MaxLogEntriesPerRequest,
  104. connectionString: connectionString,
  105. }
  106. // Setup apply function.
  107. s.log.ApplyFunc = func(c Command) (interface{}, error) {
  108. result, err := c.Apply(s)
  109. return result, err
  110. }
  111. return s, nil
  112. }
  113. //------------------------------------------------------------------------------
  114. //
  115. // Accessors
  116. //
  117. //------------------------------------------------------------------------------
  118. //--------------------------------------
  119. // General
  120. //--------------------------------------
  121. // Retrieves the name of the server.
  122. func (s *Server) Name() string {
  123. return s.name
  124. }
  125. // Retrieves the storage path for the server.
  126. func (s *Server) Path() string {
  127. return s.path
  128. }
  129. // The name of the current leader.
  130. func (s *Server) Leader() string {
  131. return s.leader
  132. }
  133. // Retrieves a copy of the peer data.
  134. func (s *Server) Peers() map[string]*Peer {
  135. s.mutex.Lock()
  136. defer s.mutex.Unlock()
  137. peers := make(map[string]*Peer)
  138. for name, peer := range s.peers {
  139. peers[name] = peer.clone()
  140. }
  141. return peers
  142. }
  143. // Retrieves the object that transports requests.
  144. func (s *Server) Transporter() Transporter {
  145. s.mutex.RLock()
  146. defer s.mutex.RUnlock()
  147. return s.transporter
  148. }
  149. func (s *Server) SetTransporter(t Transporter) {
  150. s.mutex.Lock()
  151. defer s.mutex.Unlock()
  152. s.transporter = t
  153. }
  154. // Retrieves the context passed into the constructor.
  155. func (s *Server) Context() interface{} {
  156. return s.context
  157. }
  158. // Retrieves the log path for the server.
  159. func (s *Server) LogPath() string {
  160. return path.Join(s.path, "log")
  161. }
  162. // Retrieves the current state of the server.
  163. func (s *Server) State() string {
  164. s.mutex.RLock()
  165. defer s.mutex.RUnlock()
  166. return s.state
  167. }
  168. // Sets the state of the server.
  169. func (s *Server) setState(state string) {
  170. s.mutex.Lock()
  171. defer s.mutex.Unlock()
  172. s.state = state
  173. if state == Leader {
  174. s.leader = s.Name()
  175. }
  176. }
  177. // Retrieves the current term of the server.
  178. func (s *Server) Term() uint64 {
  179. return s.currentTerm
  180. }
  181. // Retrieves the current commit index of the server.
  182. func (s *Server) CommitIndex() uint64 {
  183. return s.log.commitIndex
  184. }
  185. // Retrieves the name of the candidate this server voted for in this term.
  186. func (s *Server) VotedFor() string {
  187. return s.votedFor
  188. }
  189. // Retrieves whether the server's log has no entries.
  190. func (s *Server) IsLogEmpty() bool {
  191. return s.log.isEmpty()
  192. }
  193. // A list of all the log entries. This should only be used for debugging purposes.
  194. func (s *Server) LogEntries() []*LogEntry {
  195. return s.log.entries
  196. }
  197. // A reference to the command name of the last entry.
  198. func (s *Server) LastCommandName() string {
  199. return s.log.lastCommandName()
  200. }
  201. // Get the state of the server for debugging
  202. func (s *Server) GetState() string {
  203. s.mutex.RLock()
  204. defer s.mutex.RUnlock()
  205. return fmt.Sprintf("Name: %s, State: %s, Term: %v, CommitedIndex: %v ", s.name, s.state, s.currentTerm, s.log.commitIndex)
  206. }
  207. // Check if the server is promotable
  208. func (s *Server) promotable() bool {
  209. return s.log.currentIndex() > 0
  210. }
  211. //--------------------------------------
  212. // Membership
  213. //--------------------------------------
  214. // Retrieves the number of member servers in the consensus.
  215. func (s *Server) MemberCount() int {
  216. s.mutex.Lock()
  217. defer s.mutex.Unlock()
  218. return len(s.peers) + 1
  219. }
  220. // Retrieves the number of servers required to make a quorum.
  221. func (s *Server) QuorumSize() int {
  222. return (s.MemberCount() / 2) + 1
  223. }
  224. //--------------------------------------
  225. // Election timeout
  226. //--------------------------------------
  227. // Retrieves the election timeout.
  228. func (s *Server) ElectionTimeout() time.Duration {
  229. s.mutex.RLock()
  230. defer s.mutex.RUnlock()
  231. return s.electionTimeout
  232. }
  233. // Sets the election timeout.
  234. func (s *Server) SetElectionTimeout(duration time.Duration) {
  235. s.mutex.Lock()
  236. defer s.mutex.Unlock()
  237. s.electionTimeout = duration
  238. }
  239. //--------------------------------------
  240. // Heartbeat timeout
  241. //--------------------------------------
  242. // Retrieves the heartbeat timeout.
  243. func (s *Server) HeartbeatTimeout() time.Duration {
  244. s.mutex.RLock()
  245. defer s.mutex.RUnlock()
  246. return s.heartbeatTimeout
  247. }
  248. // Sets the heartbeat timeout.
  249. func (s *Server) SetHeartbeatTimeout(duration time.Duration) {
  250. s.mutex.Lock()
  251. defer s.mutex.Unlock()
  252. s.heartbeatTimeout = duration
  253. for _, peer := range s.peers {
  254. peer.setHeartbeatTimeout(duration)
  255. }
  256. }
  257. //------------------------------------------------------------------------------
  258. //
  259. // Methods
  260. //
  261. //------------------------------------------------------------------------------
  262. //--------------------------------------
  263. // Initialization
  264. //--------------------------------------
  265. // Reg the NOPCommand
  266. func init() {
  267. RegisterCommand(&NOPCommand{})
  268. RegisterCommand(&DefaultJoinCommand{})
  269. RegisterCommand(&DefaultLeaveCommand{})
  270. }
  271. // Start as follow
  272. // If log entries exist then allow promotion to candidate if no AEs received.
  273. // If no log entries exist then wait for AEs from another node.
  274. // If no log entries exist and a self-join command is issued then
  275. // immediately become leader and commit entry.
  276. func (s *Server) Start() error {
  277. // Exit if the server is already running.
  278. if s.state != Stopped {
  279. return errors.New("raft.Server: Server already running")
  280. }
  281. // Create snapshot directory if not exist
  282. os.Mkdir(path.Join(s.path, "snapshot"), 0700)
  283. if err := s.readConf(); err != nil {
  284. s.debugln("raft: Conf file error: ", err)
  285. return fmt.Errorf("raft: Initialization error: %s", err)
  286. }
  287. // Initialize the log and load it up.
  288. if err := s.log.open(s.LogPath()); err != nil {
  289. s.debugln("raft: Log error: ", err)
  290. return fmt.Errorf("raft: Initialization error: %s", err)
  291. }
  292. // Update the term to the last term in the log.
  293. _, s.currentTerm = s.log.lastInfo()
  294. s.setState(Follower)
  295. // If no log entries exist then
  296. // 1. wait for AEs from another node
  297. // 2. wait for self-join command
  298. // to set itself promotable
  299. if !s.promotable() {
  300. s.debugln("start as a new raft server")
  301. // If log entries exist then allow promotion to candidate
  302. // if no AEs received.
  303. } else {
  304. s.debugln("start from previous saved state")
  305. }
  306. debugln(s.GetState())
  307. go s.loop()
  308. return nil
  309. }
  310. // Shuts down the server.
  311. func (s *Server) Stop() {
  312. s.send(&stopValue)
  313. s.mutex.Lock()
  314. defer s.mutex.Unlock()
  315. s.log.close()
  316. }
  317. // Checks if the server is currently running.
  318. func (s *Server) Running() bool {
  319. s.mutex.RLock()
  320. defer s.mutex.RUnlock()
  321. return s.state != Stopped
  322. }
  323. //--------------------------------------
  324. // Term
  325. //--------------------------------------
  326. // Sets the current term for the server. This is only used when an external
  327. // current term is found.
  328. func (s *Server) setCurrentTerm(term uint64, leaderName string, append bool) {
  329. s.mutex.Lock()
  330. defer s.mutex.Unlock()
  331. // update the term and clear vote for
  332. if term > s.currentTerm {
  333. s.state = Follower
  334. s.currentTerm = term
  335. s.leader = leaderName
  336. s.votedFor = ""
  337. return
  338. }
  339. // discover new leader when candidate
  340. // save leader name when follower
  341. if term == s.currentTerm && s.state != Leader && append {
  342. s.state = Follower
  343. s.leader = leaderName
  344. }
  345. }
  346. //--------------------------------------
  347. // Event Loop
  348. //--------------------------------------
  349. // ________
  350. // --|Snapshot| timeout
  351. // | -------- ______
  352. // recover | ^ | |
  353. // snapshot / | |snapshot | |
  354. // higher | | v | recv majority votes
  355. // term | -------- timeout ----------- -----------
  356. // |-> |Follower| ----------> | Candidate |--------------------> | Leader |
  357. // -------- ----------- -----------
  358. // ^ higher term/ | higher term |
  359. // | new leader | |
  360. // |_______________________|____________________________________ |
  361. // The main event loop for the server
  362. func (s *Server) loop() {
  363. defer s.debugln("server.loop.end")
  364. for {
  365. state := s.State()
  366. s.debugln("server.loop.run ", state)
  367. switch state {
  368. case Follower:
  369. s.followerLoop()
  370. case Candidate:
  371. s.candidateLoop()
  372. case Leader:
  373. s.leaderLoop()
  374. case Snapshotting:
  375. s.snapshotLoop()
  376. case Stopped:
  377. return
  378. }
  379. }
  380. }
  381. // Sends an event to the event loop to be processed. The function will wait
  382. // until the event is actually processed before returning.
  383. func (s *Server) send(value interface{}) (interface{}, error) {
  384. event := s.sendAsync(value)
  385. err := <-event.c
  386. return event.returnValue, err
  387. }
  388. func (s *Server) sendAsync(value interface{}) *event {
  389. event := &event{target: value, c: make(chan error, 1)}
  390. s.c <- event
  391. return event
  392. }
  393. // The event loop that is run when the server is in a Follower state.
  394. // Responds to RPCs from candidates and leaders.
  395. // Converts to candidate if election timeout elapses without either:
  396. // 1.Receiving valid AppendEntries RPC, or
  397. // 2.Granting vote to candidate
  398. func (s *Server) followerLoop() {
  399. s.setState(Follower)
  400. timeoutChan := afterBetween(s.ElectionTimeout(), s.ElectionTimeout()*2)
  401. for {
  402. var err error
  403. update := false
  404. select {
  405. case e := <-s.c:
  406. if e.target == &stopValue {
  407. s.setState(Stopped)
  408. } else {
  409. switch req := e.target.(type) {
  410. case JoinCommand:
  411. //If no log entries exist and a self-join command is issued
  412. //then immediately become leader and commit entry.
  413. if s.log.currentIndex() == 0 && req.NodeName() == s.Name() {
  414. s.debugln("selfjoin and promote to leader")
  415. s.setState(Leader)
  416. s.processCommand(req, e)
  417. } else {
  418. err = NotLeaderError
  419. }
  420. case *AppendEntriesRequest:
  421. e.returnValue, update = s.processAppendEntriesRequest(req)
  422. case *RequestVoteRequest:
  423. e.returnValue, update = s.processRequestVoteRequest(req)
  424. case *SnapshotRequest:
  425. e.returnValue = s.processSnapshotRequest(req)
  426. default:
  427. err = NotLeaderError
  428. }
  429. }
  430. // Callback to event.
  431. e.c <- err
  432. case <-timeoutChan:
  433. // only allow synced follower to promote to candidate
  434. if s.promotable() {
  435. s.setState(Candidate)
  436. } else {
  437. update = true
  438. }
  439. }
  440. // Converts to candidate if election timeout elapses without either:
  441. // 1.Receiving valid AppendEntries RPC, or
  442. // 2.Granting vote to candidate
  443. if update {
  444. timeoutChan = afterBetween(s.ElectionTimeout(), s.ElectionTimeout()*2)
  445. }
  446. // Exit loop on state change.
  447. if s.State() != Follower {
  448. break
  449. }
  450. }
  451. }
  452. // The event loop that is run when the server is in a Candidate state.
  453. func (s *Server) candidateLoop() {
  454. lastLogIndex, lastLogTerm := s.log.lastInfo()
  455. s.leader = ""
  456. for {
  457. // Increment current term, vote for self.
  458. s.currentTerm++
  459. s.votedFor = s.name
  460. // Send RequestVote RPCs to all other servers.
  461. respChan := make(chan *RequestVoteResponse, len(s.peers))
  462. for _, peer := range s.peers {
  463. go peer.sendVoteRequest(newRequestVoteRequest(s.currentTerm, s.name, lastLogIndex, lastLogTerm), respChan)
  464. }
  465. // Wait for either:
  466. // * Votes received from majority of servers: become leader
  467. // * AppendEntries RPC received from new leader: step down.
  468. // * Election timeout elapses without election resolution: increment term, start new election
  469. // * Discover higher term: step down (§5.1)
  470. votesGranted := 1
  471. timeoutChan := afterBetween(s.ElectionTimeout(), s.ElectionTimeout()*2)
  472. timeout := false
  473. for {
  474. // If we received enough votes then stop waiting for more votes.
  475. s.debugln("server.candidate.votes: ", votesGranted, " quorum:", s.QuorumSize())
  476. if votesGranted >= s.QuorumSize() {
  477. s.setState(Leader)
  478. break
  479. }
  480. // Collect votes from peers.
  481. select {
  482. case resp := <-respChan:
  483. if resp.VoteGranted {
  484. s.debugln("server.candidate.vote.granted: ", votesGranted)
  485. votesGranted++
  486. } else if resp.Term > s.currentTerm {
  487. s.debugln("server.candidate.vote.failed")
  488. s.setCurrentTerm(resp.Term, "", false)
  489. } else {
  490. s.debugln("server.candidate.vote: denied")
  491. }
  492. case e := <-s.c:
  493. var err error
  494. if e.target == &stopValue {
  495. s.setState(Stopped)
  496. } else {
  497. switch req := e.target.(type) {
  498. case Command:
  499. err = NotLeaderError
  500. case *AppendEntriesRequest:
  501. e.returnValue, _ = s.processAppendEntriesRequest(req)
  502. case *RequestVoteRequest:
  503. e.returnValue, _ = s.processRequestVoteRequest(req)
  504. }
  505. }
  506. // Callback to event.
  507. e.c <- err
  508. case <-timeoutChan:
  509. timeout = true
  510. }
  511. // both process AER and RVR can make the server to follower
  512. // also break when timeout happens
  513. if s.State() != Candidate || timeout {
  514. break
  515. }
  516. }
  517. // break when we are not candidate
  518. if s.State() != Candidate {
  519. break
  520. }
  521. // continue when timeout happened
  522. }
  523. }
  524. // The event loop that is run when the server is in a Leader state.
  525. func (s *Server) leaderLoop() {
  526. s.setState(Leader)
  527. s.syncedPeer = make(map[string]bool)
  528. logIndex, _ := s.log.lastInfo()
  529. // Update the peers prevLogIndex to leader's lastLogIndex and start heartbeat.
  530. s.debugln("leaderLoop.set.PrevIndex to ", logIndex)
  531. for _, peer := range s.peers {
  532. peer.setPrevLogIndex(logIndex)
  533. peer.startHeartbeat()
  534. }
  535. go s.Do(NOPCommand{})
  536. // Begin to collect response from followers
  537. for {
  538. var err error
  539. select {
  540. case e := <-s.c:
  541. if e.target == &stopValue {
  542. s.setState(Stopped)
  543. } else {
  544. switch req := e.target.(type) {
  545. case Command:
  546. s.processCommand(req, e)
  547. continue
  548. case *AppendEntriesRequest:
  549. e.returnValue, _ = s.processAppendEntriesRequest(req)
  550. case *AppendEntriesResponse:
  551. s.processAppendEntriesResponse(req)
  552. case *RequestVoteRequest:
  553. e.returnValue, _ = s.processRequestVoteRequest(req)
  554. }
  555. }
  556. // Callback to event.
  557. e.c <- err
  558. }
  559. // Exit loop on state change.
  560. if s.State() != Leader {
  561. break
  562. }
  563. }
  564. // Stop all peers.
  565. for _, peer := range s.peers {
  566. peer.stopHeartbeat(false)
  567. }
  568. s.syncedPeer = nil
  569. }
  570. func (s *Server) snapshotLoop() {
  571. s.setState(Snapshotting)
  572. for {
  573. var err error
  574. e := <-s.c
  575. if e.target == &stopValue {
  576. s.setState(Stopped)
  577. } else {
  578. switch req := e.target.(type) {
  579. case Command:
  580. err = NotLeaderError
  581. case *AppendEntriesRequest:
  582. e.returnValue, _ = s.processAppendEntriesRequest(req)
  583. case *RequestVoteRequest:
  584. e.returnValue, _ = s.processRequestVoteRequest(req)
  585. case *SnapshotRecoveryRequest:
  586. e.returnValue = s.processSnapshotRecoveryRequest(req)
  587. }
  588. }
  589. // Callback to event.
  590. e.c <- err
  591. // Exit loop on state change.
  592. if s.State() != Snapshotting {
  593. break
  594. }
  595. }
  596. }
  597. //--------------------------------------
  598. // Commands
  599. //--------------------------------------
  600. // Attempts to execute a command and replicate it. The function will return
  601. // when the command has been successfully committed or an error has occurred.
  602. func (s *Server) Do(command Command) (interface{}, error) {
  603. return s.send(command)
  604. }
  605. // Processes a command.
  606. func (s *Server) processCommand(command Command, e *event) {
  607. s.debugln("server.command.process")
  608. // Create an entry for the command in the log.
  609. entry, err := s.log.createEntry(s.currentTerm, command)
  610. if err != nil {
  611. s.debugln("server.command.log.entry.error:", err)
  612. e.c <- err
  613. return
  614. }
  615. if err := s.log.appendEntry(entry); err != nil {
  616. s.debugln("server.command.log.error:", err)
  617. e.c <- err
  618. return
  619. }
  620. // Issue a callback for the entry once it's committed.
  621. go func() {
  622. // Wait for the entry to be committed.
  623. select {
  624. case <-entry.commit:
  625. var err error
  626. s.debugln("server.command.commit")
  627. e.returnValue, err = s.log.getEntryResult(entry, true)
  628. e.c <- err
  629. case <-time.After(time.Second):
  630. s.debugln("server.command.timeout")
  631. e.c <- CommandTimeoutError
  632. }
  633. }()
  634. // Issue an append entries response for the server.
  635. resp := newAppendEntriesResponse(s.currentTerm, true, s.log.currentIndex(), s.log.CommitIndex())
  636. resp.append = true
  637. resp.peer = s.Name()
  638. // this must be async
  639. // sendAsync is not really async every time
  640. // when the sending speed of the user is larger than
  641. // the processing speed of the server, the buffered channel
  642. // will be full. Then sendAsync will become sync, which will
  643. // cause deadlock here.
  644. // so we use a goroutine to avoid the deadlock
  645. go s.sendAsync(resp)
  646. }
  647. //--------------------------------------
  648. // Append Entries
  649. //--------------------------------------
  650. // Appends zero or more log entry from the leader to this server.
  651. func (s *Server) AppendEntries(req *AppendEntriesRequest) *AppendEntriesResponse {
  652. ret, _ := s.send(req)
  653. resp, _ := ret.(*AppendEntriesResponse)
  654. return resp
  655. }
  656. // Processes the "append entries" request.
  657. func (s *Server) processAppendEntriesRequest(req *AppendEntriesRequest) (*AppendEntriesResponse, bool) {
  658. s.traceln("server.ae.process")
  659. if req.Term < s.currentTerm {
  660. s.debugln("server.ae.error: stale term")
  661. return newAppendEntriesResponse(s.currentTerm, false, s.log.currentIndex(), s.log.CommitIndex()), false
  662. }
  663. // Update term and leader.
  664. s.setCurrentTerm(req.Term, req.LeaderName, true)
  665. // Reject if log doesn't contain a matching previous entry.
  666. if err := s.log.truncate(req.PrevLogIndex, req.PrevLogTerm); err != nil {
  667. s.debugln("server.ae.truncate.error: ", err)
  668. return newAppendEntriesResponse(s.currentTerm, false, s.log.currentIndex(), s.log.CommitIndex()), true
  669. }
  670. // Append entries to the log.
  671. if err := s.log.appendEntries(req.Entries); err != nil {
  672. s.debugln("server.ae.append.error: ", err)
  673. return newAppendEntriesResponse(s.currentTerm, false, s.log.currentIndex(), s.log.CommitIndex()), true
  674. }
  675. // Commit up to the commit index.
  676. if err := s.log.setCommitIndex(req.CommitIndex); err != nil {
  677. s.debugln("server.ae.commit.error: ", err)
  678. return newAppendEntriesResponse(s.currentTerm, false, s.log.currentIndex(), s.log.CommitIndex()), true
  679. }
  680. // once the server appended and commited all the log entries from the leader
  681. return newAppendEntriesResponse(s.currentTerm, true, s.log.currentIndex(), s.log.CommitIndex()), true
  682. }
  683. // Processes the "append entries" response from the peer. This is only
  684. // processed when the server is a leader. Responses received during other
  685. // states are dropped.
  686. func (s *Server) processAppendEntriesResponse(resp *AppendEntriesResponse) {
  687. // If we find a higher term then change to a follower and exit.
  688. if resp.Term > s.currentTerm {
  689. s.setCurrentTerm(resp.Term, "", false)
  690. return
  691. }
  692. // panic response if it's not successful.
  693. if !resp.Success {
  694. return
  695. }
  696. // if one peer successfully append a log from the leader term,
  697. // we add it to the synced list
  698. if resp.append == true {
  699. s.syncedPeer[resp.peer] = true
  700. }
  701. // Increment the commit count to make sure we have a quorum before committing.
  702. if len(s.syncedPeer) < s.QuorumSize() {
  703. return
  704. }
  705. // Determine the committed index that a majority has.
  706. var indices []uint64
  707. indices = append(indices, s.log.currentIndex())
  708. for _, peer := range s.peers {
  709. indices = append(indices, peer.getPrevLogIndex())
  710. }
  711. sort.Sort(uint64Slice(indices))
  712. // We can commit up to the index which the majority of the members have appended.
  713. commitIndex := indices[s.QuorumSize()-1]
  714. committedIndex := s.log.commitIndex
  715. if commitIndex > committedIndex {
  716. s.log.setCommitIndex(commitIndex)
  717. s.debugln("commit index ", commitIndex)
  718. for i := committedIndex; i < commitIndex; i++ {
  719. if entry := s.log.getEntry(i + 1); entry != nil {
  720. // if the leader is a new one and the entry came from the
  721. // old leader, the commit channel will be nil and no go routine
  722. // is waiting from this channel
  723. // if we try to send to it, the new leader will get stuck
  724. if entry.commit != nil {
  725. select {
  726. case entry.commit <- true:
  727. default:
  728. panic("server unable to send signal to commit channel")
  729. }
  730. }
  731. }
  732. }
  733. }
  734. }
  735. //--------------------------------------
  736. // Request Vote
  737. //--------------------------------------
  738. // Requests a vote from a server. A vote can be obtained if the vote's term is
  739. // at the server's current term and the server has not made a vote yet. A vote
  740. // can also be obtained if the term is greater than the server's current term.
  741. func (s *Server) RequestVote(req *RequestVoteRequest) *RequestVoteResponse {
  742. ret, _ := s.send(req)
  743. resp, _ := ret.(*RequestVoteResponse)
  744. return resp
  745. }
  746. // Processes a "request vote" request.
  747. func (s *Server) processRequestVoteRequest(req *RequestVoteRequest) (*RequestVoteResponse, bool) {
  748. // If the request is coming from an old term then reject it.
  749. if req.Term < s.currentTerm {
  750. s.debugln("server.rv.error: stale term")
  751. return newRequestVoteResponse(s.currentTerm, false), false
  752. }
  753. s.setCurrentTerm(req.Term, "", false)
  754. // If we've already voted for a different candidate then don't vote for this candidate.
  755. if s.votedFor != "" && s.votedFor != req.CandidateName {
  756. s.debugln("server.rv.error: duplicate vote: ", req.CandidateName,
  757. " already vote for ", s.votedFor)
  758. return newRequestVoteResponse(s.currentTerm, false), false
  759. }
  760. // If the candidate's log is not at least as up-to-date as our last log then don't vote.
  761. lastIndex, lastTerm := s.log.lastInfo()
  762. if lastIndex > req.LastLogIndex || lastTerm > req.LastLogTerm {
  763. s.debugln("server.rv.error: out of date log: ", req.CandidateName,
  764. "Index :[", lastIndex, "]", " [", req.LastLogIndex, "]",
  765. "Term :[", lastTerm, "]", " [", req.LastLogTerm, "]")
  766. return newRequestVoteResponse(s.currentTerm, false), false
  767. }
  768. // If we made it this far then cast a vote and reset our election time out.
  769. s.debugln("server.rv.vote: ", s.name, " votes for", req.CandidateName, "at term", req.Term)
  770. s.votedFor = req.CandidateName
  771. return newRequestVoteResponse(s.currentTerm, true), true
  772. }
  773. //--------------------------------------
  774. // Membership
  775. //--------------------------------------
  776. // Adds a peer to the server.
  777. func (s *Server) AddPeer(name string, connectiongString string) error {
  778. s.debugln("server.peer.add: ", name, len(s.peers))
  779. // Do not allow peers to be added twice.
  780. if s.peers[name] != nil {
  781. return nil
  782. }
  783. // Skip the Peer if it has the same name as the Server
  784. if s.name != name {
  785. peer := newPeer(s, name, connectiongString, s.heartbeatTimeout)
  786. if s.State() == Leader {
  787. peer.startHeartbeat()
  788. }
  789. s.peers[peer.Name] = peer
  790. }
  791. // Write the configuration to file.
  792. s.writeConf()
  793. return nil
  794. }
  795. // Removes a peer from the server.
  796. func (s *Server) RemovePeer(name string) error {
  797. s.debugln("server.peer.remove: ", name, len(s.peers))
  798. // Skip the Peer if it has the same name as the Server
  799. if name != s.Name() {
  800. // Return error if peer doesn't exist.
  801. peer := s.peers[name]
  802. if peer == nil {
  803. return fmt.Errorf("raft: Peer not found: %s", name)
  804. }
  805. // Stop peer and remove it.
  806. if s.State() == Leader {
  807. peer.stopHeartbeat(true)
  808. }
  809. delete(s.peers, name)
  810. }
  811. // Write the configuration to file.
  812. s.writeConf()
  813. return nil
  814. }
  815. //--------------------------------------
  816. // Log compaction
  817. //--------------------------------------
  818. func (s *Server) TakeSnapshot() error {
  819. //TODO put a snapshot mutex
  820. s.debugln("take Snapshot")
  821. if s.currentSnapshot != nil {
  822. return errors.New("handling snapshot")
  823. }
  824. lastIndex, lastTerm := s.log.commitInfo()
  825. if lastIndex == 0 {
  826. return errors.New("No logs")
  827. }
  828. path := s.SnapshotPath(lastIndex, lastTerm)
  829. var state []byte
  830. var err error
  831. if s.stateMachine != nil {
  832. state, err = s.stateMachine.Save()
  833. if err != nil {
  834. return err
  835. }
  836. } else {
  837. state = []byte{0}
  838. }
  839. peers := make([]*Peer, len(s.peers)+1)
  840. i := 0
  841. for _, peer := range s.peers {
  842. peers[i] = peer.clone()
  843. i++
  844. }
  845. peers[i] = &Peer{
  846. Name: s.Name(),
  847. ConnectionString: s.connectionString,
  848. }
  849. s.currentSnapshot = &Snapshot{lastIndex, lastTerm, peers, state, path}
  850. s.saveSnapshot()
  851. // We keep some log entries after the snapshot
  852. // We do not want to send the whole snapshot
  853. // to the slightly slow machines
  854. if lastIndex-s.log.startIndex > NumberOfLogEntriesAfterSnapshot {
  855. compactIndex := lastIndex - NumberOfLogEntriesAfterSnapshot
  856. compactTerm := s.log.getEntry(compactIndex).Term
  857. s.log.compact(compactIndex, compactTerm)
  858. }
  859. return nil
  860. }
  861. // Retrieves the log path for the server.
  862. func (s *Server) saveSnapshot() error {
  863. if s.currentSnapshot == nil {
  864. return errors.New("no snapshot to save")
  865. }
  866. err := s.currentSnapshot.save()
  867. if err != nil {
  868. return err
  869. }
  870. tmp := s.lastSnapshot
  871. s.lastSnapshot = s.currentSnapshot
  872. // delete the previous snapshot if there is any change
  873. if tmp != nil && !(tmp.LastIndex == s.lastSnapshot.LastIndex && tmp.LastTerm == s.lastSnapshot.LastTerm) {
  874. tmp.remove()
  875. }
  876. s.currentSnapshot = nil
  877. return nil
  878. }
  879. // Retrieves the log path for the server.
  880. func (s *Server) SnapshotPath(lastIndex uint64, lastTerm uint64) string {
  881. return path.Join(s.path, "snapshot", fmt.Sprintf("%v_%v.ss", lastTerm, lastIndex))
  882. }
  883. func (s *Server) RequestSnapshot(req *SnapshotRequest) *SnapshotResponse {
  884. ret, _ := s.send(req)
  885. resp, _ := ret.(*SnapshotResponse)
  886. return resp
  887. }
  888. func (s *Server) processSnapshotRequest(req *SnapshotRequest) *SnapshotResponse {
  889. // If the follower’s log contains an entry at the snapshot’s last index with a term
  890. // that matches the snapshot’s last term
  891. // Then the follower already has all the information found in the snapshot
  892. // and can reply false
  893. entry := s.log.getEntry(req.LastIndex)
  894. if entry != nil && entry.Term == req.LastTerm {
  895. return newSnapshotResponse(false)
  896. }
  897. s.setState(Snapshotting)
  898. return newSnapshotResponse(true)
  899. }
  900. func (s *Server) SnapshotRecoveryRequest(req *SnapshotRecoveryRequest) *SnapshotRecoveryResponse {
  901. ret, _ := s.send(req)
  902. resp, _ := ret.(*SnapshotRecoveryResponse)
  903. return resp
  904. }
  905. func (s *Server) processSnapshotRecoveryRequest(req *SnapshotRecoveryRequest) *SnapshotRecoveryResponse {
  906. s.stateMachine.Recovery(req.State)
  907. // clear the peer map
  908. s.peers = make(map[string]*Peer)
  909. // recovery the cluster configuration
  910. for _, peer := range req.Peers {
  911. s.AddPeer(peer.Name, peer.ConnectionString)
  912. }
  913. //update term and index
  914. s.currentTerm = req.LastTerm
  915. s.log.updateCommitIndex(req.LastIndex)
  916. snapshotPath := s.SnapshotPath(req.LastIndex, req.LastTerm)
  917. s.currentSnapshot = &Snapshot{req.LastIndex, req.LastTerm, req.Peers, req.State, snapshotPath}
  918. s.saveSnapshot()
  919. // clear the previous log entries
  920. s.log.compact(req.LastIndex, req.LastTerm)
  921. return newSnapshotRecoveryResponse(req.LastTerm, true, req.LastIndex)
  922. }
  923. // Load a snapshot at restart
  924. func (s *Server) LoadSnapshot() error {
  925. dir, err := os.OpenFile(path.Join(s.path, "snapshot"), os.O_RDONLY, 0)
  926. if err != nil {
  927. return err
  928. }
  929. filenames, err := dir.Readdirnames(-1)
  930. if err != nil {
  931. dir.Close()
  932. panic(err)
  933. }
  934. dir.Close()
  935. if len(filenames) == 0 {
  936. return errors.New("no snapshot")
  937. }
  938. // not sure how many snapshot we should keep
  939. sort.Strings(filenames)
  940. snapshotPath := path.Join(s.path, "snapshot", filenames[len(filenames)-1])
  941. // should not fail
  942. file, err := os.OpenFile(snapshotPath, os.O_RDONLY, 0)
  943. defer file.Close()
  944. if err != nil {
  945. panic(err)
  946. }
  947. // TODO check checksum first
  948. var snapshotBytes []byte
  949. var checksum uint32
  950. n, err := fmt.Fscanf(file, "%08x\n", &checksum)
  951. if err != nil {
  952. return err
  953. }
  954. if n != 1 {
  955. return errors.New("Bad snapshot file")
  956. }
  957. snapshotBytes, _ = ioutil.ReadAll(file)
  958. s.debugln(string(snapshotBytes))
  959. // Generate checksum.
  960. byteChecksum := crc32.ChecksumIEEE(snapshotBytes)
  961. if uint32(checksum) != byteChecksum {
  962. s.debugln(checksum, " ", byteChecksum)
  963. return errors.New("bad snapshot file")
  964. }
  965. err = json.Unmarshal(snapshotBytes, &s.lastSnapshot)
  966. if err != nil {
  967. s.debugln("unmarshal error: ", err)
  968. return err
  969. }
  970. err = s.stateMachine.Recovery(s.lastSnapshot.State)
  971. if err != nil {
  972. s.debugln("recovery error: ", err)
  973. return err
  974. }
  975. for _, peer := range s.lastSnapshot.Peers {
  976. s.AddPeer(peer.Name, peer.ConnectionString)
  977. }
  978. s.log.startTerm = s.lastSnapshot.LastTerm
  979. s.log.startIndex = s.lastSnapshot.LastIndex
  980. s.log.updateCommitIndex(s.lastSnapshot.LastIndex)
  981. return err
  982. }
  983. //--------------------------------------
  984. // Config File
  985. //--------------------------------------
  986. func (s *Server) writeConf() {
  987. peers := make([]*Peer, len(s.peers))
  988. i := 0
  989. for _, peer := range s.peers {
  990. peers[i] = peer.clone()
  991. i++
  992. }
  993. r := &Config{
  994. CommitIndex: s.log.commitIndex,
  995. Peers: peers,
  996. }
  997. b, _ := json.Marshal(r)
  998. confPath := path.Join(s.path, "conf")
  999. tmpConfPath := path.Join(s.path, "conf.tmp")
  1000. err := ioutil.WriteFile(tmpConfPath, b, 0600)
  1001. if err != nil {
  1002. panic(err)
  1003. }
  1004. os.Rename(tmpConfPath, confPath)
  1005. }
  1006. // Read the configuration for the server.
  1007. func (s *Server) readConf() error {
  1008. confPath := path.Join(s.path, "conf")
  1009. s.debugln("readConf.open ", confPath)
  1010. // open conf file
  1011. b, err := ioutil.ReadFile(confPath)
  1012. if err != nil {
  1013. return nil
  1014. }
  1015. conf := &Config{}
  1016. if err = json.Unmarshal(b, conf); err != nil {
  1017. return err
  1018. }
  1019. s.log.updateCommitIndex(conf.CommitIndex)
  1020. return nil
  1021. }
  1022. //--------------------------------------
  1023. // Debugging
  1024. //--------------------------------------
  1025. func (s *Server) debugln(v ...interface{}) {
  1026. debugf("[%s Term:%d] %s", s.name, s.currentTerm, fmt.Sprintln(v...))
  1027. }
  1028. func (s *Server) traceln(v ...interface{}) {
  1029. tracef("[%s] %s", s.name, fmt.Sprintln(v...))
  1030. }