Browse Source

Merge pull request #234 from benbjohnson/raft-server-interface

Use raft.Server interface.
Ben Johnson 12 years ago
parent
commit
89660ce0cd
33 changed files with 384 additions and 328 deletions
  1. 2 2
      server/join_command.go
  2. 19 19
      server/peer_server.go
  3. 5 5
      server/peer_server_handlers.go
  4. 1 1
      server/remove_command.go
  5. 9 9
      server/server.go
  6. 4 4
      server/transporter.go
  7. 1 1
      store/create_command.go
  8. 1 1
      store/delete_command.go
  9. 1 1
      store/test_and_set_command.go
  10. 1 1
      store/update_command.go
  11. 3 3
      third_party/github.com/coreos/go-etcd/README.md
  12. 2 3
      third_party/github.com/coreos/go-etcd/etcd/delete.go
  13. 6 7
      third_party/github.com/coreos/go-etcd/etcd/get.go
  14. 26 0
      third_party/github.com/coreos/go-etcd/etcd/response.go
  15. 4 5
      third_party/github.com/coreos/go-etcd/etcd/set.go
  16. 2 3
      third_party/github.com/coreos/go-etcd/etcd/testAndSet.go
  17. 3 4
      third_party/github.com/coreos/go-etcd/etcd/watch.go
  18. 2 3
      third_party/github.com/coreos/go-etcd/etcd/watch_test.go
  19. 1 1
      third_party/github.com/coreos/go-raft/README.md
  20. 1 1
      third_party/github.com/coreos/go-raft/command.go
  21. 9 9
      third_party/github.com/coreos/go-raft/http_transporter.go
  22. 9 9
      third_party/github.com/coreos/go-raft/http_transporter_test.go
  23. 2 2
      third_party/github.com/coreos/go-raft/join_command.go
  24. 2 2
      third_party/github.com/coreos/go-raft/leave_command.go
  25. 1 1
      third_party/github.com/coreos/go-raft/nop_command.go
  26. 2 2
      third_party/github.com/coreos/go-raft/peer.go
  27. 99 60
      third_party/github.com/coreos/go-raft/server.go
  28. 143 146
      third_party/github.com/coreos/go-raft/server_test.go
  29. 3 3
      third_party/github.com/coreos/go-raft/snapshot.go
  30. 1 1
      third_party/github.com/coreos/go-raft/snapshot_recovery_request.go
  31. 14 14
      third_party/github.com/coreos/go-raft/test.go
  32. 4 4
      third_party/github.com/coreos/go-raft/transporter.go
  33. 1 1
      web/web.go

+ 2 - 2
server/join_command.go

@@ -35,7 +35,7 @@ func (c *JoinCommand) CommandName() string {
 }
 }
 
 
 // Join a server to the cluster
 // Join a server to the cluster
