Browse Source

bump(github.com/coreos/raft): ac7be58b1bec49dfcfc7216df4ae27173da1fa57

Ben Johnson 12 years ago
parent
commit
81a73dbc80

+ 3 - 1
third_party/github.com/coreos/raft/.travis.yml

@@ -1,8 +1,10 @@
 language: go
 language: go
 
 
 go:
 go:
-  - 1.1
+  - 1.1.2
+  - 1.2
 
 
 install:
 install:
+  - go get github.com/stretchr/testify/assert
   - make dependencies
   - make dependencies
 
 

+ 1 - 0
third_party/github.com/coreos/raft/Makefile

@@ -8,6 +8,7 @@ dependencies:
 	go get -d .
 	go get -d .
 
 
 test:
 test:
+	go test -i ./...
 	go test -v ./...
 	go test -v ./...
 
 
 .PHONY: coverage dependencies test
 .PHONY: coverage dependencies test

+ 50 - 2
third_party/github.com/coreos/raft/README.md

@@ -34,11 +34,27 @@ go-raft is under the MIT license.
 
 
 These projects are built on go-raft:
 These projects are built on go-raft:
 
 
-- [coreos/etcd](https://github.com/coreos/etcd) - A highly-available key value store for shared configuration and service discovery
-- [benbjohnson/raftd](https://github.com/benbjohnson/raftd) - A reference implementation for using the go-raft library for distributed consensus.
+- [coreos/etcd](https://github.com/coreos/etcd) - A highly-available key value store for shared configuration and service discovery.
+- [goraft/raftd](https://github.com/goraft/raftd) - A reference implementation for using the go-raft library for distributed consensus.
+- [skynetservices/skydns](https://github.com/skynetservices/skydns) - DNS for skynet or any other service discovery.
+- [influxdb/influxdb](https://github.com/influxdb/influxdb) - An open-source, distributed, time series, events, and metrics database.
 
 
 If you have a project that you're using go-raft in, please add it to this README so others can see implementation examples.
 If you have a project that you're using go-raft in, please add it to this README so others can see implementation examples.
 
 
+## Contact and Resources
+
+- [raft-dev][raft-dev] is a mailing list for discussion about best practices
+  and implementation of Raft. Not goraft specific but helpful if you have
+  questions.
+- [Slides from Ben's talk][bens-talk] which includes easy to understand
+  diagrams of leader election and replication
+- The [Raft Consensus homepage][raft-home] has links to additional raft
+  implementations, slides to talks on Raft and general information
+
+[raft-home]:  http://raftconsensus.github.io/
+[raft-dev]: https://groups.google.com/forum/#!forum/raft-dev
+[bens-talk]: https://speakerdeck.com/benbjohnson/raft-the-understandable-distributed-consensus-protocol
+
 ## The Raft Protocol
 ## The Raft Protocol
 
 
 This section provides a summary of the Raft protocol from a high level.
 This section provides a summary of the Raft protocol from a high level.
@@ -83,6 +99,38 @@ By ensuring that this log is replicated identically between all the nodes in the
 Replicating the log under normal conditions is done by sending an `AppendEntries` RPC from the leader to each of the other servers in the cluster (called Peers).
 Replicating the log under normal conditions is done by sending an `AppendEntries` RPC from the leader to each of the other servers in the cluster (called Peers).
 Each peer will append the entries from the leader through a 2-phase commit process which ensure that a majority of servers in the cluster have entries written to log.
 Each peer will append the entries from the leader through a 2-phase commit process which ensure that a majority of servers in the cluster have entries written to log.
 
 
+
+## Raft in Practice
+
+### Optimal Cluster Size
+
+The primary consideration when choosing the node count in your Raft cluster is the number of nodes that can simultaneously fail.
+Because Raft requires a majority of nodes to be available to make progress, the number of node failures the cluster can tolerate is `(n / 2) - 1`.
+
+This means that a 3-node cluster can tolerate 1 node failure.
+If 2 nodes fail then the cluster cannot commit entries or elect a new leader so progress stops.
+A 5-node cluster can tolerate 2 node failures. A 9-node cluster can tolerate 4 node failures.
+It is unlikely that 4 nodes will simultaneously fail so clusters larger than 9 nodes are not common.
+
+Another consideration is performance.
+The leader must replicate log entries for each follower node so CPU and networking resources can quickly be bottlenecked under stress in a large cluster.
+
+
+### Scaling Raft
+
+Once you grow beyond the maximum size of your cluster there are a few options for scaling Raft:
+
+1. *Core nodes with dumb replication.*
+   This option requires you to maintain a small cluster (e.g. 5 nodes) that is involved in the Raft process and then replicate only committed log entries to the remaining nodes in the cluster.
+   This works well if you have reads in your system that can be stale.
+
+2. *Sharding.*
+   This option requires that you segment your data into different clusters.
+   This option works well if you need very strong consistency and therefore need to read and write heavily from the leader.
+
+If you have a very large cluster that you need to replicate to using Option 1 then you may want to look at performing hierarchical replication so that nodes can better share the load.
+
+
 ## History
 ## History
 
 
 Ben Johnson started this library for use in his behavioral analytics database called [Sky](https://github.com/skydb/sky).
 Ben Johnson started this library for use in his behavioral analytics database called [Sky](https://github.com/skydb/sky).

+ 55 - 0
third_party/github.com/coreos/raft/event.go

@@ -0,0 +1,55 @@
+package raft
+
+const (
+	StateChangeEventType  = "stateChange"
+	LeaderChangeEventType = "leaderChange"
+	TermChangeEventType   = "termChange"
+	AddPeerEventType      = "addPeer"
+	RemovePeerEventType   = "removePeer"
+)
+
+// Event represents an action that occurred within the Raft library.
+// Listeners can subscribe to event types by using the Server.AddEventListener() function.
+type Event interface {
+	Type() string
+	Source() interface{}
+	Value() interface{}
+	PrevValue() interface{}
+}
+
+// event is the concrete implementation of the Event interface.
+type event struct {
+	typ       string
+	source    interface{}
+	value     interface{}
+	prevValue interface{}
+}
+
+// newEvent creates a new event.
+func newEvent(typ string, value interface{}, prevValue interface{}) *event {
+	return &event{
+		typ:       typ,
+		value:     value,
+		prevValue: prevValue,
+	}
+}
+
+// Type returns the type of event that occurred.
+func (e *event) Type() string {
+	return e.typ
+}
+
+// Source returns the object that dispatched the event.
+func (e *event) Source() interface{} {
+	return e.source
+}
+
+// Value returns the current value associated with the event, if applicable.
+func (e *event) Value() interface{} {
+	return e.value
+}
+
+// PrevValue returns the previous value associated with the event, if applicable.
+func (e *event) PrevValue() interface{} {
+	return e.prevValue
+}

+ 50 - 0
third_party/github.com/coreos/raft/event_dispatcher.go

@@ -0,0 +1,50 @@
+package raft
+
+import (
+	"sync"
+)
+
+// eventDispatcher is responsible for managing listeners for named events
+// and dispatching event notifications to those listeners.
+type eventDispatcher struct {
+	sync.RWMutex
+	source    interface{}
+	listeners map[string]eventListeners
+}
+
+// EventListener is a function that can receive event notifications.
+type EventListener func(Event)
+
+// EventListeners represents a collection of individual listeners.
+type eventListeners []EventListener
+
+// newEventDispatcher creates a new eventDispatcher instance.
+func newEventDispatcher(source interface{}) *eventDispatcher {
+	return &eventDispatcher{
+		source:    source,
+		listeners: make(map[string]eventListeners),
+	}
+}
+
+// AddEventListener adds a listener function for a given event type.
+func (d *eventDispatcher) AddEventListener(typ string, listener EventListener) {
+	d.Lock()
+	defer d.Unlock()
+	d.listeners[typ] = append(d.listeners[typ], listener)
+}
+
+// DispatchEvent dispatches an event.
+func (d *eventDispatcher) DispatchEvent(e Event) {
+	d.RLock()
+	defer d.RUnlock()
+
+	// Automatically set the event source.
+	if e, ok := e.(*event); ok {
+		e.source = d.source
+	}
+
+	// Dispatch the event to all listeners.
+	for _, l := range d.listeners[e.Type()] {
+		l(e)
+	}
+}

+ 45 - 0
third_party/github.com/coreos/raft/event_dispatcher_test.go

@@ -0,0 +1,45 @@
+package raft
+
+import (
+	"testing"
+
+	"github.com/stretchr/testify/assert"
+)
+
+// Ensure that we can listen and dispatch events.
+func TestDispatchEvent(t *testing.T) {
+	var count int
+	dispatcher := newEventDispatcher(nil)
+	dispatcher.AddEventListener("foo", func(e Event) {
+		count += 1
+	})
+	dispatcher.AddEventListener("foo", func(e Event) {
+		count += 10
+	})
+	dispatcher.AddEventListener("bar", func(e Event) {
+		count += 100
+	})
+	dispatcher.DispatchEvent(&event{typ: "foo", value: nil, prevValue: nil})
+	assert.Equal(t, 11, count)
+}
+
+// Ensure that event is properly passed to listener.
+func TestEventListener(t *testing.T) {
+	dispatcher := newEventDispatcher("X")
+	dispatcher.AddEventListener("foo", func(e Event) {
+		assert.Equal(t, "foo", e.Type())
+		assert.Equal(t, "X", e.Source())
+		assert.Equal(t, 10, e.Value())
+		assert.Equal(t, 20, e.PrevValue())
+	})
+	dispatcher.DispatchEvent(&event{typ: "foo", value: 10, prevValue: 20})
+}
+
+// Benchmark the performance of event dispatch.
+func BenchmarkEventDispatch(b *testing.B) {
+	dispatcher := newEventDispatcher(nil)
+	dispatcher.AddEventListener("xxx", func(e Event) {})
+	for i := 0; i < b.N; i++ {
+		dispatcher.DispatchEvent(&event{typ: "foo", value: 10, prevValue: 20})
+	}
+}

+ 4 - 4
third_party/github.com/coreos/raft/http_transporter.go

@@ -23,6 +23,7 @@ type HTTPTransporter struct {
 	prefix            string
 	prefix            string
 	appendEntriesPath string
 	appendEntriesPath string
 	requestVotePath   string
 	requestVotePath   string
+	httpClient        http.Client
 }
 }
 
 
 type HTTPMuxer interface {
 type HTTPMuxer interface {
@@ -42,6 +43,7 @@ func NewHTTPTransporter(prefix string) *HTTPTransporter {
 		prefix:            prefix,
 		prefix:            prefix,
 		appendEntriesPath: fmt.Sprintf("%s%s", prefix, "/appendEntries"),
 		appendEntriesPath: fmt.Sprintf("%s%s", prefix, "/appendEntries"),
 		requestVotePath:   fmt.Sprintf("%s%s", prefix, "/requestVote"),
 		requestVotePath:   fmt.Sprintf("%s%s", prefix, "/requestVote"),
+		httpClient:        http.Client{Transport: &http.Transport{DisableKeepAlives: false}},
 	}
 	}
 }
 }
 
 
@@ -97,8 +99,7 @@ func (t *HTTPTransporter) SendAppendEntriesRequest(server Server, peer *Peer, re
 	url := fmt.Sprintf("%s%s", peer.ConnectionString, t.AppendEntriesPath())
 	url := fmt.Sprintf("%s%s", peer.ConnectionString, t.AppendEntriesPath())
 	traceln(server.Name(), "POST", url)
 	traceln(server.Name(), "POST", url)
 
 
-	client := &http.Client{Transport: &http.Transport{DisableKeepAlives: t.DisableKeepAlives}}
-	httpResp, err := client.Post(url, "application/protobuf", &b)
+	httpResp, err := t.httpClient.Post(url, "application/protobuf", &b)
 	if httpResp == nil || err != nil {
 	if httpResp == nil || err != nil {
 		traceln("transporter.ae.response.error:", err)
 		traceln("transporter.ae.response.error:", err)
 		return nil
 		return nil
@@ -125,8 +126,7 @@ func (t *HTTPTransporter) SendVoteRequest(server Server, peer *Peer, req *Reques
 	url := fmt.Sprintf("%s%s", peer.ConnectionString, t.RequestVotePath())
 	url := fmt.Sprintf("%s%s", peer.ConnectionString, t.RequestVotePath())
 	traceln(server.Name(), "POST", url)
 	traceln(server.Name(), "POST", url)
 
 
-	client := &http.Client{Transport: &http.Transport{DisableKeepAlives: t.DisableKeepAlives}}
-	httpResp, err := client.Post(url, "application/protobuf", &b)
+	httpResp, err := t.httpClient.Post(url, "application/protobuf", &b)
 	if httpResp == nil || err != nil {
 	if httpResp == nil || err != nil {
 		traceln("transporter.rv.response.error:", err)
 		traceln("transporter.rv.response.error:", err)
 		return nil
 		return nil

+ 12 - 9
third_party/github.com/coreos/raft/log.go

@@ -333,7 +333,7 @@ func (l *Log) commitInfo() (index uint64, term uint64) {
 	return entry.Index, entry.Term
 	return entry.Index, entry.Term
 }
 }
 
 
-// Retrieves the last index and term that has been committed to the log.
+// Retrieves the last index and term that has been appended to the log.
 func (l *Log) lastInfo() (index uint64, term uint64) {
 func (l *Log) lastInfo() (index uint64, term uint64) {
 	l.mutex.RLock()
 	l.mutex.RLock()
 	defer l.mutex.RUnlock()
 	defer l.mutex.RUnlock()
@@ -366,8 +366,7 @@ func (l *Log) setCommitIndex(index uint64) error {
 	// this is not error any more after limited the number of sending entries
 	// this is not error any more after limited the number of sending entries
 	// commit up to what we already have
 	// commit up to what we already have
 	if index > l.startIndex+uint64(len(l.entries)) {
 	if index > l.startIndex+uint64(len(l.entries)) {
-		debugln("raft.StartIndex", l.startIndex)
-		debugln("raft.Log: Commit index", index, "set back to ", l.startIndex+uint64(len(l.entries)))
+		debugln("raft.Log: Commit index", index, "set back to ", len(l.entries))
 		index = l.startIndex + uint64(len(l.entries))
 		index = l.startIndex + uint64(len(l.entries))
 	}
 	}
 
 
@@ -387,7 +386,6 @@ func (l *Log) setCommitIndex(index uint64) error {
 	// follower 2 should reply success and let leader 3 update the committed index to 80
 	// follower 2 should reply success and let leader 3 update the committed index to 80
 
 
 	if index < l.commitIndex {
 	if index < l.commitIndex {
-		debugln("raft.Log: index", index, "committedIndex", l.commitIndex)
 		return nil
 		return nil
 	}
 	}
 
 
@@ -475,8 +473,7 @@ func (l *Log) truncate(index uint64, term uint64) error {
 // Append
 // Append
 //--------------------------------------
 //--------------------------------------
 
 
-// Appends a series of entries to the log. These entries are not written to
-// disk until setCommitIndex() is called.
+// Appends a series of entries to the log.
 func (l *Log) appendEntries(entries []*LogEntry) error {
 func (l *Log) appendEntries(entries []*LogEntry) error {
 	l.mutex.Lock()
 	l.mutex.Lock()
 	defer l.mutex.Unlock()
 	defer l.mutex.Unlock()
@@ -497,14 +494,20 @@ func (l *Log) appendEntries(entries []*LogEntry) error {
 		startPosition += size
 		startPosition += size
 	}
 	}
 	w.Flush()
 	w.Flush()
+	err = l.file.Sync()
+
+	if err != nil {
+		panic(err)
+	}
 
 
 	return nil
 	return nil
 }
 }
 
 
-// Writes a single log entry to the end of the log. This function does not
-// obtain a lock and should only be used internally. Use AppendEntries() and
-// AppendEntry() to use it externally.
+// Writes a single log entry to the end of the log.
 func (l *Log) appendEntry(entry *LogEntry) error {
 func (l *Log) appendEntry(entry *LogEntry) error {
+	l.mutex.Lock()
+	defer l.mutex.Unlock()
+
 	if l.file == nil {
 	if l.file == nil {
 		return errors.New("raft.Log: Log is not open")
 		return errors.New("raft.Log: Log is not open")
 	}
 	}

+ 3 - 1
third_party/github.com/coreos/raft/peer.go

@@ -126,6 +126,8 @@ func (p *Peer) heartbeat(c chan bool) {
 
 
 	c <- true
 	c <- true
 
 
+	ticker := time.Tick(p.heartbeatTimeout)
+
 	debugln("peer.heartbeat: ", p.Name, p.heartbeatTimeout)
 	debugln("peer.heartbeat: ", p.Name, p.heartbeatTimeout)
 
 
 	for {
 	for {
@@ -142,7 +144,7 @@ func (p *Peer) heartbeat(c chan bool) {
 				return
 				return
 			}
 			}
 
 
-		case <-time.After(p.heartbeatTimeout):
+		case <-ticker:
 			p.flush()
 			p.flush()
 		}
 		}
 	}
 	}

+ 68 - 19
third_party/github.com/coreos/raft/server.go

@@ -94,9 +94,12 @@ type Server interface {
 	Do(command Command) (interface{}, error)
 	Do(command Command) (interface{}, error)
 	TakeSnapshot() error
 	TakeSnapshot() error
 	LoadSnapshot() error
 	LoadSnapshot() error
+	AddEventListener(string, EventListener)
 }
 }
 
 
 type server struct {
 type server struct {
+	*eventDispatcher
+
 	name        string
 	name        string
 	path        string
 	path        string
 	state       string
 	state       string
@@ -111,7 +114,7 @@ type server struct {
 	mutex      sync.RWMutex
 	mutex      sync.RWMutex
 	syncedPeer map[string]bool
 	syncedPeer map[string]bool
 
 
-	c                chan *event
+	c                chan *ev
 	electionTimeout  time.Duration
 	electionTimeout  time.Duration
 	heartbeatTimeout time.Duration
 	heartbeatTimeout time.Duration
 
 
@@ -123,8 +126,8 @@ type server struct {
 	connectionString string
 	connectionString string
 }
 }
 
 
-// An event to be processed by the server's event loop.
-type event struct {
+// An internal event to be processed by the server's event loop.
+type ev struct {
 	target      interface{}
 	target      interface{}
 	returnValue interface{}
 	returnValue interface{}
 	c           chan error
 	c           chan error
@@ -136,7 +139,11 @@ type event struct {
 //
 //
 //------------------------------------------------------------------------------
 //------------------------------------------------------------------------------
 
 
-// Creates a new server with a log at the given path.
+// Creates a new server with a log at the given path. transporter must
+// not be nil. stateMachine can be nil if snapshotting and log
+// compaction is to be disabled. context can be anything (including nil)
+// and is not used by the raft package except returned by
+// Server.Context(). connectionString can be anything.
 func NewServer(name string, path string, transporter Transporter, stateMachine StateMachine, context interface{}, connectionString string) (Server, error) {
 func NewServer(name string, path string, transporter Transporter, stateMachine StateMachine, context interface{}, connectionString string) (Server, error) {
 	if name == "" {
 	if name == "" {
 		return nil, errors.New("raft.Server: Name cannot be blank")
 		return nil, errors.New("raft.Server: Name cannot be blank")
@@ -154,12 +161,13 @@ func NewServer(name string, path string, transporter Transporter, stateMachine S
 		state:                   Stopped,
 		state:                   Stopped,
 		peers:                   make(map[string]*Peer),
 		peers:                   make(map[string]*Peer),
 		log:                     newLog(),
 		log:                     newLog(),
-		c:                       make(chan *event, 256),
+		c:                       make(chan *ev, 256),
 		electionTimeout:         DefaultElectionTimeout,
 		electionTimeout:         DefaultElectionTimeout,
 		heartbeatTimeout:        DefaultHeartbeatTimeout,
 		heartbeatTimeout:        DefaultHeartbeatTimeout,
 		maxLogEntriesPerRequest: MaxLogEntriesPerRequest,
 		maxLogEntriesPerRequest: MaxLogEntriesPerRequest,
 		connectionString:        connectionString,
 		connectionString:        connectionString,
 	}
 	}
+	s.eventDispatcher = newEventDispatcher(s)
 
 
 	// Setup apply function.
 	// Setup apply function.
 	s.log.ApplyFunc = func(c Command) (interface{}, error) {
 	s.log.ApplyFunc = func(c Command) (interface{}, error) {
@@ -246,19 +254,37 @@ func (s *server) State() string {
 func (s *server) setState(state string) {
 func (s *server) setState(state string) {
 	s.mutex.Lock()
 	s.mutex.Lock()
 	defer s.mutex.Unlock()
 	defer s.mutex.Unlock()
+
+	// Temporarily store previous values.
+	prevState := s.state
+	prevLeader := s.leader
+
+	// Update state and leader.
 	s.state = state
 	s.state = state
 	if state == Leader {
 	if state == Leader {
 		s.leader = s.Name()
 		s.leader = s.Name()
 	}
 	}
+
+	// Dispatch state and leader change events.
+	if prevState != state {
+		s.DispatchEvent(newEvent(StateChangeEventType, s.state, prevState))
+	}
+	if prevLeader != s.leader {
+		s.DispatchEvent(newEvent(LeaderChangeEventType, s.leader, prevLeader))
+	}
 }
 }
 
 
 // Retrieves the current term of the server.
 // Retrieves the current term of the server.
 func (s *server) Term() uint64 {
 func (s *server) Term() uint64 {
+	s.mutex.RLock()
+	defer s.mutex.RUnlock()
 	return s.currentTerm
 	return s.currentTerm
 }
 }
 
 
 // Retrieves the current commit index of the server.
 // Retrieves the current commit index of the server.
 func (s *server) CommitIndex() uint64 {
 func (s *server) CommitIndex() uint64 {
+	s.log.mutex.RLock()
+	defer s.log.mutex.RUnlock()
 	return s.log.commitIndex
 	return s.log.commitIndex
 }
 }
 
 
@@ -375,7 +401,7 @@ func init() {
 
 
 func (s *server) Start() error {
 func (s *server) Start() error {
 	// Exit if the server is already running.
 	// Exit if the server is already running.
-	if s.state != Stopped {
+	if s.State() != Stopped {
 		return errors.New("raft.Server: Server already running")
 		return errors.New("raft.Server: Server already running")
 	}
 	}
 
 
@@ -443,22 +469,34 @@ func (s *server) setCurrentTerm(term uint64, leaderName string, append bool) {
 	s.mutex.Lock()
 	s.mutex.Lock()
 	defer s.mutex.Unlock()
 	defer s.mutex.Unlock()
 
 
-	// update the term and clear vote for
+	// Store previous values temporarily.
+	prevState := s.state
+	prevTerm := s.currentTerm
+	prevLeader := s.leader
+
 	if term > s.currentTerm {
 	if term > s.currentTerm {
+		// update the term and clear vote for
 		s.state = Follower
 		s.state = Follower
 		s.currentTerm = term
 		s.currentTerm = term
 		s.leader = leaderName
 		s.leader = leaderName
 		s.votedFor = ""
 		s.votedFor = ""
-		return
-	}
-
-	// discover new leader when candidate
-	// save leader name when follower
-	if term == s.currentTerm && s.state != Leader && append {
+	} else if term == s.currentTerm && s.state != Leader && append {
+		// discover new leader when candidate
+		// save leader name when follower
 		s.state = Follower
 		s.state = Follower
 		s.leader = leaderName
 		s.leader = leaderName
 	}
 	}
 
 
+	// Dispatch change events.
+	if prevState != s.state {
+		s.DispatchEvent(newEvent(StateChangeEventType, s.state, prevState))
+	}
+	if prevLeader != s.leader {
+		s.DispatchEvent(newEvent(LeaderChangeEventType, s.leader, prevLeader))
+	}
+	if prevTerm != s.currentTerm {
+		s.DispatchEvent(newEvent(TermChangeEventType, s.currentTerm, prevTerm))
+	}
 }
 }
 
 
 //--------------------------------------
 //--------------------------------------
@@ -512,8 +550,8 @@ func (s *server) send(value interface{}) (interface{}, error) {
 	return event.returnValue, err
 	return event.returnValue, err
 }
 }
 
 
-func (s *server) sendAsync(value interface{}) *event {
-	event := &event{target: value, c: make(chan error, 1)}
+func (s *server) sendAsync(value interface{}) *ev {
+	event := &ev{target: value, c: make(chan error, 1)}
 	s.c <- event
 	s.c <- event
 	return event
 	return event
 }
 }
@@ -588,7 +626,13 @@ func (s *server) followerLoop() {
 // The event loop that is run when the server is in a Candidate state.
 // The event loop that is run when the server is in a Candidate state.
 func (s *server) candidateLoop() {
 func (s *server) candidateLoop() {
 	lastLogIndex, lastLogTerm := s.log.lastInfo()
 	lastLogIndex, lastLogTerm := s.log.lastInfo()
+
+	// Clear leader value.
+	prevLeader := s.leader
 	s.leader = ""
 	s.leader = ""
+	if prevLeader != s.leader {
+		s.DispatchEvent(newEvent(LeaderChangeEventType, s.leader, prevLeader))
+	}
 
 
 	for {
 	for {
 		// Increment current term, vote for self.
 		// Increment current term, vote for self.
@@ -765,7 +809,7 @@ func (s *server) Do(command Command) (interface{}, error) {
 }
 }
 
 
 // Processes a command.
 // Processes a command.
-func (s *server) processCommand(command Command, e *event) {
+func (s *server) processCommand(command Command, e *ev) {
 	s.debugln("server.command.process")
 	s.debugln("server.command.process")
 
 
 	// Create an entry for the command in the log.
 	// Create an entry for the command in the log.
@@ -866,7 +910,7 @@ func (s *server) processAppendEntriesRequest(req *AppendEntriesRequest) (*Append
 func (s *server) processAppendEntriesResponse(resp *AppendEntriesResponse) {
 func (s *server) processAppendEntriesResponse(resp *AppendEntriesResponse) {
 
 
 	// If we find a higher term then change to a follower and exit.
 	// If we find a higher term then change to a follower and exit.
-	if resp.Term > s.currentTerm {
+	if resp.Term > s.Term() {
 		s.setCurrentTerm(resp.Term, "", false)
 		s.setCurrentTerm(resp.Term, "", false)
 		return
 		return
 	}
 	}
@@ -914,6 +958,7 @@ func (s *server) processAppendEntriesResponse(resp *AppendEntriesResponse) {
 					default:
 					default:
 						panic("server unable to send signal to commit channel")
 						panic("server unable to send signal to commit channel")
 					}
 					}
+					entry.commit = nil
 				}
 				}
 			}
 			}
 		}
 		}
@@ -937,7 +982,7 @@ func (s *server) RequestVote(req *RequestVoteRequest) *RequestVoteResponse {
 func (s *server) processRequestVoteRequest(req *RequestVoteRequest) (*RequestVoteResponse, bool) {
 func (s *server) processRequestVoteRequest(req *RequestVoteRequest) (*RequestVoteResponse, bool) {
 
 
 	// If the request is coming from an old term then reject it.
 	// If the request is coming from an old term then reject it.
-	if req.Term < s.currentTerm {
+	if req.Term < s.Term() {
 		s.debugln("server.rv.error: stale term")
 		s.debugln("server.rv.error: stale term")
 		return newRequestVoteResponse(s.currentTerm, false), false
 		return newRequestVoteResponse(s.currentTerm, false), false
 	}
 	}
@@ -989,6 +1034,8 @@ func (s *server) AddPeer(name string, connectiongString string) error {
 		}
 		}
 
 
 		s.peers[peer.Name] = peer
 		s.peers[peer.Name] = peer
+
+		s.DispatchEvent(newEvent(AddPeerEventType, name, nil))
 	}
 	}
 
 
 	// Write the configuration to file.
 	// Write the configuration to file.
@@ -1015,6 +1062,8 @@ func (s *server) RemovePeer(name string) error {
 		}
 		}
 
 
 		delete(s.peers, name)
 		delete(s.peers, name)
+
+		s.DispatchEvent(newEvent(RemovePeerEventType, name, nil))
 	}
 	}
 
 
 	// Write the configuration to file.
 	// Write the configuration to file.
@@ -1317,7 +1366,7 @@ func (s *server) readConf() error {
 //--------------------------------------
 //--------------------------------------
 
 
 func (s *server) debugln(v ...interface{}) {
 func (s *server) debugln(v ...interface{}) {
-	debugf("[%s Term:%d] %s", s.name, s.currentTerm, fmt.Sprintln(v...))
+	debugf("[%s Term:%d] %s", s.name, s.Term(), fmt.Sprintln(v...))
 }
 }
 
 
 func (s *server) traceln(v ...interface{}) {
 func (s *server) traceln(v ...interface{}) {

+ 36 - 4
third_party/github.com/coreos/raft/server_test.go

@@ -1,6 +1,7 @@
 package raft
 package raft
 
 
 import (
 import (
+	"encoding/json"
 	"fmt"
 	"fmt"
 	"reflect"
 	"reflect"
 	"strconv"
 	"strconv"
@@ -44,7 +45,10 @@ func TestServerRequestVoteDeniedForStaleTerm(t *testing.T) {
 		t.Fatalf("Server %s unable to join: %v", s.Name(), err)
 		t.Fatalf("Server %s unable to join: %v", s.Name(), err)
 	}
 	}
 
 
+	s.(*server).mutex.Lock()
 	s.(*server).currentTerm = 2
 	s.(*server).currentTerm = 2
+	s.(*server).mutex.Unlock()
+
 	defer s.Stop()
 	defer s.Stop()
 	resp := s.RequestVote(newRequestVoteRequest(1, "foo", 1, 0))
 	resp := s.RequestVote(newRequestVoteRequest(1, "foo", 1, 0))
 	if resp.Term != 2 || resp.VoteGranted {
 	if resp.Term != 2 || resp.VoteGranted {
@@ -64,7 +68,9 @@ func TestServerRequestVoteDeniedIfAlreadyVoted(t *testing.T) {
 		t.Fatalf("Server %s unable to join: %v", s.Name(), err)
 		t.Fatalf("Server %s unable to join: %v", s.Name(), err)
 	}
 	}
 
 
+	s.(*server).mutex.Lock()
 	s.(*server).currentTerm = 2
 	s.(*server).currentTerm = 2
+	s.(*server).mutex.Unlock()
 	defer s.Stop()
 	defer s.Stop()
 	resp := s.RequestVote(newRequestVoteRequest(2, "foo", 1, 0))
 	resp := s.RequestVote(newRequestVoteRequest(2, "foo", 1, 0))
 	if resp.Term != 2 || !resp.VoteGranted {
 	if resp.Term != 2 || !resp.VoteGranted {
@@ -87,7 +93,9 @@ func TestServerRequestVoteApprovedIfAlreadyVotedInOlderTerm(t *testing.T) {
 
 
 	time.Sleep(time.Millisecond * 100)
 	time.Sleep(time.Millisecond * 100)
 
 
+	s.(*server).mutex.Lock()
 	s.(*server).currentTerm = 2
 	s.(*server).currentTerm = 2
+	s.(*server).mutex.Unlock()
 	defer s.Stop()
 	defer s.Stop()
 	resp := s.RequestVote(newRequestVoteRequest(2, "foo", 2, 1))
 	resp := s.RequestVote(newRequestVoteRequest(2, "foo", 2, 1))
 	if resp.Term != 2 || !resp.VoteGranted || s.VotedFor() != "foo" {
 	if resp.Term != 2 || !resp.VoteGranted || s.VotedFor() != "foo" {
@@ -235,7 +243,9 @@ func TestServerAppendEntriesWithStaleTermsAreRejected(t *testing.T) {
 	s.Start()
 	s.Start()
 
 
 	defer s.Stop()
 	defer s.Stop()
+	s.(*server).mutex.Lock()
 	s.(*server).currentTerm = 2
 	s.(*server).currentTerm = 2
+	s.(*server).mutex.Unlock()
 
 
 	// Append single entry.
 	// Append single entry.
 	e, _ := newLogEntry(nil, 1, 1, &testCommand1{Val: "foo", I: 10})
 	e, _ := newLogEntry(nil, 1, 1, &testCommand1{Val: "foo", I: 10})
@@ -328,13 +338,23 @@ func TestServerRecoverFromPreviousLogAndConf(t *testing.T) {
 		mutex.RLock()
 		mutex.RLock()
 		target := servers[peer.Name]
 		target := servers[peer.Name]
 		mutex.RUnlock()
 		mutex.RUnlock()
-		return target.RequestVote(req)
+
+		b, _ := json.Marshal(req)
+		clonedReq := &RequestVoteRequest{}
+		json.Unmarshal(b, clonedReq)
+
+		return target.RequestVote(clonedReq)
 	}
 	}
 	transporter.sendAppendEntriesRequestFunc = func(s Server, peer *Peer, req *AppendEntriesRequest) *AppendEntriesResponse {
 	transporter.sendAppendEntriesRequestFunc = func(s Server, peer *Peer, req *AppendEntriesRequest) *AppendEntriesResponse {
 		mutex.RLock()
 		mutex.RLock()
 		target := servers[peer.Name]
 		target := servers[peer.Name]
 		mutex.RUnlock()
 		mutex.RUnlock()
-		return target.AppendEntries(req)
+
+		b, _ := json.Marshal(req)
+		clonedReq := &AppendEntriesRequest{}
+		json.Unmarshal(b, clonedReq)
+
+		return target.AppendEntries(clonedReq)
 	}
 	}
 
 
 	disTransporter := &testTransporter{}
 	disTransporter := &testTransporter{}
@@ -359,7 +379,9 @@ func TestServerRecoverFromPreviousLogAndConf(t *testing.T) {
 	for _, name := range names {
 	for _, name := range names {
 		s := newTestServer(name, transporter)
 		s := newTestServer(name, transporter)
 
 
+		mutex.Lock()
 		servers[name] = s
 		servers[name] = s
+		mutex.Unlock()
 		paths[name] = s.Path()
 		paths[name] = s.Path()
 
 
 		if name == "1" {
 		if name == "1" {
@@ -474,13 +496,23 @@ func TestServerMultiNode(t *testing.T) {
 		mutex.RLock()
 		mutex.RLock()
 		target := servers[peer.Name]
 		target := servers[peer.Name]
 		mutex.RUnlock()
 		mutex.RUnlock()
-		return target.RequestVote(req)
+
+		b, _ := json.Marshal(req)
+		clonedReq := &RequestVoteRequest{}
+		json.Unmarshal(b, clonedReq)
+
+		return target.RequestVote(clonedReq)
 	}
 	}
 	transporter.sendAppendEntriesRequestFunc = func(s Server, peer *Peer, req *AppendEntriesRequest) *AppendEntriesResponse {
 	transporter.sendAppendEntriesRequestFunc = func(s Server, peer *Peer, req *AppendEntriesRequest) *AppendEntriesResponse {
 		mutex.RLock()
 		mutex.RLock()
 		target := servers[peer.Name]
 		target := servers[peer.Name]
 		mutex.RUnlock()
 		mutex.RUnlock()
-		return target.AppendEntries(req)
+
+		b, _ := json.Marshal(req)
+		clonedReq := &AppendEntriesRequest{}
+		json.Unmarshal(b, clonedReq)
+
+		return target.AppendEntries(clonedReq)
 	}
 	}
 
 
 	disTransporter := &testTransporter{}
 	disTransporter := &testTransporter{}

+ 2 - 1
third_party/github.com/coreos/raft/statemachine.go

@@ -7,7 +7,8 @@ package raft
 //------------------------------------------------------------------------------
 //------------------------------------------------------------------------------
 
 
 // StateMachine is the interface for allowing the host application to save and
 // StateMachine is the interface for allowing the host application to save and
-// recovery the state machine
+// recovery the state machine. This makes it possible to make snapshots
+// and compact the log.
 type StateMachine interface {
 type StateMachine interface {
 	Save() ([]byte, error)
 	Save() ([]byte, error)
 	Recovery([]byte) error
 	Recovery([]byte) error

+ 13 - 0
third_party/github.com/coreos/raft/test.go

@@ -12,6 +12,10 @@ const (
 	testElectionTimeout  = 200 * time.Millisecond
 	testElectionTimeout  = 200 * time.Millisecond
 )
 )
 
 
+const (
+	testListenerLoggerEnabled = false
+)
+
 func init() {
 func init() {
 	RegisterCommand(&testCommand1{})
 	RegisterCommand(&testCommand1{})
 	RegisterCommand(&testCommand2{})
 	RegisterCommand(&testCommand2{})
@@ -66,6 +70,15 @@ func newTestServer(name string, transporter Transporter) Server {
 		panic(err.Error())
 		panic(err.Error())
 	}
 	}
 	server, _ := NewServer(name, p, transporter, nil, nil, "")
 	server, _ := NewServer(name, p, transporter, nil, nil, "")
+	if testListenerLoggerEnabled {
+		fn := func(e Event) {
+			server := e.Source().(Server)
+			warnf("[%s] %s %v -> %v\n", server.Name(), e.Type(), e.PrevValue(), e.Value())
+		}
+		server.AddEventListener(StateChangeEventType, fn)
+		server.AddEventListener(LeaderChangeEventType, fn)
+		server.AddEventListener(TermChangeEventType, fn)
+	}
 	return server
 	return server
 }
 }