| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422 |
- package raft
- import (
- "encoding/json"
- "errors"
- "fmt"
- "hash/crc32"
- "io/ioutil"
- "os"
- "path"
- "sort"
- "sync"
- "time"
- )
- //------------------------------------------------------------------------------
- //
- // Constants
- //
- //------------------------------------------------------------------------------
- const (
- Stopped = "stopped"
- Initialized = "initialized"
- Follower = "follower"
- Candidate = "candidate"
- Leader = "leader"
- Snapshotting = "snapshotting"
- )
- const (
- MaxLogEntriesPerRequest = 2000
- NumberOfLogEntriesAfterSnapshot = 200
- )
- const (
- // DefaultHeartbeatInterval is the interval that the leader will send
- // AppendEntriesRequests to followers to maintain leadership.
- DefaultHeartbeatInterval = 50 * time.Millisecond
- DefaultElectionTimeout = 150 * time.Millisecond
- )
- // ElectionTimeoutThresholdPercent specifies the threshold at which the server
- // will dispatch warning events that the heartbeat RTT is too close to the
- // election timeout.
- const ElectionTimeoutThresholdPercent = 0.8
- //------------------------------------------------------------------------------
- //
- // Errors
- //
- //------------------------------------------------------------------------------
- var NotLeaderError = errors.New("raft.Server: Not current leader")
- var DuplicatePeerError = errors.New("raft.Server: Duplicate peer")
- var CommandTimeoutError = errors.New("raft: Command timeout")
- //------------------------------------------------------------------------------
- //
- // Typedefs
- //
- //------------------------------------------------------------------------------
- // A server is involved in the consensus protocol and can act as a follower,
- // candidate or a leader.
- type Server interface {
- Name() string
- Context() interface{}
- StateMachine() StateMachine
- Leader() string
- State() string
- Path() string
- LogPath() string
- SnapshotPath(lastIndex uint64, lastTerm uint64) string
- Term() uint64
- CommitIndex() uint64
- VotedFor() string
- MemberCount() int
- QuorumSize() int
- IsLogEmpty() bool
- LogEntries() []*LogEntry
- LastCommandName() string
- GetState() string
- ElectionTimeout() time.Duration
- SetElectionTimeout(duration time.Duration)
- HeartbeatInterval() time.Duration
- SetHeartbeatInterval(duration time.Duration)
- Transporter() Transporter
- SetTransporter(t Transporter)
- AppendEntries(req *AppendEntriesRequest) *AppendEntriesResponse
- RequestVote(req *RequestVoteRequest) *RequestVoteResponse
- RequestSnapshot(req *SnapshotRequest) *SnapshotResponse
- SnapshotRecoveryRequest(req *SnapshotRecoveryRequest) *SnapshotRecoveryResponse
- AddPeer(name string, connectiongString string) error
- RemovePeer(name string) error
- Peers() map[string]*Peer
- Init() error
- Start() error
- Stop()
- Running() bool
- Do(command Command) (interface{}, error)
- TakeSnapshot() error
- LoadSnapshot() error
- AddEventListener(string, EventListener)
- FlushCommitIndex()
- }
- type server struct {
- *eventDispatcher
- name string
- path string
- state string
- transporter Transporter
- context interface{}
- currentTerm uint64
- votedFor string
- log *Log
- leader string
- peers map[string]*Peer
- mutex sync.RWMutex
- syncedPeer map[string]bool
- stopped chan chan bool
- c chan *ev
- electionTimeout time.Duration
- heartbeatInterval time.Duration
- snapshot *Snapshot
- // PendingSnapshot is an unfinished snapshot.
- // After the pendingSnapshot is saved to disk,
- // it will be set to snapshot and also will be
- // set to nil.
- pendingSnapshot *Snapshot
- stateMachine StateMachine
- maxLogEntriesPerRequest uint64
- connectionString string
- }
- // An internal event to be processed by the server's event loop.
- type ev struct {
- target interface{}
- returnValue interface{}
- c chan error
- }
- //------------------------------------------------------------------------------
- //
- // Constructor
- //
- //------------------------------------------------------------------------------
- // Creates a new server with a log at the given path. transporter must
- // not be nil. stateMachine can be nil if snapshotting and log
- // compaction is to be disabled. context can be anything (including nil)
- // and is not used by the raft package except returned by
- // Server.Context(). connectionString can be anything.
- func NewServer(name string, path string, transporter Transporter, stateMachine StateMachine, ctx interface{}, connectionString string) (Server, error) {
- if name == "" {
- return nil, errors.New("raft.Server: Name cannot be blank")
- }
- if transporter == nil {
- panic("raft: Transporter required")
- }
- s := &server{
- name: name,
- path: path,
- transporter: transporter,
- stateMachine: stateMachine,
- context: ctx,
- state: Stopped,
- peers: make(map[string]*Peer),
- log: newLog(),
- stopped: make(chan chan bool),
- c: make(chan *ev, 256),
- electionTimeout: DefaultElectionTimeout,
- heartbeatInterval: DefaultHeartbeatInterval,
- maxLogEntriesPerRequest: MaxLogEntriesPerRequest,
- connectionString: connectionString,
- }
- s.eventDispatcher = newEventDispatcher(s)
- // Setup apply function.
- s.log.ApplyFunc = func(e *LogEntry, c Command) (interface{}, error) {
- // Dispatch commit event.
- s.DispatchEvent(newEvent(CommitEventType, e, nil))
- // Apply command to the state machine.
- switch c := c.(type) {
- case CommandApply:
- return c.Apply(&context{
- server: s,
- currentTerm: s.currentTerm,
- currentIndex: s.log.internalCurrentIndex(),
- commitIndex: s.log.commitIndex,
- })
- case deprecatedCommandApply:
- return c.Apply(s)
- default:
- return nil, fmt.Errorf("Command does not implement Apply()")
- }
- }
- return s, nil
- }
- //------------------------------------------------------------------------------
- //
- // Accessors
- //
- //------------------------------------------------------------------------------
- //--------------------------------------
- // General
- //--------------------------------------
- // Retrieves the name of the server.
- func (s *server) Name() string {
- return s.name
- }
- // Retrieves the storage path for the server.
- func (s *server) Path() string {
- return s.path
- }
- // The name of the current leader.
- func (s *server) Leader() string {
- return s.leader
- }
- // Retrieves a copy of the peer data.
- func (s *server) Peers() map[string]*Peer {
- s.mutex.Lock()
- defer s.mutex.Unlock()
- peers := make(map[string]*Peer)
- for name, peer := range s.peers {
- peers[name] = peer.clone()
- }
- return peers
- }
- // Retrieves the object that transports requests.
- func (s *server) Transporter() Transporter {
- s.mutex.RLock()
- defer s.mutex.RUnlock()
- return s.transporter
- }
- func (s *server) SetTransporter(t Transporter) {
- s.mutex.Lock()
- defer s.mutex.Unlock()
- s.transporter = t
- }
- // Retrieves the context passed into the constructor.
- func (s *server) Context() interface{} {
- return s.context
- }
- // Retrieves the state machine passed into the constructor.
- func (s *server) StateMachine() StateMachine {
- return s.stateMachine
- }
- // Retrieves the log path for the server.
- func (s *server) LogPath() string {
- return path.Join(s.path, "log")
- }
- // Retrieves the current state of the server.
- func (s *server) State() string {
- s.mutex.RLock()
- defer s.mutex.RUnlock()
- return s.state
- }
- // Sets the state of the server.
- func (s *server) setState(state string) {
- s.mutex.Lock()
- defer s.mutex.Unlock()
- // Temporarily store previous values.
- prevState := s.state
- prevLeader := s.leader
- // Update state and leader.
- s.state = state
- if state == Leader {
- s.leader = s.Name()
- s.syncedPeer = make(map[string]bool)
- }
- // Dispatch state and leader change events.
- s.DispatchEvent(newEvent(StateChangeEventType, s.state, prevState))
- if prevLeader != s.leader {
- s.DispatchEvent(newEvent(LeaderChangeEventType, s.leader, prevLeader))
- }
- }
- // Retrieves the current term of the server.
- func (s *server) Term() uint64 {
- s.mutex.RLock()
- defer s.mutex.RUnlock()
- return s.currentTerm
- }
- // Retrieves the current commit index of the server.
- func (s *server) CommitIndex() uint64 {
- s.log.mutex.RLock()
- defer s.log.mutex.RUnlock()
- return s.log.commitIndex
- }
- // Retrieves the name of the candidate this server voted for in this term.
- func (s *server) VotedFor() string {
- return s.votedFor
- }
- // Retrieves whether the server's log has no entries.
- func (s *server) IsLogEmpty() bool {
- return s.log.isEmpty()
- }
- // A list of all the log entries. This should only be used for debugging purposes.
- func (s *server) LogEntries() []*LogEntry {
- return s.log.entries
- }
- // A reference to the command name of the last entry.
- func (s *server) LastCommandName() string {
- return s.log.lastCommandName()
- }
- // Get the state of the server for debugging
- func (s *server) GetState() string {
- s.mutex.RLock()
- defer s.mutex.RUnlock()
- return fmt.Sprintf("Name: %s, State: %s, Term: %v, CommitedIndex: %v ", s.name, s.state, s.currentTerm, s.log.commitIndex)
- }
- // Check if the server is promotable
- func (s *server) promotable() bool {
- return s.log.currentIndex() > 0
- }
- //--------------------------------------
- // Membership
- //--------------------------------------
- // Retrieves the number of member servers in the consensus.
- func (s *server) MemberCount() int {
- s.mutex.Lock()
- defer s.mutex.Unlock()
- return len(s.peers) + 1
- }
- // Retrieves the number of servers required to make a quorum.
- func (s *server) QuorumSize() int {
- return (s.MemberCount() / 2) + 1
- }
- //--------------------------------------
- // Election timeout
- //--------------------------------------
- // Retrieves the election timeout.
- func (s *server) ElectionTimeout() time.Duration {
- s.mutex.RLock()
- defer s.mutex.RUnlock()
- return s.electionTimeout
- }
- // Sets the election timeout.
- func (s *server) SetElectionTimeout(duration time.Duration) {
- s.mutex.Lock()
- defer s.mutex.Unlock()
- s.electionTimeout = duration
- }
- //--------------------------------------
- // Heartbeat timeout
- //--------------------------------------
- // Retrieves the heartbeat timeout.
- func (s *server) HeartbeatInterval() time.Duration {
- s.mutex.RLock()
- defer s.mutex.RUnlock()
- return s.heartbeatInterval
- }
- // Sets the heartbeat timeout.
- func (s *server) SetHeartbeatInterval(duration time.Duration) {
- s.mutex.Lock()
- defer s.mutex.Unlock()
- s.heartbeatInterval = duration
- for _, peer := range s.peers {
- peer.setHeartbeatInterval(duration)
- }
- }
- //------------------------------------------------------------------------------
- //
- // Methods
- //
- //------------------------------------------------------------------------------
- //--------------------------------------
- // Initialization
- //--------------------------------------
- // Reg the NOPCommand
- func init() {
- RegisterCommand(&NOPCommand{})
- RegisterCommand(&DefaultJoinCommand{})
- RegisterCommand(&DefaultLeaveCommand{})
- }
- // Start the raft server
- // If log entries exist then allow promotion to candidate if no AEs received.
- // If no log entries exist then wait for AEs from another node.
- // If no log entries exist and a self-join command is issued then
- // immediately become leader and commit entry.
- func (s *server) Start() error {
- // Exit if the server is already running.
- if s.Running() {
- return fmt.Errorf("raft.Server: Server already running[%v]", s.state)
- }
- if err := s.Init(); err != nil {
- return err
- }
- s.setState(Follower)
- // If no log entries exist then
- // 1. wait for AEs from another node
- // 2. wait for self-join command
- // to set itself promotable
- if !s.promotable() {
- s.debugln("start as a new raft server")
- // If log entries exist then allow promotion to candidate
- // if no AEs received.
- } else {
- s.debugln("start from previous saved state")
- }
- debugln(s.GetState())
- go s.loop()
- return nil
- }
- // Init initializes the raft server
- func (s *server) Init() error {
- if s.Running() {
- return fmt.Errorf("raft.Server: Server already running[%v]", s.state)
- }
- // server has been initialized or server was stopped after initialized
- if s.state == Initialized || !s.log.isEmpty() {
- s.state = Initialized
- return nil
- }
- // Create snapshot directory if it does not exist
- err := os.Mkdir(path.Join(s.path, "snapshot"), 0700)
- if err != nil && !os.IsExist(err) {
- s.debugln("raft: Snapshot dir error: ", err)
- return fmt.Errorf("raft: Initialization error: %s", err)
- }
- if err := s.readConf(); err != nil {
- s.debugln("raft: Conf file error: ", err)
- return fmt.Errorf("raft: Initialization error: %s", err)
- }
- // Initialize the log and load it up.
- if err := s.log.open(s.LogPath()); err != nil {
- s.debugln("raft: Log error: ", err)
- return fmt.Errorf("raft: Initialization error: %s", err)
- }
- // Update the term to the last term in the log.
- _, s.currentTerm = s.log.lastInfo()
- s.state = Initialized
- return nil
- }
- // Shuts down the server.
- func (s *server) Stop() {
- stop := make(chan bool)
- s.stopped <- stop
- s.state = Stopped
- // make sure the server has stopped before we close the log
- <-stop
- s.log.close()
- }
- // Checks if the server is currently running.
- func (s *server) Running() bool {
- s.mutex.RLock()
- defer s.mutex.RUnlock()
- return (s.state != Stopped && s.state != Initialized)
- }
- //--------------------------------------
- // Term
- //--------------------------------------
- // updates the current term for the server. This is only used when a larger
- // external term is found.
- func (s *server) updateCurrentTerm(term uint64, leaderName string) {
- _assert(term > s.currentTerm,
- "upadteCurrentTerm: update is called when term is not larger than currentTerm")
- s.mutex.Lock()
- defer s.mutex.Unlock()
- // Store previous values temporarily.
- prevTerm := s.currentTerm
- prevLeader := s.leader
- // set currentTerm = T, convert to follower (§5.1)
- // stop heartbeats before step-down
- if s.state == Leader {
- s.mutex.Unlock()
- for _, peer := range s.peers {
- peer.stopHeartbeat(false)
- }
- s.mutex.Lock()
- }
- // update the term and clear vote for
- if s.state != Follower {
- s.mutex.Unlock()
- s.setState(Follower)
- s.mutex.Lock()
- }
- s.currentTerm = term
- s.leader = leaderName
- s.votedFor = ""
- // Dispatch change events.
- s.DispatchEvent(newEvent(TermChangeEventType, s.currentTerm, prevTerm))
- if prevLeader != s.leader {
- s.DispatchEvent(newEvent(LeaderChangeEventType, s.leader, prevLeader))
- }
- }
- //--------------------------------------
- // Event Loop
- //--------------------------------------
- // ________
- // --|Snapshot| timeout
- // | -------- ______
- // recover | ^ | |
- // snapshot / | |snapshot | |
- // higher | | v | recv majority votes
- // term | -------- timeout ----------- -----------
- // |-> |Follower| ----------> | Candidate |--------------------> | Leader |
- // -------- ----------- -----------
- // ^ higher term/ | higher term |
- // | new leader | |
- // |_______________________|____________________________________ |
- // The main event loop for the server
- func (s *server) loop() {
- defer s.debugln("server.loop.end")
- for s.state != Stopped {
- state := s.State()
- s.debugln("server.loop.run ", state)
- switch state {
- case Follower:
- s.followerLoop()
- case Candidate:
- s.candidateLoop()
- case Leader:
- s.leaderLoop()
- case Snapshotting:
- s.snapshotLoop()
- }
- }
- }
- // Sends an event to the event loop to be processed. The function will wait
- // until the event is actually processed before returning.
- func (s *server) send(value interface{}) (interface{}, error) {
- event := &ev{target: value, c: make(chan error, 1)}
- s.c <- event
- err := <-event.c
- return event.returnValue, err
- }
- func (s *server) sendAsync(value interface{}) {
- event := &ev{target: value, c: make(chan error, 1)}
- // try a non-blocking send first
- // in most cases, this should not be blocking
- // avoid create unnecessary go routines
- select {
- case s.c <- event:
- return
- default:
- }
- go func() {
- s.c <- event
- }()
- }
- // The event loop that is run when the server is in a Follower state.
- // Responds to RPCs from candidates and leaders.
- // Converts to candidate if election timeout elapses without either:
- // 1.Receiving valid AppendEntries RPC, or
- // 2.Granting vote to candidate
- func (s *server) followerLoop() {
- since := time.Now()
- electionTimeout := s.ElectionTimeout()
- timeoutChan := afterBetween(s.ElectionTimeout(), s.ElectionTimeout()*2)
- for s.State() == Follower {
- var err error
- update := false
- select {
- case stop := <-s.stopped:
- s.setState(Stopped)
- stop <- true
- return
- case e := <-s.c:
- switch req := e.target.(type) {
- case JoinCommand:
- //If no log entries exist and a self-join command is issued
- //then immediately become leader and commit entry.
- if s.log.currentIndex() == 0 && req.NodeName() == s.Name() {
- s.debugln("selfjoin and promote to leader")
- s.setState(Leader)
- s.processCommand(req, e)
- } else {
- err = NotLeaderError
- }
- case *AppendEntriesRequest:
- // If heartbeats get too close to the election timeout then send an event.
- elapsedTime := time.Now().Sub(since)
- if elapsedTime > time.Duration(float64(electionTimeout)*ElectionTimeoutThresholdPercent) {
- s.DispatchEvent(newEvent(ElectionTimeoutThresholdEventType, elapsedTime, nil))
- }
- e.returnValue, update = s.processAppendEntriesRequest(req)
- case *RequestVoteRequest:
- e.returnValue, update = s.processRequestVoteRequest(req)
- case *SnapshotRequest:
- e.returnValue = s.processSnapshotRequest(req)
- default:
- err = NotLeaderError
- }
- // Callback to event.
- e.c <- err
- case <-timeoutChan:
- // only allow synced follower to promote to candidate
- if s.promotable() {
- s.setState(Candidate)
- } else {
- update = true
- }
- }
- // Converts to candidate if election timeout elapses without either:
- // 1.Receiving valid AppendEntries RPC, or
- // 2.Granting vote to candidate
- if update {
- since = time.Now()
- timeoutChan = afterBetween(s.ElectionTimeout(), s.ElectionTimeout()*2)
- }
- }
- }
- // The event loop that is run when the server is in a Candidate state.
- func (s *server) candidateLoop() {
- // Clear leader value.
- prevLeader := s.leader
- s.leader = ""
- if prevLeader != s.leader {
- s.DispatchEvent(newEvent(LeaderChangeEventType, s.leader, prevLeader))
- }
- lastLogIndex, lastLogTerm := s.log.lastInfo()
- doVote := true
- votesGranted := 0
- var timeoutChan <-chan time.Time
- var respChan chan *RequestVoteResponse
- for s.State() == Candidate {
- if doVote {
- // Increment current term, vote for self.
- s.currentTerm++
- s.votedFor = s.name
- // Send RequestVote RPCs to all other servers.
- respChan = make(chan *RequestVoteResponse, len(s.peers))
- for _, peer := range s.peers {
- go peer.sendVoteRequest(newRequestVoteRequest(s.currentTerm, s.name, lastLogIndex, lastLogTerm), respChan)
- }
- // Wait for either:
- // * Votes received from majority of servers: become leader
- // * AppendEntries RPC received from new leader: step down.
- // * Election timeout elapses without election resolution: increment term, start new election
- // * Discover higher term: step down (§5.1)
- votesGranted = 1
- timeoutChan = afterBetween(s.ElectionTimeout(), s.ElectionTimeout()*2)
- doVote = false
- }
- // If we received enough votes then stop waiting for more votes.
- // And return from the candidate loop
- if votesGranted == s.QuorumSize() {
- s.debugln("server.candidate.recv.enough.votes")
- s.setState(Leader)
- return
- }
- // Collect votes from peers.
- select {
- case stop := <-s.stopped:
- s.setState(Stopped)
- stop <- true
- return
- case resp := <-respChan:
- if success := s.processVoteResponse(resp); success {
- s.debugln("server.candidate.vote.granted: ", votesGranted)
- votesGranted++
- }
- case e := <-s.c:
- var err error
- switch req := e.target.(type) {
- case Command:
- err = NotLeaderError
- case *AppendEntriesRequest:
- e.returnValue, _ = s.processAppendEntriesRequest(req)
- case *RequestVoteRequest:
- e.returnValue, _ = s.processRequestVoteRequest(req)
- }
- // Callback to event.
- e.c <- err
- case <-timeoutChan:
- doVote = true
- }
- }
- }
- // The event loop that is run when the server is in a Leader state.
- func (s *server) leaderLoop() {
- logIndex, _ := s.log.lastInfo()
- // Update the peers prevLogIndex to leader's lastLogIndex and start heartbeat.
- s.debugln("leaderLoop.set.PrevIndex to ", logIndex)
- for _, peer := range s.peers {
- peer.setPrevLogIndex(logIndex)
- peer.startHeartbeat()
- }
- // Commit a NOP after the server becomes leader. From the Raft paper:
- // "Upon election: send initial empty AppendEntries RPCs (heartbeat) to
- // each server; repeat during idle periods to prevent election timeouts
- // (§5.2)". The heartbeats started above do the "idle" period work.
- go s.Do(NOPCommand{})
- // Begin to collect response from followers
- for s.State() == Leader {
- var err error
- select {
- case stop := <-s.stopped:
- // Stop all peers before stop
- for _, peer := range s.peers {
- peer.stopHeartbeat(false)
- }
- s.setState(Stopped)
- stop <- true
- return
- case e := <-s.c:
- switch req := e.target.(type) {
- case Command:
- s.processCommand(req, e)
- continue
- case *AppendEntriesRequest:
- e.returnValue, _ = s.processAppendEntriesRequest(req)
- case *AppendEntriesResponse:
- s.processAppendEntriesResponse(req)
- case *RequestVoteRequest:
- e.returnValue, _ = s.processRequestVoteRequest(req)
- }
- // Callback to event.
- e.c <- err
- }
- }
- s.syncedPeer = nil
- }
- func (s *server) snapshotLoop() {
- for s.State() == Snapshotting {
- var err error
- select {
- case stop := <-s.stopped:
- s.setState(Stopped)
- stop <- true
- return
- case e := <-s.c:
- switch req := e.target.(type) {
- case Command:
- err = NotLeaderError
- case *AppendEntriesRequest:
- e.returnValue, _ = s.processAppendEntriesRequest(req)
- case *RequestVoteRequest:
- e.returnValue, _ = s.processRequestVoteRequest(req)
- case *SnapshotRecoveryRequest:
- e.returnValue = s.processSnapshotRecoveryRequest(req)
- }
- // Callback to event.
- e.c <- err
- }
- }
- }
- //--------------------------------------
- // Commands
- //--------------------------------------
- // Attempts to execute a command and replicate it. The function will return
- // when the command has been successfully committed or an error has occurred.
- func (s *server) Do(command Command) (interface{}, error) {
- return s.send(command)
- }
- // Processes a command.
- func (s *server) processCommand(command Command, e *ev) {
- s.debugln("server.command.process")
- // Create an entry for the command in the log.
- entry, err := s.log.createEntry(s.currentTerm, command, e)
- if err != nil {
- s.debugln("server.command.log.entry.error:", err)
- e.c <- err
- return
- }
- if err := s.log.appendEntry(entry); err != nil {
- s.debugln("server.command.log.error:", err)
- e.c <- err
- return
- }
- s.syncedPeer[s.Name()] = true
- if len(s.peers) == 0 {
- commitIndex := s.log.currentIndex()
- s.log.setCommitIndex(commitIndex)
- s.debugln("commit index ", commitIndex)
- }
- }
- //--------------------------------------
- // Append Entries
- //--------------------------------------
- // Appends zero or more log entry from the leader to this server.
- func (s *server) AppendEntries(req *AppendEntriesRequest) *AppendEntriesResponse {
- ret, _ := s.send(req)
- resp, _ := ret.(*AppendEntriesResponse)
- return resp
- }
- // Processes the "append entries" request.
- func (s *server) processAppendEntriesRequest(req *AppendEntriesRequest) (*AppendEntriesResponse, bool) {
- s.traceln("server.ae.process")
- if req.Term < s.currentTerm {
- s.debugln("server.ae.error: stale term")
- return newAppendEntriesResponse(s.currentTerm, false, s.log.currentIndex(), s.log.CommitIndex()), false
- }
- if req.Term == s.currentTerm {
- _assert(s.state != Leader, "leader.elected.at.same.term.%d\n", s.currentTerm)
- // change state to follower
- s.state = Follower
- // discover new leader when candidate
- // save leader name when follower
- s.leader = req.LeaderName
- } else {
- // Update term and leader.
- s.updateCurrentTerm(req.Term, req.LeaderName)
- }
- // Reject if log doesn't contain a matching previous entry.
- if err := s.log.truncate(req.PrevLogIndex, req.PrevLogTerm); err != nil {
- s.debugln("server.ae.truncate.error: ", err)
- return newAppendEntriesResponse(s.currentTerm, false, s.log.currentIndex(), s.log.CommitIndex()), true
- }
- // Append entries to the log.
- if err := s.log.appendEntries(req.Entries); err != nil {
- s.debugln("server.ae.append.error: ", err)
- return newAppendEntriesResponse(s.currentTerm, false, s.log.currentIndex(), s.log.CommitIndex()), true
- }
- // Commit up to the commit index.
- if err := s.log.setCommitIndex(req.CommitIndex); err != nil {
- s.debugln("server.ae.commit.error: ", err)
- return newAppendEntriesResponse(s.currentTerm, false, s.log.currentIndex(), s.log.CommitIndex()), true
- }
- // once the server appended and committed all the log entries from the leader
- return newAppendEntriesResponse(s.currentTerm, true, s.log.currentIndex(), s.log.CommitIndex()), true
- }
- // Processes the "append entries" response from the peer. This is only
- // processed when the server is a leader. Responses received during other
- // states are dropped.
- func (s *server) processAppendEntriesResponse(resp *AppendEntriesResponse) {
- // If we find a higher term then change to a follower and exit.
- if resp.Term() > s.Term() {
- s.updateCurrentTerm(resp.Term(), "")
- return
- }
- // panic response if it's not successful.
- if !resp.Success() {
- return
- }
- // if one peer successfully append a log from the leader term,
- // we add it to the synced list
- if resp.append == true {
- s.syncedPeer[resp.peer] = true
- }
- // Increment the commit count to make sure we have a quorum before committing.
- if len(s.syncedPeer) < s.QuorumSize() {
- return
- }
- // Determine the committed index that a majority has.
- var indices []uint64
- indices = append(indices, s.log.currentIndex())
- for _, peer := range s.peers {
- indices = append(indices, peer.getPrevLogIndex())
- }
- sort.Sort(sort.Reverse(uint64Slice(indices)))
- // We can commit up to the index which the majority of the members have appended.
- commitIndex := indices[s.QuorumSize()-1]
- committedIndex := s.log.commitIndex
- if commitIndex > committedIndex {
- // leader needs to do a fsync before committing log entries
- s.log.sync()
- s.log.setCommitIndex(commitIndex)
- s.debugln("commit index ", commitIndex)
- }
- }
- // processVoteReponse processes a vote request:
- // 1. if the vote is granted for the current term of the candidate, return true
- // 2. if the vote is denied due to smaller term, update the term of this server
- // which will also cause the candidate to step-down, and return false.
- // 3. if the vote is for a smaller term, ignore it and return false.
- func (s *server) processVoteResponse(resp *RequestVoteResponse) bool {
- if resp.VoteGranted && resp.Term == s.currentTerm {
- return true
- }
- if resp.Term > s.currentTerm {
- s.debugln("server.candidate.vote.failed")
- s.updateCurrentTerm(resp.Term, "")
- } else {
- s.debugln("server.candidate.vote: denied")
- }
- return false
- }
- //--------------------------------------
- // Request Vote
- //--------------------------------------
- // Requests a vote from a server. A vote can be obtained if the vote's term is
- // at the server's current term and the server has not made a vote yet. A vote
- // can also be obtained if the term is greater than the server's current term.
- func (s *server) RequestVote(req *RequestVoteRequest) *RequestVoteResponse {
- ret, _ := s.send(req)
- resp, _ := ret.(*RequestVoteResponse)
- return resp
- }
- // Processes a "request vote" request.
- func (s *server) processRequestVoteRequest(req *RequestVoteRequest) (*RequestVoteResponse, bool) {
- // If the request is coming from an old term then reject it.
- if req.Term < s.Term() {
- s.debugln("server.rv.deny.vote: cause stale term")
- return newRequestVoteResponse(s.currentTerm, false), false
- }
- // If the term of the request peer is larger than this node, update the term
- // If the term is equal and we've already voted for a different candidate then
- // don't vote for this candidate.
- if req.Term > s.Term() {
- s.updateCurrentTerm(req.Term, "")
- } else if s.votedFor != "" && s.votedFor != req.CandidateName {
- s.debugln("server.deny.vote: cause duplicate vote: ", req.CandidateName,
- " already vote for ", s.votedFor)
- return newRequestVoteResponse(s.currentTerm, false), false
- }
- // If the candidate's log is not at least as up-to-date as our last log then don't vote.
- lastIndex, lastTerm := s.log.lastInfo()
- if lastIndex > req.LastLogIndex || lastTerm > req.LastLogTerm {
- s.debugln("server.deny.vote: cause out of date log: ", req.CandidateName,
- "Index :[", lastIndex, "]", " [", req.LastLogIndex, "]",
- "Term :[", lastTerm, "]", " [", req.LastLogTerm, "]")
- return newRequestVoteResponse(s.currentTerm, false), false
- }
- // If we made it this far then cast a vote and reset our election time out.
- s.debugln("server.rv.vote: ", s.name, " votes for", req.CandidateName, "at term", req.Term)
- s.votedFor = req.CandidateName
- return newRequestVoteResponse(s.currentTerm, true), true
- }
- //--------------------------------------
- // Membership
- //--------------------------------------
- // Adds a peer to the server.
- func (s *server) AddPeer(name string, connectiongString string) error {
- s.debugln("server.peer.add: ", name, len(s.peers))
- // Do not allow peers to be added twice.
- if s.peers[name] != nil {
- return nil
- }
- // Skip the Peer if it has the same name as the Server
- if s.name != name {
- peer := newPeer(s, name, connectiongString, s.heartbeatInterval)
- if s.State() == Leader {
- peer.startHeartbeat()
- }
- s.peers[peer.Name] = peer
- s.DispatchEvent(newEvent(AddPeerEventType, name, nil))
- }
- // Write the configuration to file.
- s.writeConf()
- return nil
- }
- // Removes a peer from the server.
- func (s *server) RemovePeer(name string) error {
- s.debugln("server.peer.remove: ", name, len(s.peers))
- // Skip the Peer if it has the same name as the Server
- if name != s.Name() {
- // Return error if peer doesn't exist.
- peer := s.peers[name]
- if peer == nil {
- return fmt.Errorf("raft: Peer not found: %s", name)
- }
- // Stop peer and remove it.
- if s.State() == Leader {
- // We create a go routine here to avoid potential deadlock.
- // We are holding log write lock when reach this line of code.
- // Peer.stopHeartbeat can be blocked without go routine, if the
- // target go routine (which we want to stop) is calling
- // log.getEntriesAfter and waiting for log read lock.
- // So we might be holding log lock and waiting for log lock,
- // which lead to a deadlock.
- // TODO(xiangli) refactor log lock
- go peer.stopHeartbeat(true)
- }
- delete(s.peers, name)
- s.DispatchEvent(newEvent(RemovePeerEventType, name, nil))
- }
- // Write the configuration to file.
- s.writeConf()
- return nil
- }
- //--------------------------------------
- // Log compaction
- //--------------------------------------
- func (s *server) TakeSnapshot() error {
- if s.stateMachine == nil {
- return errors.New("Snapshot: Cannot create snapshot. Missing state machine.")
- }
- // Shortcut without lock
- // Exit if the server is currently creating a snapshot.
- if s.pendingSnapshot != nil {
- return errors.New("Snapshot: Last snapshot is not finished.")
- }
- // TODO: acquire the lock and no more committed is allowed
- // This will be done after finishing refactoring heartbeat
- s.debugln("take.snapshot")
- lastIndex, lastTerm := s.log.commitInfo()
- // check if there is log has been committed since the
- // last snapshot.
- if lastIndex == s.log.startIndex {
- return nil
- }
- path := s.SnapshotPath(lastIndex, lastTerm)
- // Attach snapshot to pending snapshot and save it to disk.
- s.pendingSnapshot = &Snapshot{lastIndex, lastTerm, nil, nil, path}
- state, err := s.stateMachine.Save()
- if err != nil {
- return err
- }
- // Clone the list of peers.
- peers := make([]*Peer, 0, len(s.peers)+1)
- for _, peer := range s.peers {
- peers = append(peers, peer.clone())
- }
- peers = append(peers, &Peer{Name: s.Name(), ConnectionString: s.connectionString})
- // Attach snapshot to pending snapshot and save it to disk.
- s.pendingSnapshot.Peers = peers
- s.pendingSnapshot.State = state
- s.saveSnapshot()
- // We keep some log entries after the snapshot.
- // We do not want to send the whole snapshot to the slightly slow machines
- if lastIndex-s.log.startIndex > NumberOfLogEntriesAfterSnapshot {
- compactIndex := lastIndex - NumberOfLogEntriesAfterSnapshot
- compactTerm := s.log.getEntry(compactIndex).Term()
- s.log.compact(compactIndex, compactTerm)
- }
- return nil
- }
- // Retrieves the log path for the server.
- func (s *server) saveSnapshot() error {
- if s.pendingSnapshot == nil {
- return errors.New("pendingSnapshot.is.nil")
- }
- // Write snapshot to disk.
- if err := s.pendingSnapshot.save(); err != nil {
- return err
- }
- // Swap the current and last snapshots.
- tmp := s.snapshot
- s.snapshot = s.pendingSnapshot
- // Delete the previous snapshot if there is any change
- if tmp != nil && !(tmp.LastIndex == s.snapshot.LastIndex && tmp.LastTerm == s.snapshot.LastTerm) {
- tmp.remove()
- }
- s.pendingSnapshot = nil
- return nil
- }
- // Retrieves the log path for the server.
- func (s *server) SnapshotPath(lastIndex uint64, lastTerm uint64) string {
- return path.Join(s.path, "snapshot", fmt.Sprintf("%v_%v.ss", lastTerm, lastIndex))
- }
- func (s *server) RequestSnapshot(req *SnapshotRequest) *SnapshotResponse {
- ret, _ := s.send(req)
- resp, _ := ret.(*SnapshotResponse)
- return resp
- }
- func (s *server) processSnapshotRequest(req *SnapshotRequest) *SnapshotResponse {
- // If the follower’s log contains an entry at the snapshot’s last index with a term
- // that matches the snapshot’s last term, then the follower already has all the
- // information found in the snapshot and can reply false.
- entry := s.log.getEntry(req.LastIndex)
- if entry != nil && entry.Term() == req.LastTerm {
- return newSnapshotResponse(false)
- }
- // Update state.
- s.setState(Snapshotting)
- return newSnapshotResponse(true)
- }
- func (s *server) SnapshotRecoveryRequest(req *SnapshotRecoveryRequest) *SnapshotRecoveryResponse {
- ret, _ := s.send(req)
- resp, _ := ret.(*SnapshotRecoveryResponse)
- return resp
- }
- func (s *server) processSnapshotRecoveryRequest(req *SnapshotRecoveryRequest) *SnapshotRecoveryResponse {
- // Recover state sent from request.
- if err := s.stateMachine.Recovery(req.State); err != nil {
- panic("cannot recover from previous state")
- }
- // Recover the cluster configuration.
- s.peers = make(map[string]*Peer)
- for _, peer := range req.Peers {
- s.AddPeer(peer.Name, peer.ConnectionString)
- }
- // Update log state.
- s.currentTerm = req.LastTerm
- s.log.updateCommitIndex(req.LastIndex)
- // Create local snapshot.
- s.pendingSnapshot = &Snapshot{req.LastIndex, req.LastTerm, req.Peers, req.State, s.SnapshotPath(req.LastIndex, req.LastTerm)}
- s.saveSnapshot()
- // Clear the previous log entries.
- s.log.compact(req.LastIndex, req.LastTerm)
- return newSnapshotRecoveryResponse(req.LastTerm, true, req.LastIndex)
- }
- // Load a snapshot at restart
- func (s *server) LoadSnapshot() error {
- // Open snapshot/ directory.
- dir, err := os.OpenFile(path.Join(s.path, "snapshot"), os.O_RDONLY, 0)
- if err != nil {
- s.debugln("cannot.open.snapshot: ", err)
- return err
- }
- // Retrieve a list of all snapshots.
- filenames, err := dir.Readdirnames(-1)
- if err != nil {
- dir.Close()
- panic(err)
- }
- dir.Close()
- if len(filenames) == 0 {
- s.debugln("no.snapshot.to.load")
- return nil
- }
- // Grab the latest snapshot.
- sort.Strings(filenames)
- snapshotPath := path.Join(s.path, "snapshot", filenames[len(filenames)-1])
- // Read snapshot data.
- file, err := os.OpenFile(snapshotPath, os.O_RDONLY, 0)
- if err != nil {
- return err
- }
- defer file.Close()
- // Check checksum.
- var checksum uint32
- n, err := fmt.Fscanf(file, "%08x\n", &checksum)
- if err != nil {
- return err
- } else if n != 1 {
- return errors.New("checksum.err: bad.snapshot.file")
- }
- // Load remaining snapshot contents.
- b, err := ioutil.ReadAll(file)
- if err != nil {
- return err
- }
- // Generate checksum.
- byteChecksum := crc32.ChecksumIEEE(b)
- if uint32(checksum) != byteChecksum {
- s.debugln(checksum, " ", byteChecksum)
- return errors.New("bad snapshot file")
- }
- // Decode snapshot.
- if err = json.Unmarshal(b, &s.snapshot); err != nil {
- s.debugln("unmarshal.snapshot.error: ", err)
- return err
- }
- // Recover snapshot into state machine.
- if err = s.stateMachine.Recovery(s.snapshot.State); err != nil {
- s.debugln("recovery.snapshot.error: ", err)
- return err
- }
- // Recover cluster configuration.
- for _, peer := range s.snapshot.Peers {
- s.AddPeer(peer.Name, peer.ConnectionString)
- }
- // Update log state.
- s.log.startTerm = s.snapshot.LastTerm
- s.log.startIndex = s.snapshot.LastIndex
- s.log.updateCommitIndex(s.snapshot.LastIndex)
- return err
- }
- //--------------------------------------
- // Config File
- //--------------------------------------
- // Flushes commit index to the disk.
- // So when the raft server restarts, it will commit upto the flushed commitIndex.
- func (s *server) FlushCommitIndex() {
- s.debugln("server.conf.update")
- // Write the configuration to file.
- s.writeConf()
- }
- func (s *server) writeConf() {
- peers := make([]*Peer, len(s.peers))
- i := 0
- for _, peer := range s.peers {
- peers[i] = peer.clone()
- i++
- }
- r := &Config{
- CommitIndex: s.log.commitIndex,
- Peers: peers,
- }
- b, _ := json.Marshal(r)
- confPath := path.Join(s.path, "conf")
- tmpConfPath := path.Join(s.path, "conf.tmp")
- err := writeFileSynced(tmpConfPath, b, 0600)
- if err != nil {
- panic(err)
- }
- os.Rename(tmpConfPath, confPath)
- }
- // Read the configuration for the server.
- func (s *server) readConf() error {
- confPath := path.Join(s.path, "conf")
- s.debugln("readConf.open ", confPath)
- // open conf file
- b, err := ioutil.ReadFile(confPath)
- if err != nil {
- return nil
- }
- conf := &Config{}
- if err = json.Unmarshal(b, conf); err != nil {
- return err
- }
- s.log.updateCommitIndex(conf.CommitIndex)
- return nil
- }
- //--------------------------------------
- // Debugging
- //--------------------------------------
- func (s *server) debugln(v ...interface{}) {
- if logLevel > Debug {
- debugf("[%s Term:%d] %s", s.name, s.Term(), fmt.Sprintln(v...))
- }
- }
- func (s *server) traceln(v ...interface{}) {
- if logLevel > Trace {
- tracef("[%s] %s", s.name, fmt.Sprintln(v...))
- }
- }
|