-func (c *JoinCommand) Apply(server *raft.Server) (interface{}, error) {
+func (c *JoinCommand) Apply(server raft.Server) (interface{}, error) {
 	ps, _ := server.Context().(*PeerServer)
 	ps, _ := server.Context().(*PeerServer)
 
 
 	b := make([]byte, 8)
 	b := make([]byte, 8)
@@ -62,7 +62,7 @@ func (c *JoinCommand) Apply(server *raft.Server) (interface{}, error) {
 	err := server.AddPeer(c.Name, "")
 	err := server.AddPeer(c.Name, "")
 
 
 	// Add peer stats
 	// Add peer stats
-	if c.Name != ps.Name() {
+	if c.Name != ps.RaftServer().Name() {
 		ps.followersStats.Followers[c.Name] = &raftFollowerStats{}
 		ps.followersStats.Followers[c.Name] = &raftFollowerStats{}
 		ps.followersStats.Followers[c.Name].Latency.Minimum = 1 << 63
 		ps.followersStats.Followers[c.Name].Latency.Minimum = 1 << 63
 	}
 	}

+ 19 - 19
server/peer_server.go

@@ -19,7 +19,7 @@ import (
 )
 )
 
 
 type PeerServer struct {
 type PeerServer struct {
-	*raft.Server
+	raftServer     raft.Server
 	server         *Server
 	server         *Server
 	joinIndex      uint64
 	joinIndex      uint64
 	name           string
 	name           string
@@ -78,12 +78,12 @@ func NewPeerServer(name string, path string, url string, listenHost string, tlsC
 	raftTransporter := newTransporter(tlsConf.Scheme, tlsConf.Client, s)
 	raftTransporter := newTransporter(tlsConf.Scheme, tlsConf.Client, s)
 
 
 	// Create raft server
 	// Create raft server
-	server, err := raft.NewServer(name, path, raftTransporter, s.store, s, "")
+	raftServer, err := raft.NewServer(name, path, raftTransporter, s.store, s, "")
 	if err != nil {
 	if err != nil {
 		log.Fatal(err)
 		log.Fatal(err)
 	}
 	}
 
 
-	s.Server = server
+	s.raftServer = raftServer
 
 
 	return s
 	return s
 }
 }
@@ -92,7 +92,7 @@ func NewPeerServer(name string, path string, url string, listenHost string, tlsC
 func (s *PeerServer) ListenAndServe(snapshot bool, cluster []string) {
 func (s *PeerServer) ListenAndServe(snapshot bool, cluster []string) {
 	// LoadSnapshot
 	// LoadSnapshot
 	if snapshot {
 	if snapshot {
-		err := s.LoadSnapshot()
+		err := s.raftServer.LoadSnapshot()
 
 
 		if err == nil {
 		if err == nil {
 			log.Debugf("%s finished load snapshot", s.name)
 			log.Debugf("%s finished load snapshot", s.name)
@@ -101,12 +101,12 @@ func (s *PeerServer) ListenAndServe(snapshot bool, cluster []string) {
 		}
 		}
 	}
 	}
 
 
-	s.SetElectionTimeout(ElectionTimeout)
-	s.SetHeartbeatTimeout(HeartbeatTimeout)
+	s.raftServer.SetElectionTimeout(ElectionTimeout)
+	s.raftServer.SetHeartbeatTimeout(HeartbeatTimeout)
 
 
-	s.Start()
+	s.raftServer.Start()
 
 
-	if s.IsLogEmpty() {
+	if s.raftServer.IsLogEmpty() {
 		// start as a leader in a new cluster
 		// start as a leader in a new cluster
 		if len(cluster) == 0 {
 		if len(cluster) == 0 {
 			s.startAsLeader()
 			s.startAsLeader()
@@ -116,7 +116,7 @@ func (s *PeerServer) ListenAndServe(snapshot bool, cluster []string) {
 
 
 	} else {
 	} else {
 		// Rejoin the previous cluster
 		// Rejoin the previous cluster
-		cluster = s.registry.PeerURLs(s.Leader(), s.name)
+		cluster = s.registry.PeerURLs(s.raftServer.Leader(), s.name)
 		for i := 0; i < len(cluster); i++ {
 		for i := 0; i < len(cluster); i++ {
 			u, err := url.Parse(cluster[i])
 			u, err := url.Parse(cluster[i])
 			if err != nil {
 			if err != nil {
@@ -143,8 +143,8 @@ func (s *PeerServer) ListenAndServe(snapshot bool, cluster []string) {
 }
 }
 
 
 // Retrieves the underlying Raft server.
 // Retrieves the underlying Raft server.
-func (s *PeerServer) RaftServer() *raft.Server {
-	return s.Server
+func (s *PeerServer) RaftServer() raft.Server {
+	return s.raftServer
 }
 }
 
 
 // Associates the client server with the peer server.
 // Associates the client server with the peer server.
@@ -155,7 +155,7 @@ func (s *PeerServer) SetServer(server *Server) {
 func (s *PeerServer) startAsLeader() {
 func (s *PeerServer) startAsLeader() {
 	// leader need to join self as a peer
 	// leader need to join self as a peer
 	for {
 	for {
-		_, err := s.Do(NewJoinCommand(PeerVersion, s.Name(), s.url, s.server.URL()))
+		_, err := s.raftServer.Do(NewJoinCommand(PeerVersion, s.raftServer.Name(), s.url, s.server.URL()))
 		if err == nil {
 		if err == nil {
 			break
 			break
 		}
 		}
@@ -232,7 +232,7 @@ func (s *PeerServer) joinCluster(cluster []string) bool {
 			continue
 			continue
 		}
 		}
 
 
-		err := s.joinByMachine(s.Server, machine, s.tlsConf.Scheme)
+		err := s.joinByMachine(s.raftServer, machine, s.tlsConf.Scheme)
 		if err == nil {
 		if err == nil {
 			log.Debugf("%s success join to the cluster via machine %s", s.name, machine)
 			log.Debugf("%s success join to the cluster via machine %s", s.name, machine)
 			return true
 			return true
@@ -249,7 +249,7 @@ func (s *PeerServer) joinCluster(cluster []string) bool {
 }
 }
 
 
 // Send join requests to machine.
 // Send join requests to machine.
-func (s *PeerServer) joinByMachine(server *raft.Server, machine string, scheme string) error {
+func (s *PeerServer) joinByMachine(server raft.Server, machine string, scheme string) error {
 	var b bytes.Buffer
 	var b bytes.Buffer
 
 
 	// t must be ok
 	// t must be ok
@@ -327,7 +327,7 @@ func (s *PeerServer) Stats() []byte {
 }
 }
 
 
 func (s *PeerServer) PeerStats() []byte {
 func (s *PeerServer) PeerStats() []byte {
-	if s.State() == raft.Leader {
+	if s.raftServer.State() == raft.Leader {
 		b, _ := json.Marshal(s.followersStats)
 		b, _ := json.Marshal(s.followersStats)
 		return b
 		return b
 	}
 	}
@@ -339,15 +339,15 @@ func (s *PeerServer) monitorSnapshot() {
 		time.Sleep(s.snapConf.checkingInterval)
 		time.Sleep(s.snapConf.checkingInterval)
 		currentWrites := 0
 		currentWrites := 0
 		if uint64(currentWrites) > s.snapConf.writesThr {
 		if uint64(currentWrites) > s.snapConf.writesThr {
-			s.TakeSnapshot()
+			s.raftServer.TakeSnapshot()
 			s.snapConf.lastWrites = 0
 			s.snapConf.lastWrites = 0
 		}
 		}
 	}
 	}
 }
 }
 
 
 func (s *PeerServer) dispatch(c raft.Command, w http.ResponseWriter, req *http.Request) error {
 func (s *PeerServer) dispatch(c raft.Command, w http.ResponseWriter, req *http.Request) error {
-	if s.State() == raft.Leader {
-		result, err := s.Do(c)
+	if s.raftServer.State() == raft.Leader {
+		result, err := s.raftServer.Do(c)
 		if err != nil {
 		if err != nil {
 			return err
 			return err
 		}
 		}
@@ -375,7 +375,7 @@ func (s *PeerServer) dispatch(c raft.Command, w http.ResponseWriter, req *http.R
 		return nil
 		return nil
 
 
 	} else {
 	} else {
-		leader := s.Leader()
+		leader := s.raftServer.Leader()
 
 
 		// No leader available.
 		// No leader available.
 		if leader == "" {
 		if leader == "" {

+ 5 - 5
server/peer_server_handlers.go

@@ -14,7 +14,7 @@ func (s *PeerServer) GetLogHttpHandler(w http.ResponseWriter, req *http.Request)
 	log.Debugf("[recv] GET %s/log", s.url)
 	log.Debugf("[recv] GET %s/log", s.url)
 	w.Header().Set("Content-Type", "application/json")
 	w.Header().Set("Content-Type", "application/json")
 	w.WriteHeader(http.StatusOK)
 	w.WriteHeader(http.StatusOK)
-	json.NewEncoder(w).Encode(s.LogEntries())
+	json.NewEncoder(w).Encode(s.raftServer.LogEntries())
 }
 }
 
 
 // Response to vote request
 // Response to vote request
@@ -23,7 +23,7 @@ func (s *PeerServer) VoteHttpHandler(w http.ResponseWriter, req *http.Request) {
 	err := decodeJsonRequest(req, rvreq)
 	err := decodeJsonRequest(req, rvreq)
 	if err == nil {
 	if err == nil {
 		log.Debugf("[recv] POST %s/vote [%s]", s.url, rvreq.CandidateName)
 		log.Debugf("[recv] POST %s/vote [%s]", s.url, rvreq.CandidateName)
-		if resp := s.RequestVote(rvreq); resp != nil {
+		if resp := s.raftServer.RequestVote(rvreq); resp != nil {
 			w.WriteHeader(http.StatusOK)
 			w.WriteHeader(http.StatusOK)
 			json.NewEncoder(w).Encode(resp)
 			json.NewEncoder(w).Encode(resp)
 			return
 			return
@@ -43,7 +43,7 @@ func (s *PeerServer) AppendEntriesHttpHandler(w http.ResponseWriter, req *http.R
 
 
 		s.serverStats.RecvAppendReq(aereq.LeaderName, int(req.ContentLength))
 		s.serverStats.RecvAppendReq(aereq.LeaderName, int(req.ContentLength))
 
 
-		if resp := s.AppendEntries(aereq); resp != nil {
+		if resp := s.raftServer.AppendEntries(aereq); resp != nil {
 			w.WriteHeader(http.StatusOK)
 			w.WriteHeader(http.StatusOK)
 			json.NewEncoder(w).Encode(resp)
 			json.NewEncoder(w).Encode(resp)
 			if !resp.Success {
 			if !resp.Success {
@@ -62,7 +62,7 @@ func (s *PeerServer) SnapshotHttpHandler(w http.ResponseWriter, req *http.Reques
 	err := decodeJsonRequest(req, aereq)
 	err := decodeJsonRequest(req, aereq)
 	if err == nil {
 	if err == nil {
 		log.Debugf("[recv] POST %s/snapshot/ ", s.url)
 		log.Debugf("[recv] POST %s/snapshot/ ", s.url)
-		if resp := s.RequestSnapshot(aereq); resp != nil {
+		if resp := s.raftServer.RequestSnapshot(aereq); resp != nil {
 			w.WriteHeader(http.StatusOK)
 			w.WriteHeader(http.StatusOK)
 			json.NewEncoder(w).Encode(resp)
 			json.NewEncoder(w).Encode(resp)
 			return
 			return
@@ -78,7 +78,7 @@ func (s *PeerServer) SnapshotRecoveryHttpHandler(w http.ResponseWriter, req *htt
 	err := decodeJsonRequest(req, aereq)
 	err := decodeJsonRequest(req, aereq)
 	if err == nil {
 	if err == nil {
 		log.Debugf("[recv] POST %s/snapshotRecovery/ ", s.url)
 		log.Debugf("[recv] POST %s/snapshotRecovery/ ", s.url)
-		if resp := s.SnapshotRecoveryRequest(aereq); resp != nil {
+		if resp := s.raftServer.SnapshotRecoveryRequest(aereq); resp != nil {
 			w.WriteHeader(http.StatusOK)
 			w.WriteHeader(http.StatusOK)
 			json.NewEncoder(w).Encode(resp)
 			json.NewEncoder(w).Encode(resp)
 			return
 			return

+ 1 - 1
server/remove_command.go

@@ -23,7 +23,7 @@ func (c *RemoveCommand) CommandName() string {
 }
 }
 
 
 // Remove a server from the cluster
 // Remove a server from the cluster
-func (c *RemoveCommand) Apply(server *raft.Server) (interface{}, error) {
+func (c *RemoveCommand) Apply(server raft.Server) (interface{}, error) {
 	ps, _ := server.Context().(*PeerServer)
 	ps, _ := server.Context().(*PeerServer)
 
 
 	// Remove node from the shared registry.
 	// Remove node from the shared registry.

+ 9 - 9
server/server.go

@@ -56,22 +56,22 @@ func New(name string, urlStr string, listenHost string, tlsConf *TLSConfig, tlsI
 
 
 // The current state of the server in the cluster.
 // The current state of the server in the cluster.
 func (s *Server) State() string {
 func (s *Server) State() string {
-	return s.peerServer.State()
+	return s.peerServer.RaftServer().State()
 }
 }
 
 
 // The node name of the leader in the cluster.
 // The node name of the leader in the cluster.
 func (s *Server) Leader() string {
 func (s *Server) Leader() string {
-	return s.peerServer.Leader()
+	return s.peerServer.RaftServer().Leader()
 }
 }
 
 
 // The current Raft committed index.
 // The current Raft committed index.
 func (s *Server) CommitIndex() uint64 {
 func (s *Server) CommitIndex() uint64 {
-	return s.peerServer.CommitIndex()
+	return s.peerServer.RaftServer().CommitIndex()
 }
 }
 
 
 // The current Raft term.
 // The current Raft term.
 func (s *Server) Term() uint64 {
 func (s *Server) Term() uint64 {
-	return s.peerServer.Term()
+	return s.peerServer.RaftServer().Term()
 }
 }
 
 
 // The server URL.
 // The server URL.
@@ -201,7 +201,7 @@ func (s *Server) GetVersionHandler(w http.ResponseWriter, req *http.Request) err
 
 
 // Handler to return the current leader's raft address
 // Handler to return the current leader's raft address
 func (s *Server) GetLeaderHandler(w http.ResponseWriter, req *http.Request) error {
 func (s *Server) GetLeaderHandler(w http.ResponseWriter, req *http.Request) error {
-	leader := s.peerServer.Leader()
+	leader := s.peerServer.RaftServer().Leader()
 	if leader == "" {
 	if leader == "" {
 		return etcdErr.NewError(etcdErr.EcodeLeaderElect, "", store.UndefIndex, store.UndefTerm)
 		return etcdErr.NewError(etcdErr.EcodeLeaderElect, "", store.UndefIndex, store.UndefTerm)
 	}
 	}
@@ -213,7 +213,7 @@ func (s *Server) GetLeaderHandler(w http.ResponseWriter, req *http.Request) erro
 
 
 // Handler to return all the known machines in the current cluster.
 // Handler to return all the known machines in the current cluster.
 func (s *Server) GetMachinesHandler(w http.ResponseWriter, req *http.Request) error {
 func (s *Server) GetMachinesHandler(w http.ResponseWriter, req *http.Request) error {
-	machines := s.registry.ClientURLs(s.peerServer.Leader(), s.name)
+	machines := s.registry.ClientURLs(s.peerServer.RaftServer().Leader(), s.name)
 	w.WriteHeader(http.StatusOK)
 	w.WriteHeader(http.StatusOK)
 	w.Write([]byte(strings.Join(machines, ", ")))
 	w.Write([]byte(strings.Join(machines, ", ")))
 	return nil
 	return nil
@@ -227,12 +227,12 @@ func (s *Server) GetStatsHandler(w http.ResponseWriter, req *http.Request) error
 
 
 // Retrieves stats on the leader.
 // Retrieves stats on the leader.
 func (s *Server) GetLeaderStatsHandler(w http.ResponseWriter, req *http.Request) error {
 func (s *Server) GetLeaderStatsHandler(w http.ResponseWriter, req *http.Request) error {
-	if s.peerServer.State() == raft.Leader {
+	if s.peerServer.RaftServer().State() == raft.Leader {
 		w.Write(s.peerServer.PeerStats())
 		w.Write(s.peerServer.PeerStats())
 		return nil
 		return nil
 	}
 	}
 
 
-	leader := s.peerServer.Leader()
+	leader := s.peerServer.RaftServer().Leader()
 	if leader == "" {
 	if leader == "" {
 		return etcdErr.NewError(300, "", store.UndefIndex, store.UndefTerm)
 		return etcdErr.NewError(300, "", store.UndefIndex, store.UndefTerm)
 	}
 	}
@@ -259,7 +259,7 @@ func (s *Server) SpeedTestHandler(w http.ResponseWriter, req *http.Request) erro
 					Value:      "bar",
 					Value:      "bar",
 					ExpireTime: time.Unix(0, 0),
 					ExpireTime: time.Unix(0, 0),
 				}
 				}
-				s.peerServer.Do(c)
+				s.peerServer.RaftServer().Do(c)
 			}
 			}
 			c <- true
 			c <- true
 		}()
 		}()

+ 4 - 4
server/transporter.go

@@ -62,7 +62,7 @@ func dialWithTimeout(network, addr string) (net.Conn, error) {
 }
 }
 
 
 // Sends AppendEntries RPCs to a peer when the server is the leader.
 // Sends AppendEntries RPCs to a peer when the server is the leader.
-func (t *transporter) SendAppendEntriesRequest(server *raft.Server, peer *raft.Peer, req *raft.AppendEntriesRequest) *raft.AppendEntriesResponse {
+func (t *transporter) SendAppendEntriesRequest(server raft.Server, peer *raft.Peer, req *raft.AppendEntriesRequest) *raft.AppendEntriesResponse {
 	var aersp *raft.AppendEntriesResponse
 	var aersp *raft.AppendEntriesResponse
 	var b bytes.Buffer
 	var b bytes.Buffer
 
 
@@ -117,7 +117,7 @@ func (t *transporter) SendAppendEntriesRequest(server *raft.Server, peer *raft.P
 }
 }
 
 
 // Sends RequestVote RPCs to a peer when the server is the candidate.
 // Sends RequestVote RPCs to a peer when the server is the candidate.
-func (t *transporter) SendVoteRequest(server *raft.Server, peer *raft.Peer, req *raft.RequestVoteRequest) *raft.RequestVoteResponse {
+func (t *transporter) SendVoteRequest(server raft.Server, peer *raft.Peer, req *raft.RequestVoteRequest) *raft.RequestVoteResponse {
 	var rvrsp *raft.RequestVoteResponse
 	var rvrsp *raft.RequestVoteResponse
 	var b bytes.Buffer
 	var b bytes.Buffer
 	json.NewEncoder(&b).Encode(req)
 	json.NewEncoder(&b).Encode(req)
@@ -146,7 +146,7 @@ func (t *transporter) SendVoteRequest(server *raft.Server, peer *raft.Peer, req
 }
 }
 
 
 // Sends SnapshotRequest RPCs to a peer when the server is the candidate.
 // Sends SnapshotRequest RPCs to a peer when the server is the candidate.
-func (t *transporter) SendSnapshotRequest(server *raft.Server, peer *raft.Peer, req *raft.SnapshotRequest) *raft.SnapshotResponse {
+func (t *transporter) SendSnapshotRequest(server raft.Server, peer *raft.Peer, req *raft.SnapshotRequest) *raft.SnapshotResponse {
 	var aersp *raft.SnapshotResponse
 	var aersp *raft.SnapshotResponse
 	var b bytes.Buffer
 	var b bytes.Buffer
 	json.NewEncoder(&b).Encode(req)
 	json.NewEncoder(&b).Encode(req)
@@ -177,7 +177,7 @@ func (t *transporter) SendSnapshotRequest(server *raft.Server, peer *raft.Peer,
 }
 }
 
 
 // Sends SnapshotRecoveryRequest RPCs to a peer when the server is the candidate.
 // Sends SnapshotRecoveryRequest RPCs to a peer when the server is the candidate.
-func (t *transporter) SendSnapshotRecoveryRequest(server *raft.Server, peer *raft.Peer, req *raft.SnapshotRecoveryRequest) *raft.SnapshotRecoveryResponse {
+func (t *transporter) SendSnapshotRecoveryRequest(server raft.Server, peer *raft.Peer, req *raft.SnapshotRecoveryRequest) *raft.SnapshotRecoveryResponse {
 	var aersp *raft.SnapshotRecoveryResponse
 	var aersp *raft.SnapshotRecoveryResponse
 	var b bytes.Buffer
 	var b bytes.Buffer
 	json.NewEncoder(&b).Encode(req)
 	json.NewEncoder(&b).Encode(req)

+ 1 - 1
store/create_command.go

@@ -25,7 +25,7 @@ func (c *CreateCommand) CommandName() string {
 }
 }
 
 
 // Create node
 // Create node
-func (c *CreateCommand) Apply(server *raft.Server) (interface{}, error) {
+func (c *CreateCommand) Apply(server raft.Server) (interface{}, error) {
 	s, _ := server.StateMachine().(Store)
 	s, _ := server.StateMachine().(Store)
 
 
 	e, err := s.Create(c.Key, c.Value, c.IncrementalSuffix, c.Force, c.ExpireTime, server.CommitIndex(), server.Term())
 	e, err := s.Create(c.Key, c.Value, c.IncrementalSuffix, c.Force, c.ExpireTime, server.CommitIndex(), server.Term())

+ 1 - 1
store/delete_command.go

@@ -21,7 +21,7 @@ func (c *DeleteCommand) CommandName() string {
 }
 }
 
 
 // Delete the key
 // Delete the key
-func (c *DeleteCommand) Apply(server *raft.Server) (interface{}, error) {
+func (c *DeleteCommand) Apply(server raft.Server) (interface{}, error) {
 	s, _ := server.StateMachine().(Store)
 	s, _ := server.StateMachine().(Store)
 
 
 	e, err := s.Delete(c.Key, c.Recursive, server.CommitIndex(), server.Term())
 	e, err := s.Delete(c.Key, c.Recursive, server.CommitIndex(), server.Term())

+ 1 - 1
store/test_and_set_command.go

@@ -26,7 +26,7 @@ func (c *TestAndSetCommand) CommandName() string {
 }
 }
 
 
 // Set the key-value pair if the current value of the key equals to the given prevValue
 // Set the key-value pair if the current value of the key equals to the given prevValue
-func (c *TestAndSetCommand) Apply(server *raft.Server) (interface{}, error) {
+func (c *TestAndSetCommand) Apply(server raft.Server) (interface{}, error) {
 	s, _ := server.StateMachine().(Store)
 	s, _ := server.StateMachine().(Store)
 
 
 	e, err := s.TestAndSet(c.Key, c.PrevValue, c.PrevIndex,
 	e, err := s.TestAndSet(c.Key, c.PrevValue, c.PrevIndex,

+ 1 - 1
store/update_command.go

@@ -24,7 +24,7 @@ func (c *UpdateCommand) CommandName() string {
 }
 }
 
 
 // Update node
 // Update node
-func (c *UpdateCommand) Apply(server *raft.Server) (interface{}, error) {
+func (c *UpdateCommand) Apply(server raft.Server) (interface{}, error) {
 	s, _ := server.StateMachine().(Store)
 	s, _ := server.StateMachine().(Store)
 
 
 	e, err := s.Update(c.Key, c.Value, c.ExpireTime, server.CommitIndex(), server.Term())
 	e, err := s.Update(c.Key, c.Value, c.ExpireTime, server.CommitIndex(), server.Term())

+ 3 - 3
third_party/github.com/coreos/go-etcd/README.md

@@ -31,19 +31,19 @@ func main() {
 	c := etcd.NewClient() // default binds to http://0.0.0.0:4001
 	c := etcd.NewClient() // default binds to http://0.0.0.0:4001
 
 
 	// SET the value "bar" to the key "foo" with zero TTL
 	// SET the value "bar" to the key "foo" with zero TTL
-	// returns a: *store.Response
+	// returns a: *Response
 	res, _ := c.Set("foo", "bar", 0)
 	res, _ := c.Set("foo", "bar", 0)
 	fmt.Printf("set response: %+v\n", res)
 	fmt.Printf("set response: %+v\n", res)
 
 
 	// GET the value that is stored for the key "foo"
 	// GET the value that is stored for the key "foo"
-	// return a slice: []*store.Response
+	// return a slice: []*Response
 	values, _ := c.Get("foo")
 	values, _ := c.Get("foo")
 	for i, res := range values { // .. and print them out
 	for i, res := range values { // .. and print them out
 		fmt.Printf("[%d] get response: %+v\n", i, res)
 		fmt.Printf("[%d] get response: %+v\n", i, res)
 	}
 	}
 
 
 	// DELETE the key "foo"
 	// DELETE the key "foo"
-	// returns a: *store.Response
+	// returns a: *Response
 	res, _ = c.Delete("foo")
 	res, _ = c.Delete("foo")
 	fmt.Printf("delete response: %+v\n", res)
 	fmt.Printf("delete response: %+v\n", res)
 }
 }

+ 2 - 3
third_party/github.com/coreos/go-etcd/etcd/delete.go

@@ -2,13 +2,12 @@ package etcd
 
 
 import (
 import (
 	"encoding/json"
 	"encoding/json"
-	"github.com/coreos/etcd/store"
 	"io/ioutil"
 	"io/ioutil"
 	"net/http"
 	"net/http"
 	"path"
 	"path"
 )
 )
 
 
-func (c *Client) Delete(key string) (*store.Response, error) {
+func (c *Client) Delete(key string) (*Response, error) {
 
 
 	resp, err := c.sendRequest("DELETE", path.Join("keys", key), "")
 	resp, err := c.sendRequest("DELETE", path.Join("keys", key), "")
 
 
@@ -28,7 +27,7 @@ func (c *Client) Delete(key string) (*store.Response, error) {
 		return nil, handleError(b)
 		return nil, handleError(b)
 	}
 	}
 
 
-	var result store.Response
+	var result Response
 
 
 	err = json.Unmarshal(b, &result)
 	err = json.Unmarshal(b, &result)
 
 

+ 6 - 7
third_party/github.com/coreos/go-etcd/etcd/get.go

@@ -2,13 +2,12 @@ package etcd
 
 
 import (
 import (
 	"encoding/json"
 	"encoding/json"
-	"github.com/coreos/etcd/store"
 	"io/ioutil"
 	"io/ioutil"
 	"net/http"
 	"net/http"
 	"path"
 	"path"
 )
 )
 
 
-func (c *Client) Get(key string) ([]*store.Response, error) {
+func (c *Client) Get(key string) ([]*Response, error) {
 	logger.Debugf("get %s [%s]", key, c.cluster.Leader)
 	logger.Debugf("get %s [%s]", key, c.cluster.Leader)
 	resp, err := c.sendRequest("GET", path.Join("keys", key), "")
 	resp, err := c.sendRequest("GET", path.Join("keys", key), "")
 
 
@@ -36,7 +35,7 @@ func (c *Client) Get(key string) ([]*store.Response, error) {
 // GetTo gets the value of the key from a given machine address.
 // GetTo gets the value of the key from a given machine address.
 // If the given machine is not available it returns an error.
 // If the given machine is not available it returns an error.
 // Mainly use for testing purpose
 // Mainly use for testing purpose
-func (c *Client) GetFrom(key string, addr string) ([]*store.Response, error) {
+func (c *Client) GetFrom(key string, addr string) ([]*Response, error) {
 	httpPath := c.createHttpPath(addr, path.Join(version, "keys", key))
 	httpPath := c.createHttpPath(addr, path.Join(version, "keys", key))
 
 
 	resp, err := c.httpClient.Get(httpPath)
 	resp, err := c.httpClient.Get(httpPath)
@@ -61,10 +60,10 @@ func (c *Client) GetFrom(key string, addr string) ([]*store.Response, error) {
 }
 }
 
 
 // Convert byte stream to response.
 // Convert byte stream to response.
-func convertGetResponse(b []byte) ([]*store.Response, error) {
+func convertGetResponse(b []byte) ([]*Response, error) {
 
 
-	var results []*store.Response
-	var result *store.Response
+	var results []*Response
+	var result *Response
 
 
 	err := json.Unmarshal(b, &result)
 	err := json.Unmarshal(b, &result)
 
 
@@ -76,7 +75,7 @@ func convertGetResponse(b []byte) ([]*store.Response, error) {
 		}
 		}
 
 
 	} else {
 	} else {
-		results = make([]*store.Response, 1)
+		results = make([]*Response, 1)
 		results[0] = result
 		results[0] = result
 	}
 	}
 	return results, nil
 	return results, nil

+ 26 - 0
third_party/github.com/coreos/go-etcd/etcd/response.go

@@ -0,0 +1,26 @@
+package etcd
+
+import (
+	"time"
+)
+
+// The response object from the server.
+type Response struct {
+	Action    string `json:"action"`
+	Key       string `json:"key"`
+	Dir       bool   `json:"dir,omitempty"`
+	PrevValue string `json:"prevValue,omitempty"`
+	Value     string `json:"value,omitempty"`
+
+	// If the key did not exist before the action,
+	// this field should be set to true
+	NewKey bool `json:"newKey,omitempty"`
+
+	Expiration *time.Time `json:"expiration,omitempty"`
+
+	// Time to live in second
+	TTL int64 `json:"ttl,omitempty"`
+
+	// The command index of the raft machine when the command is executed
+	Index uint64 `json:"index"`
+}

+ 4 - 5
third_party/github.com/coreos/go-etcd/etcd/set.go

@@ -3,14 +3,13 @@ package etcd
 import (
 import (
 	"encoding/json"
 	"encoding/json"
 	"fmt"
 	"fmt"
-	"github.com/coreos/etcd/store"
 	"io/ioutil"
 	"io/ioutil"
 	"net/http"
 	"net/http"
 	"net/url"
 	"net/url"
 	"path"
 	"path"
 )
 )
 
 
-func (c *Client) Set(key string, value string, ttl uint64) (*store.Response, error) {
+func (c *Client) Set(key string, value string, ttl uint64) (*Response, error) {
 	logger.Debugf("set %s, %s, ttl: %d, [%s]", key, value, ttl, c.cluster.Leader)
 	logger.Debugf("set %s, %s, ttl: %d, [%s]", key, value, ttl, c.cluster.Leader)
 	v := url.Values{}
 	v := url.Values{}
 	v.Set("value", value)
 	v.Set("value", value)
@@ -45,7 +44,7 @@ func (c *Client) Set(key string, value string, ttl uint64) (*store.Response, err
 // SetTo sets the value of the key to a given machine address.
 // SetTo sets the value of the key to a given machine address.
 // If the given machine is not available or is not leader it returns an error
 // If the given machine is not available or is not leader it returns an error
 // Mainly use for testing purpose.
 // Mainly use for testing purpose.
-func (c *Client) SetTo(key string, value string, ttl uint64, addr string) (*store.Response, error) {
+func (c *Client) SetTo(key string, value string, ttl uint64, addr string) (*Response, error) {
 	v := url.Values{}
 	v := url.Values{}
 	v.Set("value", value)
 	v.Set("value", value)
 
 
@@ -77,8 +76,8 @@ func (c *Client) SetTo(key string, value string, ttl uint64, addr string) (*stor
 }
 }
 
 
 // Convert byte stream to response.
 // Convert byte stream to response.
-func convertSetResponse(b []byte) (*store.Response, error) {
-	var result store.Response
+func convertSetResponse(b []byte) (*Response, error) {
+	var result Response
 
 
 	err := json.Unmarshal(b, &result)
 	err := json.Unmarshal(b, &result)
 
 

+ 2 - 3
third_party/github.com/coreos/go-etcd/etcd/testAndSet.go

@@ -3,14 +3,13 @@ package etcd
 import (
 import (
 	"encoding/json"
 	"encoding/json"
 	"fmt"
 	"fmt"
-	"github.com/coreos/etcd/store"
 	"io/ioutil"
 	"io/ioutil"
 	"net/http"
 	"net/http"
 	"net/url"
 	"net/url"
 	"path"
 	"path"
 )
 )
 
 
-func (c *Client) TestAndSet(key string, prevValue string, value string, ttl uint64) (*store.Response, bool, error) {
+func (c *Client) TestAndSet(key string, prevValue string, value string, ttl uint64) (*Response, bool, error) {
 	logger.Debugf("set %s, %s[%s], ttl: %d, [%s]", key, value, prevValue, ttl, c.cluster.Leader)
 	logger.Debugf("set %s, %s[%s], ttl: %d, [%s]", key, value, prevValue, ttl, c.cluster.Leader)
 	v := url.Values{}
 	v := url.Values{}
 	v.Set("value", value)
 	v.Set("value", value)
@@ -39,7 +38,7 @@ func (c *Client) TestAndSet(key string, prevValue string, value string, ttl uint
 		return nil, false, handleError(b)
 		return nil, false, handleError(b)
 	}
 	}
 
 
-	var result store.Response
+	var result Response
 
 
 	err = json.Unmarshal(b, &result)
 	err = json.Unmarshal(b, &result)
 
 

+ 3 - 4
third_party/github.com/coreos/go-etcd/etcd/watch.go

@@ -4,7 +4,6 @@ import (
 	"encoding/json"
 	"encoding/json"
 	"errors"
 	"errors"
 	"fmt"
 	"fmt"
-	"github.com/coreos/etcd/store"
 	"io/ioutil"
 	"io/ioutil"
 	"net/http"
 	"net/http"
 	"net/url"
 	"net/url"
@@ -28,7 +27,7 @@ var (
 // channel. And after someone receive the channel, it will go on to watch that prefix.
 // channel. And after someone receive the channel, it will go on to watch that prefix.
 // If a stop channel is given, client can close long-term watch using the stop channel
 // If a stop channel is given, client can close long-term watch using the stop channel
 
 
-func (c *Client) Watch(prefix string, sinceIndex uint64, receiver chan *store.Response, stop chan bool) (*store.Response, error) {
+func (c *Client) Watch(prefix string, sinceIndex uint64, receiver chan *Response, stop chan bool) (*Response, error) {
 	logger.Debugf("watch %s [%s]", prefix, c.cluster.Leader)
 	logger.Debugf("watch %s [%s]", prefix, c.cluster.Leader)
 	if receiver == nil {
 	if receiver == nil {
 		return c.watchOnce(prefix, sinceIndex, stop)
 		return c.watchOnce(prefix, sinceIndex, stop)
@@ -50,7 +49,7 @@ func (c *Client) Watch(prefix string, sinceIndex uint64, receiver chan *store.Re
 
 
 // helper func
 // helper func
 // return when there is change under the given prefix
 // return when there is change under the given prefix
-func (c *Client) watchOnce(key string, sinceIndex uint64, stop chan bool) (*store.Response, error) {
+func (c *Client) watchOnce(key string, sinceIndex uint64, stop chan bool) (*Response, error) {
 
 
 	var resp *http.Response
 	var resp *http.Response
 	var err error
 	var err error
@@ -94,7 +93,7 @@ func (c *Client) watchOnce(key string, sinceIndex uint64, stop chan bool) (*stor
 		return nil, handleError(b)
 		return nil, handleError(b)
 	}
 	}
 
 
-	var result store.Response
+	var result Response
 
 
 	err = json.Unmarshal(b, &result)
 	err = json.Unmarshal(b, &result)
 
 

+ 2 - 3
third_party/github.com/coreos/go-etcd/etcd/watch_test.go

@@ -2,7 +2,6 @@ package etcd
 
 
 import (
 import (
 	"fmt"
 	"fmt"
-	"github.com/coreos/etcd/store"
 	"testing"
 	"testing"
 	"time"
 	"time"
 )
 )
@@ -30,7 +29,7 @@ func TestWatch(t *testing.T) {
 		t.Fatalf("Watch with Index failed with %s %s %v %v", result.Key, result.Value, result.TTL, result.Index)
 		t.Fatalf("Watch with Index failed with %s %s %v %v", result.Key, result.Value, result.TTL, result.Index)
 	}
 	}
 
 
-	ch := make(chan *store.Response, 10)
+	ch := make(chan *Response, 10)
 	stop := make(chan bool, 1)
 	stop := make(chan bool, 1)
 
 
 	go setLoop("bar", c)
 	go setLoop("bar", c)
@@ -57,7 +56,7 @@ func setLoop(value string, c *Client) {
 	}
 	}
 }
 }
 
 
-func receiver(c chan *store.Response, stop chan bool) {
+func receiver(c chan *Response, stop chan bool) {
 	for i := 0; i < 10; i++ {
 	for i := 0; i < 10; i++ {
 		<-c
 		<-c
 	}
 	}

+ 1 - 1
third_party/github.com/coreos/go-raft/README.md

@@ -57,7 +57,7 @@ A distributed consensus protocol is used for maintaining a consistent state acro
 Many distributed systems are built upon the Paxos protocol but Paxos can be difficult to understand and there are many gaps between Paxos and real world implementation.
 Many distributed systems are built upon the Paxos protocol but Paxos can be difficult to understand and there are many gaps between Paxos and real world implementation.
 
 
 An alternative is the [Raft distributed consensus protocol][raft-paper] by Diego Ongaro and John Ousterhout.
 An alternative is the [Raft distributed consensus protocol][raft-paper] by Diego Ongaro and John Ousterhout.
-Raft is a protocol built with understandability as a primary tenant and it centers around two things:
+Raft is a protocol built with understandability as a primary tenet and it centers around two things:
 
 
 1. Leader Election
 1. Leader Election
 2. Replicated Log
 2. Replicated Log

+ 1 - 1
third_party/github.com/coreos/go-raft/command.go

@@ -29,7 +29,7 @@ func init() {
 // A command represents an action to be taken on the replicated state machine.
 // A command represents an action to be taken on the replicated state machine.
 type Command interface {
 type Command interface {
 	CommandName() string
 	CommandName() string
-	Apply(server *Server) (interface{}, error)
+	Apply(server Server) (interface{}, error)
 }
 }
 
 
 type CommandEncoder interface {
 type CommandEncoder interface {

+ 9 - 9
third_party/github.com/coreos/go-raft/http_transporter.go

@@ -77,7 +77,7 @@ func (t *HTTPTransporter) RequestVotePath() string {
 //--------------------------------------
 //--------------------------------------
 
 
 // Applies Raft routes to an HTTP router for a given server.
 // Applies Raft routes to an HTTP router for a given server.
-func (t *HTTPTransporter) Install(server *Server, mux HTTPMuxer) {
+func (t *HTTPTransporter) Install(server Server, mux HTTPMuxer) {
 	mux.HandleFunc(t.AppendEntriesPath(), t.appendEntriesHandler(server))
 	mux.HandleFunc(t.AppendEntriesPath(), t.appendEntriesHandler(server))
 	mux.HandleFunc(t.RequestVotePath(), t.requestVoteHandler(server))
 	mux.HandleFunc(t.RequestVotePath(), t.requestVoteHandler(server))
 }
 }
@@ -87,14 +87,14 @@ func (t *HTTPTransporter) Install(server *Server, mux HTTPMuxer) {
 //--------------------------------------
 //--------------------------------------
 
 
 // Sends an AppendEntries RPC to a peer.
 // Sends an AppendEntries RPC to a peer.
-func (t *HTTPTransporter) SendAppendEntriesRequest(server *Server, peer *Peer, req *AppendEntriesRequest) *AppendEntriesResponse {
+func (t *HTTPTransporter) SendAppendEntriesRequest(server Server, peer *Peer, req *AppendEntriesRequest) *AppendEntriesResponse {
 	var b bytes.Buffer
 	var b bytes.Buffer
 	if _, err := req.Encode(&b); err != nil {
 	if _, err := req.Encode(&b); err != nil {
 		traceln("transporter.ae.encoding.error:", err)
 		traceln("transporter.ae.encoding.error:", err)
 		return nil
 		return nil
 	}
 	}
 
 
-	url := fmt.Sprintf("http://%s%s", peer.Name, t.AppendEntriesPath())
+	url := fmt.Sprintf("%s%s", peer.ConnectionString, t.AppendEntriesPath())
 	traceln(server.Name(), "POST", url)
 	traceln(server.Name(), "POST", url)
 
 
 	client := &http.Client{Transport: &http.Transport{DisableKeepAlives: t.DisableKeepAlives}}
 	client := &http.Client{Transport: &http.Transport{DisableKeepAlives: t.DisableKeepAlives}}
@@ -115,14 +115,14 @@ func (t *HTTPTransporter) SendAppendEntriesRequest(server *Server, peer *Peer, r
 }
 }
 
 
 // Sends a RequestVote RPC to a peer.
 // Sends a RequestVote RPC to a peer.
-func (t *HTTPTransporter) SendVoteRequest(server *Server, peer *Peer, req *RequestVoteRequest) *RequestVoteResponse {
+func (t *HTTPTransporter) SendVoteRequest(server Server, peer *Peer, req *RequestVoteRequest) *RequestVoteResponse {
 	var b bytes.Buffer
 	var b bytes.Buffer
 	if _, err := req.Encode(&b); err != nil {
 	if _, err := req.Encode(&b); err != nil {
 		traceln("transporter.rv.encoding.error:", err)
 		traceln("transporter.rv.encoding.error:", err)
 		return nil
 		return nil
 	}
 	}
 
 
-	url := fmt.Sprintf("http://%s%s", peer.Name, t.RequestVotePath())
+	url := fmt.Sprintf("%s%s", peer.ConnectionString, t.RequestVotePath())
 	traceln(server.Name(), "POST", url)
 	traceln(server.Name(), "POST", url)
 
 
 	client := &http.Client{Transport: &http.Transport{DisableKeepAlives: t.DisableKeepAlives}}
 	client := &http.Client{Transport: &http.Transport{DisableKeepAlives: t.DisableKeepAlives}}
@@ -143,12 +143,12 @@ func (t *HTTPTransporter) SendVoteRequest(server *Server, peer *Peer, req *Reque
 }
 }
 
 
 // Sends a SnapshotRequest RPC to a peer.
 // Sends a SnapshotRequest RPC to a peer.
-func (t *HTTPTransporter) SendSnapshotRequest(server *Server, peer *Peer, req *SnapshotRequest) *SnapshotResponse {
+func (t *HTTPTransporter) SendSnapshotRequest(server Server, peer *Peer, req *SnapshotRequest) *SnapshotResponse {
 	return nil
 	return nil
 }
 }
 
 
 // Sends a SnapshotRequest RPC to a peer.
 // Sends a SnapshotRequest RPC to a peer.
-func (t *HTTPTransporter) SendSnapshotRecoveryRequest(server *Server, peer *Peer, req *SnapshotRecoveryRequest) *SnapshotRecoveryResponse {
+func (t *HTTPTransporter) SendSnapshotRecoveryRequest(server Server, peer *Peer, req *SnapshotRecoveryRequest) *SnapshotRecoveryResponse {
 	return nil
 	return nil
 }
 }
 
 
@@ -157,7 +157,7 @@ func (t *HTTPTransporter) SendSnapshotRecoveryRequest(server *Server, peer *Peer
 //--------------------------------------
 //--------------------------------------
 
 
 // Handles incoming AppendEntries requests.
 // Handles incoming AppendEntries requests.
-func (t *HTTPTransporter) appendEntriesHandler(server *Server) http.HandlerFunc {
+func (t *HTTPTransporter) appendEntriesHandler(server Server) http.HandlerFunc {
 	return func(w http.ResponseWriter, r *http.Request) {
 	return func(w http.ResponseWriter, r *http.Request) {
 		traceln(server.Name(), "RECV /appendEntries")
 		traceln(server.Name(), "RECV /appendEntries")
 
 
@@ -176,7 +176,7 @@ func (t *HTTPTransporter) appendEntriesHandler(server *Server) http.HandlerFunc
 }
 }
 
 
 // Handles incoming RequestVote requests.
 // Handles incoming RequestVote requests.
-func (t *HTTPTransporter) requestVoteHandler(server *Server) http.HandlerFunc {
+func (t *HTTPTransporter) requestVoteHandler(server Server) http.HandlerFunc {
 	return func(w http.ResponseWriter, r *http.Request) {
 	return func(w http.ResponseWriter, r *http.Request) {
 		traceln(server.Name(), "RECV /requestVote")
 		traceln(server.Name(), "RECV /requestVote")
 
 

+ 9 - 9
third_party/github.com/coreos/go-raft/http_transporter_test.go

@@ -14,8 +14,8 @@ func TestHTTPTransporter(t *testing.T) {
 	transporter := NewHTTPTransporter("/raft")
 	transporter := NewHTTPTransporter("/raft")
 	transporter.DisableKeepAlives = true
 	transporter.DisableKeepAlives = true
 
 
-	servers := []*Server{}
-	f0 := func(server *Server, httpServer *http.Server) {
+	servers := []Server{}
+	f0 := func(server Server, httpServer *http.Server) {
 		// Stop the leader and wait for an election.
 		// Stop the leader and wait for an election.
 		server.Stop()
 		server.Stop()
 		time.Sleep(testElectionTimeout * 2)
 		time.Sleep(testElectionTimeout * 2)
@@ -25,15 +25,15 @@ func TestHTTPTransporter(t *testing.T) {
 		}
 		}
 		server.Start()
 		server.Start()
 	}
 	}
-	f1 := func(server *Server, httpServer *http.Server) {
+	f1 := func(server Server, httpServer *http.Server) {
 	}
 	}
-	f2 := func(server *Server, httpServer *http.Server) {
+	f2 := func(server Server, httpServer *http.Server) {
 	}
 	}
 	runTestHttpServers(t, &servers, transporter, f0, f1, f2)
 	runTestHttpServers(t, &servers, transporter, f0, f1, f2)
 }
 }
 
 
 // Starts multiple independent Raft servers wrapped with HTTP servers.
 // Starts multiple independent Raft servers wrapped with HTTP servers.
-func runTestHttpServers(t *testing.T, servers *[]*Server, transporter *HTTPTransporter, callbacks ...func(*Server, *http.Server)) {
+func runTestHttpServers(t *testing.T, servers *[]Server, transporter *HTTPTransporter, callbacks ...func(Server, *http.Server)) {
 	var wg sync.WaitGroup
 	var wg sync.WaitGroup
 	httpServers := []*http.Server{}
 	httpServers := []*http.Server{}
 	listeners := []net.Listener{}
 	listeners := []net.Listener{}
@@ -68,7 +68,7 @@ func runTestHttpServers(t *testing.T, servers *[]*Server, transporter *HTTPTrans
 
 
 	// Setup configuration.
 	// Setup configuration.
 	for _, server := range *servers {
 	for _, server := range *servers {
-		if _, err := (*servers)[0].Do(&DefaultJoinCommand{Name: server.Name()}); err != nil {
+		if _, err := (*servers)[0].Do(&DefaultJoinCommand{Name: server.Name(), ConnectionString: fmt.Sprintf("http://%s", server.Name())}); err != nil {
 			t.Fatalf("Server %s unable to join: %v", server.Name(), err)
 			t.Fatalf("Server %s unable to join: %v", server.Name(), err)
 		}
 		}
 	}
 	}
@@ -94,7 +94,7 @@ func BenchmarkSpeed(b *testing.B) {
 	transporter := NewHTTPTransporter("/raft")
 	transporter := NewHTTPTransporter("/raft")
 	transporter.DisableKeepAlives = true
 	transporter.DisableKeepAlives = true
 
 
-	servers := []*Server{}
+	servers := []Server{}
 
 
 	for i := 0; i < 3; i++ {
 	for i := 0; i < 3; i++ {
 		port := 9000 + i
 		port := 9000 + i
@@ -125,7 +125,7 @@ func BenchmarkSpeed(b *testing.B) {
 
 
 	// Setup configuration.
 	// Setup configuration.
 	for _, server := range servers {
 	for _, server := range servers {
-		(servers)[0].Do(&DefaultJoinCommand{Name: server.Name()})
+		(servers)[0].Do(&DefaultJoinCommand{Name: server.Name(), ConnectionString: fmt.Sprintf("http://%s", server.Name())})
 	}
 	}
 
 
 	c := make(chan bool)
 	c := make(chan bool)
@@ -145,7 +145,7 @@ func BenchmarkSpeed(b *testing.B) {
 	}
 	}
 }
 }
 
 
-func send(c chan bool, s *Server) {
+func send(c chan bool, s Server) {
 	for i := 0; i < 20; i++ {
 	for i := 0; i < 20; i++ {
 		s.Do(&NOPCommand{})
 		s.Do(&NOPCommand{})
 	}
 	}

+ 2 - 2
third_party/github.com/coreos/go-raft/join_command.go

@@ -3,7 +3,7 @@ package raft
 // Join command interface
 // Join command interface
 type JoinCommand interface {
 type JoinCommand interface {
 	CommandName() string
 	CommandName() string
-	Apply(server *Server) (interface{}, error)
+	Apply(server Server) (interface{}, error)
 	NodeName() string
 	NodeName() string
 }
 }
 
 
@@ -18,7 +18,7 @@ func (c *DefaultJoinCommand) CommandName() string {
 	return "raft:join"
 	return "raft:join"
 }
 }
 
 
-func (c *DefaultJoinCommand) Apply(server *Server) (interface{}, error) {
+func (c *DefaultJoinCommand) Apply(server Server) (interface{}, error) {
 	err := server.AddPeer(c.Name, c.ConnectionString)
 	err := server.AddPeer(c.Name, c.ConnectionString)
 
 
 	return []byte("join"), err
 	return []byte("join"), err

+ 2 - 2
third_party/github.com/coreos/go-raft/leave_command.go

@@ -3,7 +3,7 @@ package raft
 // Leave command interface
 // Leave command interface
 type LeaveCommand interface {
 type LeaveCommand interface {
 	CommandName() string
 	CommandName() string
-	Apply(server *Server) (interface{}, error)
+	Apply(server Server) (interface{}, error)
 	NodeName() string
 	NodeName() string
 }
 }
 
 
@@ -17,7 +17,7 @@ func (c *DefaultLeaveCommand) CommandName() string {
 	return "raft:leave"
 	return "raft:leave"
 }
 }
 
 
-func (c *DefaultLeaveCommand) Apply(server *Server) (interface{}, error) {
+func (c *DefaultLeaveCommand) Apply(server Server) (interface{}, error) {
 	err := server.RemovePeer(c.Name)
 	err := server.RemovePeer(c.Name)
 
 
 	return []byte("leave"), err
 	return []byte("leave"), err

+ 1 - 1
third_party/github.com/coreos/go-raft/nop_command.go

@@ -13,7 +13,7 @@ func (c NOPCommand) CommandName() string {
 	return "raft:nop"
 	return "raft:nop"
 }
 }
 
 
-func (c NOPCommand) Apply(server *Server) (interface{}, error) {
+func (c NOPCommand) Apply(server Server) (interface{}, error) {
 	return nil, nil
 	return nil, nil
 }
 }
 
 

+ 2 - 2
third_party/github.com/coreos/go-raft/peer.go

@@ -13,7 +13,7 @@ import (
 
 
 // A peer is a reference to another server involved in the consensus protocol.
 // A peer is a reference to another server involved in the consensus protocol.
 type Peer struct {
 type Peer struct {
-	server           *Server
+	server           *server
 	Name             string `json:"name"`
 	Name             string `json:"name"`
 	ConnectionString string `json:"connectionString"`
 	ConnectionString string `json:"connectionString"`
 	prevLogIndex     uint64
 	prevLogIndex     uint64
@@ -29,7 +29,7 @@ type Peer struct {
 //------------------------------------------------------------------------------
 //------------------------------------------------------------------------------
 
 
 // Creates a new peer.
 // Creates a new peer.
-func newPeer(server *Server, name string, connectionString string, heartbeatTimeout time.Duration) *Peer {
+func newPeer(server *server, name string, connectionString string, heartbeatTimeout time.Duration) *Peer {
 	return &Peer{
 	return &Peer{
 		server:           server,
 		server:           server,
 		Name:             name,
 		Name:             name,

+ 99 - 60
third_party/github.com/coreos/go-raft/server.go

@@ -57,7 +57,46 @@ var CommandTimeoutError = errors.New("raft: Command timeout")
 
 
 // A server is involved in the consensus protocol and can act as a follower,
 // A server is involved in the consensus protocol and can act as a follower,
 // candidate or a leader.
 // candidate or a leader.
-type Server struct {
+type Server interface {
+	Name() string
+	Context() interface{}
+	StateMachine() StateMachine
+	Leader() string
+	State() string
+	Path() string
+	LogPath() string
+	SnapshotPath(lastIndex uint64, lastTerm uint64) string
+	Term() uint64
+	CommitIndex() uint64
+	VotedFor() string
+	MemberCount() int
+	QuorumSize() int
+	IsLogEmpty() bool
+	LogEntries() []*LogEntry
+	LastCommandName() string
+	GetState() string
+	ElectionTimeout() time.Duration
+	SetElectionTimeout(duration time.Duration)
+	HeartbeatTimeout() time.Duration
+	SetHeartbeatTimeout(duration time.Duration)
+	Transporter() Transporter
+	SetTransporter(t Transporter)
+	AppendEntries(req *AppendEntriesRequest) *AppendEntriesResponse
+	RequestVote(req *RequestVoteRequest) *RequestVoteResponse
+	RequestSnapshot(req *SnapshotRequest) *SnapshotResponse
+	SnapshotRecoveryRequest(req *SnapshotRecoveryRequest) *SnapshotRecoveryResponse
+	AddPeer(name string, connectiongString string) error
+	RemovePeer(name string) error
+	Peers() map[string]*Peer
+	Start() error
+	Stop()
+	Running() bool
+	Do(command Command) (interface{}, error)
+	TakeSnapshot() error
+	LoadSnapshot() error
+}
+
+type server struct {
 	name        string
 	name        string
 	path        string
 	path        string
 	state       string
 	state       string
@@ -98,7 +137,7 @@ type event struct {
 //------------------------------------------------------------------------------
 //------------------------------------------------------------------------------
 
 
 // Creates a new server with a log at the given path.
 // Creates a new server with a log at the given path.
-func NewServer(name string, path string, transporter Transporter, stateMachine StateMachine, context interface{}, connectionString string) (*Server, error) {
+func NewServer(name string, path string, transporter Transporter, stateMachine StateMachine, context interface{}, connectionString string) (Server, error) {
 	if name == "" {
 	if name == "" {
 		return nil, errors.New("raft.Server: Name cannot be blank")
 		return nil, errors.New("raft.Server: Name cannot be blank")
 	}
 	}
@@ -106,7 +145,7 @@ func NewServer(name string, path string, transporter Transporter, stateMachine S
 		panic("raft: Transporter required")
 		panic("raft: Transporter required")
 	}
 	}
 
 
-	s := &Server{
+	s := &server{
 		name:                    name,
 		name:                    name,
 		path:                    path,
 		path:                    path,
 		transporter:             transporter,
 		transporter:             transporter,
@@ -142,22 +181,22 @@ func NewServer(name string, path string, transporter Transporter, stateMachine S
 //--------------------------------------
 //--------------------------------------
 
 
 // Retrieves the name of the server.
 // Retrieves the name of the server.
-func (s *Server) Name() string {
+func (s *server) Name() string {
 	return s.name
 	return s.name
 }
 }
 
 
 // Retrieves the storage path for the server.
 // Retrieves the storage path for the server.
-func (s *Server) Path() string {
+func (s *server) Path() string {
 	return s.path
 	return s.path
 }
 }
 
 
 // The name of the current leader.
 // The name of the current leader.
-func (s *Server) Leader() string {
+func (s *server) Leader() string {
 	return s.leader
 	return s.leader
 }
 }
 
 
 // Retrieves a copy of the peer data.
 // Retrieves a copy of the peer data.
-func (s *Server) Peers() map[string]*Peer {
+func (s *server) Peers() map[string]*Peer {
 	s.mutex.Lock()
 	s.mutex.Lock()
 	defer s.mutex.Unlock()
 	defer s.mutex.Unlock()
 
 
@@ -169,42 +208,42 @@ func (s *Server) Peers() map[string]*Peer {
 }
 }
 
 
 // Retrieves the object that transports requests.
 // Retrieves the object that transports requests.
-func (s *Server) Transporter() Transporter {
+func (s *server) Transporter() Transporter {
 	s.mutex.RLock()
 	s.mutex.RLock()
 	defer s.mutex.RUnlock()
 	defer s.mutex.RUnlock()
 	return s.transporter
 	return s.transporter
 }
 }
 
 
-func (s *Server) SetTransporter(t Transporter) {
+func (s *server) SetTransporter(t Transporter) {
 	s.mutex.Lock()
 	s.mutex.Lock()
 	defer s.mutex.Unlock()
 	defer s.mutex.Unlock()
 	s.transporter = t
 	s.transporter = t
 }
 }
 
 
 // Retrieves the context passed into the constructor.
 // Retrieves the context passed into the constructor.
-func (s *Server) Context() interface{} {
+func (s *server) Context() interface{} {
 	return s.context
 	return s.context
 }
 }
 
 
 // Retrieves the state machine passed into the constructor.
 // Retrieves the state machine passed into the constructor.
-func (s *Server) StateMachine() StateMachine {
+func (s *server) StateMachine() StateMachine {
 	return s.stateMachine
 	return s.stateMachine
 }
 }
 
 
 // Retrieves the log path for the server.
 // Retrieves the log path for the server.
-func (s *Server) LogPath() string {
+func (s *server) LogPath() string {
 	return path.Join(s.path, "log")
 	return path.Join(s.path, "log")
 }
 }
 
 
 // Retrieves the current state of the server.
 // Retrieves the current state of the server.
-func (s *Server) State() string {
+func (s *server) State() string {
 	s.mutex.RLock()
 	s.mutex.RLock()
 	defer s.mutex.RUnlock()
 	defer s.mutex.RUnlock()
 	return s.state
 	return s.state
 }
 }
 
 
 // Sets the state of the server.
 // Sets the state of the server.
-func (s *Server) setState(state string) {
+func (s *server) setState(state string) {
 	s.mutex.Lock()
 	s.mutex.Lock()
 	defer s.mutex.Unlock()
 	defer s.mutex.Unlock()
 	s.state = state
 	s.state = state
@@ -214,44 +253,44 @@ func (s *Server) setState(state string) {
 }
 }
 
 
 // Retrieves the current term of the server.
 // Retrieves the current term of the server.
-func (s *Server) Term() uint64 {
+func (s *server) Term() uint64 {
 	return s.currentTerm
 	return s.currentTerm
 }
 }
 
 
 // Retrieves the current commit index of the server.
 // Retrieves the current commit index of the server.
-func (s *Server) CommitIndex() uint64 {
+func (s *server) CommitIndex() uint64 {
 	return s.log.commitIndex
 	return s.log.commitIndex
 }
 }
 
 
 // Retrieves the name of the candidate this server voted for in this term.
 // Retrieves the name of the candidate this server voted for in this term.
-func (s *Server) VotedFor() string {
+func (s *server) VotedFor() string {
 	return s.votedFor
 	return s.votedFor
 }
 }
 
 
 // Retrieves whether the server's log has no entries.
 // Retrieves whether the server's log has no entries.
-func (s *Server) IsLogEmpty() bool {
+func (s *server) IsLogEmpty() bool {
 	return s.log.isEmpty()
 	return s.log.isEmpty()
 }
 }
 
 
 // A list of all the log entries. This should only be used for debugging purposes.
 // A list of all the log entries. This should only be used for debugging purposes.
-func (s *Server) LogEntries() []*LogEntry {
+func (s *server) LogEntries() []*LogEntry {
 	return s.log.entries
 	return s.log.entries
 }
 }
 
 
 // A reference to the command name of the last entry.
 // A reference to the command name of the last entry.
-func (s *Server) LastCommandName() string {
+func (s *server) LastCommandName() string {
 	return s.log.lastCommandName()
 	return s.log.lastCommandName()
 }
 }
 
 
 // Get the state of the server for debugging
 // Get the state of the server for debugging
-func (s *Server) GetState() string {
+func (s *server) GetState() string {
 	s.mutex.RLock()
 	s.mutex.RLock()
 	defer s.mutex.RUnlock()
 	defer s.mutex.RUnlock()
 	return fmt.Sprintf("Name: %s, State: %s, Term: %v, CommitedIndex: %v ", s.name, s.state, s.currentTerm, s.log.commitIndex)
 	return fmt.Sprintf("Name: %s, State: %s, Term: %v, CommitedIndex: %v ", s.name, s.state, s.currentTerm, s.log.commitIndex)
 }
 }
 
 
 // Check if the server is promotable
 // Check if the server is promotable
-func (s *Server) promotable() bool {
+func (s *server) promotable() bool {
 	return s.log.currentIndex() > 0
 	return s.log.currentIndex() > 0
 }
 }
 
 
@@ -260,14 +299,14 @@ func (s *Server) promotable() bool {
 //--------------------------------------
 //--------------------------------------
 
 
 // Retrieves the number of member servers in the consensus.
 // Retrieves the number of member servers in the consensus.
-func (s *Server) MemberCount() int {
+func (s *server) MemberCount() int {
 	s.mutex.Lock()
 	s.mutex.Lock()
 	defer s.mutex.Unlock()
 	defer s.mutex.Unlock()
 	return len(s.peers) + 1
 	return len(s.peers) + 1
 }
 }
 
 
 // Retrieves the number of servers required to make a quorum.
 // Retrieves the number of servers required to make a quorum.
-func (s *Server) QuorumSize() int {
+func (s *server) QuorumSize() int {
 	return (s.MemberCount() / 2) + 1
 	return (s.MemberCount() / 2) + 1
 }
 }
 
 
@@ -276,14 +315,14 @@ func (s *Server) QuorumSize() int {
 //--------------------------------------
 //--------------------------------------
 
 
 // Retrieves the election timeout.
 // Retrieves the election timeout.
-func (s *Server) ElectionTimeout() time.Duration {
+func (s *server) ElectionTimeout() time.Duration {
 	s.mutex.RLock()
 	s.mutex.RLock()
 	defer s.mutex.RUnlock()
 	defer s.mutex.RUnlock()
 	return s.electionTimeout
 	return s.electionTimeout
 }
 }
 
 
 // Sets the election timeout.
 // Sets the election timeout.
-func (s *Server) SetElectionTimeout(duration time.Duration) {
+func (s *server) SetElectionTimeout(duration time.Duration) {
 	s.mutex.Lock()
 	s.mutex.Lock()
 	defer s.mutex.Unlock()
 	defer s.mutex.Unlock()
 	s.electionTimeout = duration
 	s.electionTimeout = duration
@@ -294,14 +333,14 @@ func (s *Server) SetElectionTimeout(duration time.Duration) {
 //--------------------------------------
 //--------------------------------------
 
 
 // Retrieves the heartbeat timeout.
 // Retrieves the heartbeat timeout.
-func (s *Server) HeartbeatTimeout() time.Duration {
+func (s *server) HeartbeatTimeout() time.Duration {
 	s.mutex.RLock()
 	s.mutex.RLock()
 	defer s.mutex.RUnlock()
 	defer s.mutex.RUnlock()
 	return s.heartbeatTimeout
 	return s.heartbeatTimeout
 }
 }
 
 
 // Sets the heartbeat timeout.
 // Sets the heartbeat timeout.
-func (s *Server) SetHeartbeatTimeout(duration time.Duration) {
+func (s *server) SetHeartbeatTimeout(duration time.Duration) {
 	s.mutex.Lock()
 	s.mutex.Lock()
 	defer s.mutex.Unlock()
 	defer s.mutex.Unlock()
 
 
@@ -334,7 +373,7 @@ func init() {
 // If no log entries exist and a self-join command is issued then
 // If no log entries exist and a self-join command is issued then
 // immediately become leader and commit entry.
 // immediately become leader and commit entry.
 
 
-func (s *Server) Start() error {
+func (s *server) Start() error {
 	// Exit if the server is already running.
 	// Exit if the server is already running.
 	if s.state != Stopped {
 	if s.state != Stopped {
 		return errors.New("raft.Server: Server already running")
 		return errors.New("raft.Server: Server already running")
@@ -380,7 +419,7 @@ func (s *Server) Start() error {
 }
 }
 
 
 // Shuts down the server.
 // Shuts down the server.
-func (s *Server) Stop() {
+func (s *server) Stop() {
 	s.send(&stopValue)
 	s.send(&stopValue)
 	s.mutex.Lock()
 	s.mutex.Lock()
 	defer s.mutex.Unlock()
 	defer s.mutex.Unlock()
@@ -388,7 +427,7 @@ func (s *Server) Stop() {
 }
 }
 
 
 // Checks if the server is currently running.
 // Checks if the server is currently running.
-func (s *Server) Running() bool {
+func (s *server) Running() bool {
 	s.mutex.RLock()
 	s.mutex.RLock()
 	defer s.mutex.RUnlock()
 	defer s.mutex.RUnlock()
 	return s.state != Stopped
 	return s.state != Stopped
@@ -400,7 +439,7 @@ func (s *Server) Running() bool {
 
 
 // Sets the current term for the server. This is only used when an external
 // Sets the current term for the server. This is only used when an external
 // current term is found.
 // current term is found.
-func (s *Server) setCurrentTerm(term uint64, leaderName string, append bool) {
+func (s *server) setCurrentTerm(term uint64, leaderName string, append bool) {
 	s.mutex.Lock()
 	s.mutex.Lock()
 	defer s.mutex.Unlock()
 	defer s.mutex.Unlock()
 
 
@@ -439,7 +478,7 @@ func (s *Server) setCurrentTerm(term uint64, leaderName string, append bool) {
 //                    |            new leader |                                     |
 //                    |            new leader |                                     |
 //                    |_______________________|____________________________________ |
 //                    |_______________________|____________________________________ |
 // The main event loop for the server
 // The main event loop for the server
-func (s *Server) loop() {
+func (s *server) loop() {
 	defer s.debugln("server.loop.end")
 	defer s.debugln("server.loop.end")
 
 
 	for {
 	for {
@@ -467,13 +506,13 @@ func (s *Server) loop() {
 
 
 // Sends an event to the event loop to be processed. The function will wait
 // Sends an event to the event loop to be processed. The function will wait
 // until the event is actually processed before returning.
 // until the event is actually processed before returning.
-func (s *Server) send(value interface{}) (interface{}, error) {
+func (s *server) send(value interface{}) (interface{}, error) {
 	event := s.sendAsync(value)
 	event := s.sendAsync(value)
 	err := <-event.c
 	err := <-event.c
 	return event.returnValue, err
 	return event.returnValue, err
 }
 }
 
 
-func (s *Server) sendAsync(value interface{}) *event {
+func (s *server) sendAsync(value interface{}) *event {
 	event := &event{target: value, c: make(chan error, 1)}
 	event := &event{target: value, c: make(chan error, 1)}
 	s.c <- event
 	s.c <- event
 	return event
 	return event
@@ -484,7 +523,7 @@ func (s *Server) sendAsync(value interface{}) *event {
 // Converts to candidate if election timeout elapses without either:
 // Converts to candidate if election timeout elapses without either:
 //   1.Receiving valid AppendEntries RPC, or
 //   1.Receiving valid AppendEntries RPC, or
 //   2.Granting vote to candidate
 //   2.Granting vote to candidate
-func (s *Server) followerLoop() {
+func (s *server) followerLoop() {
 
 
 	s.setState(Follower)
 	s.setState(Follower)
 	timeoutChan := afterBetween(s.ElectionTimeout(), s.ElectionTimeout()*2)
 	timeoutChan := afterBetween(s.ElectionTimeout(), s.ElectionTimeout()*2)
@@ -547,7 +586,7 @@ func (s *Server) followerLoop() {
 }
 }
 
 
 // The event loop that is run when the server is in a Candidate state.
 // The event loop that is run when the server is in a Candidate state.
-func (s *Server) candidateLoop() {
+func (s *server) candidateLoop() {
 	lastLogIndex, lastLogTerm := s.log.lastInfo()
 	lastLogIndex, lastLogTerm := s.log.lastInfo()
 	s.leader = ""
 	s.leader = ""
 
 
@@ -630,7 +669,7 @@ func (s *Server) candidateLoop() {
 }
 }
 
 
 // The event loop that is run when the server is in a Leader state.
 // The event loop that is run when the server is in a Leader state.
-func (s *Server) leaderLoop() {
+func (s *server) leaderLoop() {
 	s.setState(Leader)
 	s.setState(Leader)
 	s.syncedPeer = make(map[string]bool)
 	s.syncedPeer = make(map[string]bool)
 	logIndex, _ := s.log.lastInfo()
 	logIndex, _ := s.log.lastInfo()
@@ -682,7 +721,7 @@ func (s *Server) leaderLoop() {
 	s.syncedPeer = nil
 	s.syncedPeer = nil
 }
 }
 
 
-func (s *Server) snapshotLoop() {
+func (s *server) snapshotLoop() {
 	s.setState(Snapshotting)
 	s.setState(Snapshotting)
 
 
 	for {
 	for {
@@ -721,12 +760,12 @@ func (s *Server) snapshotLoop() {
 // Attempts to execute a command and replicate it. The function will return
 // Attempts to execute a command and replicate it. The function will return
 // when the command has been successfully committed or an error has occurred.
 // when the command has been successfully committed or an error has occurred.
 
 
-func (s *Server) Do(command Command) (interface{}, error) {
+func (s *server) Do(command Command) (interface{}, error) {
 	return s.send(command)
 	return s.send(command)
 }
 }
 
 
 // Processes a command.
 // Processes a command.
-func (s *Server) processCommand(command Command, e *event) {
+func (s *server) processCommand(command Command, e *event) {
 	s.debugln("server.command.process")
 	s.debugln("server.command.process")
 
 
 	// Create an entry for the command in the log.
 	// Create an entry for the command in the log.
@@ -779,14 +818,14 @@ func (s *Server) processCommand(command Command, e *event) {
 //--------------------------------------
 //--------------------------------------
 
 
 // Appends zero or more log entry from the leader to this server.
 // Appends zero or more log entry from the leader to this server.
-func (s *Server) AppendEntries(req *AppendEntriesRequest) *AppendEntriesResponse {
+func (s *server) AppendEntries(req *AppendEntriesRequest) *AppendEntriesResponse {
 	ret, _ := s.send(req)
 	ret, _ := s.send(req)
 	resp, _ := ret.(*AppendEntriesResponse)
 	resp, _ := ret.(*AppendEntriesResponse)
 	return resp
 	return resp
 }
 }
 
 
 // Processes the "append entries" request.
 // Processes the "append entries" request.
-func (s *Server) processAppendEntriesRequest(req *AppendEntriesRequest) (*AppendEntriesResponse, bool) {
+func (s *server) processAppendEntriesRequest(req *AppendEntriesRequest) (*AppendEntriesResponse, bool) {
 
 
 	s.traceln("server.ae.process")
 	s.traceln("server.ae.process")
 
 
@@ -824,7 +863,7 @@ func (s *Server) processAppendEntriesRequest(req *AppendEntriesRequest) (*Append
 // Processes the "append entries" response from the peer. This is only
 // Processes the "append entries" response from the peer. This is only
 // processed when the server is a leader. Responses received during other
 // processed when the server is a leader. Responses received during other
 // states are dropped.
 // states are dropped.
-func (s *Server) processAppendEntriesResponse(resp *AppendEntriesResponse) {
+func (s *server) processAppendEntriesResponse(resp *AppendEntriesResponse) {
 
 
 	// If we find a higher term then change to a follower and exit.
 	// If we find a higher term then change to a follower and exit.
 	if resp.Term > s.currentTerm {
 	if resp.Term > s.currentTerm {
@@ -888,14 +927,14 @@ func (s *Server) processAppendEntriesResponse(resp *AppendEntriesResponse) {
 // Requests a vote from a server. A vote can be obtained if the vote's term is
 // Requests a vote from a server. A vote can be obtained if the vote's term is
 // at the server's current term and the server has not made a vote yet. A vote
 // at the server's current term and the server has not made a vote yet. A vote
 // can also be obtained if the term is greater than the server's current term.
 // can also be obtained if the term is greater than the server's current term.
-func (s *Server) RequestVote(req *RequestVoteRequest) *RequestVoteResponse {
+func (s *server) RequestVote(req *RequestVoteRequest) *RequestVoteResponse {
 	ret, _ := s.send(req)
 	ret, _ := s.send(req)
 	resp, _ := ret.(*RequestVoteResponse)
 	resp, _ := ret.(*RequestVoteResponse)
 	return resp
 	return resp
 }
 }
 
 
 // Processes a "request vote" request.
 // Processes a "request vote" request.
-func (s *Server) processRequestVoteRequest(req *RequestVoteRequest) (*RequestVoteResponse, bool) {
+func (s *server) processRequestVoteRequest(req *RequestVoteRequest) (*RequestVoteResponse, bool) {
 
 
 	// If the request is coming from an old term then reject it.
 	// If the request is coming from an old term then reject it.
 	if req.Term < s.currentTerm {
 	if req.Term < s.currentTerm {
@@ -933,7 +972,7 @@ func (s *Server) processRequestVoteRequest(req *RequestVoteRequest) (*RequestVot
 //--------------------------------------
 //--------------------------------------
 
 
 // Adds a peer to the server.
 // Adds a peer to the server.
-func (s *Server) AddPeer(name string, connectiongString string) error {
+func (s *server) AddPeer(name string, connectiongString string) error {
 	s.debugln("server.peer.add: ", name, len(s.peers))
 	s.debugln("server.peer.add: ", name, len(s.peers))
 
 
 	// Do not allow peers to be added twice.
 	// Do not allow peers to be added twice.
@@ -959,7 +998,7 @@ func (s *Server) AddPeer(name string, connectiongString string) error {
 }
 }
 
 
 // Removes a peer from the server.
 // Removes a peer from the server.
-func (s *Server) RemovePeer(name string) error {
+func (s *server) RemovePeer(name string) error {
 	s.debugln("server.peer.remove: ", name, len(s.peers))
 	s.debugln("server.peer.remove: ", name, len(s.peers))
 
 
 	// Skip the Peer if it has the same name as the Server
 	// Skip the Peer if it has the same name as the Server
@@ -988,7 +1027,7 @@ func (s *Server) RemovePeer(name string) error {
 // Log compaction
 // Log compaction
 //--------------------------------------
 //--------------------------------------
 
 
-func (s *Server) TakeSnapshot() error {
+func (s *server) TakeSnapshot() error {
 	//TODO put a snapshot mutex
 	//TODO put a snapshot mutex
 	s.debugln("take Snapshot")
 	s.debugln("take Snapshot")
 	if s.currentSnapshot != nil {
 	if s.currentSnapshot != nil {
@@ -1047,7 +1086,7 @@ func (s *Server) TakeSnapshot() error {
 }
 }
 
 
 // Retrieves the log path for the server.
 // Retrieves the log path for the server.
-func (s *Server) saveSnapshot() error {
+func (s *server) saveSnapshot() error {
 
 
 	if s.currentSnapshot == nil {
 	if s.currentSnapshot == nil {
 		return errors.New("no snapshot to save")
 		return errors.New("no snapshot to save")
@@ -1071,17 +1110,17 @@ func (s *Server) saveSnapshot() error {
 }
 }
 
 
 // Retrieves the log path for the server.
 // Retrieves the log path for the server.
-func (s *Server) SnapshotPath(lastIndex uint64, lastTerm uint64) string {
+func (s *server) SnapshotPath(lastIndex uint64, lastTerm uint64) string {
 	return path.Join(s.path, "snapshot", fmt.Sprintf("%v_%v.ss", lastTerm, lastIndex))
 	return path.Join(s.path, "snapshot", fmt.Sprintf("%v_%v.ss", lastTerm, lastIndex))
 }
 }
 
 
-func (s *Server) RequestSnapshot(req *SnapshotRequest) *SnapshotResponse {
+func (s *server) RequestSnapshot(req *SnapshotRequest) *SnapshotResponse {
 	ret, _ := s.send(req)
 	ret, _ := s.send(req)
 	resp, _ := ret.(*SnapshotResponse)
 	resp, _ := ret.(*SnapshotResponse)
 	return resp
 	return resp
 }
 }
 
 
-func (s *Server) processSnapshotRequest(req *SnapshotRequest) *SnapshotResponse {
+func (s *server) processSnapshotRequest(req *SnapshotRequest) *SnapshotResponse {
 
 
 	// If the follower’s log contains an entry at the snapshot’s last index with a term
 	// If the follower’s log contains an entry at the snapshot’s last index with a term
 	// that matches the snapshot’s last term
 	// that matches the snapshot’s last term
@@ -1099,13 +1138,13 @@ func (s *Server) processSnapshotRequest(req *SnapshotRequest) *SnapshotResponse
 	return newSnapshotResponse(true)
 	return newSnapshotResponse(true)
 }
 }
 
 
-func (s *Server) SnapshotRecoveryRequest(req *SnapshotRecoveryRequest) *SnapshotRecoveryResponse {
+func (s *server) SnapshotRecoveryRequest(req *SnapshotRecoveryRequest) *SnapshotRecoveryResponse {
 	ret, _ := s.send(req)
 	ret, _ := s.send(req)
 	resp, _ := ret.(*SnapshotRecoveryResponse)
 	resp, _ := ret.(*SnapshotRecoveryResponse)
 	return resp
 	return resp
 }
 }
 
 
-func (s *Server) processSnapshotRecoveryRequest(req *SnapshotRecoveryRequest) *SnapshotRecoveryResponse {
+func (s *server) processSnapshotRecoveryRequest(req *SnapshotRecoveryRequest) *SnapshotRecoveryResponse {
 
 
 	s.stateMachine.Recovery(req.State)
 	s.stateMachine.Recovery(req.State)
 
 
@@ -1136,7 +1175,7 @@ func (s *Server) processSnapshotRecoveryRequest(req *SnapshotRecoveryRequest) *S
 }
 }
 
 
 // Load a snapshot at restart
 // Load a snapshot at restart
-func (s *Server) LoadSnapshot() error {
+func (s *server) LoadSnapshot() error {
 	dir, err := os.OpenFile(path.Join(s.path, "snapshot"), os.O_RDONLY, 0)
 	dir, err := os.OpenFile(path.Join(s.path, "snapshot"), os.O_RDONLY, 0)
 	if err != nil {
 	if err != nil {
 
 
@@ -1221,7 +1260,7 @@ func (s *Server) LoadSnapshot() error {
 // Config File
 // Config File
 //--------------------------------------
 //--------------------------------------
 
 
-func (s *Server) writeConf() {
+func (s *server) writeConf() {
 
 
 	peers := make([]*Peer, len(s.peers))
 	peers := make([]*Peer, len(s.peers))
 
 
@@ -1251,7 +1290,7 @@ func (s *Server) writeConf() {
 }
 }
 
 
 // Read the configuration for the server.
 // Read the configuration for the server.
-func (s *Server) readConf() error {
+func (s *server) readConf() error {
 	confPath := path.Join(s.path, "conf")
 	confPath := path.Join(s.path, "conf")
 	s.debugln("readConf.open ", confPath)
 	s.debugln("readConf.open ", confPath)
 
 
@@ -1277,10 +1316,10 @@ func (s *Server) readConf() error {
 // Debugging
 // Debugging
 //--------------------------------------
 //--------------------------------------
 
 
-func (s *Server) debugln(v ...interface{}) {
+func (s *server) debugln(v ...interface{}) {
 	debugf("[%s Term:%d] %s", s.name, s.currentTerm, fmt.Sprintln(v...))
 	debugf("[%s Term:%d] %s", s.name, s.currentTerm, fmt.Sprintln(v...))
 }
 }
 
 
-func (s *Server) traceln(v ...interface{}) {
+func (s *server) traceln(v ...interface{}) {
 	tracef("[%s] %s", s.name, fmt.Sprintln(v...))
 	tracef("[%s] %s", s.name, fmt.Sprintln(v...))
 }
 }

+ 143 - 146
third_party/github.com/coreos/go-raft/server_test.go

@@ -37,40 +37,40 @@ func TestServerRequestVote(t *testing.T) {
 
 
 // // Ensure that a vote request is denied if it comes from an old term.
 // // Ensure that a vote request is denied if it comes from an old term.
 func TestServerRequestVoteDeniedForStaleTerm(t *testing.T) {
 func TestServerRequestVoteDeniedForStaleTerm(t *testing.T) {
-	server := newTestServer("1", &testTransporter{})
+	s := newTestServer("1", &testTransporter{})
 
 
-	server.Start()
-	if _, err := server.Do(&DefaultJoinCommand{Name: server.Name()}); err != nil {
-		t.Fatalf("Server %s unable to join: %v", server.Name(), err)
+	s.Start()
+	if _, err := s.Do(&DefaultJoinCommand{Name: s.Name()}); err != nil {
+		t.Fatalf("Server %s unable to join: %v", s.Name(), err)
 	}
 	}
 
 
-	server.currentTerm = 2
-	defer server.Stop()
-	resp := server.RequestVote(newRequestVoteRequest(1, "foo", 1, 0))
+	s.(*server).currentTerm = 2
+	defer s.Stop()
+	resp := s.RequestVote(newRequestVoteRequest(1, "foo", 1, 0))
 	if resp.Term != 2 || resp.VoteGranted {
 	if resp.Term != 2 || resp.VoteGranted {
 		t.Fatalf("Invalid request vote response: %v/%v", resp.Term, resp.VoteGranted)
 		t.Fatalf("Invalid request vote response: %v/%v", resp.Term, resp.VoteGranted)
 	}
 	}
-	if server.currentTerm != 2 && server.State() != Follower {
-		t.Fatalf("Server did not update term and demote: %v / %v", server.currentTerm, server.State())
+	if s.Term() != 2 && s.State() != Follower {
+		t.Fatalf("Server did not update term and demote: %v / %v", s.Term(), s.State())
 	}
 	}
 }
 }
 
 
 // Ensure that a vote request is denied if we've already voted for a different candidate.
 // Ensure that a vote request is denied if we've already voted for a different candidate.
 func TestServerRequestVoteDeniedIfAlreadyVoted(t *testing.T) {
 func TestServerRequestVoteDeniedIfAlreadyVoted(t *testing.T) {
-	server := newTestServer("1", &testTransporter{})
+	s := newTestServer("1", &testTransporter{})
 
 
-	server.Start()
-	if _, err := server.Do(&DefaultJoinCommand{Name: server.Name()}); err != nil {
-		t.Fatalf("Server %s unable to join: %v", server.Name(), err)
+	s.Start()
+	if _, err := s.Do(&DefaultJoinCommand{Name: s.Name()}); err != nil {
+		t.Fatalf("Server %s unable to join: %v", s.Name(), err)
 	}
 	}
 
 
-	server.currentTerm = 2
-	defer server.Stop()
-	resp := server.RequestVote(newRequestVoteRequest(2, "foo", 1, 0))
+	s.(*server).currentTerm = 2
+	defer s.Stop()
+	resp := s.RequestVote(newRequestVoteRequest(2, "foo", 1, 0))
 	if resp.Term != 2 || !resp.VoteGranted {
 	if resp.Term != 2 || !resp.VoteGranted {
 		t.Fatalf("First vote should not have been denied")
 		t.Fatalf("First vote should not have been denied")
 	}
 	}
-	resp = server.RequestVote(newRequestVoteRequest(2, "bar", 1, 0))
+	resp = s.RequestVote(newRequestVoteRequest(2, "bar", 1, 0))
 	if resp.Term != 2 || resp.VoteGranted {
 	if resp.Term != 2 || resp.VoteGranted {
 		t.Fatalf("Second vote should have been denied")
 		t.Fatalf("Second vote should have been denied")
 	}
 	}
@@ -78,24 +78,24 @@ func TestServerRequestVoteDeniedIfAlreadyVoted(t *testing.T) {
 
 
 // Ensure that a vote request is approved if vote occurs in a new term.
 // Ensure that a vote request is approved if vote occurs in a new term.
 func TestServerRequestVoteApprovedIfAlreadyVotedInOlderTerm(t *testing.T) {
 func TestServerRequestVoteApprovedIfAlreadyVotedInOlderTerm(t *testing.T) {
-	server := newTestServer("1", &testTransporter{})
+	s := newTestServer("1", &testTransporter{})
 
 
-	server.Start()
-	if _, err := server.Do(&DefaultJoinCommand{Name: server.Name()}); err != nil {
-		t.Fatalf("Server %s unable to join: %v", server.Name(), err)
+	s.Start()
+	if _, err := s.Do(&DefaultJoinCommand{Name: s.Name()}); err != nil {
+		t.Fatalf("Server %s unable to join: %v", s.Name(), err)
 	}
 	}
 
 
 	time.Sleep(time.Millisecond * 100)
 	time.Sleep(time.Millisecond * 100)
 
 
-	server.currentTerm = 2
-	defer server.Stop()
-	resp := server.RequestVote(newRequestVoteRequest(2, "foo", 2, 1))
-	if resp.Term != 2 || !resp.VoteGranted || server.VotedFor() != "foo" {
+	s.(*server).currentTerm = 2
+	defer s.Stop()
+	resp := s.RequestVote(newRequestVoteRequest(2, "foo", 2, 1))
+	if resp.Term != 2 || !resp.VoteGranted || s.VotedFor() != "foo" {
 		t.Fatalf("First vote should not have been denied")
 		t.Fatalf("First vote should not have been denied")
 	}
 	}
-	resp = server.RequestVote(newRequestVoteRequest(3, "bar", 2, 1))
+	resp = s.RequestVote(newRequestVoteRequest(3, "bar", 2, 1))
 
 
-	if resp.Term != 3 || !resp.VoteGranted || server.VotedFor() != "bar" {
+	if resp.Term != 3 || !resp.VoteGranted || s.VotedFor() != "bar" {
 		t.Fatalf("Second vote should have been approved")
 		t.Fatalf("Second vote should have been approved")
 	}
 	}
 }
 }
@@ -106,33 +106,32 @@ func TestServerRequestVoteDenyIfCandidateLogIsBehind(t *testing.T) {
 	e0, _ := newLogEntry(tmpLog, 1, 1, &testCommand1{Val: "foo", I: 20})
 	e0, _ := newLogEntry(tmpLog, 1, 1, &testCommand1{Val: "foo", I: 20})
 	e1, _ := newLogEntry(tmpLog, 2, 1, &testCommand2{X: 100})
 	e1, _ := newLogEntry(tmpLog, 2, 1, &testCommand2{X: 100})
 	e2, _ := newLogEntry(tmpLog, 3, 2, &testCommand1{Val: "bar", I: 0})
 	e2, _ := newLogEntry(tmpLog, 3, 2, &testCommand1{Val: "bar", I: 0})
-	server := newTestServerWithLog("1", &testTransporter{}, []*LogEntry{e0, e1, e2})
+	s := newTestServerWithLog("1", &testTransporter{}, []*LogEntry{e0, e1, e2})
 
 
 	// start as a follower with term 2 and index 3
 	// start as a follower with term 2 and index 3
-	server.Start()
-
-	defer server.Stop()
+	s.Start()
+	defer s.Stop()
 
 
 	// request vote from term 3 with last log entry 2, 2
 	// request vote from term 3 with last log entry 2, 2
-	resp := server.RequestVote(newRequestVoteRequest(3, "foo", 2, 2))
+	resp := s.RequestVote(newRequestVoteRequest(3, "foo", 2, 2))
 	if resp.Term != 3 || resp.VoteGranted {
 	if resp.Term != 3 || resp.VoteGranted {
 		t.Fatalf("Stale index vote should have been denied [%v/%v]", resp.Term, resp.VoteGranted)
 		t.Fatalf("Stale index vote should have been denied [%v/%v]", resp.Term, resp.VoteGranted)
 	}
 	}
 
 
 	// request vote from term 2 with last log entry 2, 3
 	// request vote from term 2 with last log entry 2, 3
-	resp = server.RequestVote(newRequestVoteRequest(2, "foo", 3, 2))
+	resp = s.RequestVote(newRequestVoteRequest(2, "foo", 3, 2))
 	if resp.Term != 3 || resp.VoteGranted {
 	if resp.Term != 3 || resp.VoteGranted {
 		t.Fatalf("Stale term vote should have been denied [%v/%v]", resp.Term, resp.VoteGranted)
 		t.Fatalf("Stale term vote should have been denied [%v/%v]", resp.Term, resp.VoteGranted)
 	}
 	}
 
 
 	// request vote from term 3 with last log entry 2, 3
 	// request vote from term 3 with last log entry 2, 3
-	resp = server.RequestVote(newRequestVoteRequest(3, "foo", 3, 2))
+	resp = s.RequestVote(newRequestVoteRequest(3, "foo", 3, 2))
 	if resp.Term != 3 || !resp.VoteGranted {
 	if resp.Term != 3 || !resp.VoteGranted {
 		t.Fatalf("Matching log vote should have been granted")
 		t.Fatalf("Matching log vote should have been granted")
 	}
 	}
 
 
 	// request vote from term 3 with last log entry 2, 4
 	// request vote from term 3 with last log entry 2, 4
-	resp = server.RequestVote(newRequestVoteRequest(3, "foo", 4, 2))
+	resp = s.RequestVote(newRequestVoteRequest(3, "foo", 4, 2))
 	if resp.Term != 3 || !resp.VoteGranted {
 	if resp.Term != 3 || !resp.VoteGranted {
 		t.Fatalf("Ahead-of-log vote should have been granted")
 		t.Fatalf("Ahead-of-log vote should have been granted")
 	}
 	}
@@ -145,28 +144,27 @@ func TestServerRequestVoteDenyIfCandidateLogIsBehind(t *testing.T) {
 // // Ensure that we can self-promote a server to candidate, obtain votes and become a fearless leader.
 // // Ensure that we can self-promote a server to candidate, obtain votes and become a fearless leader.
 func TestServerPromoteSelf(t *testing.T) {
 func TestServerPromoteSelf(t *testing.T) {
 	e0, _ := newLogEntry(newLog(), 1, 1, &testCommand1{Val: "foo", I: 20})
 	e0, _ := newLogEntry(newLog(), 1, 1, &testCommand1{Val: "foo", I: 20})
-	server := newTestServerWithLog("1", &testTransporter{}, []*LogEntry{e0})
+	s := newTestServerWithLog("1", &testTransporter{}, []*LogEntry{e0})
 
 
 	// start as a follower
 	// start as a follower
-	server.Start()
-
-	defer server.Stop()
+	s.Start()
+	defer s.Stop()
 
 
 	time.Sleep(2 * testElectionTimeout)
 	time.Sleep(2 * testElectionTimeout)
 
 
-	if server.State() != Leader {
-		t.Fatalf("Server self-promotion failed: %v", server.State())
+	if s.State() != Leader {
+		t.Fatalf("Server self-promotion failed: %v", s.State())
 	}
 	}
 }
 }
 
 
 //Ensure that we can promote a server within a cluster to a leader.
 //Ensure that we can promote a server within a cluster to a leader.
 func TestServerPromote(t *testing.T) {
 func TestServerPromote(t *testing.T) {
-	lookup := map[string]*Server{}
+	lookup := map[string]Server{}
 	transporter := &testTransporter{}
 	transporter := &testTransporter{}
-	transporter.sendVoteRequestFunc = func(server *Server, peer *Peer, req *RequestVoteRequest) *RequestVoteResponse {
+	transporter.sendVoteRequestFunc = func(s Server, peer *Peer, req *RequestVoteRequest) *RequestVoteResponse {
 		return lookup[peer.Name].RequestVote(req)
 		return lookup[peer.Name].RequestVote(req)
 	}
 	}
-	transporter.sendAppendEntriesRequestFunc = func(server *Server, peer *Peer, req *AppendEntriesRequest) *AppendEntriesResponse {
+	transporter.sendAppendEntriesRequestFunc = func(s Server, peer *Peer, req *AppendEntriesRequest) *AppendEntriesResponse {
 		return lookup[peer.Name].AppendEntries(req)
 		return lookup[peer.Name].AppendEntries(req)
 	}
 	}
 	servers := newTestCluster([]string{"1", "2", "3"}, transporter, lookup)
 	servers := newTestCluster([]string{"1", "2", "3"}, transporter, lookup)
@@ -180,8 +178,8 @@ func TestServerPromote(t *testing.T) {
 	if servers[0].State() != Leader && servers[1].State() != Leader && servers[2].State() != Leader {
 	if servers[0].State() != Leader && servers[1].State() != Leader && servers[2].State() != Leader {
 		t.Fatalf("No leader elected: (%s, %s, %s)", servers[0].State(), servers[1].State(), servers[2].State())
 		t.Fatalf("No leader elected: (%s, %s, %s)", servers[0].State(), servers[1].State(), servers[2].State())
 	}
 	}
-	for _, server := range servers {
-		server.Stop()
+	for _, s := range servers {
+		s.Stop()
 	}
 	}
 }
 }
 
 
@@ -191,20 +189,20 @@ func TestServerPromote(t *testing.T) {
 
 
 // Ensure we can append entries to a server.
 // Ensure we can append entries to a server.
 func TestServerAppendEntries(t *testing.T) {
 func TestServerAppendEntries(t *testing.T) {
-	server := newTestServer("1", &testTransporter{})
+	s := newTestServer("1", &testTransporter{})
 
 
-	server.SetHeartbeatTimeout(time.Second * 10)
-	server.Start()
-	defer server.Stop()
+	s.SetHeartbeatTimeout(time.Second * 10)
+	s.Start()
+	defer s.Stop()
 
 
 	// Append single entry.
 	// Append single entry.
 	e, _ := newLogEntry(nil, 1, 1, &testCommand1{Val: "foo", I: 10})
 	e, _ := newLogEntry(nil, 1, 1, &testCommand1{Val: "foo", I: 10})
 	entries := []*LogEntry{e}
 	entries := []*LogEntry{e}
-	resp := server.AppendEntries(newAppendEntriesRequest(1, 0, 0, 0, "ldr", entries))
+	resp := s.AppendEntries(newAppendEntriesRequest(1, 0, 0, 0, "ldr", entries))
 	if resp.Term != 1 || !resp.Success {
 	if resp.Term != 1 || !resp.Success {
 		t.Fatalf("AppendEntries failed: %v/%v", resp.Term, resp.Success)
 		t.Fatalf("AppendEntries failed: %v/%v", resp.Term, resp.Success)
 	}
 	}
-	if index, term := server.log.commitInfo(); index != 0 || term != 0 {
+	if index, term := s.(*server).log.commitInfo(); index != 0 || term != 0 {
 		t.Fatalf("Invalid commit info [IDX=%v, TERM=%v]", index, term)
 		t.Fatalf("Invalid commit info [IDX=%v, TERM=%v]", index, term)
 	}
 	}
 
 
@@ -212,57 +210,56 @@ func TestServerAppendEntries(t *testing.T) {
 	e1, _ := newLogEntry(nil, 2, 1, &testCommand1{Val: "bar", I: 20})
 	e1, _ := newLogEntry(nil, 2, 1, &testCommand1{Val: "bar", I: 20})
 	e2, _ := newLogEntry(nil, 3, 1, &testCommand1{Val: "baz", I: 30})
 	e2, _ := newLogEntry(nil, 3, 1, &testCommand1{Val: "baz", I: 30})
 	entries = []*LogEntry{e1, e2}
 	entries = []*LogEntry{e1, e2}
-	resp = server.AppendEntries(newAppendEntriesRequest(1, 1, 1, 1, "ldr", entries))
+	resp = s.AppendEntries(newAppendEntriesRequest(1, 1, 1, 1, "ldr", entries))
 	if resp.Term != 1 || !resp.Success {
 	if resp.Term != 1 || !resp.Success {
 		t.Fatalf("AppendEntries failed: %v/%v", resp.Term, resp.Success)
 		t.Fatalf("AppendEntries failed: %v/%v", resp.Term, resp.Success)
 	}
 	}
-	if index, term := server.log.commitInfo(); index != 1 || term != 1 {
+	if index, term := s.(*server).log.commitInfo(); index != 1 || term != 1 {
 		t.Fatalf("Invalid commit info [IDX=%v, TERM=%v]", index, term)
 		t.Fatalf("Invalid commit info [IDX=%v, TERM=%v]", index, term)
 	}
 	}
 
 
 	// Send zero entries and commit everything.
 	// Send zero entries and commit everything.
-	resp = server.AppendEntries(newAppendEntriesRequest(2, 3, 1, 3, "ldr", []*LogEntry{}))
+	resp = s.AppendEntries(newAppendEntriesRequest(2, 3, 1, 3, "ldr", []*LogEntry{}))
 	if resp.Term != 2 || !resp.Success {
 	if resp.Term != 2 || !resp.Success {
 		t.Fatalf("AppendEntries failed: %v/%v", resp.Term, resp.Success)
 		t.Fatalf("AppendEntries failed: %v/%v", resp.Term, resp.Success)
 	}
 	}
-	if index, term := server.log.commitInfo(); index != 3 || term != 1 {
+	if index, term := s.(*server).log.commitInfo(); index != 3 || term != 1 {
 		t.Fatalf("Invalid commit info [IDX=%v, TERM=%v]", index, term)
 		t.Fatalf("Invalid commit info [IDX=%v, TERM=%v]", index, term)
 	}
 	}
 }
 }
 
 
 //Ensure that entries with stale terms are rejected.
 //Ensure that entries with stale terms are rejected.
 func TestServerAppendEntriesWithStaleTermsAreRejected(t *testing.T) {
 func TestServerAppendEntriesWithStaleTermsAreRejected(t *testing.T) {
-	server := newTestServer("1", &testTransporter{})
+	s := newTestServer("1", &testTransporter{})
 
 
-	server.Start()
+	s.Start()
 
 
-	defer server.Stop()
-	server.currentTerm = 2
+	defer s.Stop()
+	s.(*server).currentTerm = 2
 
 
 	// Append single entry.
 	// Append single entry.
 	e, _ := newLogEntry(nil, 1, 1, &testCommand1{Val: "foo", I: 10})
 	e, _ := newLogEntry(nil, 1, 1, &testCommand1{Val: "foo", I: 10})
 	entries := []*LogEntry{e}
 	entries := []*LogEntry{e}
-	resp := server.AppendEntries(newAppendEntriesRequest(1, 0, 0, 0, "ldr", entries))
+	resp := s.AppendEntries(newAppendEntriesRequest(1, 0, 0, 0, "ldr", entries))
 	if resp.Term != 2 || resp.Success {
 	if resp.Term != 2 || resp.Success {
 		t.Fatalf("AppendEntries should have failed: %v/%v", resp.Term, resp.Success)
 		t.Fatalf("AppendEntries should have failed: %v/%v", resp.Term, resp.Success)
 	}
 	}
-	if index, term := server.log.commitInfo(); index != 0 || term != 0 {
+	if index, term := s.(*server).log.commitInfo(); index != 0 || term != 0 {
 		t.Fatalf("Invalid commit info [IDX=%v, TERM=%v]", index, term)
 		t.Fatalf("Invalid commit info [IDX=%v, TERM=%v]", index, term)
 	}
 	}
 }
 }
 
 
 // Ensure that we reject entries if the commit log is different.
 // Ensure that we reject entries if the commit log is different.
 func TestServerAppendEntriesRejectedIfAlreadyCommitted(t *testing.T) {
 func TestServerAppendEntriesRejectedIfAlreadyCommitted(t *testing.T) {
-	server := newTestServer("1", &testTransporter{})
-	server.Start()
-
-	defer server.Stop()
+	s := newTestServer("1", &testTransporter{})
+	s.Start()
+	defer s.Stop()
 
 
 	// Append single entry + commit.
 	// Append single entry + commit.
 	e1, _ := newLogEntry(nil, 1, 1, &testCommand1{Val: "foo", I: 10})
 	e1, _ := newLogEntry(nil, 1, 1, &testCommand1{Val: "foo", I: 10})
 	e2, _ := newLogEntry(nil, 2, 1, &testCommand1{Val: "foo", I: 15})
 	e2, _ := newLogEntry(nil, 2, 1, &testCommand1{Val: "foo", I: 15})
 	entries := []*LogEntry{e1, e2}
 	entries := []*LogEntry{e1, e2}
-	resp := server.AppendEntries(newAppendEntriesRequest(1, 0, 0, 2, "ldr", entries))
+	resp := s.AppendEntries(newAppendEntriesRequest(1, 0, 0, 2, "ldr", entries))
 	if resp.Term != 1 || !resp.Success {
 	if resp.Term != 1 || !resp.Success {
 		t.Fatalf("AppendEntries failed: %v/%v", resp.Term, resp.Success)
 		t.Fatalf("AppendEntries failed: %v/%v", resp.Term, resp.Success)
 	}
 	}
@@ -270,7 +267,7 @@ func TestServerAppendEntriesRejectedIfAlreadyCommitted(t *testing.T) {
 	// Append entry again (post-commit).
 	// Append entry again (post-commit).
 	e, _ := newLogEntry(nil, 2, 1, &testCommand1{Val: "bar", I: 20})
 	e, _ := newLogEntry(nil, 2, 1, &testCommand1{Val: "bar", I: 20})
 	entries = []*LogEntry{e}
 	entries = []*LogEntry{e}
-	resp = server.AppendEntries(newAppendEntriesRequest(1, 2, 1, 1, "ldr", entries))
+	resp = s.AppendEntries(newAppendEntriesRequest(1, 2, 1, 1, "ldr", entries))
 	if resp.Term != 1 || resp.Success {
 	if resp.Term != 1 || resp.Success {
 		t.Fatalf("AppendEntries should have failed: %v/%v", resp.Term, resp.Success)
 		t.Fatalf("AppendEntries should have failed: %v/%v", resp.Term, resp.Success)
 	}
 	}
@@ -278,9 +275,9 @@ func TestServerAppendEntriesRejectedIfAlreadyCommitted(t *testing.T) {
 
 
 // Ensure that we uncommitted entries are rolled back if new entries overwrite them.
 // Ensure that we uncommitted entries are rolled back if new entries overwrite them.
 func TestServerAppendEntriesOverwritesUncommittedEntries(t *testing.T) {
 func TestServerAppendEntriesOverwritesUncommittedEntries(t *testing.T) {
-	server := newTestServer("1", &testTransporter{})
-	server.Start()
-	defer server.Stop()
+	s := newTestServer("1", &testTransporter{})
+	s.Start()
+	defer s.Stop()
 
 
 	entry1, _ := newLogEntry(nil, 1, 1, &testCommand1{Val: "foo", I: 10})
 	entry1, _ := newLogEntry(nil, 1, 1, &testCommand1{Val: "foo", I: 10})
 	entry2, _ := newLogEntry(nil, 2, 1, &testCommand1{Val: "foo", I: 15})
 	entry2, _ := newLogEntry(nil, 2, 1, &testCommand1{Val: "foo", I: 15})
@@ -288,15 +285,15 @@ func TestServerAppendEntriesOverwritesUncommittedEntries(t *testing.T) {
 
 
 	// Append single entry + commit.
 	// Append single entry + commit.
 	entries := []*LogEntry{entry1, entry2}
 	entries := []*LogEntry{entry1, entry2}
-	resp := server.AppendEntries(newAppendEntriesRequest(1, 0, 0, 1, "ldr", entries))
-	if resp.Term != 1 || !resp.Success || server.log.commitIndex != 1 || !reflect.DeepEqual(server.log.entries, []*LogEntry{entry1, entry2}) {
+	resp := s.AppendEntries(newAppendEntriesRequest(1, 0, 0, 1, "ldr", entries))
+	if resp.Term != 1 || !resp.Success || s.(*server).log.commitIndex != 1 || !reflect.DeepEqual(s.(*server).log.entries, []*LogEntry{entry1, entry2}) {
 		t.Fatalf("AppendEntries failed: %v/%v", resp.Term, resp.Success)
 		t.Fatalf("AppendEntries failed: %v/%v", resp.Term, resp.Success)
 	}
 	}
 
 
 	// Append entry that overwrites the second (uncommitted) entry.
 	// Append entry that overwrites the second (uncommitted) entry.
 	entries = []*LogEntry{entry3}
 	entries = []*LogEntry{entry3}
-	resp = server.AppendEntries(newAppendEntriesRequest(2, 1, 1, 2, "ldr", entries))
-	if resp.Term != 2 || !resp.Success || server.log.commitIndex != 2 || !reflect.DeepEqual(server.log.entries, []*LogEntry{entry1, entry3}) {
+	resp = s.AppendEntries(newAppendEntriesRequest(2, 1, 1, 2, "ldr", entries))
+	if resp.Term != 2 || !resp.Success || s.(*server).log.commitIndex != 2 || !reflect.DeepEqual(s.(*server).log.entries, []*LogEntry{entry1, entry3}) {
 		t.Fatalf("AppendEntries should have succeeded: %v/%v", resp.Term, resp.Success)
 		t.Fatalf("AppendEntries should have succeeded: %v/%v", resp.Term, resp.Success)
 	}
 	}
 }
 }
@@ -307,11 +304,11 @@ func TestServerAppendEntriesOverwritesUncommittedEntries(t *testing.T) {
 
 
 // Ensure that a follower cannot execute a command.
 // Ensure that a follower cannot execute a command.
 func TestServerDenyCommandExecutionWhenFollower(t *testing.T) {
 func TestServerDenyCommandExecutionWhenFollower(t *testing.T) {
-	server := newTestServer("1", &testTransporter{})
-	server.Start()
-	defer server.Stop()
+	s := newTestServer("1", &testTransporter{})
+	s.Start()
+	defer s.Stop()
 	var err error
 	var err error
-	if _, err = server.Do(&testCommand1{Val: "foo", I: 10}); err != NotLeaderError {
+	if _, err = s.Do(&testCommand1{Val: "foo", I: 10}); err != NotLeaderError {
 		t.Fatalf("Expected error: %v, got: %v", NotLeaderError, err)
 		t.Fatalf("Expected error: %v, got: %v", NotLeaderError, err)
 	}
 	}
 }
 }
@@ -324,27 +321,27 @@ func TestServerDenyCommandExecutionWhenFollower(t *testing.T) {
 func TestServerRecoverFromPreviousLogAndConf(t *testing.T) {
 func TestServerRecoverFromPreviousLogAndConf(t *testing.T) {
 	// Initialize the servers.
 	// Initialize the servers.
 	var mutex sync.RWMutex
 	var mutex sync.RWMutex
-	servers := map[string]*Server{}
+	servers := map[string]Server{}
 
 
 	transporter := &testTransporter{}
 	transporter := &testTransporter{}
-	transporter.sendVoteRequestFunc = func(server *Server, peer *Peer, req *RequestVoteRequest) *RequestVoteResponse {
+	transporter.sendVoteRequestFunc = func(s Server, peer *Peer, req *RequestVoteRequest) *RequestVoteResponse {
 		mutex.RLock()
 		mutex.RLock()
-		s := servers[peer.Name]
+		target := servers[peer.Name]
 		mutex.RUnlock()
 		mutex.RUnlock()
-		return s.RequestVote(req)
+		return target.RequestVote(req)
 	}
 	}
-	transporter.sendAppendEntriesRequestFunc = func(server *Server, peer *Peer, req *AppendEntriesRequest) *AppendEntriesResponse {
+	transporter.sendAppendEntriesRequestFunc = func(s Server, peer *Peer, req *AppendEntriesRequest) *AppendEntriesResponse {
 		mutex.RLock()
 		mutex.RLock()
-		s := servers[peer.Name]
+		target := servers[peer.Name]
 		mutex.RUnlock()
 		mutex.RUnlock()
-		return s.AppendEntries(req)
+		return target.AppendEntries(req)
 	}
 	}
 
 
 	disTransporter := &testTransporter{}
 	disTransporter := &testTransporter{}
-	disTransporter.sendVoteRequestFunc = func(server *Server, peer *Peer, req *RequestVoteRequest) *RequestVoteResponse {
+	disTransporter.sendVoteRequestFunc = func(s Server, peer *Peer, req *RequestVoteRequest) *RequestVoteResponse {
 		return nil
 		return nil
 	}
 	}
-	disTransporter.sendAppendEntriesRequestFunc = func(server *Server, peer *Peer, req *AppendEntriesRequest) *AppendEntriesResponse {
+	disTransporter.sendAppendEntriesRequestFunc = func(s Server, peer *Peer, req *AppendEntriesRequest) *AppendEntriesResponse {
 		return nil
 		return nil
 	}
 	}
 
 
@@ -358,22 +355,22 @@ func TestServerRecoverFromPreviousLogAndConf(t *testing.T) {
 		names = append(names, strconv.Itoa(i))
 		names = append(names, strconv.Itoa(i))
 	}
 	}
 
 
-	var leader *Server
+	var leader Server
 	for _, name := range names {
 	for _, name := range names {
-		server := newTestServer(name, transporter)
+		s := newTestServer(name, transporter)
 
 
-		servers[name] = server
-		paths[name] = server.Path()
+		servers[name] = s
+		paths[name] = s.Path()
 
 
 		if name == "1" {
 		if name == "1" {
-			leader = server
-			server.SetHeartbeatTimeout(testHeartbeatTimeout)
-			server.Start()
+			leader = s
+			s.SetHeartbeatTimeout(testHeartbeatTimeout)
+			s.Start()
 			time.Sleep(testHeartbeatTimeout)
 			time.Sleep(testHeartbeatTimeout)
 		} else {
 		} else {
-			server.SetElectionTimeout(testElectionTimeout)
-			server.SetHeartbeatTimeout(testHeartbeatTimeout)
-			server.Start()
+			s.SetElectionTimeout(testElectionTimeout)
+			s.SetHeartbeatTimeout(testHeartbeatTimeout)
+			s.Start()
 			time.Sleep(testHeartbeatTimeout)
 			time.Sleep(testHeartbeatTimeout)
 		}
 		}
 		if _, err := leader.Do(&DefaultJoinCommand{Name: name}); err != nil {
 		if _, err := leader.Do(&DefaultJoinCommand{Name: name}); err != nil {
@@ -385,35 +382,35 @@ func TestServerRecoverFromPreviousLogAndConf(t *testing.T) {
 	// commit some commands
 	// commit some commands
 	for i := 0; i < 10; i++ {
 	for i := 0; i < 10; i++ {
 		if _, err := leader.Do(&testCommand2{X: 1}); err != nil {
 		if _, err := leader.Do(&testCommand2{X: 1}); err != nil {
-			t.Fatalf("cannot commit command:", err.Error())
+			t.Fatalf("cannot commit command: %s", err.Error())
 		}
 		}
 	}
 	}
 
 
 	time.Sleep(2 * testHeartbeatTimeout)
 	time.Sleep(2 * testHeartbeatTimeout)
 
 
 	for _, name := range names {
 	for _, name := range names {
-		server := servers[name]
-		if server.CommitIndex() != 16 {
-			t.Fatalf("%s commitIndex is invalid [%d/%d]", name, server.CommitIndex(), 16)
+		s := servers[name]
+		if s.CommitIndex() != 16 {
+			t.Fatalf("%s commitIndex is invalid [%d/%d]", name, s.CommitIndex(), 16)
 		}
 		}
-		server.Stop()
+		s.Stop()
 	}
 	}
 
 
 	for _, name := range names {
 	for _, name := range names {
 		// with old path and disable transportation
 		// with old path and disable transportation
-		server := newTestServerWithPath(name, disTransporter, paths[name])
-		servers[name] = server
+		s := newTestServerWithPath(name, disTransporter, paths[name])
+		servers[name] = s
 
 
-		server.Start()
+		s.Start()
 
 
 		// should only commit to the last join command
 		// should only commit to the last join command
-		if server.CommitIndex() != 6 {
-			t.Fatalf("%s recover phase 1 commitIndex is invalid [%d/%d]", name, server.CommitIndex(), 6)
+		if s.CommitIndex() != 6 {
+			t.Fatalf("%s recover phase 1 commitIndex is invalid [%d/%d]", name, s.CommitIndex(), 6)
 		}
 		}
 
 
 		// peer conf should be recovered
 		// peer conf should be recovered
-		if len(server.Peers()) != 4 {
-			t.Fatalf("%s recover phase 1 peer failed! [%d/%d]", name, len(server.Peers()), 4)
+		if len(s.Peers()) != 4 {
+			t.Fatalf("%s recover phase 1 peer failed! [%d/%d]", name, len(s.Peers()), 4)
 		}
 		}
 	}
 	}
 
 
@@ -426,11 +423,11 @@ func TestServerRecoverFromPreviousLogAndConf(t *testing.T) {
 
 
 	// should commit to the previous index + 1(nop command when new leader elected)
 	// should commit to the previous index + 1(nop command when new leader elected)
 	for _, name := range names {
 	for _, name := range names {
-		server := servers[name]
-		if server.CommitIndex() != 17 {
-			t.Fatalf("%s commitIndex is invalid [%d/%d]", name, server.CommitIndex(), 17)
+		s := servers[name]
+		if s.CommitIndex() != 17 {
+			t.Fatalf("%s commitIndex is invalid [%d/%d]", name, s.CommitIndex(), 17)
 		}
 		}
-		server.Stop()
+		s.Stop()
 	}
 	}
 }
 }
 
 
@@ -440,29 +437,29 @@ func TestServerRecoverFromPreviousLogAndConf(t *testing.T) {
 
 
 // Ensure that we can start a single server and append to its log.
 // Ensure that we can start a single server and append to its log.
 func TestServerSingleNode(t *testing.T) {
 func TestServerSingleNode(t *testing.T) {
-	server := newTestServer("1", &testTransporter{})
-	if server.State() != Stopped {
-		t.Fatalf("Unexpected server state: %v", server.State())
+	s := newTestServer("1", &testTransporter{})
+	if s.State() != Stopped {
+		t.Fatalf("Unexpected server state: %v", s.State())
 	}
 	}
 
 
-	server.Start()
+	s.Start()
 
 
 	time.Sleep(testHeartbeatTimeout)
 	time.Sleep(testHeartbeatTimeout)
 
 
 	// Join the server to itself.
 	// Join the server to itself.
-	if _, err := server.Do(&DefaultJoinCommand{Name: "1"}); err != nil {
+	if _, err := s.Do(&DefaultJoinCommand{Name: "1"}); err != nil {
 		t.Fatalf("Unable to join: %v", err)
 		t.Fatalf("Unable to join: %v", err)
 	}
 	}
 	debugln("finish command")
 	debugln("finish command")
 
 
-	if server.State() != Leader {
-		t.Fatalf("Unexpected server state: %v", server.State())
+	if s.State() != Leader {
+		t.Fatalf("Unexpected server state: %v", s.State())
 	}
 	}
 
 
-	server.Stop()
+	s.Stop()
 
 
-	if server.State() != Stopped {
-		t.Fatalf("Unexpected server state: %v", server.State())
+	if s.State() != Stopped {
+		t.Fatalf("Unexpected server state: %v", s.State())
 	}
 	}
 }
 }
 
 
@@ -470,27 +467,27 @@ func TestServerSingleNode(t *testing.T) {
 func TestServerMultiNode(t *testing.T) {
 func TestServerMultiNode(t *testing.T) {
 	// Initialize the servers.
 	// Initialize the servers.
 	var mutex sync.RWMutex
 	var mutex sync.RWMutex
-	servers := map[string]*Server{}
+	servers := map[string]Server{}
 
 
 	transporter := &testTransporter{}
 	transporter := &testTransporter{}
-	transporter.sendVoteRequestFunc = func(server *Server, peer *Peer, req *RequestVoteRequest) *RequestVoteResponse {
+	transporter.sendVoteRequestFunc = func(s Server, peer *Peer, req *RequestVoteRequest) *RequestVoteResponse {
 		mutex.RLock()
 		mutex.RLock()
-		s := servers[peer.Name]
+		target := servers[peer.Name]
 		mutex.RUnlock()
 		mutex.RUnlock()
-		return s.RequestVote(req)
+		return target.RequestVote(req)
 	}
 	}
-	transporter.sendAppendEntriesRequestFunc = func(server *Server, peer *Peer, req *AppendEntriesRequest) *AppendEntriesResponse {
+	transporter.sendAppendEntriesRequestFunc = func(s Server, peer *Peer, req *AppendEntriesRequest) *AppendEntriesResponse {
 		mutex.RLock()
 		mutex.RLock()
-		s := servers[peer.Name]
+		target := servers[peer.Name]
 		mutex.RUnlock()
 		mutex.RUnlock()
-		return s.AppendEntries(req)
+		return target.AppendEntries(req)
 	}
 	}
 
 
 	disTransporter := &testTransporter{}
 	disTransporter := &testTransporter{}
-	disTransporter.sendVoteRequestFunc = func(server *Server, peer *Peer, req *RequestVoteRequest) *RequestVoteResponse {
+	disTransporter.sendVoteRequestFunc = func(s Server, peer *Peer, req *RequestVoteRequest) *RequestVoteResponse {
 		return nil
 		return nil
 	}
 	}
-	disTransporter.sendAppendEntriesRequestFunc = func(server *Server, peer *Peer, req *AppendEntriesRequest) *AppendEntriesResponse {
+	disTransporter.sendAppendEntriesRequestFunc = func(s Server, peer *Peer, req *AppendEntriesRequest) *AppendEntriesResponse {
 		return nil
 		return nil
 	}
 	}
 
 
@@ -503,24 +500,24 @@ func TestServerMultiNode(t *testing.T) {
 		names = append(names, strconv.Itoa(i))
 		names = append(names, strconv.Itoa(i))
 	}
 	}
 
 
-	var leader *Server
+	var leader Server
 	for _, name := range names {
 	for _, name := range names {
-		server := newTestServer(name, transporter)
-		defer server.Stop()
+		s := newTestServer(name, transporter)
+		defer s.Stop()
 
 
 		mutex.Lock()
 		mutex.Lock()
-		servers[name] = server
+		servers[name] = s
 		mutex.Unlock()
 		mutex.Unlock()
 
 
 		if name == "1" {
 		if name == "1" {
-			leader = server
-			server.SetHeartbeatTimeout(testHeartbeatTimeout)
-			server.Start()
+			leader = s
+			s.SetHeartbeatTimeout(testHeartbeatTimeout)
+			s.Start()
 			time.Sleep(testHeartbeatTimeout)
 			time.Sleep(testHeartbeatTimeout)
 		} else {
 		} else {
-			server.SetElectionTimeout(testElectionTimeout)
-			server.SetHeartbeatTimeout(testHeartbeatTimeout)
-			server.Start()
+			s.SetElectionTimeout(testElectionTimeout)
+			s.SetHeartbeatTimeout(testHeartbeatTimeout)
+			s.Start()
 			time.Sleep(testHeartbeatTimeout)
 			time.Sleep(testHeartbeatTimeout)
 		}
 		}
 		if _, err := leader.Do(&DefaultJoinCommand{Name: name}); err != nil {
 		if _, err := leader.Do(&DefaultJoinCommand{Name: name}); err != nil {
@@ -536,7 +533,7 @@ func TestServerMultiNode(t *testing.T) {
 		t.Fatalf("Expected member count to be %v, got %v", n, leader.MemberCount())
 		t.Fatalf("Expected member count to be %v, got %v", n, leader.MemberCount())
 	}
 	}
 	if servers["2"].State() == Leader || servers["3"].State() == Leader {
 	if servers["2"].State() == Leader || servers["3"].State() == Leader {
-		t.Fatalf("Expected leader should be 1: 2=%v, 3=%v\n", servers["2"].state, servers["3"].state)
+		t.Fatalf("Expected leader should be 1: 2=%v, 3=%v\n", servers["2"].State(), servers["3"].State())
 	}
 	}
 	mutex.RUnlock()
 	mutex.RUnlock()
 
 
@@ -573,7 +570,7 @@ func TestServerMultiNode(t *testing.T) {
 						}
 						}
 						debugln("[Test] Done")
 						debugln("[Test] Done")
 					}
 					}
-					debugln("Leader is ", value.Name(), " Index ", value.log.commitIndex)
+					debugln("Leader is ", value.Name(), " Index ", value.(*server).log.commitIndex)
 				}
 				}
 				debugln("Not Found leader")
 				debugln("Not Found leader")
 			}
 			}
@@ -584,7 +581,7 @@ func TestServerMultiNode(t *testing.T) {
 					if value.State() == Leader {
 					if value.State() == Leader {
 						leader++
 						leader++
 					}
 					}
-					debugln(value.Name(), " ", value.currentTerm, " ", value.state)
+					debugln(value.Name(), " ", value.(*server).Term(), " ", value.State())
 				}
 				}
 			}
 			}
 
 

+ 3 - 3
third_party/github.com/coreos/go-raft/snapshot.go

@@ -20,9 +20,9 @@ type Snapshot struct {
 	LastIndex uint64 `json:"lastIndex"`
 	LastIndex uint64 `json:"lastIndex"`
 	LastTerm  uint64 `json:"lastTerm"`
 	LastTerm  uint64 `json:"lastTerm"`
 	// cluster configuration.
 	// cluster configuration.
-	Peers []*Peer `json: "peers"`
-	State []byte  `json: "state"`
-	Path  string  `json: "path"`
+	Peers []*Peer `json:"peers"`
+	State []byte  `json:"state"`
+	Path  string  `json:"path"`
 }
 }
 
 
 // Save the snapshot to a file
 // Save the snapshot to a file

+ 1 - 1
third_party/github.com/coreos/go-raft/snapshot_recovery_request.go

@@ -80,7 +80,7 @@ func (req *SnapshotRecoveryRequest) Decode(r io.Reader) (int, error) {
 	req.LeaderName = pb.GetLeaderName()
 	req.LeaderName = pb.GetLeaderName()
 	req.LastIndex = pb.GetLastIndex()
 	req.LastIndex = pb.GetLastIndex()
 	req.LastTerm = pb.GetLastTerm()
 	req.LastTerm = pb.GetLastTerm()
-	req.State = req.State
+	req.State = pb.GetState()
 
 
 	req.Peers = make([]*Peer, len(pb.Peers))
 	req.Peers = make([]*Peer, len(pb.Peers))
 
 

+ 14 - 14
third_party/github.com/coreos/go-raft/test.go

@@ -60,7 +60,7 @@ func setupLog(entries []*LogEntry) (*Log, string) {
 // Servers
 // Servers
 //--------------------------------------
 //--------------------------------------
 
 
-func newTestServer(name string, transporter Transporter) *Server {
+func newTestServer(name string, transporter Transporter) Server {
 	p, _ := ioutil.TempDir("", "raft-server-")
 	p, _ := ioutil.TempDir("", "raft-server-")
 	if err := os.MkdirAll(p, 0644); err != nil {
 	if err := os.MkdirAll(p, 0644); err != nil {
 		panic(err.Error())
 		panic(err.Error())
@@ -69,12 +69,12 @@ func newTestServer(name string, transporter Transporter) *Server {
 	return server
 	return server
 }
 }
 
 
-func newTestServerWithPath(name string, transporter Transporter, p string) *Server {
+func newTestServerWithPath(name string, transporter Transporter, p string) Server {
 	server, _ := NewServer(name, p, transporter, nil, nil, "")
 	server, _ := NewServer(name, p, transporter, nil, nil, "")
 	return server
 	return server
 }
 }
 
 
-func newTestServerWithLog(name string, transporter Transporter, entries []*LogEntry) *Server {
+func newTestServerWithLog(name string, transporter Transporter, entries []*LogEntry) Server {
 	server := newTestServer(name, transporter)
 	server := newTestServer(name, transporter)
 	f, err := os.Create(server.LogPath())
 	f, err := os.Create(server.LogPath())
 	if err != nil {
 	if err != nil {
@@ -88,8 +88,8 @@ func newTestServerWithLog(name string, transporter Transporter, entries []*LogEn
 	return server
 	return server
 }
 }
 
 
-func newTestCluster(names []string, transporter Transporter, lookup map[string]*Server) []*Server {
-	servers := []*Server{}
+func newTestCluster(names []string, transporter Transporter, lookup map[string]Server) []Server {
+	servers := []Server{}
 	e0, _ := newLogEntry(newLog(), 1, 1, &testCommand1{Val: "foo", I: 20})
 	e0, _ := newLogEntry(newLog(), 1, 1, &testCommand1{Val: "foo", I: 20})
 
 
 	for _, name := range names {
 	for _, name := range names {
@@ -116,24 +116,24 @@ func newTestCluster(names []string, transporter Transporter, lookup map[string]*
 //--------------------------------------
 //--------------------------------------
 
 
 type testTransporter struct {
 type testTransporter struct {
-	sendVoteRequestFunc          func(server *Server, peer *Peer, req *RequestVoteRequest) *RequestVoteResponse
-	sendAppendEntriesRequestFunc func(server *Server, peer *Peer, req *AppendEntriesRequest) *AppendEntriesResponse
-	sendSnapshotRequestFunc      func(server *Server, peer *Peer, req *SnapshotRequest) *SnapshotResponse
+	sendVoteRequestFunc          func(server Server, peer *Peer, req *RequestVoteRequest) *RequestVoteResponse
+	sendAppendEntriesRequestFunc func(server Server, peer *Peer, req *AppendEntriesRequest) *AppendEntriesResponse
+	sendSnapshotRequestFunc      func(server Server, peer *Peer, req *SnapshotRequest) *SnapshotResponse
 }
 }
 
 
-func (t *testTransporter) SendVoteRequest(server *Server, peer *Peer, req *RequestVoteRequest) *RequestVoteResponse {
+func (t *testTransporter) SendVoteRequest(server Server, peer *Peer, req *RequestVoteRequest) *RequestVoteResponse {
 	return t.sendVoteRequestFunc(server, peer, req)
 	return t.sendVoteRequestFunc(server, peer, req)
 }
 }
 
 
-func (t *testTransporter) SendAppendEntriesRequest(server *Server, peer *Peer, req *AppendEntriesRequest) *AppendEntriesResponse {
+func (t *testTransporter) SendAppendEntriesRequest(server Server, peer *Peer, req *AppendEntriesRequest) *AppendEntriesResponse {
 	return t.sendAppendEntriesRequestFunc(server, peer, req)
 	return t.sendAppendEntriesRequestFunc(server, peer, req)
 }
 }
 
 
-func (t *testTransporter) SendSnapshotRequest(server *Server, peer *Peer, req *SnapshotRequest) *SnapshotResponse {
+func (t *testTransporter) SendSnapshotRequest(server Server, peer *Peer, req *SnapshotRequest) *SnapshotResponse {
 	return t.sendSnapshotRequestFunc(server, peer, req)
 	return t.sendSnapshotRequestFunc(server, peer, req)
 }
 }
 
 
-func (t *testTransporter) SendSnapshotRecoveryRequest(server *Server, peer *Peer, req *SnapshotRecoveryRequest) *SnapshotRecoveryResponse {
+func (t *testTransporter) SendSnapshotRecoveryRequest(server Server, peer *Peer, req *SnapshotRecoveryRequest) *SnapshotRecoveryResponse {
 	return t.SendSnapshotRecoveryRequest(server, peer, req)
 	return t.SendSnapshotRecoveryRequest(server, peer, req)
 }
 }
 
 
@@ -163,7 +163,7 @@ func (c *testCommand1) CommandName() string {
 	return "cmd_1"
 	return "cmd_1"
 }
 }
 
 
-func (c *testCommand1) Apply(server *Server) (interface{}, error) {
+func (c *testCommand1) Apply(server Server) (interface{}, error) {
 	return nil, nil
 	return nil, nil
 }
 }
 
 
@@ -179,6 +179,6 @@ func (c *testCommand2) CommandName() string {
 	return "cmd_2"
 	return "cmd_2"
 }
 }
 
 
-func (c *testCommand2) Apply(server *Server) (interface{}, error) {
+func (c *testCommand2) Apply(server Server) (interface{}, error) {
 	return nil, nil
 	return nil, nil
 }
 }

+ 4 - 4
third_party/github.com/coreos/go-raft/transporter.go

@@ -9,8 +9,8 @@ package raft
 // Transporter is the interface for allowing the host application to transport
 // Transporter is the interface for allowing the host application to transport
 // requests to other nodes.
 // requests to other nodes.
 type Transporter interface {
 type Transporter interface {
-	SendVoteRequest(server *Server, peer *Peer, req *RequestVoteRequest) *RequestVoteResponse
-	SendAppendEntriesRequest(server *Server, peer *Peer, req *AppendEntriesRequest) *AppendEntriesResponse
-	SendSnapshotRequest(server *Server, peer *Peer, req *SnapshotRequest) *SnapshotResponse
-	SendSnapshotRecoveryRequest(server *Server, peer *Peer, req *SnapshotRecoveryRequest) *SnapshotRecoveryResponse
+	SendVoteRequest(server Server, peer *Peer, req *RequestVoteRequest) *RequestVoteResponse
+	SendAppendEntriesRequest(server Server, peer *Peer, req *AppendEntriesRequest) *AppendEntriesResponse
+	SendSnapshotRequest(server Server, peer *Peer, req *SnapshotRequest) *SnapshotResponse
+	SendSnapshotRecoveryRequest(server Server, peer *Peer, req *SnapshotRecoveryRequest) *SnapshotRecoveryResponse
 }
 }

+ 1 - 1
web/web.go

@@ -23,7 +23,7 @@ func mainHandler(c http.ResponseWriter, req *http.Request) {
 	mainTempl.Execute(c, p)
 	mainTempl.Execute(c, p)
 }
 }
 
 
-func Start(raftServer *raft.Server, webURL string) {
+func Start(raftServer raft.Server, webURL string) {
 	u, _ := url.Parse(webURL)
 	u, _ := url.Parse(webURL)
 
 
 	webMux := http.NewServeMux()
 	webMux := http.NewServeMux()