server.go 38 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422
  1. package raft
  2. import (
  3. "encoding/json"
  4. "errors"
  5. "fmt"
  6. "hash/crc32"
  7. "io/ioutil"
  8. "os"
  9. "path"
  10. "sort"
  11. "sync"
  12. "time"
  13. )
  14. //------------------------------------------------------------------------------
  15. //
  16. // Constants
  17. //
  18. //------------------------------------------------------------------------------
  19. const (
  20. Stopped = "stopped"
  21. Initialized = "initialized"
  22. Follower = "follower"
  23. Candidate = "candidate"
  24. Leader = "leader"
  25. Snapshotting = "snapshotting"
  26. )
  27. const (
  28. MaxLogEntriesPerRequest = 2000
  29. NumberOfLogEntriesAfterSnapshot = 200
  30. )
  31. const (
  32. // DefaultHeartbeatInterval is the interval that the leader will send
  33. // AppendEntriesRequests to followers to maintain leadership.
  34. DefaultHeartbeatInterval = 50 * time.Millisecond
  35. DefaultElectionTimeout = 150 * time.Millisecond
  36. )
  37. // ElectionTimeoutThresholdPercent specifies the threshold at which the server
  38. // will dispatch warning events that the heartbeat RTT is too close to the
  39. // election timeout.
  40. const ElectionTimeoutThresholdPercent = 0.8
  41. //------------------------------------------------------------------------------
  42. //
  43. // Errors
  44. //
  45. //------------------------------------------------------------------------------
  46. var NotLeaderError = errors.New("raft.Server: Not current leader")
  47. var DuplicatePeerError = errors.New("raft.Server: Duplicate peer")
  48. var CommandTimeoutError = errors.New("raft: Command timeout")
  49. //------------------------------------------------------------------------------
  50. //
  51. // Typedefs
  52. //
  53. //------------------------------------------------------------------------------
  54. // A server is involved in the consensus protocol and can act as a follower,
  55. // candidate or a leader.
  56. type Server interface {
  57. Name() string
  58. Context() interface{}
  59. StateMachine() StateMachine
  60. Leader() string
  61. State() string
  62. Path() string
  63. LogPath() string
  64. SnapshotPath(lastIndex uint64, lastTerm uint64) string
  65. Term() uint64
  66. CommitIndex() uint64
  67. VotedFor() string
  68. MemberCount() int
  69. QuorumSize() int
  70. IsLogEmpty() bool
  71. LogEntries() []*LogEntry
  72. LastCommandName() string
  73. GetState() string
  74. ElectionTimeout() time.Duration
  75. SetElectionTimeout(duration time.Duration)
  76. HeartbeatInterval() time.Duration
  77. SetHeartbeatInterval(duration time.Duration)
  78. Transporter() Transporter
  79. SetTransporter(t Transporter)
  80. AppendEntries(req *AppendEntriesRequest) *AppendEntriesResponse
  81. RequestVote(req *RequestVoteRequest) *RequestVoteResponse
  82. RequestSnapshot(req *SnapshotRequest) *SnapshotResponse
  83. SnapshotRecoveryRequest(req *SnapshotRecoveryRequest) *SnapshotRecoveryResponse
  84. AddPeer(name string, connectiongString string) error
  85. RemovePeer(name string) error
  86. Peers() map[string]*Peer
  87. Init() error
  88. Start() error
  89. Stop()
  90. Running() bool
  91. Do(command Command) (interface{}, error)
  92. TakeSnapshot() error
  93. LoadSnapshot() error
  94. AddEventListener(string, EventListener)
  95. FlushCommitIndex()
  96. }
  97. type server struct {
  98. *eventDispatcher
  99. name string
  100. path string
  101. state string
  102. transporter Transporter
  103. context interface{}
  104. currentTerm uint64
  105. votedFor string
  106. log *Log
  107. leader string
  108. peers map[string]*Peer
  109. mutex sync.RWMutex
  110. syncedPeer map[string]bool
  111. stopped chan chan bool
  112. c chan *ev
  113. electionTimeout time.Duration
  114. heartbeatInterval time.Duration
  115. snapshot *Snapshot
  116. // PendingSnapshot is an unfinished snapshot.
  117. // After the pendingSnapshot is saved to disk,
  118. // it will be set to snapshot and also will be
  119. // set to nil.
  120. pendingSnapshot *Snapshot
  121. stateMachine StateMachine
  122. maxLogEntriesPerRequest uint64
  123. connectionString string
  124. }
  125. // An internal event to be processed by the server's event loop.
  126. type ev struct {
  127. target interface{}
  128. returnValue interface{}
  129. c chan error
  130. }
  131. //------------------------------------------------------------------------------
  132. //
  133. // Constructor
  134. //
  135. //------------------------------------------------------------------------------
  136. // Creates a new server with a log at the given path. transporter must
  137. // not be nil. stateMachine can be nil if snapshotting and log
  138. // compaction is to be disabled. context can be anything (including nil)
  139. // and is not used by the raft package except returned by
  140. // Server.Context(). connectionString can be anything.
  141. func NewServer(name string, path string, transporter Transporter, stateMachine StateMachine, ctx interface{}, connectionString string) (Server, error) {
  142. if name == "" {
  143. return nil, errors.New("raft.Server: Name cannot be blank")
  144. }
  145. if transporter == nil {
  146. panic("raft: Transporter required")
  147. }
  148. s := &server{
  149. name: name,
  150. path: path,
  151. transporter: transporter,
  152. stateMachine: stateMachine,
  153. context: ctx,
  154. state: Stopped,
  155. peers: make(map[string]*Peer),
  156. log: newLog(),
  157. stopped: make(chan chan bool),
  158. c: make(chan *ev, 256),
  159. electionTimeout: DefaultElectionTimeout,
  160. heartbeatInterval: DefaultHeartbeatInterval,
  161. maxLogEntriesPerRequest: MaxLogEntriesPerRequest,
  162. connectionString: connectionString,
  163. }
  164. s.eventDispatcher = newEventDispatcher(s)
  165. // Setup apply function.
  166. s.log.ApplyFunc = func(e *LogEntry, c Command) (interface{}, error) {
  167. // Dispatch commit event.
  168. s.DispatchEvent(newEvent(CommitEventType, e, nil))
  169. // Apply command to the state machine.
  170. switch c := c.(type) {
  171. case CommandApply:
  172. return c.Apply(&context{
  173. server: s,
  174. currentTerm: s.currentTerm,
  175. currentIndex: s.log.internalCurrentIndex(),
  176. commitIndex: s.log.commitIndex,
  177. })
  178. case deprecatedCommandApply:
  179. return c.Apply(s)
  180. default:
  181. return nil, fmt.Errorf("Command does not implement Apply()")
  182. }
  183. }
  184. return s, nil
  185. }
  186. //------------------------------------------------------------------------------
  187. //
  188. // Accessors
  189. //
  190. //------------------------------------------------------------------------------
  191. //--------------------------------------
  192. // General
  193. //--------------------------------------
  194. // Retrieves the name of the server.
  195. func (s *server) Name() string {
  196. return s.name
  197. }
  198. // Retrieves the storage path for the server.
  199. func (s *server) Path() string {
  200. return s.path
  201. }
  202. // The name of the current leader.
  203. func (s *server) Leader() string {
  204. return s.leader
  205. }
  206. // Retrieves a copy of the peer data.
  207. func (s *server) Peers() map[string]*Peer {
  208. s.mutex.Lock()
  209. defer s.mutex.Unlock()
  210. peers := make(map[string]*Peer)
  211. for name, peer := range s.peers {
  212. peers[name] = peer.clone()
  213. }
  214. return peers
  215. }
  216. // Retrieves the object that transports requests.
  217. func (s *server) Transporter() Transporter {
  218. s.mutex.RLock()
  219. defer s.mutex.RUnlock()
  220. return s.transporter
  221. }
  222. func (s *server) SetTransporter(t Transporter) {
  223. s.mutex.Lock()
  224. defer s.mutex.Unlock()
  225. s.transporter = t
  226. }
  227. // Retrieves the context passed into the constructor.
  228. func (s *server) Context() interface{} {
  229. return s.context
  230. }
  231. // Retrieves the state machine passed into the constructor.
  232. func (s *server) StateMachine() StateMachine {
  233. return s.stateMachine
  234. }
  235. // Retrieves the log path for the server.
  236. func (s *server) LogPath() string {
  237. return path.Join(s.path, "log")
  238. }
  239. // Retrieves the current state of the server.
  240. func (s *server) State() string {
  241. s.mutex.RLock()
  242. defer s.mutex.RUnlock()
  243. return s.state
  244. }
  245. // Sets the state of the server.
  246. func (s *server) setState(state string) {
  247. s.mutex.Lock()
  248. defer s.mutex.Unlock()
  249. // Temporarily store previous values.
  250. prevState := s.state
  251. prevLeader := s.leader
  252. // Update state and leader.
  253. s.state = state
  254. if state == Leader {
  255. s.leader = s.Name()
  256. s.syncedPeer = make(map[string]bool)
  257. }
  258. // Dispatch state and leader change events.
  259. s.DispatchEvent(newEvent(StateChangeEventType, s.state, prevState))
  260. if prevLeader != s.leader {
  261. s.DispatchEvent(newEvent(LeaderChangeEventType, s.leader, prevLeader))
  262. }
  263. }
  264. // Retrieves the current term of the server.
  265. func (s *server) Term() uint64 {
  266. s.mutex.RLock()
  267. defer s.mutex.RUnlock()
  268. return s.currentTerm
  269. }
  270. // Retrieves the current commit index of the server.
  271. func (s *server) CommitIndex() uint64 {
  272. s.log.mutex.RLock()
  273. defer s.log.mutex.RUnlock()
  274. return s.log.commitIndex
  275. }
  276. // Retrieves the name of the candidate this server voted for in this term.
  277. func (s *server) VotedFor() string {
  278. return s.votedFor
  279. }
  280. // Retrieves whether the server's log has no entries.
  281. func (s *server) IsLogEmpty() bool {
  282. return s.log.isEmpty()
  283. }
  284. // A list of all the log entries. This should only be used for debugging purposes.
  285. func (s *server) LogEntries() []*LogEntry {
  286. return s.log.entries
  287. }
  288. // A reference to the command name of the last entry.
  289. func (s *server) LastCommandName() string {
  290. return s.log.lastCommandName()
  291. }
  292. // Get the state of the server for debugging
  293. func (s *server) GetState() string {
  294. s.mutex.RLock()
  295. defer s.mutex.RUnlock()
  296. return fmt.Sprintf("Name: %s, State: %s, Term: %v, CommitedIndex: %v ", s.name, s.state, s.currentTerm, s.log.commitIndex)
  297. }
  298. // Check if the server is promotable
  299. func (s *server) promotable() bool {
  300. return s.log.currentIndex() > 0
  301. }
  302. //--------------------------------------
  303. // Membership
  304. //--------------------------------------
  305. // Retrieves the number of member servers in the consensus.
  306. func (s *server) MemberCount() int {
  307. s.mutex.Lock()
  308. defer s.mutex.Unlock()
  309. return len(s.peers) + 1
  310. }
  311. // Retrieves the number of servers required to make a quorum.
  312. func (s *server) QuorumSize() int {
  313. return (s.MemberCount() / 2) + 1
  314. }
  315. //--------------------------------------
  316. // Election timeout
  317. //--------------------------------------
  318. // Retrieves the election timeout.
  319. func (s *server) ElectionTimeout() time.Duration {
  320. s.mutex.RLock()
  321. defer s.mutex.RUnlock()
  322. return s.electionTimeout
  323. }
  324. // Sets the election timeout.
  325. func (s *server) SetElectionTimeout(duration time.Duration) {
  326. s.mutex.Lock()
  327. defer s.mutex.Unlock()
  328. s.electionTimeout = duration
  329. }
  330. //--------------------------------------
  331. // Heartbeat timeout
  332. //--------------------------------------
  333. // Retrieves the heartbeat timeout.
  334. func (s *server) HeartbeatInterval() time.Duration {
  335. s.mutex.RLock()
  336. defer s.mutex.RUnlock()
  337. return s.heartbeatInterval
  338. }
  339. // Sets the heartbeat timeout.
  340. func (s *server) SetHeartbeatInterval(duration time.Duration) {
  341. s.mutex.Lock()
  342. defer s.mutex.Unlock()
  343. s.heartbeatInterval = duration
  344. for _, peer := range s.peers {
  345. peer.setHeartbeatInterval(duration)
  346. }
  347. }
  348. //------------------------------------------------------------------------------
  349. //
  350. // Methods
  351. //
  352. //------------------------------------------------------------------------------
  353. //--------------------------------------
  354. // Initialization
  355. //--------------------------------------
  356. // Reg the NOPCommand
  357. func init() {
  358. RegisterCommand(&NOPCommand{})
  359. RegisterCommand(&DefaultJoinCommand{})
  360. RegisterCommand(&DefaultLeaveCommand{})
  361. }
  362. // Start the raft server
  363. // If log entries exist then allow promotion to candidate if no AEs received.
  364. // If no log entries exist then wait for AEs from another node.
  365. // If no log entries exist and a self-join command is issued then
  366. // immediately become leader and commit entry.
  367. func (s *server) Start() error {
  368. // Exit if the server is already running.
  369. if s.Running() {
  370. return fmt.Errorf("raft.Server: Server already running[%v]", s.state)
  371. }
  372. if err := s.Init(); err != nil {
  373. return err
  374. }
  375. s.setState(Follower)
  376. // If no log entries exist then
  377. // 1. wait for AEs from another node
  378. // 2. wait for self-join command
  379. // to set itself promotable
  380. if !s.promotable() {
  381. s.debugln("start as a new raft server")
  382. // If log entries exist then allow promotion to candidate
  383. // if no AEs received.
  384. } else {
  385. s.debugln("start from previous saved state")
  386. }
  387. debugln(s.GetState())
  388. go s.loop()
  389. return nil
  390. }
  391. // Init initializes the raft server
  392. func (s *server) Init() error {
  393. if s.Running() {
  394. return fmt.Errorf("raft.Server: Server already running[%v]", s.state)
  395. }
  396. // server has been initialized or server was stopped after initialized
  397. if s.state == Initialized || !s.log.isEmpty() {
  398. s.state = Initialized
  399. return nil
  400. }
  401. // Create snapshot directory if it does not exist
  402. err := os.Mkdir(path.Join(s.path, "snapshot"), 0700)
  403. if err != nil && !os.IsExist(err) {
  404. s.debugln("raft: Snapshot dir error: ", err)
  405. return fmt.Errorf("raft: Initialization error: %s", err)
  406. }
  407. if err := s.readConf(); err != nil {
  408. s.debugln("raft: Conf file error: ", err)
  409. return fmt.Errorf("raft: Initialization error: %s", err)
  410. }
  411. // Initialize the log and load it up.
  412. if err := s.log.open(s.LogPath()); err != nil {
  413. s.debugln("raft: Log error: ", err)
  414. return fmt.Errorf("raft: Initialization error: %s", err)
  415. }
  416. // Update the term to the last term in the log.
  417. _, s.currentTerm = s.log.lastInfo()
  418. s.state = Initialized
  419. return nil
  420. }
  421. // Shuts down the server.
  422. func (s *server) Stop() {
  423. stop := make(chan bool)
  424. s.stopped <- stop
  425. s.state = Stopped
  426. // make sure the server has stopped before we close the log
  427. <-stop
  428. s.log.close()
  429. }
  430. // Checks if the server is currently running.
  431. func (s *server) Running() bool {
  432. s.mutex.RLock()
  433. defer s.mutex.RUnlock()
  434. return (s.state != Stopped && s.state != Initialized)
  435. }
  436. //--------------------------------------
  437. // Term
  438. //--------------------------------------
  439. // updates the current term for the server. This is only used when a larger
  440. // external term is found.
  441. func (s *server) updateCurrentTerm(term uint64, leaderName string) {
  442. _assert(term > s.currentTerm,
  443. "upadteCurrentTerm: update is called when term is not larger than currentTerm")
  444. s.mutex.Lock()
  445. defer s.mutex.Unlock()
  446. // Store previous values temporarily.
  447. prevTerm := s.currentTerm
  448. prevLeader := s.leader
  449. // set currentTerm = T, convert to follower (§5.1)
  450. // stop heartbeats before step-down
  451. if s.state == Leader {
  452. s.mutex.Unlock()
  453. for _, peer := range s.peers {
  454. peer.stopHeartbeat(false)
  455. }
  456. s.mutex.Lock()
  457. }
  458. // update the term and clear vote for
  459. if s.state != Follower {
  460. s.mutex.Unlock()
  461. s.setState(Follower)
  462. s.mutex.Lock()
  463. }
  464. s.currentTerm = term
  465. s.leader = leaderName
  466. s.votedFor = ""
  467. // Dispatch change events.
  468. s.DispatchEvent(newEvent(TermChangeEventType, s.currentTerm, prevTerm))
  469. if prevLeader != s.leader {
  470. s.DispatchEvent(newEvent(LeaderChangeEventType, s.leader, prevLeader))
  471. }
  472. }
  473. //--------------------------------------
  474. // Event Loop
  475. //--------------------------------------
  476. // ________
  477. // --|Snapshot| timeout
  478. // | -------- ______
  479. // recover | ^ | |
  480. // snapshot / | |snapshot | |
  481. // higher | | v | recv majority votes
  482. // term | -------- timeout ----------- -----------
  483. // |-> |Follower| ----------> | Candidate |--------------------> | Leader |
  484. // -------- ----------- -----------
  485. // ^ higher term/ | higher term |
  486. // | new leader | |
  487. // |_______________________|____________________________________ |
  488. // The main event loop for the server
  489. func (s *server) loop() {
  490. defer s.debugln("server.loop.end")
  491. for s.state != Stopped {
  492. state := s.State()
  493. s.debugln("server.loop.run ", state)
  494. switch state {
  495. case Follower:
  496. s.followerLoop()
  497. case Candidate:
  498. s.candidateLoop()
  499. case Leader:
  500. s.leaderLoop()
  501. case Snapshotting:
  502. s.snapshotLoop()
  503. }
  504. }
  505. }
  506. // Sends an event to the event loop to be processed. The function will wait
  507. // until the event is actually processed before returning.
  508. func (s *server) send(value interface{}) (interface{}, error) {
  509. event := &ev{target: value, c: make(chan error, 1)}
  510. s.c <- event
  511. err := <-event.c
  512. return event.returnValue, err
  513. }
  514. func (s *server) sendAsync(value interface{}) {
  515. event := &ev{target: value, c: make(chan error, 1)}
  516. // try a non-blocking send first
  517. // in most cases, this should not be blocking
  518. // avoid create unnecessary go routines
  519. select {
  520. case s.c <- event:
  521. return
  522. default:
  523. }
  524. go func() {
  525. s.c <- event
  526. }()
  527. }
  528. // The event loop that is run when the server is in a Follower state.
  529. // Responds to RPCs from candidates and leaders.
  530. // Converts to candidate if election timeout elapses without either:
  531. // 1.Receiving valid AppendEntries RPC, or
  532. // 2.Granting vote to candidate
  533. func (s *server) followerLoop() {
  534. since := time.Now()
  535. electionTimeout := s.ElectionTimeout()
  536. timeoutChan := afterBetween(s.ElectionTimeout(), s.ElectionTimeout()*2)
  537. for s.State() == Follower {
  538. var err error
  539. update := false
  540. select {
  541. case stop := <-s.stopped:
  542. s.setState(Stopped)
  543. stop <- true
  544. return
  545. case e := <-s.c:
  546. switch req := e.target.(type) {
  547. case JoinCommand:
  548. //If no log entries exist and a self-join command is issued
  549. //then immediately become leader and commit entry.
  550. if s.log.currentIndex() == 0 && req.NodeName() == s.Name() {
  551. s.debugln("selfjoin and promote to leader")
  552. s.setState(Leader)
  553. s.processCommand(req, e)
  554. } else {
  555. err = NotLeaderError
  556. }
  557. case *AppendEntriesRequest:
  558. // If heartbeats get too close to the election timeout then send an event.
  559. elapsedTime := time.Now().Sub(since)
  560. if elapsedTime > time.Duration(float64(electionTimeout)*ElectionTimeoutThresholdPercent) {
  561. s.DispatchEvent(newEvent(ElectionTimeoutThresholdEventType, elapsedTime, nil))
  562. }
  563. e.returnValue, update = s.processAppendEntriesRequest(req)
  564. case *RequestVoteRequest:
  565. e.returnValue, update = s.processRequestVoteRequest(req)
  566. case *SnapshotRequest:
  567. e.returnValue = s.processSnapshotRequest(req)
  568. default:
  569. err = NotLeaderError
  570. }
  571. // Callback to event.
  572. e.c <- err
  573. case <-timeoutChan:
  574. // only allow synced follower to promote to candidate
  575. if s.promotable() {
  576. s.setState(Candidate)
  577. } else {
  578. update = true
  579. }
  580. }
  581. // Converts to candidate if election timeout elapses without either:
  582. // 1.Receiving valid AppendEntries RPC, or
  583. // 2.Granting vote to candidate
  584. if update {
  585. since = time.Now()
  586. timeoutChan = afterBetween(s.ElectionTimeout(), s.ElectionTimeout()*2)
  587. }
  588. }
  589. }
  590. // The event loop that is run when the server is in a Candidate state.
  591. func (s *server) candidateLoop() {
  592. // Clear leader value.
  593. prevLeader := s.leader
  594. s.leader = ""
  595. if prevLeader != s.leader {
  596. s.DispatchEvent(newEvent(LeaderChangeEventType, s.leader, prevLeader))
  597. }
  598. lastLogIndex, lastLogTerm := s.log.lastInfo()
  599. doVote := true
  600. votesGranted := 0
  601. var timeoutChan <-chan time.Time
  602. var respChan chan *RequestVoteResponse
  603. for s.State() == Candidate {
  604. if doVote {
  605. // Increment current term, vote for self.
  606. s.currentTerm++
  607. s.votedFor = s.name
  608. // Send RequestVote RPCs to all other servers.
  609. respChan = make(chan *RequestVoteResponse, len(s.peers))
  610. for _, peer := range s.peers {
  611. go peer.sendVoteRequest(newRequestVoteRequest(s.currentTerm, s.name, lastLogIndex, lastLogTerm), respChan)
  612. }
  613. // Wait for either:
  614. // * Votes received from majority of servers: become leader
  615. // * AppendEntries RPC received from new leader: step down.
  616. // * Election timeout elapses without election resolution: increment term, start new election
  617. // * Discover higher term: step down (§5.1)
  618. votesGranted = 1
  619. timeoutChan = afterBetween(s.ElectionTimeout(), s.ElectionTimeout()*2)
  620. doVote = false
  621. }
  622. // If we received enough votes then stop waiting for more votes.
  623. // And return from the candidate loop
  624. if votesGranted == s.QuorumSize() {
  625. s.debugln("server.candidate.recv.enough.votes")
  626. s.setState(Leader)
  627. return
  628. }
  629. // Collect votes from peers.
  630. select {
  631. case stop := <-s.stopped:
  632. s.setState(Stopped)
  633. stop <- true
  634. return
  635. case resp := <-respChan:
  636. if success := s.processVoteResponse(resp); success {
  637. s.debugln("server.candidate.vote.granted: ", votesGranted)
  638. votesGranted++
  639. }
  640. case e := <-s.c:
  641. var err error
  642. switch req := e.target.(type) {
  643. case Command:
  644. err = NotLeaderError
  645. case *AppendEntriesRequest:
  646. e.returnValue, _ = s.processAppendEntriesRequest(req)
  647. case *RequestVoteRequest:
  648. e.returnValue, _ = s.processRequestVoteRequest(req)
  649. }
  650. // Callback to event.
  651. e.c <- err
  652. case <-timeoutChan:
  653. doVote = true
  654. }
  655. }
  656. }
  657. // The event loop that is run when the server is in a Leader state.
  658. func (s *server) leaderLoop() {
  659. logIndex, _ := s.log.lastInfo()
  660. // Update the peers prevLogIndex to leader's lastLogIndex and start heartbeat.
  661. s.debugln("leaderLoop.set.PrevIndex to ", logIndex)
  662. for _, peer := range s.peers {
  663. peer.setPrevLogIndex(logIndex)
  664. peer.startHeartbeat()
  665. }
  666. // Commit a NOP after the server becomes leader. From the Raft paper:
  667. // "Upon election: send initial empty AppendEntries RPCs (heartbeat) to
  668. // each server; repeat during idle periods to prevent election timeouts
  669. // (§5.2)". The heartbeats started above do the "idle" period work.
  670. go s.Do(NOPCommand{})
  671. // Begin to collect response from followers
  672. for s.State() == Leader {
  673. var err error
  674. select {
  675. case stop := <-s.stopped:
  676. // Stop all peers before stop
  677. for _, peer := range s.peers {
  678. peer.stopHeartbeat(false)
  679. }
  680. s.setState(Stopped)
  681. stop <- true
  682. return
  683. case e := <-s.c:
  684. switch req := e.target.(type) {
  685. case Command:
  686. s.processCommand(req, e)
  687. continue
  688. case *AppendEntriesRequest:
  689. e.returnValue, _ = s.processAppendEntriesRequest(req)
  690. case *AppendEntriesResponse:
  691. s.processAppendEntriesResponse(req)
  692. case *RequestVoteRequest:
  693. e.returnValue, _ = s.processRequestVoteRequest(req)
  694. }
  695. // Callback to event.
  696. e.c <- err
  697. }
  698. }
  699. s.syncedPeer = nil
  700. }
  701. func (s *server) snapshotLoop() {
  702. for s.State() == Snapshotting {
  703. var err error
  704. select {
  705. case stop := <-s.stopped:
  706. s.setState(Stopped)
  707. stop <- true
  708. return
  709. case e := <-s.c:
  710. switch req := e.target.(type) {
  711. case Command:
  712. err = NotLeaderError
  713. case *AppendEntriesRequest:
  714. e.returnValue, _ = s.processAppendEntriesRequest(req)
  715. case *RequestVoteRequest:
  716. e.returnValue, _ = s.processRequestVoteRequest(req)
  717. case *SnapshotRecoveryRequest:
  718. e.returnValue = s.processSnapshotRecoveryRequest(req)
  719. }
  720. // Callback to event.
  721. e.c <- err
  722. }
  723. }
  724. }
  725. //--------------------------------------
  726. // Commands
  727. //--------------------------------------
  728. // Attempts to execute a command and replicate it. The function will return
  729. // when the command has been successfully committed or an error has occurred.
  730. func (s *server) Do(command Command) (interface{}, error) {
  731. return s.send(command)
  732. }
  733. // Processes a command.
  734. func (s *server) processCommand(command Command, e *ev) {
  735. s.debugln("server.command.process")
  736. // Create an entry for the command in the log.
  737. entry, err := s.log.createEntry(s.currentTerm, command, e)
  738. if err != nil {
  739. s.debugln("server.command.log.entry.error:", err)
  740. e.c <- err
  741. return
  742. }
  743. if err := s.log.appendEntry(entry); err != nil {
  744. s.debugln("server.command.log.error:", err)
  745. e.c <- err
  746. return
  747. }
  748. s.syncedPeer[s.Name()] = true
  749. if len(s.peers) == 0 {
  750. commitIndex := s.log.currentIndex()
  751. s.log.setCommitIndex(commitIndex)
  752. s.debugln("commit index ", commitIndex)
  753. }
  754. }
  755. //--------------------------------------
  756. // Append Entries
  757. //--------------------------------------
  758. // Appends zero or more log entry from the leader to this server.
  759. func (s *server) AppendEntries(req *AppendEntriesRequest) *AppendEntriesResponse {
  760. ret, _ := s.send(req)
  761. resp, _ := ret.(*AppendEntriesResponse)
  762. return resp
  763. }
  764. // Processes the "append entries" request.
  765. func (s *server) processAppendEntriesRequest(req *AppendEntriesRequest) (*AppendEntriesResponse, bool) {
  766. s.traceln("server.ae.process")
  767. if req.Term < s.currentTerm {
  768. s.debugln("server.ae.error: stale term")
  769. return newAppendEntriesResponse(s.currentTerm, false, s.log.currentIndex(), s.log.CommitIndex()), false
  770. }
  771. if req.Term == s.currentTerm {
  772. _assert(s.state != Leader, "leader.elected.at.same.term.%d\n", s.currentTerm)
  773. // change state to follower
  774. s.state = Follower
  775. // discover new leader when candidate
  776. // save leader name when follower
  777. s.leader = req.LeaderName
  778. } else {
  779. // Update term and leader.
  780. s.updateCurrentTerm(req.Term, req.LeaderName)
  781. }
  782. // Reject if log doesn't contain a matching previous entry.
  783. if err := s.log.truncate(req.PrevLogIndex, req.PrevLogTerm); err != nil {
  784. s.debugln("server.ae.truncate.error: ", err)
  785. return newAppendEntriesResponse(s.currentTerm, false, s.log.currentIndex(), s.log.CommitIndex()), true
  786. }
  787. // Append entries to the log.
  788. if err := s.log.appendEntries(req.Entries); err != nil {
  789. s.debugln("server.ae.append.error: ", err)
  790. return newAppendEntriesResponse(s.currentTerm, false, s.log.currentIndex(), s.log.CommitIndex()), true
  791. }
  792. // Commit up to the commit index.
  793. if err := s.log.setCommitIndex(req.CommitIndex); err != nil {
  794. s.debugln("server.ae.commit.error: ", err)
  795. return newAppendEntriesResponse(s.currentTerm, false, s.log.currentIndex(), s.log.CommitIndex()), true
  796. }
  797. // once the server appended and committed all the log entries from the leader
  798. return newAppendEntriesResponse(s.currentTerm, true, s.log.currentIndex(), s.log.CommitIndex()), true
  799. }
  800. // Processes the "append entries" response from the peer. This is only
  801. // processed when the server is a leader. Responses received during other
  802. // states are dropped.
  803. func (s *server) processAppendEntriesResponse(resp *AppendEntriesResponse) {
  804. // If we find a higher term then change to a follower and exit.
  805. if resp.Term() > s.Term() {
  806. s.updateCurrentTerm(resp.Term(), "")
  807. return
  808. }
  809. // panic response if it's not successful.
  810. if !resp.Success() {
  811. return
  812. }
  813. // if one peer successfully append a log from the leader term,
  814. // we add it to the synced list
  815. if resp.append == true {
  816. s.syncedPeer[resp.peer] = true
  817. }
  818. // Increment the commit count to make sure we have a quorum before committing.
  819. if len(s.syncedPeer) < s.QuorumSize() {
  820. return
  821. }
  822. // Determine the committed index that a majority has.
  823. var indices []uint64
  824. indices = append(indices, s.log.currentIndex())
  825. for _, peer := range s.peers {
  826. indices = append(indices, peer.getPrevLogIndex())
  827. }
  828. sort.Sort(sort.Reverse(uint64Slice(indices)))
  829. // We can commit up to the index which the majority of the members have appended.
  830. commitIndex := indices[s.QuorumSize()-1]
  831. committedIndex := s.log.commitIndex
  832. if commitIndex > committedIndex {
  833. // leader needs to do a fsync before committing log entries
  834. s.log.sync()
  835. s.log.setCommitIndex(commitIndex)
  836. s.debugln("commit index ", commitIndex)
  837. }
  838. }
  839. // processVoteReponse processes a vote request:
  840. // 1. if the vote is granted for the current term of the candidate, return true
  841. // 2. if the vote is denied due to smaller term, update the term of this server
  842. // which will also cause the candidate to step-down, and return false.
  843. // 3. if the vote is for a smaller term, ignore it and return false.
  844. func (s *server) processVoteResponse(resp *RequestVoteResponse) bool {
  845. if resp.VoteGranted && resp.Term == s.currentTerm {
  846. return true
  847. }
  848. if resp.Term > s.currentTerm {
  849. s.debugln("server.candidate.vote.failed")
  850. s.updateCurrentTerm(resp.Term, "")
  851. } else {
  852. s.debugln("server.candidate.vote: denied")
  853. }
  854. return false
  855. }
  856. //--------------------------------------
  857. // Request Vote
  858. //--------------------------------------
  859. // Requests a vote from a server. A vote can be obtained if the vote's term is
  860. // at the server's current term and the server has not made a vote yet. A vote
  861. // can also be obtained if the term is greater than the server's current term.
  862. func (s *server) RequestVote(req *RequestVoteRequest) *RequestVoteResponse {
  863. ret, _ := s.send(req)
  864. resp, _ := ret.(*RequestVoteResponse)
  865. return resp
  866. }
  867. // Processes a "request vote" request.
  868. func (s *server) processRequestVoteRequest(req *RequestVoteRequest) (*RequestVoteResponse, bool) {
  869. // If the request is coming from an old term then reject it.
  870. if req.Term < s.Term() {
  871. s.debugln("server.rv.deny.vote: cause stale term")
  872. return newRequestVoteResponse(s.currentTerm, false), false
  873. }
  874. // If the term of the request peer is larger than this node, update the term
  875. // If the term is equal and we've already voted for a different candidate then
  876. // don't vote for this candidate.
  877. if req.Term > s.Term() {
  878. s.updateCurrentTerm(req.Term, "")
  879. } else if s.votedFor != "" && s.votedFor != req.CandidateName {
  880. s.debugln("server.deny.vote: cause duplicate vote: ", req.CandidateName,
  881. " already vote for ", s.votedFor)
  882. return newRequestVoteResponse(s.currentTerm, false), false
  883. }
  884. // If the candidate's log is not at least as up-to-date as our last log then don't vote.
  885. lastIndex, lastTerm := s.log.lastInfo()
  886. if lastIndex > req.LastLogIndex || lastTerm > req.LastLogTerm {
  887. s.debugln("server.deny.vote: cause out of date log: ", req.CandidateName,
  888. "Index :[", lastIndex, "]", " [", req.LastLogIndex, "]",
  889. "Term :[", lastTerm, "]", " [", req.LastLogTerm, "]")
  890. return newRequestVoteResponse(s.currentTerm, false), false
  891. }
  892. // If we made it this far then cast a vote and reset our election time out.
  893. s.debugln("server.rv.vote: ", s.name, " votes for", req.CandidateName, "at term", req.Term)
  894. s.votedFor = req.CandidateName
  895. return newRequestVoteResponse(s.currentTerm, true), true
  896. }
  897. //--------------------------------------
  898. // Membership
  899. //--------------------------------------
  900. // Adds a peer to the server.
  901. func (s *server) AddPeer(name string, connectiongString string) error {
  902. s.debugln("server.peer.add: ", name, len(s.peers))
  903. // Do not allow peers to be added twice.
  904. if s.peers[name] != nil {
  905. return nil
  906. }
  907. // Skip the Peer if it has the same name as the Server
  908. if s.name != name {
  909. peer := newPeer(s, name, connectiongString, s.heartbeatInterval)
  910. if s.State() == Leader {
  911. peer.startHeartbeat()
  912. }
  913. s.peers[peer.Name] = peer
  914. s.DispatchEvent(newEvent(AddPeerEventType, name, nil))
  915. }
  916. // Write the configuration to file.
  917. s.writeConf()
  918. return nil
  919. }
  920. // Removes a peer from the server.
  921. func (s *server) RemovePeer(name string) error {
  922. s.debugln("server.peer.remove: ", name, len(s.peers))
  923. // Skip the Peer if it has the same name as the Server
  924. if name != s.Name() {
  925. // Return error if peer doesn't exist.
  926. peer := s.peers[name]
  927. if peer == nil {
  928. return fmt.Errorf("raft: Peer not found: %s", name)
  929. }
  930. // Stop peer and remove it.
  931. if s.State() == Leader {
  932. // We create a go routine here to avoid potential deadlock.
  933. // We are holding log write lock when reach this line of code.
  934. // Peer.stopHeartbeat can be blocked without go routine, if the
  935. // target go routine (which we want to stop) is calling
  936. // log.getEntriesAfter and waiting for log read lock.
  937. // So we might be holding log lock and waiting for log lock,
  938. // which lead to a deadlock.
  939. // TODO(xiangli) refactor log lock
  940. go peer.stopHeartbeat(true)
  941. }
  942. delete(s.peers, name)
  943. s.DispatchEvent(newEvent(RemovePeerEventType, name, nil))
  944. }
  945. // Write the configuration to file.
  946. s.writeConf()
  947. return nil
  948. }
  949. //--------------------------------------
  950. // Log compaction
  951. //--------------------------------------
  952. func (s *server) TakeSnapshot() error {
  953. if s.stateMachine == nil {
  954. return errors.New("Snapshot: Cannot create snapshot. Missing state machine.")
  955. }
  956. // Shortcut without lock
  957. // Exit if the server is currently creating a snapshot.
  958. if s.pendingSnapshot != nil {
  959. return errors.New("Snapshot: Last snapshot is not finished.")
  960. }
  961. // TODO: acquire the lock and no more committed is allowed
  962. // This will be done after finishing refactoring heartbeat
  963. s.debugln("take.snapshot")
  964. lastIndex, lastTerm := s.log.commitInfo()
  965. // check if there is log has been committed since the
  966. // last snapshot.
  967. if lastIndex == s.log.startIndex {
  968. return nil
  969. }
  970. path := s.SnapshotPath(lastIndex, lastTerm)
  971. // Attach snapshot to pending snapshot and save it to disk.
  972. s.pendingSnapshot = &Snapshot{lastIndex, lastTerm, nil, nil, path}
  973. state, err := s.stateMachine.Save()
  974. if err != nil {
  975. return err
  976. }
  977. // Clone the list of peers.
  978. peers := make([]*Peer, 0, len(s.peers)+1)
  979. for _, peer := range s.peers {
  980. peers = append(peers, peer.clone())
  981. }
  982. peers = append(peers, &Peer{Name: s.Name(), ConnectionString: s.connectionString})
  983. // Attach snapshot to pending snapshot and save it to disk.
  984. s.pendingSnapshot.Peers = peers
  985. s.pendingSnapshot.State = state
  986. s.saveSnapshot()
  987. // We keep some log entries after the snapshot.
  988. // We do not want to send the whole snapshot to the slightly slow machines
  989. if lastIndex-s.log.startIndex > NumberOfLogEntriesAfterSnapshot {
  990. compactIndex := lastIndex - NumberOfLogEntriesAfterSnapshot
  991. compactTerm := s.log.getEntry(compactIndex).Term()
  992. s.log.compact(compactIndex, compactTerm)
  993. }
  994. return nil
  995. }
  996. // Retrieves the log path for the server.
  997. func (s *server) saveSnapshot() error {
  998. if s.pendingSnapshot == nil {
  999. return errors.New("pendingSnapshot.is.nil")
  1000. }
  1001. // Write snapshot to disk.
  1002. if err := s.pendingSnapshot.save(); err != nil {
  1003. return err
  1004. }
  1005. // Swap the current and last snapshots.
  1006. tmp := s.snapshot
  1007. s.snapshot = s.pendingSnapshot
  1008. // Delete the previous snapshot if there is any change
  1009. if tmp != nil && !(tmp.LastIndex == s.snapshot.LastIndex && tmp.LastTerm == s.snapshot.LastTerm) {
  1010. tmp.remove()
  1011. }
  1012. s.pendingSnapshot = nil
  1013. return nil
  1014. }
  1015. // Retrieves the log path for the server.
  1016. func (s *server) SnapshotPath(lastIndex uint64, lastTerm uint64) string {
  1017. return path.Join(s.path, "snapshot", fmt.Sprintf("%v_%v.ss", lastTerm, lastIndex))
  1018. }
  1019. func (s *server) RequestSnapshot(req *SnapshotRequest) *SnapshotResponse {
  1020. ret, _ := s.send(req)
  1021. resp, _ := ret.(*SnapshotResponse)
  1022. return resp
  1023. }
  1024. func (s *server) processSnapshotRequest(req *SnapshotRequest) *SnapshotResponse {
  1025. // If the follower’s log contains an entry at the snapshot’s last index with a term
  1026. // that matches the snapshot’s last term, then the follower already has all the
  1027. // information found in the snapshot and can reply false.
  1028. entry := s.log.getEntry(req.LastIndex)
  1029. if entry != nil && entry.Term() == req.LastTerm {
  1030. return newSnapshotResponse(false)
  1031. }
  1032. // Update state.
  1033. s.setState(Snapshotting)
  1034. return newSnapshotResponse(true)
  1035. }
  1036. func (s *server) SnapshotRecoveryRequest(req *SnapshotRecoveryRequest) *SnapshotRecoveryResponse {
  1037. ret, _ := s.send(req)
  1038. resp, _ := ret.(*SnapshotRecoveryResponse)
  1039. return resp
  1040. }
  1041. func (s *server) processSnapshotRecoveryRequest(req *SnapshotRecoveryRequest) *SnapshotRecoveryResponse {
  1042. // Recover state sent from request.
  1043. if err := s.stateMachine.Recovery(req.State); err != nil {
  1044. panic("cannot recover from previous state")
  1045. }
  1046. // Recover the cluster configuration.
  1047. s.peers = make(map[string]*Peer)
  1048. for _, peer := range req.Peers {
  1049. s.AddPeer(peer.Name, peer.ConnectionString)
  1050. }
  1051. // Update log state.
  1052. s.currentTerm = req.LastTerm
  1053. s.log.updateCommitIndex(req.LastIndex)
  1054. // Create local snapshot.
  1055. s.pendingSnapshot = &Snapshot{req.LastIndex, req.LastTerm, req.Peers, req.State, s.SnapshotPath(req.LastIndex, req.LastTerm)}
  1056. s.saveSnapshot()
  1057. // Clear the previous log entries.
  1058. s.log.compact(req.LastIndex, req.LastTerm)
  1059. return newSnapshotRecoveryResponse(req.LastTerm, true, req.LastIndex)
  1060. }
  1061. // Load a snapshot at restart
  1062. func (s *server) LoadSnapshot() error {
  1063. // Open snapshot/ directory.
  1064. dir, err := os.OpenFile(path.Join(s.path, "snapshot"), os.O_RDONLY, 0)
  1065. if err != nil {
  1066. s.debugln("cannot.open.snapshot: ", err)
  1067. return err
  1068. }
  1069. // Retrieve a list of all snapshots.
  1070. filenames, err := dir.Readdirnames(-1)
  1071. if err != nil {
  1072. dir.Close()
  1073. panic(err)
  1074. }
  1075. dir.Close()
  1076. if len(filenames) == 0 {
  1077. s.debugln("no.snapshot.to.load")
  1078. return nil
  1079. }
  1080. // Grab the latest snapshot.
  1081. sort.Strings(filenames)
  1082. snapshotPath := path.Join(s.path, "snapshot", filenames[len(filenames)-1])
  1083. // Read snapshot data.
  1084. file, err := os.OpenFile(snapshotPath, os.O_RDONLY, 0)
  1085. if err != nil {
  1086. return err
  1087. }
  1088. defer file.Close()
  1089. // Check checksum.
  1090. var checksum uint32
  1091. n, err := fmt.Fscanf(file, "%08x\n", &checksum)
  1092. if err != nil {
  1093. return err
  1094. } else if n != 1 {
  1095. return errors.New("checksum.err: bad.snapshot.file")
  1096. }
  1097. // Load remaining snapshot contents.
  1098. b, err := ioutil.ReadAll(file)
  1099. if err != nil {
  1100. return err
  1101. }
  1102. // Generate checksum.
  1103. byteChecksum := crc32.ChecksumIEEE(b)
  1104. if uint32(checksum) != byteChecksum {
  1105. s.debugln(checksum, " ", byteChecksum)
  1106. return errors.New("bad snapshot file")
  1107. }
  1108. // Decode snapshot.
  1109. if err = json.Unmarshal(b, &s.snapshot); err != nil {
  1110. s.debugln("unmarshal.snapshot.error: ", err)
  1111. return err
  1112. }
  1113. // Recover snapshot into state machine.
  1114. if err = s.stateMachine.Recovery(s.snapshot.State); err != nil {
  1115. s.debugln("recovery.snapshot.error: ", err)
  1116. return err
  1117. }
  1118. // Recover cluster configuration.
  1119. for _, peer := range s.snapshot.Peers {
  1120. s.AddPeer(peer.Name, peer.ConnectionString)
  1121. }
  1122. // Update log state.
  1123. s.log.startTerm = s.snapshot.LastTerm
  1124. s.log.startIndex = s.snapshot.LastIndex
  1125. s.log.updateCommitIndex(s.snapshot.LastIndex)
  1126. return err
  1127. }
  1128. //--------------------------------------
  1129. // Config File
  1130. //--------------------------------------
  1131. // Flushes commit index to the disk.
  1132. // So when the raft server restarts, it will commit upto the flushed commitIndex.
  1133. func (s *server) FlushCommitIndex() {
  1134. s.debugln("server.conf.update")
  1135. // Write the configuration to file.
  1136. s.writeConf()
  1137. }
  1138. func (s *server) writeConf() {
  1139. peers := make([]*Peer, len(s.peers))
  1140. i := 0
  1141. for _, peer := range s.peers {
  1142. peers[i] = peer.clone()
  1143. i++
  1144. }
  1145. r := &Config{
  1146. CommitIndex: s.log.commitIndex,
  1147. Peers: peers,
  1148. }
  1149. b, _ := json.Marshal(r)
  1150. confPath := path.Join(s.path, "conf")
  1151. tmpConfPath := path.Join(s.path, "conf.tmp")
  1152. err := writeFileSynced(tmpConfPath, b, 0600)
  1153. if err != nil {
  1154. panic(err)
  1155. }
  1156. os.Rename(tmpConfPath, confPath)
  1157. }
  1158. // Read the configuration for the server.
  1159. func (s *server) readConf() error {
  1160. confPath := path.Join(s.path, "conf")
  1161. s.debugln("readConf.open ", confPath)
  1162. // open conf file
  1163. b, err := ioutil.ReadFile(confPath)
  1164. if err != nil {
  1165. return nil
  1166. }
  1167. conf := &Config{}
  1168. if err = json.Unmarshal(b, conf); err != nil {
  1169. return err
  1170. }
  1171. s.log.updateCommitIndex(conf.CommitIndex)
  1172. return nil
  1173. }
  1174. //--------------------------------------
  1175. // Debugging
  1176. //--------------------------------------
  1177. func (s *server) debugln(v ...interface{}) {
  1178. if logLevel > Debug {
  1179. debugf("[%s Term:%d] %s", s.name, s.Term(), fmt.Sprintln(v...))
  1180. }
  1181. }
  1182. func (s *server) traceln(v ...interface{}) {
  1183. if logLevel > Trace {
  1184. tracef("[%s] %s", s.name, fmt.Sprintln(v...))
  1185. }
  1186. }