server.go 32 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260
  1. package raft
  2. import (
  3. "encoding/json"
  4. "errors"
  5. "fmt"
  6. "hash/crc32"
  7. "io"
  8. "io/ioutil"
  9. "os"
  10. "path"
  11. "sort"
  12. "sync"
  13. "time"
  14. )
  15. //------------------------------------------------------------------------------
  16. //
  17. // Constants
  18. //
  19. //------------------------------------------------------------------------------
  20. const (
  21. Stopped = "stopped"
  22. Follower = "follower"
  23. Candidate = "candidate"
  24. Leader = "leader"
  25. Snapshotting = "snapshotting"
  26. )
  27. const (
  28. MaxLogEntriesPerRequest = 2000
  29. NumberOfLogEntriesAfterSnapshot = 200
  30. )
  31. const (
  32. DefaultHeartbeatTimeout = 50 * time.Millisecond
  33. DefaultElectionTimeout = 150 * time.Millisecond
  34. )
  35. var stopValue interface{}
  36. //------------------------------------------------------------------------------
  37. //
  38. // Errors
  39. //
  40. //------------------------------------------------------------------------------
  41. var NotLeaderError = errors.New("raft.Server: Not current leader")
  42. var DuplicatePeerError = errors.New("raft.Server: Duplicate peer")
  43. var CommandTimeoutError = errors.New("raft: Command timeout")
  44. //------------------------------------------------------------------------------
  45. //
  46. // Typedefs
  47. //
  48. //------------------------------------------------------------------------------
  49. // A server is involved in the consensus protocol and can act as a follower,
  50. // candidate or a leader.
  51. type Server struct {
  52. name string
  53. path string
  54. state string
  55. transporter Transporter
  56. context interface{}
  57. currentTerm uint64
  58. votedFor string
  59. log *Log
  60. leader string
  61. peers map[string]*Peer
  62. mutex sync.RWMutex
  63. syncedPeer map[string]bool
  64. c chan *event
  65. electionTimeout time.Duration
  66. heartbeatTimeout time.Duration
  67. currentSnapshot *Snapshot
  68. lastSnapshot *Snapshot
  69. stateMachine StateMachine
  70. maxLogEntriesPerRequest uint64
  71. confFile *os.File
  72. }
  73. // An event to be processed by the server's event loop.
  74. type event struct {
  75. target interface{}
  76. returnValue interface{}
  77. c chan error
  78. }
  79. //------------------------------------------------------------------------------
  80. //
  81. // Constructor
  82. //
  83. //------------------------------------------------------------------------------
  84. // Creates a new server with a log at the given path.
  85. func NewServer(name string, path string, transporter Transporter, stateMachine StateMachine, context interface{}) (*Server, error) {
  86. if name == "" {
  87. return nil, errors.New("raft.Server: Name cannot be blank")
  88. }
  89. if transporter == nil {
  90. panic("raft: Transporter required")
  91. }
  92. s := &Server{
  93. name: name,
  94. path: path,
  95. transporter: transporter,
  96. stateMachine: stateMachine,
  97. context: context,
  98. state: Stopped,
  99. peers: make(map[string]*Peer),
  100. log: newLog(),
  101. c: make(chan *event, 256),
  102. electionTimeout: DefaultElectionTimeout,
  103. heartbeatTimeout: DefaultHeartbeatTimeout,
  104. maxLogEntriesPerRequest: MaxLogEntriesPerRequest,
  105. }
  106. // Setup apply function.
  107. s.log.ApplyFunc = func(c Command) (interface{}, error) {
  108. result, err := c.Apply(s)
  109. return result, err
  110. }
  111. return s, nil
  112. }
  113. //------------------------------------------------------------------------------
  114. //
  115. // Accessors
  116. //
  117. //------------------------------------------------------------------------------
  118. //--------------------------------------
  119. // General
  120. //--------------------------------------
  121. // Retrieves the name of the server.
  122. func (s *Server) Name() string {
  123. return s.name
  124. }
  125. // Retrieves the storage path for the server.
  126. func (s *Server) Path() string {
  127. return s.path
  128. }
  129. // The name of the current leader.
  130. func (s *Server) Leader() string {
  131. return s.leader
  132. }
  133. // Retrieves a copy of the peer data.
  134. func (s *Server) Peers() map[string]*Peer {
  135. s.mutex.Lock()
  136. defer s.mutex.Unlock()
  137. peers := make(map[string]*Peer)
  138. for name, peer := range s.peers {
  139. peers[name] = peer.clone()
  140. }
  141. return peers
  142. }
  143. // Retrieves the object that transports requests.
  144. func (s *Server) Transporter() Transporter {
  145. s.mutex.RLock()
  146. defer s.mutex.RUnlock()
  147. return s.transporter
  148. }
  149. func (s *Server) SetTransporter(t Transporter) {
  150. s.mutex.Lock()
  151. defer s.mutex.Unlock()
  152. s.transporter = t
  153. }
  154. // Retrieves the context passed into the constructor.
  155. func (s *Server) Context() interface{} {
  156. return s.context
  157. }
  158. // Retrieves the log path for the server.
  159. func (s *Server) LogPath() string {
  160. return path.Join(s.path, "log")
  161. }
  162. // Retrieves the current state of the server.
  163. func (s *Server) State() string {
  164. s.mutex.RLock()
  165. defer s.mutex.RUnlock()
  166. return s.state
  167. }
  168. // Sets the state of the server.
  169. func (s *Server) setState(state string) {
  170. s.mutex.Lock()
  171. defer s.mutex.Unlock()
  172. s.state = state
  173. if state == Leader {
  174. s.leader = s.Name()
  175. }
  176. }
  177. // Retrieves the current term of the server.
  178. func (s *Server) Term() uint64 {
  179. return s.currentTerm
  180. }
  181. // Retrieves the current commit index of the server.
  182. func (s *Server) CommitIndex() uint64 {
  183. return s.log.commitIndex
  184. }
  185. // Retrieves the name of the candidate this server voted for in this term.
  186. func (s *Server) VotedFor() string {
  187. return s.votedFor
  188. }
  189. // Retrieves whether the server's log has no entries.
  190. func (s *Server) IsLogEmpty() bool {
  191. return s.log.isEmpty()
  192. }
  193. // A list of all the log entries. This should only be used for debugging purposes.
  194. func (s *Server) LogEntries() []*LogEntry {
  195. return s.log.entries
  196. }
  197. // A reference to the command name of the last entry.
  198. func (s *Server) LastCommandName() string {
  199. return s.log.lastCommandName()
  200. }
  201. // Get the state of the server for debugging
  202. func (s *Server) GetState() string {
  203. s.mutex.RLock()
  204. defer s.mutex.RUnlock()
  205. return fmt.Sprintf("Name: %s, State: %s, Term: %v, Index: %v ", s.name, s.state, s.currentTerm, s.log.commitIndex)
  206. }
  207. // Check if the server is promotable
  208. func (s *Server) promotable() bool {
  209. return s.log.currentIndex() > 0
  210. }
  211. //--------------------------------------
  212. // Membership
  213. //--------------------------------------
  214. // Retrieves the number of member servers in the consensus.
  215. func (s *Server) MemberCount() int {
  216. s.mutex.Lock()
  217. defer s.mutex.Unlock()
  218. return len(s.peers) + 1
  219. }
  220. // Retrieves the number of servers required to make a quorum.
  221. func (s *Server) QuorumSize() int {
  222. return (s.MemberCount() / 2) + 1
  223. }
  224. //--------------------------------------
  225. // Election timeout
  226. //--------------------------------------
  227. // Retrieves the election timeout.
  228. func (s *Server) ElectionTimeout() time.Duration {
  229. return s.electionTimeout
  230. }
  231. // Sets the election timeout.
  232. func (s *Server) SetElectionTimeout(duration time.Duration) {
  233. s.electionTimeout = duration
  234. }
  235. //--------------------------------------
  236. // Heartbeat timeout
  237. //--------------------------------------
  238. // Retrieves the heartbeat timeout.
  239. func (s *Server) HeartbeatTimeout() time.Duration {
  240. return s.heartbeatTimeout
  241. }
  242. // Sets the heartbeat timeout.
  243. func (s *Server) SetHeartbeatTimeout(duration time.Duration) {
  244. s.mutex.Lock()
  245. defer s.mutex.Unlock()
  246. s.heartbeatTimeout = duration
  247. for _, peer := range s.peers {
  248. peer.setHeartbeatTimeout(duration)
  249. }
  250. }
  251. //------------------------------------------------------------------------------
  252. //
  253. // Methods
  254. //
  255. //------------------------------------------------------------------------------
  256. //--------------------------------------
  257. // Initialization
  258. //--------------------------------------
  259. // Reg the NOPCommand
  260. func init() {
  261. RegisterCommand(&NOPCommand{})
  262. RegisterCommand(&DefaultJoinCommand{})
  263. RegisterCommand(&DefaultLeaveCommand{})
  264. }
  265. // Start as follow
  266. // If log entries exist then allow promotion to candidate if no AEs received.
  267. // If no log entries exist then wait for AEs from another node.
  268. // If no log entries exist and a self-join command is issued then
  269. // immediately become leader and commit entry.
  270. func (s *Server) Start() error {
  271. // Exit if the server is already running.
  272. if s.state != Stopped {
  273. return errors.New("raft.Server: Server already running")
  274. }
  275. // Create snapshot directory if not exist
  276. os.Mkdir(path.Join(s.path, "snapshot"), 0700)
  277. // Initialize the log and load it up.
  278. if err := s.log.open(s.LogPath()); err != nil {
  279. s.debugln("raft: Log error: ", err)
  280. return fmt.Errorf("raft: Initialization error: %s", err)
  281. }
  282. if err := s.readConf(); err != nil {
  283. s.debugln("raft: Conf file error: ", err)
  284. return fmt.Errorf("raft: Initialization error: %s", err)
  285. }
  286. // Update the term to the last term in the log.
  287. _, s.currentTerm = s.log.lastInfo()
  288. s.setState(Follower)
  289. // If no log entries exist then
  290. // 1. wait for AEs from another node
  291. // 2. wait for self-join command
  292. // to set itself promotable
  293. if !s.promotable() {
  294. s.debugln("start as a new raft server")
  295. // If log entries exist then allow promotion to candidate
  296. // if no AEs received.
  297. } else {
  298. s.debugln("start from previous saved state")
  299. }
  300. go s.loop()
  301. return nil
  302. }
  303. // Read the configuration for the server.
  304. func (s *Server) readConf() error {
  305. var err error
  306. confPath := path.Join(s.path, "conf")
  307. s.debugln("readConf.open ", confPath)
  308. // open conf file
  309. s.confFile, err = os.OpenFile(confPath, os.O_RDWR, 0600)
  310. if err != nil {
  311. if os.IsNotExist(err) {
  312. s.confFile, err = os.OpenFile(confPath, os.O_WRONLY|os.O_CREATE, 0600)
  313. debugln("readConf.create ", confPath)
  314. if err != nil {
  315. return err
  316. }
  317. }
  318. return err
  319. }
  320. for {
  321. var peerName string
  322. _, err = fmt.Fscanf(s.confFile, "%s\n", &peerName)
  323. if err != nil {
  324. if err == io.EOF {
  325. s.debugln("server.peer.conf: finish")
  326. return nil
  327. }
  328. return err
  329. }
  330. s.debugln("server.peer.conf.read: ", peerName)
  331. peer := newPeer(s, peerName, s.heartbeatTimeout)
  332. s.peers[peer.name] = peer
  333. }
  334. return nil
  335. }
  336. // Shuts down the server.
  337. func (s *Server) Stop() {
  338. s.send(&stopValue)
  339. s.mutex.Lock()
  340. s.log.close()
  341. s.mutex.Unlock()
  342. }
  343. // Checks if the server is currently running.
  344. func (s *Server) Running() bool {
  345. s.mutex.RLock()
  346. defer s.mutex.RUnlock()
  347. return s.state != Stopped
  348. }
  349. //--------------------------------------
  350. // Term
  351. //--------------------------------------
  352. // Sets the current term for the server. This is only used when an external
  353. // current term is found.
  354. func (s *Server) setCurrentTerm(term uint64, leaderName string, append bool) {
  355. s.mutex.Lock()
  356. defer s.mutex.Unlock()
  357. // update the term and clear vote for
  358. if term > s.currentTerm {
  359. s.state = Follower
  360. s.currentTerm = term
  361. s.leader = leaderName
  362. s.votedFor = ""
  363. return
  364. }
  365. // discover new leader when candidate
  366. // save leader name when follower
  367. if term == s.currentTerm && s.state != Leader && append {
  368. s.state = Follower
  369. s.leader = leaderName
  370. }
  371. }
  372. //--------------------------------------
  373. // Event Loop
  374. //--------------------------------------
  375. // ________
  376. // --|Snapshot| timeout
  377. // | -------- ______
  378. // recover | ^ | |
  379. // snapshot / | |snapshot | |
  380. // higher | | v | recv majority votes
  381. // term | -------- timeout ----------- -----------
  382. // |-> |Follower| ----------> | Candidate |--------------------> | Leader |
  383. // -------- ----------- -----------
  384. // ^ higher term/ | higher term |
  385. // | new leader | |
  386. // |_______________________|____________________________________ |
  387. // The main event loop for the server
  388. func (s *Server) loop() {
  389. defer s.debugln("server.loop.end")
  390. for {
  391. state := s.State()
  392. s.debugln("server.loop.run ", state)
  393. switch state {
  394. case Follower:
  395. s.followerLoop()
  396. case Candidate:
  397. s.candidateLoop()
  398. case Leader:
  399. s.leaderLoop()
  400. case Snapshotting:
  401. s.snapshotLoop()
  402. case Stopped:
  403. return
  404. }
  405. }
  406. }
  407. // Sends an event to the event loop to be processed. The function will wait
  408. // until the event is actually processed before returning.
  409. func (s *Server) send(value interface{}) (interface{}, error) {
  410. event := s.sendAsync(value)
  411. err := <-event.c
  412. return event.returnValue, err
  413. }
  414. func (s *Server) sendAsync(value interface{}) *event {
  415. event := &event{target: value, c: make(chan error, 1)}
  416. s.c <- event
  417. return event
  418. }
  419. // The event loop that is run when the server is in a Follower state.
  420. // Responds to RPCs from candidates and leaders.
  421. // Converts to candidate if election timeout elapses without either:
  422. // 1.Receiving valid AppendEntries RPC, or
  423. // 2.Granting vote to candidate
  424. func (s *Server) followerLoop() {
  425. s.setState(Follower)
  426. timeoutChan := afterBetween(s.ElectionTimeout(), s.ElectionTimeout()*2)
  427. for {
  428. var err error
  429. update := false
  430. select {
  431. case e := <-s.c:
  432. if e.target == &stopValue {
  433. s.setState(Stopped)
  434. } else if command, ok := e.target.(JoinCommand); ok {
  435. //If no log entries exist and a self-join command is issued
  436. //then immediately become leader and commit entry.
  437. if s.log.currentIndex() == 0 && command.NodeName() == s.Name() {
  438. s.debugln("selfjoin and promote to leader")
  439. s.setState(Leader)
  440. s.processCommand(command, e)
  441. } else {
  442. err = NotLeaderError
  443. }
  444. } else if req, ok := e.target.(*AppendEntriesRequest); ok {
  445. e.returnValue, update = s.processAppendEntriesRequest(req)
  446. } else if req, ok := e.target.(*RequestVoteRequest); ok {
  447. e.returnValue, update = s.processRequestVoteRequest(req)
  448. } else if req, ok := e.target.(*SnapshotRequest); ok {
  449. e.returnValue = s.processSnapshotRequest(req)
  450. } else {
  451. err = NotLeaderError
  452. }
  453. // Callback to event.
  454. e.c <- err
  455. case <-timeoutChan:
  456. // only allow synced follower to promote to candidate
  457. if s.promotable() {
  458. s.setState(Candidate)
  459. } else {
  460. update = true
  461. }
  462. }
  463. // Converts to candidate if election timeout elapses without either:
  464. // 1.Receiving valid AppendEntries RPC, or
  465. // 2.Granting vote to candidate
  466. if update {
  467. timeoutChan = afterBetween(s.ElectionTimeout(), s.ElectionTimeout()*2)
  468. }
  469. // Exit loop on state change.
  470. if s.State() != Follower {
  471. break
  472. }
  473. }
  474. }
  475. // The event loop that is run when the server is in a Candidate state.
  476. func (s *Server) candidateLoop() {
  477. lastLogIndex, lastLogTerm := s.log.lastInfo()
  478. s.leader = ""
  479. for {
  480. // Increment current term, vote for self.
  481. s.currentTerm++
  482. s.votedFor = s.name
  483. // Send RequestVote RPCs to all other servers.
  484. respChan := make(chan *RequestVoteResponse, len(s.peers))
  485. for _, peer := range s.peers {
  486. go peer.sendVoteRequest(newRequestVoteRequest(s.currentTerm, s.name, lastLogIndex, lastLogTerm), respChan)
  487. }
  488. // Wait for either:
  489. // * Votes received from majority of servers: become leader
  490. // * AppendEntries RPC received from new leader: step down.
  491. // * Election timeout elapses without election resolution: increment term, start new election
  492. // * Discover higher term: step down (§5.1)
  493. votesGranted := 1
  494. timeoutChan := afterBetween(s.ElectionTimeout(), s.ElectionTimeout()*2)
  495. timeout := false
  496. for {
  497. // If we received enough votes then stop waiting for more votes.
  498. s.debugln("server.candidate.votes: ", votesGranted, " quorum:", s.QuorumSize())
  499. if votesGranted >= s.QuorumSize() {
  500. s.setState(Leader)
  501. break
  502. }
  503. // Collect votes from peers.
  504. select {
  505. case resp := <-respChan:
  506. if resp.VoteGranted {
  507. s.debugln("server.candidate.vote.granted: ", votesGranted)
  508. votesGranted++
  509. } else if resp.Term > s.currentTerm {
  510. s.debugln("server.candidate.vote.failed")
  511. s.setCurrentTerm(resp.Term, "", false)
  512. } else {
  513. s.debugln("server.candidate.vote: denied")
  514. }
  515. case e := <-s.c:
  516. var err error
  517. if e.target == &stopValue {
  518. s.setState(Stopped)
  519. } else if _, ok := e.target.(Command); ok {
  520. err = NotLeaderError
  521. } else if req, ok := e.target.(*AppendEntriesRequest); ok {
  522. e.returnValue, _ = s.processAppendEntriesRequest(req)
  523. } else if req, ok := e.target.(*RequestVoteRequest); ok {
  524. e.returnValue, _ = s.processRequestVoteRequest(req)
  525. }
  526. // Callback to event.
  527. e.c <- err
  528. case <-timeoutChan:
  529. timeout = true
  530. }
  531. // both process AER and RVR can make the server to follower
  532. // also break when timeout happens
  533. if s.State() != Candidate || timeout {
  534. break
  535. }
  536. }
  537. // break when we are not candidate
  538. if s.State() != Candidate {
  539. break
  540. }
  541. // continue when timeout happened
  542. }
  543. }
  544. // The event loop that is run when the server is in a Candidate state.
  545. func (s *Server) leaderLoop() {
  546. s.setState(Leader)
  547. s.syncedPeer = make(map[string]bool)
  548. logIndex, _ := s.log.lastInfo()
  549. // Update the peers prevLogIndex to leader's lastLogIndex and start heartbeat.
  550. s.debugln("leaderLoop.set.PrevIndex to ", logIndex)
  551. for _, peer := range s.peers {
  552. peer.setPrevLogIndex(logIndex)
  553. peer.startHeartbeat()
  554. }
  555. go s.Do(NOPCommand{})
  556. // Begin to collect response from followers
  557. for {
  558. var err error
  559. select {
  560. case e := <-s.c:
  561. if e.target == &stopValue {
  562. s.setState(Stopped)
  563. } else if command, ok := e.target.(Command); ok {
  564. s.processCommand(command, e)
  565. continue
  566. } else if req, ok := e.target.(*AppendEntriesRequest); ok {
  567. e.returnValue, _ = s.processAppendEntriesRequest(req)
  568. } else if resp, ok := e.target.(*AppendEntriesResponse); ok {
  569. s.processAppendEntriesResponse(resp)
  570. } else if req, ok := e.target.(*RequestVoteRequest); ok {
  571. e.returnValue, _ = s.processRequestVoteRequest(req)
  572. }
  573. // Callback to event.
  574. e.c <- err
  575. }
  576. // Exit loop on state change.
  577. if s.State() != Leader {
  578. break
  579. }
  580. }
  581. // Stop all peers.
  582. for _, peer := range s.peers {
  583. peer.stopHeartbeat()
  584. }
  585. s.syncedPeer = nil
  586. }
  587. func (s *Server) snapshotLoop() {
  588. s.setState(Snapshotting)
  589. for {
  590. var err error
  591. e := <-s.c
  592. if e.target == &stopValue {
  593. s.setState(Stopped)
  594. } else if _, ok := e.target.(Command); ok {
  595. err = NotLeaderError
  596. } else if req, ok := e.target.(*AppendEntriesRequest); ok {
  597. e.returnValue, _ = s.processAppendEntriesRequest(req)
  598. } else if req, ok := e.target.(*RequestVoteRequest); ok {
  599. e.returnValue, _ = s.processRequestVoteRequest(req)
  600. } else if req, ok := e.target.(*SnapshotRecoveryRequest); ok {
  601. e.returnValue = s.processSnapshotRecoveryRequest(req)
  602. }
  603. // Callback to event.
  604. e.c <- err
  605. // Exit loop on state change.
  606. if s.State() != Snapshotting {
  607. break
  608. }
  609. }
  610. }
  611. //--------------------------------------
  612. // Commands
  613. //--------------------------------------
  614. // Attempts to execute a command and replicate it. The function will return
  615. // when the command has been successfully committed or an error has occurred.
  616. func (s *Server) Do(command Command) (interface{}, error) {
  617. return s.send(command)
  618. }
  619. // Processes a command.
  620. func (s *Server) processCommand(command Command, e *event) {
  621. s.debugln("server.command.process")
  622. // Create an entry for the command in the log.
  623. entry, err := s.log.createEntry(s.currentTerm, command)
  624. if err != nil {
  625. s.debugln("server.command.log.entry.error:", err)
  626. e.c <- err
  627. return
  628. }
  629. if err := s.log.appendEntry(entry); err != nil {
  630. s.debugln("server.command.log.error:", err)
  631. e.c <- err
  632. return
  633. }
  634. // Issue a callback for the entry once it's committed.
  635. go func() {
  636. // Wait for the entry to be committed.
  637. select {
  638. case <-entry.commit:
  639. var err error
  640. s.debugln("server.command.commit")
  641. e.returnValue, err = s.log.getEntryResult(entry, true)
  642. e.c <- err
  643. case <-time.After(time.Second):
  644. s.debugln("server.command.timeout")
  645. e.c <- CommandTimeoutError
  646. }
  647. }()
  648. // Issue an append entries response for the server.
  649. resp := newAppendEntriesResponse(s.currentTerm, true, s.log.currentIndex(), s.log.CommitIndex())
  650. resp.append = true
  651. resp.peer = s.Name()
  652. // this must be async
  653. // sendAsync is not really async every time
  654. // when the sending speed of the user is larger than
  655. // the processing speed of the server, the buffered channel
  656. // will be full. Then sendAsync will become sync, which will
  657. // cause deadlock here.
  658. // so we use a goroutine to avoid the deadlock
  659. go s.sendAsync(resp)
  660. }
  661. //--------------------------------------
  662. // Append Entries
  663. //--------------------------------------
  664. // Appends zero or more log entry from the leader to this server.
  665. func (s *Server) AppendEntries(req *AppendEntriesRequest) *AppendEntriesResponse {
  666. ret, _ := s.send(req)
  667. resp, _ := ret.(*AppendEntriesResponse)
  668. return resp
  669. }
  670. // Processes the "append entries" request.
  671. func (s *Server) processAppendEntriesRequest(req *AppendEntriesRequest) (*AppendEntriesResponse, bool) {
  672. s.traceln("server.ae.process")
  673. if req.Term < s.currentTerm {
  674. s.debugln("server.ae.error: stale term")
  675. return newAppendEntriesResponse(s.currentTerm, false, s.log.currentIndex(), s.log.CommitIndex()), false
  676. }
  677. // Update term and leader.
  678. s.setCurrentTerm(req.Term, req.LeaderName, true)
  679. // Reject if log doesn't contain a matching previous entry.
  680. if err := s.log.truncate(req.PrevLogIndex, req.PrevLogTerm); err != nil {
  681. s.debugln("server.ae.truncate.error: ", err)
  682. return newAppendEntriesResponse(s.currentTerm, false, s.log.currentIndex(), s.log.CommitIndex()), true
  683. }
  684. // Append entries to the log.
  685. if err := s.log.appendEntries(req.Entries); err != nil {
  686. s.debugln("server.ae.append.error: ", err)
  687. return newAppendEntriesResponse(s.currentTerm, false, s.log.currentIndex(), s.log.CommitIndex()), true
  688. }
  689. // Commit up to the commit index.
  690. if err := s.log.setCommitIndex(req.CommitIndex); err != nil {
  691. s.debugln("server.ae.commit.error: ", err)
  692. return newAppendEntriesResponse(s.currentTerm, false, s.log.currentIndex(), s.log.CommitIndex()), true
  693. }
  694. // once the server appended and commited all the log entries from the leader
  695. return newAppendEntriesResponse(s.currentTerm, true, s.log.currentIndex(), s.log.CommitIndex()), true
  696. }
  697. // Processes the "append entries" response from the peer. This is only
  698. // processed when the server is a leader. Responses received during other
  699. // states are dropped.
  700. func (s *Server) processAppendEntriesResponse(resp *AppendEntriesResponse) {
  701. // If we find a higher term then change to a follower and exit.
  702. if resp.Term > s.currentTerm {
  703. s.setCurrentTerm(resp.Term, "", false)
  704. return
  705. }
  706. // panic response if it's not successful.
  707. if !resp.Success {
  708. return
  709. }
  710. // if one peer successfully append a log from the leader term,
  711. // we add it to the synced list
  712. if resp.append == true {
  713. s.syncedPeer[resp.peer] = true
  714. }
  715. // Increment the commit count to make sure we have a quorum before committing.
  716. if len(s.syncedPeer) < s.QuorumSize() {
  717. return
  718. }
  719. // Determine the committed index that a majority has.
  720. var indices []uint64
  721. indices = append(indices, s.log.currentIndex())
  722. for _, peer := range s.peers {
  723. indices = append(indices, peer.getPrevLogIndex())
  724. }
  725. sort.Sort(uint64Slice(indices))
  726. // We can commit up to the index which the majority of the members have appended.
  727. commitIndex := indices[s.QuorumSize()-1]
  728. committedIndex := s.log.commitIndex
  729. if commitIndex > committedIndex {
  730. s.log.setCommitIndex(commitIndex)
  731. s.debugln("commit index ", commitIndex)
  732. for i := committedIndex; i < commitIndex; i++ {
  733. if entry := s.log.getEntry(i + 1); entry != nil {
  734. // if the leader is a new one and the entry came from the
  735. // old leader, the commit channel will be nil and no go routine
  736. // is waiting from this channel
  737. // if we try to send to it, the new leader will get stuck
  738. if entry.commit != nil {
  739. select {
  740. case entry.commit <- true:
  741. default:
  742. panic("server unable to send signal to commit channel")
  743. }
  744. }
  745. }
  746. }
  747. }
  748. }
  749. //--------------------------------------
  750. // Request Vote
  751. //--------------------------------------
  752. // Requests a vote from a server. A vote can be obtained if the vote's term is
  753. // at the server's current term and the server has not made a vote yet. A vote
  754. // can also be obtained if the term is greater than the server's current term.
  755. func (s *Server) RequestVote(req *RequestVoteRequest) *RequestVoteResponse {
  756. ret, _ := s.send(req)
  757. resp, _ := ret.(*RequestVoteResponse)
  758. return resp
  759. }
  760. // Processes a "request vote" request.
  761. func (s *Server) processRequestVoteRequest(req *RequestVoteRequest) (*RequestVoteResponse, bool) {
  762. // If the request is coming from an old term then reject it.
  763. if req.Term < s.currentTerm {
  764. s.debugln("server.rv.error: stale term")
  765. return newRequestVoteResponse(s.currentTerm, false), false
  766. }
  767. s.setCurrentTerm(req.Term, "", false)
  768. // If we've already voted for a different candidate then don't vote for this candidate.
  769. if s.votedFor != "" && s.votedFor != req.CandidateName {
  770. s.debugln("server.rv.error: duplicate vote: ", req.CandidateName,
  771. " already vote for ", s.votedFor)
  772. return newRequestVoteResponse(s.currentTerm, false), false
  773. }
  774. // If the candidate's log is not at least as up-to-date as our last log then don't vote.
  775. lastIndex, lastTerm := s.log.lastInfo()
  776. if lastIndex > req.LastLogIndex || lastTerm > req.LastLogTerm {
  777. s.debugln("server.rv.error: out of date log: ", req.CandidateName,
  778. "Index :[", lastIndex, "]", " [", req.LastLogIndex, "]",
  779. "Term :[", lastTerm, "]", " [", req.LastLogTerm, "]")
  780. return newRequestVoteResponse(s.currentTerm, false), false
  781. }
  782. // If we made it this far then cast a vote and reset our election time out.
  783. s.debugln("server.rv.vote: ", s.name, " votes for", req.CandidateName, "at term", req.Term)
  784. s.votedFor = req.CandidateName
  785. return newRequestVoteResponse(s.currentTerm, true), true
  786. }
  787. //--------------------------------------
  788. // Membership
  789. //--------------------------------------
  790. // Adds a peer to the server.
  791. func (s *Server) AddPeer(name string) error {
  792. s.debugln("server.peer.add: ", name, len(s.peers))
  793. // Do not allow peers to be added twice.
  794. if s.peers[name] != nil {
  795. return nil
  796. }
  797. // Only add the peer if it doesn't have the same name.
  798. if s.name != name {
  799. _, err := fmt.Fprintln(s.confFile, name)
  800. s.debugln("server.peer.conf.write: ", name)
  801. if err != nil {
  802. return err
  803. }
  804. peer := newPeer(s, name, s.heartbeatTimeout)
  805. if s.State() == Leader {
  806. peer.startHeartbeat()
  807. }
  808. s.peers[peer.name] = peer
  809. }
  810. return nil
  811. }
  812. // Removes a peer from the server.
  813. func (s *Server) RemovePeer(name string) error {
  814. s.debugln("server.peer.remove: ", name, len(s.peers))
  815. // Ignore removal of the server itself.
  816. if s.name == name {
  817. return nil
  818. }
  819. // Return error if peer doesn't exist.
  820. peer := s.peers[name]
  821. if peer == nil {
  822. return fmt.Errorf("raft: Peer not found: %s", name)
  823. }
  824. // TODO: Flush entries to the peer first.
  825. // Stop peer and remove it.
  826. peer.stopHeartbeat()
  827. delete(s.peers, name)
  828. s.confFile.Truncate(0)
  829. s.confFile.Seek(0, os.SEEK_SET)
  830. for peer := range s.peers {
  831. _, err := fmt.Fprintln(s.confFile, peer)
  832. if err != nil {
  833. return err
  834. }
  835. }
  836. return nil
  837. }
  838. //--------------------------------------
  839. // Log compaction
  840. //--------------------------------------
  841. // The background snapshot function
  842. func (s *Server) Snapshot() {
  843. for {
  844. // TODO: change this... to something reasonable
  845. time.Sleep(1 * time.Second)
  846. s.takeSnapshot()
  847. }
  848. }
  849. func (s *Server) takeSnapshot() error {
  850. //TODO put a snapshot mutex
  851. s.debugln("take Snapshot")
  852. if s.currentSnapshot != nil {
  853. return errors.New("handling snapshot")
  854. }
  855. lastIndex, lastTerm := s.log.commitInfo()
  856. if lastIndex == 0 || lastTerm == 0 {
  857. return errors.New("No logs")
  858. }
  859. path := s.SnapshotPath(lastIndex, lastTerm)
  860. var state []byte
  861. var err error
  862. if s.stateMachine != nil {
  863. state, err = s.stateMachine.Save()
  864. if err != nil {
  865. return err
  866. }
  867. } else {
  868. state = []byte{0}
  869. }
  870. var peerNames []string
  871. for _, peer := range s.peers {
  872. peerNames = append(peerNames, peer.Name())
  873. }
  874. peerNames = append(peerNames, s.Name())
  875. s.currentSnapshot = &Snapshot{lastIndex, lastTerm, peerNames, state, path}
  876. s.saveSnapshot()
  877. // We keep some log entries after the snapshot
  878. // We do not want to send the whole snapshot
  879. // to the slightly slow machines
  880. if lastIndex-s.log.startIndex > NumberOfLogEntriesAfterSnapshot {
  881. compactIndex := lastIndex - NumberOfLogEntriesAfterSnapshot
  882. compactTerm := s.log.getEntry(compactIndex).Term
  883. s.log.compact(compactIndex, compactTerm)
  884. }
  885. return nil
  886. }
  887. // Retrieves the log path for the server.
  888. func (s *Server) saveSnapshot() error {
  889. if s.currentSnapshot == nil {
  890. return errors.New("no snapshot to save")
  891. }
  892. err := s.currentSnapshot.save()
  893. if err != nil {
  894. return err
  895. }
  896. tmp := s.lastSnapshot
  897. s.lastSnapshot = s.currentSnapshot
  898. // delete the previous snapshot if there is any change
  899. if tmp != nil && !(tmp.LastIndex == s.lastSnapshot.LastIndex && tmp.LastTerm == s.lastSnapshot.LastTerm) {
  900. tmp.remove()
  901. }
  902. s.currentSnapshot = nil
  903. return nil
  904. }
  905. // Retrieves the log path for the server.
  906. func (s *Server) SnapshotPath(lastIndex uint64, lastTerm uint64) string {
  907. return path.Join(s.path, "snapshot", fmt.Sprintf("%v_%v.ss", lastTerm, lastIndex))
  908. }
  909. func (s *Server) RequestSnapshot(req *SnapshotRequest) *SnapshotResponse {
  910. ret, _ := s.send(req)
  911. resp, _ := ret.(*SnapshotResponse)
  912. return resp
  913. }
  914. func (s *Server) processSnapshotRequest(req *SnapshotRequest) *SnapshotResponse {
  915. // If the follower’s log contains an entry at the snapshot’s last index with a term
  916. // that matches the snapshot’s last term
  917. // Then the follower already has all the information found in the snapshot
  918. // and can reply false
  919. entry := s.log.getEntry(req.LastIndex)
  920. if entry != nil && entry.Term == req.LastTerm {
  921. return newSnapshotResponse(false)
  922. }
  923. s.setState(Snapshotting)
  924. return newSnapshotResponse(true)
  925. }
  926. func (s *Server) SnapshotRecoveryRequest(req *SnapshotRecoveryRequest) *SnapshotRecoveryResponse {
  927. ret, _ := s.send(req)
  928. resp, _ := ret.(*SnapshotRecoveryResponse)
  929. return resp
  930. }
  931. func (s *Server) processSnapshotRecoveryRequest(req *SnapshotRecoveryRequest) *SnapshotRecoveryResponse {
  932. s.stateMachine.Recovery(req.State)
  933. // clear the peer map
  934. s.peers = make(map[string]*Peer)
  935. // recovery the cluster configuration
  936. for _, peerName := range req.Peers {
  937. s.AddPeer(peerName)
  938. }
  939. //update term and index
  940. s.currentTerm = req.LastTerm
  941. s.log.updateCommitIndex(req.LastIndex)
  942. snapshotPath := s.SnapshotPath(req.LastIndex, req.LastTerm)
  943. s.currentSnapshot = &Snapshot{req.LastIndex, req.LastTerm, req.Peers, req.State, snapshotPath}
  944. s.saveSnapshot()
  945. // clear the previous log entries
  946. s.log.compact(req.LastIndex, req.LastTerm)
  947. return newSnapshotRecoveryResponse(req.LastTerm, true, req.LastIndex)
  948. }
  949. // Load a snapshot at restart
  950. func (s *Server) LoadSnapshot() error {
  951. dir, err := os.OpenFile(path.Join(s.path, "snapshot"), os.O_RDONLY, 0)
  952. if err != nil {
  953. return err
  954. }
  955. filenames, err := dir.Readdirnames(-1)
  956. if err != nil {
  957. dir.Close()
  958. panic(err)
  959. }
  960. dir.Close()
  961. if len(filenames) == 0 {
  962. return errors.New("no snapshot")
  963. }
  964. // not sure how many snapshot we should keep
  965. sort.Strings(filenames)
  966. snapshotPath := path.Join(s.path, "snapshot", filenames[len(filenames)-1])
  967. // should not fail
  968. file, err := os.OpenFile(snapshotPath, os.O_RDONLY, 0)
  969. defer file.Close()
  970. if err != nil {
  971. panic(err)
  972. }
  973. // TODO check checksum first
  974. var snapshotBytes []byte
  975. var checksum uint32
  976. n, err := fmt.Fscanf(file, "%08x\n", &checksum)
  977. if err != nil {
  978. return err
  979. }
  980. if n != 1 {
  981. return errors.New("Bad snapshot file")
  982. }
  983. snapshotBytes, _ = ioutil.ReadAll(file)
  984. s.debugln(string(snapshotBytes))
  985. // Generate checksum.
  986. byteChecksum := crc32.ChecksumIEEE(snapshotBytes)
  987. if uint32(checksum) != byteChecksum {
  988. s.debugln(checksum, " ", byteChecksum)
  989. return errors.New("bad snapshot file")
  990. }
  991. err = json.Unmarshal(snapshotBytes, &s.lastSnapshot)
  992. if err != nil {
  993. s.debugln("unmarshal error: ", err)
  994. return err
  995. }
  996. err = s.stateMachine.Recovery(s.lastSnapshot.State)
  997. if err != nil {
  998. s.debugln("recovery error: ", err)
  999. return err
  1000. }
  1001. for _, peerName := range s.lastSnapshot.Peers {
  1002. s.AddPeer(peerName)
  1003. }
  1004. s.log.startTerm = s.lastSnapshot.LastTerm
  1005. s.log.startIndex = s.lastSnapshot.LastIndex
  1006. s.log.updateCommitIndex(s.lastSnapshot.LastIndex)
  1007. return err
  1008. }
  1009. //--------------------------------------
  1010. // Debugging
  1011. //--------------------------------------
  1012. func (s *Server) debugln(v ...interface{}) {
  1013. debugf("[%s Term:%d] %s", s.name, s.currentTerm, fmt.Sprintln(v...))
  1014. }
  1015. func (s *Server) traceln(v ...interface{}) {
  1016. tracef("[%s] %s", s.name, fmt.Sprintln(v...))
  1017. }