server.go 38 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426
  1. package raft
  2. import (
  3. "encoding/json"
  4. "errors"
  5. "fmt"
  6. "hash/crc32"
  7. "io/ioutil"
  8. "os"
  9. "path"
  10. "sort"
  11. "sync"
  12. "time"
  13. )
  14. //------------------------------------------------------------------------------
  15. //
  16. // Constants
  17. //
  18. //------------------------------------------------------------------------------
  19. const (
  20. Stopped = "stopped"
  21. Initialized = "initialized"
  22. Follower = "follower"
  23. Candidate = "candidate"
  24. Leader = "leader"
  25. Snapshotting = "snapshotting"
  26. )
  27. const (
  28. MaxLogEntriesPerRequest = 2000
  29. NumberOfLogEntriesAfterSnapshot = 200
  30. )
  31. const (
  32. // DefaultHeartbeatInterval is the interval that the leader will send
  33. // AppendEntriesRequests to followers to maintain leadership.
  34. DefaultHeartbeatInterval = 50 * time.Millisecond
  35. DefaultElectionTimeout = 150 * time.Millisecond
  36. )
  37. // ElectionTimeoutThresholdPercent specifies the threshold at which the server
  38. // will dispatch warning events that the heartbeat RTT is too close to the
  39. // election timeout.
  40. const ElectionTimeoutThresholdPercent = 0.8
  41. //------------------------------------------------------------------------------
  42. //
  43. // Errors
  44. //
  45. //------------------------------------------------------------------------------
  46. var NotLeaderError = errors.New("raft.Server: Not current leader")
  47. var DuplicatePeerError = errors.New("raft.Server: Duplicate peer")
  48. var CommandTimeoutError = errors.New("raft: Command timeout")
  49. //------------------------------------------------------------------------------
  50. //
  51. // Typedefs
  52. //
  53. //------------------------------------------------------------------------------
  54. // A server is involved in the consensus protocol and can act as a follower,
  55. // candidate or a leader.
  56. type Server interface {
  57. Name() string
  58. Context() interface{}
  59. StateMachine() StateMachine
  60. Leader() string
  61. State() string
  62. Path() string
  63. LogPath() string
  64. SnapshotPath(lastIndex uint64, lastTerm uint64) string
  65. Term() uint64
  66. CommitIndex() uint64
  67. VotedFor() string
  68. MemberCount() int
  69. QuorumSize() int
  70. IsLogEmpty() bool
  71. LogEntries() []*LogEntry
  72. LastCommandName() string
  73. GetState() string
  74. ElectionTimeout() time.Duration
  75. SetElectionTimeout(duration time.Duration)
  76. HeartbeatInterval() time.Duration
  77. SetHeartbeatInterval(duration time.Duration)
  78. Transporter() Transporter
  79. SetTransporter(t Transporter)
  80. AppendEntries(req *AppendEntriesRequest) *AppendEntriesResponse
  81. RequestVote(req *RequestVoteRequest) *RequestVoteResponse
  82. RequestSnapshot(req *SnapshotRequest) *SnapshotResponse
  83. SnapshotRecoveryRequest(req *SnapshotRecoveryRequest) *SnapshotRecoveryResponse
  84. AddPeer(name string, connectiongString string) error
  85. RemovePeer(name string) error
  86. Peers() map[string]*Peer
  87. Init() error
  88. Start() error
  89. Stop()
  90. Running() bool
  91. Do(command Command) (interface{}, error)
  92. TakeSnapshot() error
  93. LoadSnapshot() error
  94. AddEventListener(string, EventListener)
  95. FlushCommitIndex()
  96. }
  97. type server struct {
  98. *eventDispatcher
  99. name string
  100. path string
  101. state string
  102. transporter Transporter
  103. context interface{}
  104. currentTerm uint64
  105. votedFor string
  106. log *Log
  107. leader string
  108. peers map[string]*Peer
  109. mutex sync.RWMutex
  110. syncedPeer map[string]bool
  111. stopped chan chan bool
  112. c chan *ev
  113. electionTimeout time.Duration
  114. heartbeatInterval time.Duration
  115. snapshot *Snapshot
  116. // PendingSnapshot is an unfinished snapshot.
  117. // After the pendingSnapshot is saved to disk,
  118. // it will be set to snapshot and also will be
  119. // set to nil.
  120. pendingSnapshot *Snapshot
  121. stateMachine StateMachine
  122. maxLogEntriesPerRequest uint64
  123. connectionString string
  124. }
  125. // An internal event to be processed by the server's event loop.
  126. type ev struct {
  127. target interface{}
  128. returnValue interface{}
  129. c chan error
  130. }
  131. //------------------------------------------------------------------------------
  132. //
  133. // Constructor
  134. //
  135. //------------------------------------------------------------------------------
  136. // Creates a new server with a log at the given path. transporter must
  137. // not be nil. stateMachine can be nil if snapshotting and log
  138. // compaction is to be disabled. context can be anything (including nil)
  139. // and is not used by the raft package except returned by
  140. // Server.Context(). connectionString can be anything.
  141. func NewServer(name string, path string, transporter Transporter, stateMachine StateMachine, ctx interface{}, connectionString string) (Server, error) {
  142. if name == "" {
  143. return nil, errors.New("raft.Server: Name cannot be blank")
  144. }
  145. if transporter == nil {
  146. panic("raft: Transporter required")
  147. }
  148. s := &server{
  149. name: name,
  150. path: path,
  151. transporter: transporter,
  152. stateMachine: stateMachine,
  153. context: ctx,
  154. state: Stopped,
  155. peers: make(map[string]*Peer),
  156. log: newLog(),
  157. stopped: make(chan chan bool),
  158. c: make(chan *ev, 256),
  159. electionTimeout: DefaultElectionTimeout,
  160. heartbeatInterval: DefaultHeartbeatInterval,
  161. maxLogEntriesPerRequest: MaxLogEntriesPerRequest,
  162. connectionString: connectionString,
  163. }
  164. s.eventDispatcher = newEventDispatcher(s)
  165. // Setup apply function.
  166. s.log.ApplyFunc = func(e *LogEntry, c Command) (interface{}, error) {
  167. // Dispatch commit event.
  168. s.DispatchEvent(newEvent(CommitEventType, e, nil))
  169. // Apply command to the state machine.
  170. switch c := c.(type) {
  171. case CommandApply:
  172. return c.Apply(&context{
  173. server: s,
  174. currentTerm: s.currentTerm,
  175. currentIndex: s.log.internalCurrentIndex(),
  176. commitIndex: s.log.commitIndex,
  177. })
  178. case deprecatedCommandApply:
  179. return c.Apply(s)
  180. default:
  181. return nil, fmt.Errorf("Command does not implement Apply()")
  182. }
  183. }
  184. return s, nil
  185. }
  186. //------------------------------------------------------------------------------
  187. //
  188. // Accessors
  189. //
  190. //------------------------------------------------------------------------------
  191. //--------------------------------------
  192. // General
  193. //--------------------------------------
  194. // Retrieves the name of the server.
  195. func (s *server) Name() string {
  196. return s.name
  197. }
  198. // Retrieves the storage path for the server.
  199. func (s *server) Path() string {
  200. return s.path
  201. }
  202. // The name of the current leader.
  203. func (s *server) Leader() string {
  204. return s.leader
  205. }
  206. // Retrieves a copy of the peer data.
  207. func (s *server) Peers() map[string]*Peer {
  208. s.mutex.Lock()
  209. defer s.mutex.Unlock()
  210. peers := make(map[string]*Peer)
  211. for name, peer := range s.peers {
  212. peers[name] = peer.clone()
  213. }
  214. return peers
  215. }
  216. // Retrieves the object that transports requests.
  217. func (s *server) Transporter() Transporter {
  218. s.mutex.RLock()
  219. defer s.mutex.RUnlock()
  220. return s.transporter
  221. }
  222. func (s *server) SetTransporter(t Transporter) {
  223. s.mutex.Lock()
  224. defer s.mutex.Unlock()
  225. s.transporter = t
  226. }
  227. // Retrieves the context passed into the constructor.
  228. func (s *server) Context() interface{} {
  229. return s.context
  230. }
  231. // Retrieves the state machine passed into the constructor.
  232. func (s *server) StateMachine() StateMachine {
  233. return s.stateMachine
  234. }
  235. // Retrieves the log path for the server.
  236. func (s *server) LogPath() string {
  237. return path.Join(s.path, "log")
  238. }
  239. // Retrieves the current state of the server.
  240. func (s *server) State() string {
  241. s.mutex.RLock()
  242. defer s.mutex.RUnlock()
  243. return s.state
  244. }
  245. // Sets the state of the server.
  246. func (s *server) setState(state string) {
  247. s.mutex.Lock()
  248. defer s.mutex.Unlock()
  249. // Temporarily store previous values.
  250. prevState := s.state
  251. prevLeader := s.leader
  252. // Update state and leader.
  253. s.state = state
  254. if state == Leader {
  255. s.leader = s.Name()
  256. s.syncedPeer = make(map[string]bool)
  257. }
  258. // Dispatch state and leader change events.
  259. s.DispatchEvent(newEvent(StateChangeEventType, s.state, prevState))
  260. if prevLeader != s.leader {
  261. s.DispatchEvent(newEvent(LeaderChangeEventType, s.leader, prevLeader))
  262. }
  263. }
  264. // Retrieves the current term of the server.
  265. func (s *server) Term() uint64 {
  266. s.mutex.RLock()
  267. defer s.mutex.RUnlock()
  268. return s.currentTerm
  269. }
  270. // Retrieves the current commit index of the server.
  271. func (s *server) CommitIndex() uint64 {
  272. s.log.mutex.RLock()
  273. defer s.log.mutex.RUnlock()
  274. return s.log.commitIndex
  275. }
  276. // Retrieves the name of the candidate this server voted for in this term.
  277. func (s *server) VotedFor() string {
  278. return s.votedFor
  279. }
  280. // Retrieves whether the server's log has no entries.
  281. func (s *server) IsLogEmpty() bool {
  282. return s.log.isEmpty()
  283. }
  284. // A list of all the log entries. This should only be used for debugging purposes.
  285. func (s *server) LogEntries() []*LogEntry {
  286. return s.log.entries
  287. }
  288. // A reference to the command name of the last entry.
  289. func (s *server) LastCommandName() string {
  290. return s.log.lastCommandName()
  291. }
  292. // Get the state of the server for debugging
  293. func (s *server) GetState() string {
  294. s.mutex.RLock()
  295. defer s.mutex.RUnlock()
  296. return fmt.Sprintf("Name: %s, State: %s, Term: %v, CommitedIndex: %v ", s.name, s.state, s.currentTerm, s.log.commitIndex)
  297. }
  298. // Check if the server is promotable
  299. func (s *server) promotable() bool {
  300. return s.log.currentIndex() > 0
  301. }
  302. //--------------------------------------
  303. // Membership
  304. //--------------------------------------
  305. // Retrieves the number of member servers in the consensus.
  306. func (s *server) MemberCount() int {
  307. s.mutex.RLock()
  308. defer s.mutex.RUnlock()
  309. return len(s.peers) + 1
  310. }
  311. // Retrieves the number of servers required to make a quorum.
  312. func (s *server) QuorumSize() int {
  313. return (s.MemberCount() / 2) + 1
  314. }
  315. //--------------------------------------
  316. // Election timeout
  317. //--------------------------------------
  318. // Retrieves the election timeout.
  319. func (s *server) ElectionTimeout() time.Duration {
  320. s.mutex.RLock()
  321. defer s.mutex.RUnlock()
  322. return s.electionTimeout
  323. }
  324. // Sets the election timeout.
  325. func (s *server) SetElectionTimeout(duration time.Duration) {
  326. s.mutex.Lock()
  327. defer s.mutex.Unlock()
  328. s.electionTimeout = duration
  329. }
  330. //--------------------------------------
  331. // Heartbeat timeout
  332. //--------------------------------------
  333. // Retrieves the heartbeat timeout.
  334. func (s *server) HeartbeatInterval() time.Duration {
  335. s.mutex.RLock()
  336. defer s.mutex.RUnlock()
  337. return s.heartbeatInterval
  338. }
  339. // Sets the heartbeat timeout.
  340. func (s *server) SetHeartbeatInterval(duration time.Duration) {
  341. s.mutex.Lock()
  342. defer s.mutex.Unlock()
  343. s.heartbeatInterval = duration
  344. for _, peer := range s.peers {
  345. peer.setHeartbeatInterval(duration)
  346. }
  347. }
  348. //------------------------------------------------------------------------------
  349. //
  350. // Methods
  351. //
  352. //------------------------------------------------------------------------------
  353. //--------------------------------------
  354. // Initialization
  355. //--------------------------------------
  356. // Reg the NOPCommand
  357. func init() {
  358. RegisterCommand(&NOPCommand{})
  359. RegisterCommand(&DefaultJoinCommand{})
  360. RegisterCommand(&DefaultLeaveCommand{})
  361. }
  362. // Start the raft server
  363. // If log entries exist then allow promotion to candidate if no AEs received.
  364. // If no log entries exist then wait for AEs from another node.
  365. // If no log entries exist and a self-join command is issued then
  366. // immediately become leader and commit entry.
  367. func (s *server) Start() error {
  368. // Exit if the server is already running.
  369. if s.Running() {
  370. return fmt.Errorf("raft.Server: Server already running[%v]", s.state)
  371. }
  372. if err := s.Init(); err != nil {
  373. return err
  374. }
  375. s.setState(Follower)
  376. // If no log entries exist then
  377. // 1. wait for AEs from another node
  378. // 2. wait for self-join command
  379. // to set itself promotable
  380. if !s.promotable() {
  381. s.debugln("start as a new raft server")
  382. // If log entries exist then allow promotion to candidate
  383. // if no AEs received.
  384. } else {
  385. s.debugln("start from previous saved state")
  386. }
  387. debugln(s.GetState())
  388. go s.loop()
  389. return nil
  390. }
  391. // Init initializes the raft server
  392. func (s *server) Init() error {
  393. if s.Running() {
  394. return fmt.Errorf("raft.Server: Server already running[%v]", s.state)
  395. }
  396. // Server has been initialized or server was stopped after initialized
  397. // If log has been initialized, we know that the server was stopped after
  398. // running.
  399. if s.state == Initialized || s.log.initialized {
  400. s.state = Initialized
  401. return nil
  402. }
  403. // Create snapshot directory if it does not exist
  404. err := os.Mkdir(path.Join(s.path, "snapshot"), 0700)
  405. if err != nil && !os.IsExist(err) {
  406. s.debugln("raft: Snapshot dir error: ", err)
  407. return fmt.Errorf("raft: Initialization error: %s", err)
  408. }
  409. if err := s.readConf(); err != nil {
  410. s.debugln("raft: Conf file error: ", err)
  411. return fmt.Errorf("raft: Initialization error: %s", err)
  412. }
  413. // Initialize the log and load it up.
  414. if err := s.log.open(s.LogPath()); err != nil {
  415. s.debugln("raft: Log error: ", err)
  416. return fmt.Errorf("raft: Initialization error: %s", err)
  417. }
  418. // Update the term to the last term in the log.
  419. _, s.currentTerm = s.log.lastInfo()
  420. s.state = Initialized
  421. return nil
  422. }
  423. // Shuts down the server.
  424. func (s *server) Stop() {
  425. if s.State() == Stopped {
  426. return
  427. }
  428. stop := make(chan bool)
  429. s.stopped <- stop
  430. // make sure the server has stopped before we close the log
  431. <-stop
  432. s.log.close()
  433. s.setState(Stopped)
  434. }
  435. // Checks if the server is currently running.
  436. func (s *server) Running() bool {
  437. s.mutex.RLock()
  438. defer s.mutex.RUnlock()
  439. return (s.state != Stopped && s.state != Initialized)
  440. }
  441. //--------------------------------------
  442. // Term
  443. //--------------------------------------
  444. // updates the current term for the server. This is only used when a larger
  445. // external term is found.
  446. func (s *server) updateCurrentTerm(term uint64, leaderName string) {
  447. _assert(term > s.currentTerm,
  448. "upadteCurrentTerm: update is called when term is not larger than currentTerm")
  449. // Store previous values temporarily.
  450. prevTerm := s.currentTerm
  451. prevLeader := s.leader
  452. // set currentTerm = T, convert to follower (§5.1)
  453. // stop heartbeats before step-down
  454. if s.state == Leader {
  455. for _, peer := range s.peers {
  456. peer.stopHeartbeat(false)
  457. }
  458. }
  459. // update the term and clear vote for
  460. if s.state != Follower {
  461. s.setState(Follower)
  462. }
  463. s.mutex.Lock()
  464. s.currentTerm = term
  465. s.leader = leaderName
  466. s.votedFor = ""
  467. s.mutex.Unlock()
  468. // Dispatch change events.
  469. s.DispatchEvent(newEvent(TermChangeEventType, s.currentTerm, prevTerm))
  470. if prevLeader != s.leader {
  471. s.DispatchEvent(newEvent(LeaderChangeEventType, s.leader, prevLeader))
  472. }
  473. }
  474. //--------------------------------------
  475. // Event Loop
  476. //--------------------------------------
  477. // ________
  478. // --|Snapshot| timeout
  479. // | -------- ______
  480. // recover | ^ | |
  481. // snapshot / | |snapshot | |
  482. // higher | | v | recv majority votes
  483. // term | -------- timeout ----------- -----------
  484. // |-> |Follower| ----------> | Candidate |--------------------> | Leader |
  485. // -------- ----------- -----------
  486. // ^ higher term/ | higher term |
  487. // | new leader | |
  488. // |_______________________|____________________________________ |
  489. // The main event loop for the server
  490. func (s *server) loop() {
  491. defer s.debugln("server.loop.end")
  492. state := s.State()
  493. for state != Stopped {
  494. s.debugln("server.loop.run ", state)
  495. switch state {
  496. case Follower:
  497. s.followerLoop()
  498. case Candidate:
  499. s.candidateLoop()
  500. case Leader:
  501. s.leaderLoop()
  502. case Snapshotting:
  503. s.snapshotLoop()
  504. }
  505. state = s.State()
  506. }
  507. }
  508. // Sends an event to the event loop to be processed. The function will wait
  509. // until the event is actually processed before returning.
  510. func (s *server) send(value interface{}) (interface{}, error) {
  511. event := &ev{target: value, c: make(chan error, 1)}
  512. s.c <- event
  513. err := <-event.c
  514. return event.returnValue, err
  515. }
  516. func (s *server) sendAsync(value interface{}) {
  517. event := &ev{target: value, c: make(chan error, 1)}
  518. // try a non-blocking send first
  519. // in most cases, this should not be blocking
  520. // avoid create unnecessary go routines
  521. select {
  522. case s.c <- event:
  523. return
  524. default:
  525. }
  526. go func() {
  527. s.c <- event
  528. }()
  529. }
  530. // The event loop that is run when the server is in a Follower state.
  531. // Responds to RPCs from candidates and leaders.
  532. // Converts to candidate if election timeout elapses without either:
  533. // 1.Receiving valid AppendEntries RPC, or
  534. // 2.Granting vote to candidate
  535. func (s *server) followerLoop() {
  536. since := time.Now()
  537. electionTimeout := s.ElectionTimeout()
  538. timeoutChan := afterBetween(s.ElectionTimeout(), s.ElectionTimeout()*2)
  539. for s.State() == Follower {
  540. var err error
  541. update := false
  542. select {
  543. case stop := <-s.stopped:
  544. s.setState(Stopped)
  545. stop <- true
  546. return
  547. case e := <-s.c:
  548. switch req := e.target.(type) {
  549. case JoinCommand:
  550. //If no log entries exist and a self-join command is issued
  551. //then immediately become leader and commit entry.
  552. if s.log.currentIndex() == 0 && req.NodeName() == s.Name() {
  553. s.debugln("selfjoin and promote to leader")
  554. s.setState(Leader)
  555. s.processCommand(req, e)
  556. } else {
  557. err = NotLeaderError
  558. }
  559. case *AppendEntriesRequest:
  560. // If heartbeats get too close to the election timeout then send an event.
  561. elapsedTime := time.Now().Sub(since)
  562. if elapsedTime > time.Duration(float64(electionTimeout)*ElectionTimeoutThresholdPercent) {
  563. s.DispatchEvent(newEvent(ElectionTimeoutThresholdEventType, elapsedTime, nil))
  564. }
  565. e.returnValue, update = s.processAppendEntriesRequest(req)
  566. case *RequestVoteRequest:
  567. e.returnValue, update = s.processRequestVoteRequest(req)
  568. case *SnapshotRequest:
  569. e.returnValue = s.processSnapshotRequest(req)
  570. default:
  571. err = NotLeaderError
  572. }
  573. // Callback to event.
  574. e.c <- err
  575. case <-timeoutChan:
  576. // only allow synced follower to promote to candidate
  577. if s.promotable() {
  578. s.setState(Candidate)
  579. } else {
  580. update = true
  581. }
  582. }
  583. // Converts to candidate if election timeout elapses without either:
  584. // 1.Receiving valid AppendEntries RPC, or
  585. // 2.Granting vote to candidate
  586. if update {
  587. since = time.Now()
  588. timeoutChan = afterBetween(s.ElectionTimeout(), s.ElectionTimeout()*2)
  589. }
  590. }
  591. }
  592. // The event loop that is run when the server is in a Candidate state.
  593. func (s *server) candidateLoop() {
  594. // Clear leader value.
  595. prevLeader := s.leader
  596. s.leader = ""
  597. if prevLeader != s.leader {
  598. s.DispatchEvent(newEvent(LeaderChangeEventType, s.leader, prevLeader))
  599. }
  600. lastLogIndex, lastLogTerm := s.log.lastInfo()
  601. doVote := true
  602. votesGranted := 0
  603. var timeoutChan <-chan time.Time
  604. var respChan chan *RequestVoteResponse
  605. for s.State() == Candidate {
  606. if doVote {
  607. // Increment current term, vote for self.
  608. s.currentTerm++
  609. s.votedFor = s.name
  610. // Send RequestVote RPCs to all other servers.
  611. respChan = make(chan *RequestVoteResponse, len(s.peers))
  612. for _, peer := range s.peers {
  613. go peer.sendVoteRequest(newRequestVoteRequest(s.currentTerm, s.name, lastLogIndex, lastLogTerm), respChan)
  614. }
  615. // Wait for either:
  616. // * Votes received from majority of servers: become leader
  617. // * AppendEntries RPC received from new leader: step down.
  618. // * Election timeout elapses without election resolution: increment term, start new election
  619. // * Discover higher term: step down (§5.1)
  620. votesGranted = 1
  621. timeoutChan = afterBetween(s.ElectionTimeout(), s.ElectionTimeout()*2)
  622. doVote = false
  623. }
  624. // If we received enough votes then stop waiting for more votes.
  625. // And return from the candidate loop
  626. if votesGranted == s.QuorumSize() {
  627. s.debugln("server.candidate.recv.enough.votes")
  628. s.setState(Leader)
  629. return
  630. }
  631. // Collect votes from peers.
  632. select {
  633. case stop := <-s.stopped:
  634. s.setState(Stopped)
  635. stop <- true
  636. return
  637. case resp := <-respChan:
  638. if success := s.processVoteResponse(resp); success {
  639. s.debugln("server.candidate.vote.granted: ", votesGranted)
  640. votesGranted++
  641. }
  642. case e := <-s.c:
  643. var err error
  644. switch req := e.target.(type) {
  645. case Command:
  646. err = NotLeaderError
  647. case *AppendEntriesRequest:
  648. e.returnValue, _ = s.processAppendEntriesRequest(req)
  649. case *RequestVoteRequest:
  650. e.returnValue, _ = s.processRequestVoteRequest(req)
  651. }
  652. // Callback to event.
  653. e.c <- err
  654. case <-timeoutChan:
  655. doVote = true
  656. }
  657. }
  658. }
  659. // The event loop that is run when the server is in a Leader state.
  660. func (s *server) leaderLoop() {
  661. logIndex, _ := s.log.lastInfo()
  662. // Update the peers prevLogIndex to leader's lastLogIndex and start heartbeat.
  663. s.debugln("leaderLoop.set.PrevIndex to ", logIndex)
  664. for _, peer := range s.peers {
  665. peer.setPrevLogIndex(logIndex)
  666. peer.startHeartbeat()
  667. }
  668. // Commit a NOP after the server becomes leader. From the Raft paper:
  669. // "Upon election: send initial empty AppendEntries RPCs (heartbeat) to
  670. // each server; repeat during idle periods to prevent election timeouts
  671. // (§5.2)". The heartbeats started above do the "idle" period work.
  672. go s.Do(NOPCommand{})
  673. // Begin to collect response from followers
  674. for s.State() == Leader {
  675. var err error
  676. select {
  677. case stop := <-s.stopped:
  678. // Stop all peers before stop
  679. for _, peer := range s.peers {
  680. peer.stopHeartbeat(false)
  681. }
  682. s.setState(Stopped)
  683. stop <- true
  684. return
  685. case e := <-s.c:
  686. switch req := e.target.(type) {
  687. case Command:
  688. s.processCommand(req, e)
  689. continue
  690. case *AppendEntriesRequest:
  691. e.returnValue, _ = s.processAppendEntriesRequest(req)
  692. case *AppendEntriesResponse:
  693. s.processAppendEntriesResponse(req)
  694. case *RequestVoteRequest:
  695. e.returnValue, _ = s.processRequestVoteRequest(req)
  696. }
  697. // Callback to event.
  698. e.c <- err
  699. }
  700. }
  701. s.syncedPeer = nil
  702. }
  703. func (s *server) snapshotLoop() {
  704. for s.State() == Snapshotting {
  705. var err error
  706. select {
  707. case stop := <-s.stopped:
  708. s.setState(Stopped)
  709. stop <- true
  710. return
  711. case e := <-s.c:
  712. switch req := e.target.(type) {
  713. case Command:
  714. err = NotLeaderError
  715. case *AppendEntriesRequest:
  716. e.returnValue, _ = s.processAppendEntriesRequest(req)
  717. case *RequestVoteRequest:
  718. e.returnValue, _ = s.processRequestVoteRequest(req)
  719. case *SnapshotRecoveryRequest:
  720. e.returnValue = s.processSnapshotRecoveryRequest(req)
  721. }
  722. // Callback to event.
  723. e.c <- err
  724. }
  725. }
  726. }
  727. //--------------------------------------
  728. // Commands
  729. //--------------------------------------
  730. // Attempts to execute a command and replicate it. The function will return
  731. // when the command has been successfully committed or an error has occurred.
  732. func (s *server) Do(command Command) (interface{}, error) {
  733. return s.send(command)
  734. }
  735. // Processes a command.
  736. func (s *server) processCommand(command Command, e *ev) {
  737. s.debugln("server.command.process")
  738. // Create an entry for the command in the log.
  739. entry, err := s.log.createEntry(s.currentTerm, command, e)
  740. if err != nil {
  741. s.debugln("server.command.log.entry.error:", err)
  742. e.c <- err
  743. return
  744. }
  745. if err := s.log.appendEntry(entry); err != nil {
  746. s.debugln("server.command.log.error:", err)
  747. e.c <- err
  748. return
  749. }
  750. s.syncedPeer[s.Name()] = true
  751. if len(s.peers) == 0 {
  752. commitIndex := s.log.currentIndex()
  753. s.log.setCommitIndex(commitIndex)
  754. s.debugln("commit index ", commitIndex)
  755. }
  756. }
  757. //--------------------------------------
  758. // Append Entries
  759. //--------------------------------------
  760. // Appends zero or more log entry from the leader to this server.
  761. func (s *server) AppendEntries(req *AppendEntriesRequest) *AppendEntriesResponse {
  762. ret, _ := s.send(req)
  763. resp, _ := ret.(*AppendEntriesResponse)
  764. return resp
  765. }
  766. // Processes the "append entries" request.
  767. func (s *server) processAppendEntriesRequest(req *AppendEntriesRequest) (*AppendEntriesResponse, bool) {
  768. s.traceln("server.ae.process")
  769. if req.Term < s.currentTerm {
  770. s.debugln("server.ae.error: stale term")
  771. return newAppendEntriesResponse(s.currentTerm, false, s.log.currentIndex(), s.log.CommitIndex()), false
  772. }
  773. if req.Term == s.currentTerm {
  774. _assert(s.State() != Leader, "leader.elected.at.same.term.%d\n", s.currentTerm)
  775. // change state to follower
  776. s.setState(Follower)
  777. // discover new leader when candidate
  778. // save leader name when follower
  779. s.leader = req.LeaderName
  780. } else {
  781. // Update term and leader.
  782. s.updateCurrentTerm(req.Term, req.LeaderName)
  783. }
  784. // Reject if log doesn't contain a matching previous entry.
  785. if err := s.log.truncate(req.PrevLogIndex, req.PrevLogTerm); err != nil {
  786. s.debugln("server.ae.truncate.error: ", err)
  787. return newAppendEntriesResponse(s.currentTerm, false, s.log.currentIndex(), s.log.CommitIndex()), true
  788. }
  789. // Append entries to the log.
  790. if err := s.log.appendEntries(req.Entries); err != nil {
  791. s.debugln("server.ae.append.error: ", err)
  792. return newAppendEntriesResponse(s.currentTerm, false, s.log.currentIndex(), s.log.CommitIndex()), true
  793. }
  794. // Commit up to the commit index.
  795. if err := s.log.setCommitIndex(req.CommitIndex); err != nil {
  796. s.debugln("server.ae.commit.error: ", err)
  797. return newAppendEntriesResponse(s.currentTerm, false, s.log.currentIndex(), s.log.CommitIndex()), true
  798. }
  799. // once the server appended and committed all the log entries from the leader
  800. return newAppendEntriesResponse(s.currentTerm, true, s.log.currentIndex(), s.log.CommitIndex()), true
  801. }
  802. // Processes the "append entries" response from the peer. This is only
  803. // processed when the server is a leader. Responses received during other
  804. // states are dropped.
  805. func (s *server) processAppendEntriesResponse(resp *AppendEntriesResponse) {
  806. // If we find a higher term then change to a follower and exit.
  807. if resp.Term() > s.Term() {
  808. s.updateCurrentTerm(resp.Term(), "")
  809. return
  810. }
  811. // panic response if it's not successful.
  812. if !resp.Success() {
  813. return
  814. }
  815. // if one peer successfully append a log from the leader term,
  816. // we add it to the synced list
  817. if resp.append == true {
  818. s.syncedPeer[resp.peer] = true
  819. }
  820. // Increment the commit count to make sure we have a quorum before committing.
  821. if len(s.syncedPeer) < s.QuorumSize() {
  822. return
  823. }
  824. // Determine the committed index that a majority has.
  825. var indices []uint64
  826. indices = append(indices, s.log.currentIndex())
  827. for _, peer := range s.peers {
  828. indices = append(indices, peer.getPrevLogIndex())
  829. }
  830. sort.Sort(sort.Reverse(uint64Slice(indices)))
  831. // We can commit up to the index which the majority of the members have appended.
  832. commitIndex := indices[s.QuorumSize()-1]
  833. committedIndex := s.log.commitIndex
  834. if commitIndex > committedIndex {
  835. // leader needs to do a fsync before committing log entries
  836. s.log.sync()
  837. s.log.setCommitIndex(commitIndex)
  838. s.debugln("commit index ", commitIndex)
  839. }
  840. }
  841. // processVoteReponse processes a vote request:
  842. // 1. if the vote is granted for the current term of the candidate, return true
  843. // 2. if the vote is denied due to smaller term, update the term of this server
  844. // which will also cause the candidate to step-down, and return false.
  845. // 3. if the vote is for a smaller term, ignore it and return false.
  846. func (s *server) processVoteResponse(resp *RequestVoteResponse) bool {
  847. if resp.VoteGranted && resp.Term == s.currentTerm {
  848. return true
  849. }
  850. if resp.Term > s.currentTerm {
  851. s.debugln("server.candidate.vote.failed")
  852. s.updateCurrentTerm(resp.Term, "")
  853. } else {
  854. s.debugln("server.candidate.vote: denied")
  855. }
  856. return false
  857. }
  858. //--------------------------------------
  859. // Request Vote
  860. //--------------------------------------
  861. // Requests a vote from a server. A vote can be obtained if the vote's term is
  862. // at the server's current term and the server has not made a vote yet. A vote
  863. // can also be obtained if the term is greater than the server's current term.
  864. func (s *server) RequestVote(req *RequestVoteRequest) *RequestVoteResponse {
  865. ret, _ := s.send(req)
  866. resp, _ := ret.(*RequestVoteResponse)
  867. return resp
  868. }
  869. // Processes a "request vote" request.
  870. func (s *server) processRequestVoteRequest(req *RequestVoteRequest) (*RequestVoteResponse, bool) {
  871. // If the request is coming from an old term then reject it.
  872. if req.Term < s.Term() {
  873. s.debugln("server.rv.deny.vote: cause stale term")
  874. return newRequestVoteResponse(s.currentTerm, false), false
  875. }
  876. // If the term of the request peer is larger than this node, update the term
  877. // If the term is equal and we've already voted for a different candidate then
  878. // don't vote for this candidate.
  879. if req.Term > s.Term() {
  880. s.updateCurrentTerm(req.Term, "")
  881. } else if s.votedFor != "" && s.votedFor != req.CandidateName {
  882. s.debugln("server.deny.vote: cause duplicate vote: ", req.CandidateName,
  883. " already vote for ", s.votedFor)
  884. return newRequestVoteResponse(s.currentTerm, false), false
  885. }
  886. // If the candidate's log is not at least as up-to-date as our last log then don't vote.
  887. lastIndex, lastTerm := s.log.lastInfo()
  888. if lastIndex > req.LastLogIndex || lastTerm > req.LastLogTerm {
  889. s.debugln("server.deny.vote: cause out of date log: ", req.CandidateName,
  890. "Index :[", lastIndex, "]", " [", req.LastLogIndex, "]",
  891. "Term :[", lastTerm, "]", " [", req.LastLogTerm, "]")
  892. return newRequestVoteResponse(s.currentTerm, false), false
  893. }
  894. // If we made it this far then cast a vote and reset our election time out.
  895. s.debugln("server.rv.vote: ", s.name, " votes for", req.CandidateName, "at term", req.Term)
  896. s.votedFor = req.CandidateName
  897. return newRequestVoteResponse(s.currentTerm, true), true
  898. }
  899. //--------------------------------------
  900. // Membership
  901. //--------------------------------------
  902. // Adds a peer to the server.
  903. func (s *server) AddPeer(name string, connectiongString string) error {
  904. s.debugln("server.peer.add: ", name, len(s.peers))
  905. // Do not allow peers to be added twice.
  906. if s.peers[name] != nil {
  907. return nil
  908. }
  909. // Skip the Peer if it has the same name as the Server
  910. if s.name != name {
  911. peer := newPeer(s, name, connectiongString, s.heartbeatInterval)
  912. if s.State() == Leader {
  913. peer.startHeartbeat()
  914. }
  915. s.peers[peer.Name] = peer
  916. s.DispatchEvent(newEvent(AddPeerEventType, name, nil))
  917. }
  918. // Write the configuration to file.
  919. s.writeConf()
  920. return nil
  921. }
  922. // Removes a peer from the server.
  923. func (s *server) RemovePeer(name string) error {
  924. s.debugln("server.peer.remove: ", name, len(s.peers))
  925. // Skip the Peer if it has the same name as the Server
  926. if name != s.Name() {
  927. // Return error if peer doesn't exist.
  928. peer := s.peers[name]
  929. if peer == nil {
  930. return fmt.Errorf("raft: Peer not found: %s", name)
  931. }
  932. // Stop peer and remove it.
  933. if s.State() == Leader {
  934. // We create a go routine here to avoid potential deadlock.
  935. // We are holding log write lock when reach this line of code.
  936. // Peer.stopHeartbeat can be blocked without go routine, if the
  937. // target go routine (which we want to stop) is calling
  938. // log.getEntriesAfter and waiting for log read lock.
  939. // So we might be holding log lock and waiting for log lock,
  940. // which lead to a deadlock.
  941. // TODO(xiangli) refactor log lock
  942. go peer.stopHeartbeat(true)
  943. }
  944. delete(s.peers, name)
  945. s.DispatchEvent(newEvent(RemovePeerEventType, name, nil))
  946. }
  947. // Write the configuration to file.
  948. s.writeConf()
  949. return nil
  950. }
  951. //--------------------------------------
  952. // Log compaction
  953. //--------------------------------------
  954. func (s *server) TakeSnapshot() error {
  955. if s.stateMachine == nil {
  956. return errors.New("Snapshot: Cannot create snapshot. Missing state machine.")
  957. }
  958. // Shortcut without lock
  959. // Exit if the server is currently creating a snapshot.
  960. if s.pendingSnapshot != nil {
  961. return errors.New("Snapshot: Last snapshot is not finished.")
  962. }
  963. // TODO: acquire the lock and no more committed is allowed
  964. // This will be done after finishing refactoring heartbeat
  965. s.debugln("take.snapshot")
  966. lastIndex, lastTerm := s.log.commitInfo()
  967. // check if there is log has been committed since the
  968. // last snapshot.
  969. if lastIndex == s.log.startIndex {
  970. return nil
  971. }
  972. path := s.SnapshotPath(lastIndex, lastTerm)
  973. // Attach snapshot to pending snapshot and save it to disk.
  974. s.pendingSnapshot = &Snapshot{lastIndex, lastTerm, nil, nil, path}
  975. state, err := s.stateMachine.Save()
  976. if err != nil {
  977. return err
  978. }
  979. // Clone the list of peers.
  980. peers := make([]*Peer, 0, len(s.peers)+1)
  981. for _, peer := range s.peers {
  982. peers = append(peers, peer.clone())
  983. }
  984. peers = append(peers, &Peer{Name: s.Name(), ConnectionString: s.connectionString})
  985. // Attach snapshot to pending snapshot and save it to disk.
  986. s.pendingSnapshot.Peers = peers
  987. s.pendingSnapshot.State = state
  988. s.saveSnapshot()
  989. // We keep some log entries after the snapshot.
  990. // We do not want to send the whole snapshot to the slightly slow machines
  991. if lastIndex-s.log.startIndex > NumberOfLogEntriesAfterSnapshot {
  992. compactIndex := lastIndex - NumberOfLogEntriesAfterSnapshot
  993. compactTerm := s.log.getEntry(compactIndex).Term()
  994. s.log.compact(compactIndex, compactTerm)
  995. }
  996. return nil
  997. }
  998. // Retrieves the log path for the server.
  999. func (s *server) saveSnapshot() error {
  1000. if s.pendingSnapshot == nil {
  1001. return errors.New("pendingSnapshot.is.nil")
  1002. }
  1003. // Write snapshot to disk.
  1004. if err := s.pendingSnapshot.save(); err != nil {
  1005. return err
  1006. }
  1007. // Swap the current and last snapshots.
  1008. tmp := s.snapshot
  1009. s.snapshot = s.pendingSnapshot
  1010. // Delete the previous snapshot if there is any change
  1011. if tmp != nil && !(tmp.LastIndex == s.snapshot.LastIndex && tmp.LastTerm == s.snapshot.LastTerm) {
  1012. tmp.remove()
  1013. }
  1014. s.pendingSnapshot = nil
  1015. return nil
  1016. }
  1017. // Retrieves the log path for the server.
  1018. func (s *server) SnapshotPath(lastIndex uint64, lastTerm uint64) string {
  1019. return path.Join(s.path, "snapshot", fmt.Sprintf("%v_%v.ss", lastTerm, lastIndex))
  1020. }
  1021. func (s *server) RequestSnapshot(req *SnapshotRequest) *SnapshotResponse {
  1022. ret, _ := s.send(req)
  1023. resp, _ := ret.(*SnapshotResponse)
  1024. return resp
  1025. }
  1026. func (s *server) processSnapshotRequest(req *SnapshotRequest) *SnapshotResponse {
  1027. // If the follower’s log contains an entry at the snapshot’s last index with a term
  1028. // that matches the snapshot’s last term, then the follower already has all the
  1029. // information found in the snapshot and can reply false.
  1030. entry := s.log.getEntry(req.LastIndex)
  1031. if entry != nil && entry.Term() == req.LastTerm {
  1032. return newSnapshotResponse(false)
  1033. }
  1034. // Update state.
  1035. s.setState(Snapshotting)
  1036. return newSnapshotResponse(true)
  1037. }
  1038. func (s *server) SnapshotRecoveryRequest(req *SnapshotRecoveryRequest) *SnapshotRecoveryResponse {
  1039. ret, _ := s.send(req)
  1040. resp, _ := ret.(*SnapshotRecoveryResponse)
  1041. return resp
  1042. }
  1043. func (s *server) processSnapshotRecoveryRequest(req *SnapshotRecoveryRequest) *SnapshotRecoveryResponse {
  1044. // Recover state sent from request.
  1045. if err := s.stateMachine.Recovery(req.State); err != nil {
  1046. panic("cannot recover from previous state")
  1047. }
  1048. // Recover the cluster configuration.
  1049. s.peers = make(map[string]*Peer)
  1050. for _, peer := range req.Peers {
  1051. s.AddPeer(peer.Name, peer.ConnectionString)
  1052. }
  1053. // Update log state.
  1054. s.currentTerm = req.LastTerm
  1055. s.log.updateCommitIndex(req.LastIndex)
  1056. // Create local snapshot.
  1057. s.pendingSnapshot = &Snapshot{req.LastIndex, req.LastTerm, req.Peers, req.State, s.SnapshotPath(req.LastIndex, req.LastTerm)}
  1058. s.saveSnapshot()
  1059. // Clear the previous log entries.
  1060. s.log.compact(req.LastIndex, req.LastTerm)
  1061. return newSnapshotRecoveryResponse(req.LastTerm, true, req.LastIndex)
  1062. }
  1063. // Load a snapshot at restart
  1064. func (s *server) LoadSnapshot() error {
  1065. // Open snapshot/ directory.
  1066. dir, err := os.OpenFile(path.Join(s.path, "snapshot"), os.O_RDONLY, 0)
  1067. if err != nil {
  1068. s.debugln("cannot.open.snapshot: ", err)
  1069. return err
  1070. }
  1071. // Retrieve a list of all snapshots.
  1072. filenames, err := dir.Readdirnames(-1)
  1073. if err != nil {
  1074. dir.Close()
  1075. panic(err)
  1076. }
  1077. dir.Close()
  1078. if len(filenames) == 0 {
  1079. s.debugln("no.snapshot.to.load")
  1080. return nil
  1081. }
  1082. // Grab the latest snapshot.
  1083. sort.Strings(filenames)
  1084. snapshotPath := path.Join(s.path, "snapshot", filenames[len(filenames)-1])
  1085. // Read snapshot data.
  1086. file, err := os.OpenFile(snapshotPath, os.O_RDONLY, 0)
  1087. if err != nil {
  1088. return err
  1089. }
  1090. defer file.Close()
  1091. // Check checksum.
  1092. var checksum uint32
  1093. n, err := fmt.Fscanf(file, "%08x\n", &checksum)
  1094. if err != nil {
  1095. return err
  1096. } else if n != 1 {
  1097. return errors.New("checksum.err: bad.snapshot.file")
  1098. }
  1099. // Load remaining snapshot contents.
  1100. b, err := ioutil.ReadAll(file)
  1101. if err != nil {
  1102. return err
  1103. }
  1104. // Generate checksum.
  1105. byteChecksum := crc32.ChecksumIEEE(b)
  1106. if uint32(checksum) != byteChecksum {
  1107. s.debugln(checksum, " ", byteChecksum)
  1108. return errors.New("bad snapshot file")
  1109. }
  1110. // Decode snapshot.
  1111. if err = json.Unmarshal(b, &s.snapshot); err != nil {
  1112. s.debugln("unmarshal.snapshot.error: ", err)
  1113. return err
  1114. }
  1115. // Recover snapshot into state machine.
  1116. if err = s.stateMachine.Recovery(s.snapshot.State); err != nil {
  1117. s.debugln("recovery.snapshot.error: ", err)
  1118. return err
  1119. }
  1120. // Recover cluster configuration.
  1121. for _, peer := range s.snapshot.Peers {
  1122. s.AddPeer(peer.Name, peer.ConnectionString)
  1123. }
  1124. // Update log state.
  1125. s.log.startTerm = s.snapshot.LastTerm
  1126. s.log.startIndex = s.snapshot.LastIndex
  1127. s.log.updateCommitIndex(s.snapshot.LastIndex)
  1128. return err
  1129. }
  1130. //--------------------------------------
  1131. // Config File
  1132. //--------------------------------------
  1133. // Flushes commit index to the disk.
  1134. // So when the raft server restarts, it will commit upto the flushed commitIndex.
  1135. func (s *server) FlushCommitIndex() {
  1136. s.debugln("server.conf.update")
  1137. // Write the configuration to file.
  1138. s.writeConf()
  1139. }
  1140. func (s *server) writeConf() {
  1141. peers := make([]*Peer, len(s.peers))
  1142. i := 0
  1143. for _, peer := range s.peers {
  1144. peers[i] = peer.clone()
  1145. i++
  1146. }
  1147. r := &Config{
  1148. CommitIndex: s.log.commitIndex,
  1149. Peers: peers,
  1150. }
  1151. b, _ := json.Marshal(r)
  1152. confPath := path.Join(s.path, "conf")
  1153. tmpConfPath := path.Join(s.path, "conf.tmp")
  1154. err := writeFileSynced(tmpConfPath, b, 0600)
  1155. if err != nil {
  1156. panic(err)
  1157. }
  1158. os.Rename(tmpConfPath, confPath)
  1159. }
  1160. // Read the configuration for the server.
  1161. func (s *server) readConf() error {
  1162. confPath := path.Join(s.path, "conf")
  1163. s.debugln("readConf.open ", confPath)
  1164. // open conf file
  1165. b, err := ioutil.ReadFile(confPath)
  1166. if err != nil {
  1167. return nil
  1168. }
  1169. conf := &Config{}
  1170. if err = json.Unmarshal(b, conf); err != nil {
  1171. return err
  1172. }
  1173. s.log.updateCommitIndex(conf.CommitIndex)
  1174. return nil
  1175. }
  1176. //--------------------------------------
  1177. // Debugging
  1178. //--------------------------------------
  1179. func (s *server) debugln(v ...interface{}) {
  1180. if logLevel > Debug {
  1181. debugf("[%s Term:%d] %s", s.name, s.Term(), fmt.Sprintln(v...))
  1182. }
  1183. }
  1184. func (s *server) traceln(v ...interface{}) {
  1185. if logLevel > Trace {
  1186. tracef("[%s] %s", s.name, fmt.Sprintln(v...))
  1187. }
  1188. }