Browse Source

Merge pull request #214 from xiangli-cmu/newStore

WIP refactor remove raft singleton
Ben Johnson 12 years ago
parent
commit
a8b677f6e7
14 changed files with 126 additions and 136 deletions
  1. 4 5
      etcd.go
  2. 12 10
      etcd_handler_v1.go
  3. 36 29
      etcd_handlers.go
  4. 14 11
      etcd_server.go
  5. 2 3
      machines.go
  6. 1 1
      name_url_map.go
  7. 14 14
      raft_handlers.go
  8. 25 25
      raft_server.go
  9. 2 1
      raft_stats.go
  10. 2 2
      snapshot.go
  11. 3 1
      store/store.go
  12. 8 6
      transporter.go
  13. 1 1
      transporter_test.go
  14. 2 27
      util.go

+ 4 - 5
etcd.go

@@ -201,13 +201,12 @@ func main() {
 	// Create etcd key-value store
 	// Create etcd key-value store
 	etcdStore = store.New()
 	etcdStore = store.New()
 
 
-	snapConf = newSnapshotConf()
-
 	// Create etcd and raft server
 	// Create etcd and raft server
-	e = newEtcdServer(info.Name, info.EtcdURL, info.EtcdListenHost, &etcdTLSConfig, &info.EtcdTLS)
-	r = newRaftServer(info.Name, info.RaftURL, info.RaftListenHost, &raftTLSConfig, &info.RaftTLS)
+	r := newRaftServer(info.Name, info.RaftURL, info.RaftListenHost, &raftTLSConfig, &info.RaftTLS)
+	snapConf = r.newSnapshotConf()
+
+	e = newEtcdServer(info.Name, info.EtcdURL, info.EtcdListenHost, &etcdTLSConfig, &info.EtcdTLS, r)
 
 
-	startWebInterface()
 	r.ListenAndServe()
 	r.ListenAndServe()
 	e.ListenAndServe()
 	e.ListenAndServe()
 
 

+ 12 - 10
etcd_handler_v1.go

@@ -14,17 +14,17 @@ import (
 // Handlers to handle etcd-store related request via etcd url
 // Handlers to handle etcd-store related request via etcd url
 //-------------------------------------------------------------------
 //-------------------------------------------------------------------
 // Multiplex GET/POST/DELETE request to corresponding handlers
 // Multiplex GET/POST/DELETE request to corresponding handlers
-func MultiplexerV1(w http.ResponseWriter, req *http.Request) error {
+func (e *etcdServer) MultiplexerV1(w http.ResponseWriter, req *http.Request) error {
 
 
 	switch req.Method {
 	switch req.Method {
 	case "GET":
 	case "GET":
-		return GetHttpHandlerV1(w, req)
+		return e.GetHttpHandlerV1(w, req)
 	case "POST":
 	case "POST":
-		return SetHttpHandlerV1(w, req)
+		return e.SetHttpHandlerV1(w, req)
 	case "PUT":
 	case "PUT":
-		return SetHttpHandlerV1(w, req)
+		return e.SetHttpHandlerV1(w, req)
 	case "DELETE":
 	case "DELETE":
-		return DeleteHttpHandlerV1(w, req)
+		return e.DeleteHttpHandlerV1(w, req)
 	default:
 	default:
 		w.WriteHeader(http.StatusMethodNotAllowed)
 		w.WriteHeader(http.StatusMethodNotAllowed)
 		return nil
 		return nil
@@ -37,7 +37,7 @@ func MultiplexerV1(w http.ResponseWriter, req *http.Request) error {
 //--------------------------------------
 //--------------------------------------
 
 
 // Set Command Handler
 // Set Command Handler
-func SetHttpHandlerV1(w http.ResponseWriter, req *http.Request) error {
+func (e *etcdServer) SetHttpHandlerV1(w http.ResponseWriter, req *http.Request) error {
 	key := req.URL.Path[len("/v1/keys/"):]
 	key := req.URL.Path[len("/v1/keys/"):]
 
 
 	debugf("[recv] POST %v/v1/keys/%s [%s]", e.url, key, req.RemoteAddr)
 	debugf("[recv] POST %v/v1/keys/%s [%s]", e.url, key, req.RemoteAddr)
@@ -81,7 +81,7 @@ func SetHttpHandlerV1(w http.ResponseWriter, req *http.Request) error {
 }
 }
 
 
 // Delete Handler
 // Delete Handler
-func DeleteHttpHandlerV1(w http.ResponseWriter, req *http.Request) error {
+func (e *etcdServer) DeleteHttpHandlerV1(w http.ResponseWriter, req *http.Request) error {
 	key := req.URL.Path[len("/v1/keys/"):]
 	key := req.URL.Path[len("/v1/keys/"):]
 
 
 	debugf("[recv] DELETE %v/v1/keys/%s [%s]", e.url, key, req.RemoteAddr)
 	debugf("[recv] DELETE %v/v1/keys/%s [%s]", e.url, key, req.RemoteAddr)
@@ -101,9 +101,10 @@ func DeleteHttpHandlerV1(w http.ResponseWriter, req *http.Request) error {
 //--------------------------------------
 //--------------------------------------
 
 
 // Get Handler
 // Get Handler
-func GetHttpHandlerV1(w http.ResponseWriter, req *http.Request) error {
+func (e *etcdServer) GetHttpHandlerV1(w http.ResponseWriter, req *http.Request) error {
 	key := req.URL.Path[len("/v1/keys/"):]
 	key := req.URL.Path[len("/v1/keys/"):]
 
 
+	r := e.raftServer
 	debugf("[recv] GET %s/v1/keys/%s [%s]", e.url, key, req.RemoteAddr)
 	debugf("[recv] GET %s/v1/keys/%s [%s]", e.url, key, req.RemoteAddr)
 
 
 	command := &GetCommand{
 	command := &GetCommand{
@@ -128,13 +129,13 @@ func GetHttpHandlerV1(w http.ResponseWriter, req *http.Request) error {
 }
 }
 
 
 // Watch handler
 // Watch handler
-func WatchHttpHandlerV1(w http.ResponseWriter, req *http.Request) error {
+func (e *etcdServer) WatchHttpHandlerV1(w http.ResponseWriter, req *http.Request) error {
 	key := req.URL.Path[len("/v1/watch/"):]
 	key := req.URL.Path[len("/v1/watch/"):]
 
 
 	command := &WatchCommand{
 	command := &WatchCommand{
 		Key: key,
 		Key: key,
 	}
 	}
-
+	r := e.raftServer
 	if req.Method == "GET" {
 	if req.Method == "GET" {
 		debugf("[recv] GET %s/watch/%s [%s]", e.url, key, req.RemoteAddr)
 		debugf("[recv] GET %s/watch/%s [%s]", e.url, key, req.RemoteAddr)
 		command.SinceIndex = 0
 		command.SinceIndex = 0
@@ -178,6 +179,7 @@ func dispatchEtcdCommandV1(c Command, w http.ResponseWriter, req *http.Request)
 }
 }
 
 
 func dispatchV1(c Command, w http.ResponseWriter, req *http.Request, toURL func(name string) (string, bool)) error {
 func dispatchV1(c Command, w http.ResponseWriter, req *http.Request, toURL func(name string) (string, bool)) error {
+	r := e.raftServer
 	if r.State() == raft.Leader {
 	if r.State() == raft.Leader {
 		if event, err := r.Do(c); err != nil {
 		if event, err := r.Do(c); err != nil {
 			return err
 			return err

+ 36 - 29
etcd_handlers.go

@@ -19,18 +19,18 @@ import (
 func NewEtcdMuxer() *http.ServeMux {
 func NewEtcdMuxer() *http.ServeMux {
 	// external commands
 	// external commands
 	etcdMux := http.NewServeMux()
 	etcdMux := http.NewServeMux()
-	etcdMux.Handle("/"+version+"/keys/", errorHandler(Multiplexer))
-	etcdMux.Handle("/"+version+"/leader", errorHandler(LeaderHttpHandler))
-	etcdMux.Handle("/"+version+"/machines", errorHandler(MachinesHttpHandler))
-	etcdMux.Handle("/"+version+"/stats/", errorHandler(StatsHttpHandler))
-	etcdMux.Handle("/version", errorHandler(VersionHttpHandler))
+	etcdMux.Handle("/"+version+"/keys/", errorHandler(e.Multiplexer))
+	etcdMux.Handle("/"+version+"/leader", errorHandler(e.LeaderHttpHandler))
+	etcdMux.Handle("/"+version+"/machines", errorHandler(e.MachinesHttpHandler))
+	etcdMux.Handle("/"+version+"/stats/", errorHandler(e.StatsHttpHandler))
+	etcdMux.Handle("/version", errorHandler(e.VersionHttpHandler))
 	etcdMux.HandleFunc("/test/", TestHttpHandler)
 	etcdMux.HandleFunc("/test/", TestHttpHandler)
 
 
 	// backward support
 	// backward support
-	etcdMux.Handle("/v1/keys/", errorHandler(MultiplexerV1))
-	etcdMux.Handle("/v1/leader", errorHandler(LeaderHttpHandler))
-	etcdMux.Handle("/v1/machines", errorHandler(MachinesHttpHandler))
-	etcdMux.Handle("/v1/stats/", errorHandler(StatsHttpHandler))
+	etcdMux.Handle("/v1/keys/", errorHandler(e.MultiplexerV1))
+	etcdMux.Handle("/v1/leader", errorHandler(e.LeaderHttpHandler))
+	etcdMux.Handle("/v1/machines", errorHandler(e.MachinesHttpHandler))
+	etcdMux.Handle("/v1/stats/", errorHandler(e.StatsHttpHandler))
 
 
 	return etcdMux
 	return etcdMux
 }
 }
@@ -68,17 +68,17 @@ func (fn errorHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
 }
 }
 
 
 // Multiplex GET/POST/DELETE request to corresponding handlers
 // Multiplex GET/POST/DELETE request to corresponding handlers
-func Multiplexer(w http.ResponseWriter, req *http.Request) error {
+func (e *etcdServer) Multiplexer(w http.ResponseWriter, req *http.Request) error {
 
 
 	switch req.Method {
 	switch req.Method {
 	case "GET":
 	case "GET":
-		return GetHttpHandler(w, req)
+		return e.GetHttpHandler(w, req)
 	case "POST":
 	case "POST":
-		return CreateHttpHandler(w, req)
+		return e.CreateHttpHandler(w, req)
 	case "PUT":
 	case "PUT":
-		return UpdateHttpHandler(w, req)
+		return e.UpdateHttpHandler(w, req)
 	case "DELETE":
 	case "DELETE":
-		return DeleteHttpHandler(w, req)
+		return e.DeleteHttpHandler(w, req)
 	default:
 	default:
 		w.WriteHeader(http.StatusMethodNotAllowed)
 		w.WriteHeader(http.StatusMethodNotAllowed)
 		return nil
 		return nil
@@ -92,7 +92,7 @@ func Multiplexer(w http.ResponseWriter, req *http.Request) error {
 // Set/Delete will dispatch to leader
 // Set/Delete will dispatch to leader
 //--------------------------------------
 //--------------------------------------
 
 
-func CreateHttpHandler(w http.ResponseWriter, req *http.Request) error {
+func (e *etcdServer) CreateHttpHandler(w http.ResponseWriter, req *http.Request) error {
 	key := getNodePath(req.URL.Path)
 	key := getNodePath(req.URL.Path)
 
 
 	debugf("recv.post[%v] [%v%v]\n", req.RemoteAddr, req.Host, req.URL)
 	debugf("recv.post[%v] [%v%v]\n", req.RemoteAddr, req.Host, req.URL)
@@ -115,11 +115,11 @@ func CreateHttpHandler(w http.ResponseWriter, req *http.Request) error {
 		command.IncrementalSuffix = true
 		command.IncrementalSuffix = true
 	}
 	}
 
 
-	return dispatchEtcdCommand(command, w, req)
+	return e.dispatchEtcdCommand(command, w, req)
 
 
 }
 }
 
 
-func UpdateHttpHandler(w http.ResponseWriter, req *http.Request) error {
+func (e *etcdServer) UpdateHttpHandler(w http.ResponseWriter, req *http.Request) error {
 	key := getNodePath(req.URL.Path)
 	key := getNodePath(req.URL.Path)
 
 
 	debugf("recv.put[%v] [%v%v]\n", req.RemoteAddr, req.Host, req.URL)
 	debugf("recv.put[%v] [%v%v]\n", req.RemoteAddr, req.Host, req.URL)
@@ -150,7 +150,7 @@ func UpdateHttpHandler(w http.ResponseWriter, req *http.Request) error {
 			ExpireTime: expireTime,
 			ExpireTime: expireTime,
 		}
 		}
 
 
-		return dispatchEtcdCommand(command, w, req)
+		return e.dispatchEtcdCommand(command, w, req)
 
 
 	} else { // update with test
 	} else { // update with test
 		var prevIndex uint64
 		var prevIndex uint64
@@ -173,13 +173,13 @@ func UpdateHttpHandler(w http.ResponseWriter, req *http.Request) error {
 			PrevIndex: prevIndex,
 			PrevIndex: prevIndex,
 		}
 		}
 
 
-		return dispatchEtcdCommand(command, w, req)
+		return e.dispatchEtcdCommand(command, w, req)
 	}
 	}
 
 
 }
 }
 
 
 // Delete Handler
 // Delete Handler
-func DeleteHttpHandler(w http.ResponseWriter, req *http.Request) error {
+func (e *etcdServer) DeleteHttpHandler(w http.ResponseWriter, req *http.Request) error {
 	key := getNodePath(req.URL.Path)
 	key := getNodePath(req.URL.Path)
 
 
 	debugf("recv.delete[%v] [%v%v]\n", req.RemoteAddr, req.Host, req.URL)
 	debugf("recv.delete[%v] [%v%v]\n", req.RemoteAddr, req.Host, req.URL)
@@ -192,12 +192,12 @@ func DeleteHttpHandler(w http.ResponseWriter, req *http.Request) error {
 		command.Recursive = true
 		command.Recursive = true
 	}
 	}
 
 
-	return dispatchEtcdCommand(command, w, req)
+	return e.dispatchEtcdCommand(command, w, req)
 }
 }
 
 
 // Dispatch the command to leader
 // Dispatch the command to leader
-func dispatchEtcdCommand(c Command, w http.ResponseWriter, req *http.Request) error {
-	return dispatch(c, w, req, nameToEtcdURL)
+func (e *etcdServer) dispatchEtcdCommand(c Command, w http.ResponseWriter, req *http.Request) error {
+	return e.raftServer.dispatch(c, w, req, nameToEtcdURL)
 }
 }
 
 
 //--------------------------------------
 //--------------------------------------
@@ -207,7 +207,9 @@ func dispatchEtcdCommand(c Command, w http.ResponseWriter, req *http.Request) er
 //--------------------------------------
 //--------------------------------------
 
 
 // Handler to return the current leader's raft address
 // Handler to return the current leader's raft address
-func LeaderHttpHandler(w http.ResponseWriter, req *http.Request) error {
+func (e *etcdServer) LeaderHttpHandler(w http.ResponseWriter, req *http.Request) error {
+	r := e.raftServer
+
 	leader := r.Leader()
 	leader := r.Leader()
 
 
 	if leader != "" {
 	if leader != "" {
@@ -222,8 +224,8 @@ func LeaderHttpHandler(w http.ResponseWriter, req *http.Request) error {
 }
 }
 
 
 // Handler to return all the known machines in the current cluster
 // Handler to return all the known machines in the current cluster
-func MachinesHttpHandler(w http.ResponseWriter, req *http.Request) error {
-	machines := getMachines(nameToEtcdURL)
+func (e *etcdServer) MachinesHttpHandler(w http.ResponseWriter, req *http.Request) error {
+	machines := e.raftServer.getMachines(nameToEtcdURL)
 
 
 	w.WriteHeader(http.StatusOK)
 	w.WriteHeader(http.StatusOK)
 	w.Write([]byte(strings.Join(machines, ", ")))
 	w.Write([]byte(strings.Join(machines, ", ")))
@@ -232,7 +234,7 @@ func MachinesHttpHandler(w http.ResponseWriter, req *http.Request) error {
 }
 }
 
 
 // Handler to return the current version of etcd
 // Handler to return the current version of etcd
-func VersionHttpHandler(w http.ResponseWriter, req *http.Request) error {
+func (e *etcdServer) VersionHttpHandler(w http.ResponseWriter, req *http.Request) error {
 	w.WriteHeader(http.StatusOK)
 	w.WriteHeader(http.StatusOK)
 	fmt.Fprintf(w, "etcd %s", releaseVersion)
 	fmt.Fprintf(w, "etcd %s", releaseVersion)
 
 
@@ -240,10 +242,12 @@ func VersionHttpHandler(w http.ResponseWriter, req *http.Request) error {
 }
 }
 
 
 // Handler to return the basic stats of etcd
 // Handler to return the basic stats of etcd
-func StatsHttpHandler(w http.ResponseWriter, req *http.Request) error {
+func (e *etcdServer) StatsHttpHandler(w http.ResponseWriter, req *http.Request) error {
 	option := req.URL.Path[len("/v1/stats/"):]
 	option := req.URL.Path[len("/v1/stats/"):]
 	w.WriteHeader(http.StatusOK)
 	w.WriteHeader(http.StatusOK)
 
 
+	r := e.raftServer
+
 	switch option {
 	switch option {
 	case "self":
 	case "self":
 		w.Write(r.Stats())
 		w.Write(r.Stats())
@@ -266,9 +270,12 @@ func StatsHttpHandler(w http.ResponseWriter, req *http.Request) error {
 	return nil
 	return nil
 }
 }
 
 
-func GetHttpHandler(w http.ResponseWriter, req *http.Request) error {
+func (e *etcdServer) GetHttpHandler(w http.ResponseWriter, req *http.Request) error {
 	var err error
 	var err error
 	var event interface{}
 	var event interface{}
+
+	r := e.raftServer
+
 	debugf("recv.get[%v] [%v%v]\n", req.RemoteAddr, req.Host, req.URL)
 	debugf("recv.get[%v] [%v%v]\n", req.RemoteAddr, req.Host, req.URL)
 
 
 	if req.FormValue("consistent") == "true" && r.State() != raft.Leader {
 	if req.FormValue("consistent") == "true" && r.State() != raft.Leader {

+ 14 - 11
etcd_server.go

@@ -6,26 +6,29 @@ import (
 
 
 type etcdServer struct {
 type etcdServer struct {
 	http.Server
 	http.Server
-	name    string
-	url     string
-	tlsConf *TLSConfig
-	tlsInfo *TLSInfo
+	raftServer *raftServer
+	name       string
+	url        string
+	tlsConf    *TLSConfig
+	tlsInfo    *TLSInfo
 }
 }
 
 
 var e *etcdServer
 var e *etcdServer
 
 
-func newEtcdServer(name string, urlStr string, listenHost string, tlsConf *TLSConfig, tlsInfo *TLSInfo) *etcdServer {
-	return &etcdServer{
+func newEtcdServer(name string, urlStr string, listenHost string, tlsConf *TLSConfig, tlsInfo *TLSInfo, raftServer *raftServer) *etcdServer {
+	e = &etcdServer{
 		Server: http.Server{
 		Server: http.Server{
-			Handler:   NewEtcdMuxer(),
 			TLSConfig: &tlsConf.Server,
 			TLSConfig: &tlsConf.Server,
 			Addr:      listenHost,
 			Addr:      listenHost,
 		},
 		},
-		name:    name,
-		url:     urlStr,
-		tlsConf: tlsConf,
-		tlsInfo: tlsInfo,
+		name:       name,
+		url:        urlStr,
+		tlsConf:    tlsConf,
+		tlsInfo:    tlsInfo,
+		raftServer: raftServer,
 	}
 	}
+	e.Handler = NewEtcdMuxer()
+	return e
 }
 }
 
 
 // Start to listen and response etcd client command
 // Start to listen and response etcd client command

+ 2 - 3
machines.go

@@ -2,7 +2,7 @@ package main
 
 
 // machineNum returns the number of machines in the cluster
 // machineNum returns the number of machines in the cluster
 func machineNum() int {
 func machineNum() int {
-	e, err := etcdStore.Get("/_etcd/machines", false, false, r.CommitIndex(), r.Term())
+	e, err := etcdStore.Get("/_etcd/machines", false, false, 0, 0)
 
 
 	if err != nil {
 	if err != nil {
 		return 0
 		return 0
@@ -12,8 +12,7 @@ func machineNum() int {
 }
 }
 
 
 // getMachines gets the current machines in the cluster
 // getMachines gets the current machines in the cluster
-func getMachines(toURL func(string) (string, bool)) []string {
-
+func (r *raftServer) getMachines(toURL func(string) (string, bool)) []string {
 	peers := r.Peers()
 	peers := r.Peers()
 
 
 	machines := make([]string, len(peers)+1)
 	machines := make([]string, len(peers)+1)

+ 1 - 1
name_url_map.go

@@ -56,7 +56,7 @@ func readURL(nodeName string, urlName string) (string, bool) {
 	// convert nodeName to url from etcd storage
 	// convert nodeName to url from etcd storage
 	key := path.Join("/_etcd/machines", nodeName)
 	key := path.Join("/_etcd/machines", nodeName)
 
 
-	e, err := etcdStore.Get(key, false, false, r.CommitIndex(), r.Term())
+	e, err := etcdStore.Get(key, false, false, 0, 0)
 
 
 	if err != nil {
 	if err != nil {
 		return "", false
 		return "", false

+ 14 - 14
raft_handlers.go

@@ -12,7 +12,7 @@ import (
 //-------------------------------------------------------------
 //-------------------------------------------------------------
 
 
 // Get all the current logs
 // Get all the current logs
-func GetLogHttpHandler(w http.ResponseWriter, req *http.Request) {
+func (r *raftServer) GetLogHttpHandler(w http.ResponseWriter, req *http.Request) {
 	debugf("[recv] GET %s/log", r.url)
 	debugf("[recv] GET %s/log", r.url)
 	w.Header().Set("Content-Type", "application/json")
 	w.Header().Set("Content-Type", "application/json")
 	w.WriteHeader(http.StatusOK)
 	w.WriteHeader(http.StatusOK)
@@ -20,7 +20,7 @@ func GetLogHttpHandler(w http.ResponseWriter, req *http.Request) {
 }
 }
 
 
 // Response to vote request
 // Response to vote request
-func VoteHttpHandler(w http.ResponseWriter, req *http.Request) {
+func (r *raftServer) VoteHttpHandler(w http.ResponseWriter, req *http.Request) {
 	rvreq := &raft.RequestVoteRequest{}
 	rvreq := &raft.RequestVoteRequest{}
 	err := decodeJsonRequest(req, rvreq)
 	err := decodeJsonRequest(req, rvreq)
 	if err == nil {
 	if err == nil {
@@ -36,7 +36,7 @@ func VoteHttpHandler(w http.ResponseWriter, req *http.Request) {
 }
 }
 
 
 // Response to append entries request
 // Response to append entries request
-func AppendEntriesHttpHandler(w http.ResponseWriter, req *http.Request) {
+func (r *raftServer) AppendEntriesHttpHandler(w http.ResponseWriter, req *http.Request) {
 	aereq := &raft.AppendEntriesRequest{}
 	aereq := &raft.AppendEntriesRequest{}
 	err := decodeJsonRequest(req, aereq)
 	err := decodeJsonRequest(req, aereq)
 
 
@@ -59,7 +59,7 @@ func AppendEntriesHttpHandler(w http.ResponseWriter, req *http.Request) {
 }
 }
 
 
 // Response to recover from snapshot request
 // Response to recover from snapshot request
-func SnapshotHttpHandler(w http.ResponseWriter, req *http.Request) {
+func (r *raftServer) SnapshotHttpHandler(w http.ResponseWriter, req *http.Request) {
 	aereq := &raft.SnapshotRequest{}
 	aereq := &raft.SnapshotRequest{}
 	err := decodeJsonRequest(req, aereq)
 	err := decodeJsonRequest(req, aereq)
 	if err == nil {
 	if err == nil {
@@ -75,7 +75,7 @@ func SnapshotHttpHandler(w http.ResponseWriter, req *http.Request) {
 }
 }
 
 
 // Response to recover from snapshot request
 // Response to recover from snapshot request
-func SnapshotRecoveryHttpHandler(w http.ResponseWriter, req *http.Request) {
+func (r *raftServer) SnapshotRecoveryHttpHandler(w http.ResponseWriter, req *http.Request) {
 	aereq := &raft.SnapshotRecoveryRequest{}
 	aereq := &raft.SnapshotRecoveryRequest{}
 	err := decodeJsonRequest(req, aereq)
 	err := decodeJsonRequest(req, aereq)
 	if err == nil {
 	if err == nil {
@@ -91,20 +91,20 @@ func SnapshotRecoveryHttpHandler(w http.ResponseWriter, req *http.Request) {
 }
 }
 
 
 // Get the port that listening for etcd connecting of the server
 // Get the port that listening for etcd connecting of the server
-func EtcdURLHttpHandler(w http.ResponseWriter, req *http.Request) {
+func (r *raftServer) EtcdURLHttpHandler(w http.ResponseWriter, req *http.Request) {
 	debugf("[recv] Get %s/etcdURL/ ", r.url)
 	debugf("[recv] Get %s/etcdURL/ ", r.url)
 	w.WriteHeader(http.StatusOK)
 	w.WriteHeader(http.StatusOK)
 	w.Write([]byte(argInfo.EtcdURL))
 	w.Write([]byte(argInfo.EtcdURL))
 }
 }
 
 
 // Response to the join request
 // Response to the join request
-func JoinHttpHandler(w http.ResponseWriter, req *http.Request) error {
+func (r *raftServer) JoinHttpHandler(w http.ResponseWriter, req *http.Request) error {
 
 
 	command := &JoinCommand{}
 	command := &JoinCommand{}
 
 
 	if err := decodeJsonRequest(req, command); err == nil {
 	if err := decodeJsonRequest(req, command); err == nil {
 		debugf("Receive Join Request from %s", command.Name)
 		debugf("Receive Join Request from %s", command.Name)
-		return dispatchRaftCommand(command, w, req)
+		return r.dispatchRaftCommand(command, w, req)
 	} else {
 	} else {
 		w.WriteHeader(http.StatusInternalServerError)
 		w.WriteHeader(http.StatusInternalServerError)
 		return nil
 		return nil
@@ -112,7 +112,7 @@ func JoinHttpHandler(w http.ResponseWriter, req *http.Request) error {
 }
 }
 
 
 // Response to remove request
 // Response to remove request
-func RemoveHttpHandler(w http.ResponseWriter, req *http.Request) {
+func (r *raftServer) RemoveHttpHandler(w http.ResponseWriter, req *http.Request) {
 	if req.Method != "DELETE" {
 	if req.Method != "DELETE" {
 		w.WriteHeader(http.StatusMethodNotAllowed)
 		w.WriteHeader(http.StatusMethodNotAllowed)
 		return
 		return
@@ -125,23 +125,23 @@ func RemoveHttpHandler(w http.ResponseWriter, req *http.Request) {
 
 
 	debugf("[recv] Remove Request [%s]", command.Name)
 	debugf("[recv] Remove Request [%s]", command.Name)
 
 
-	dispatchRaftCommand(command, w, req)
+	r.dispatchRaftCommand(command, w, req)
 }
 }
 
 
 // Response to the name request
 // Response to the name request
-func NameHttpHandler(w http.ResponseWriter, req *http.Request) {
+func (r *raftServer) NameHttpHandler(w http.ResponseWriter, req *http.Request) {
 	debugf("[recv] Get %s/name/ ", r.url)
 	debugf("[recv] Get %s/name/ ", r.url)
 	w.WriteHeader(http.StatusOK)
 	w.WriteHeader(http.StatusOK)
 	w.Write([]byte(r.name))
 	w.Write([]byte(r.name))
 }
 }
 
 
 // Response to the name request
 // Response to the name request
-func RaftVersionHttpHandler(w http.ResponseWriter, req *http.Request) {
+func (r *raftServer) RaftVersionHttpHandler(w http.ResponseWriter, req *http.Request) {
 	debugf("[recv] Get %s/version/ ", r.url)
 	debugf("[recv] Get %s/version/ ", r.url)
 	w.WriteHeader(http.StatusOK)
 	w.WriteHeader(http.StatusOK)
 	w.Write([]byte(r.version))
 	w.Write([]byte(r.version))
 }
 }
 
 
-func dispatchRaftCommand(c Command, w http.ResponseWriter, req *http.Request) error {
-	return dispatch(c, w, req, nameToRaftURL)
+func (r *raftServer) dispatchRaftCommand(c Command, w http.ResponseWriter, req *http.Request) error {
+	return r.dispatch(c, w, req, nameToRaftURL)
 }
 }

+ 25 - 25
raft_server.go

@@ -28,13 +28,10 @@ type raftServer struct {
 	serverStats    *raftServerStats
 	serverStats    *raftServerStats
 }
 }
 
 
-var r *raftServer
+//var r *raftServer
 
 
 func newRaftServer(name string, url string, listenHost string, tlsConf *TLSConfig, tlsInfo *TLSInfo) *raftServer {
 func newRaftServer(name string, url string, listenHost string, tlsConf *TLSConfig, tlsInfo *TLSInfo) *raftServer {
 
 
-	// Create transporter for raft
-	raftTransporter := newTransporter(tlsConf.Scheme, tlsConf.Client)
-
 	raftWrapper := &raftServer{
 	raftWrapper := &raftServer{
 		version:    raftVersion,
 		version:    raftVersion,
 		name:       name,
 		name:       name,
@@ -57,6 +54,9 @@ func newRaftServer(name string, url string, listenHost string, tlsConf *TLSConfi
 		},
 		},
 	}
 	}
 
 
+	// Create transporter for raft
+	raftTransporter := newTransporter(tlsConf.Scheme, tlsConf.Client, raftWrapper)
+
 	// Create raft server
 	// Create raft server
 	server, err := raft.NewServer(name, dirPath, raftTransporter, etcdStore, raftWrapper, "")
 	server, err := raft.NewServer(name, dirPath, raftTransporter, etcdStore, raftWrapper, "")
 	check(err)
 	check(err)
@@ -91,16 +91,16 @@ func (r *raftServer) ListenAndServe() {
 
 
 		// start as a leader in a new cluster
 		// start as a leader in a new cluster
 		if len(cluster) == 0 {
 		if len(cluster) == 0 {
-			startAsLeader()
+			r.startAsLeader()
 
 
 		} else {
 		} else {
-			startAsFollower()
+			r.startAsFollower()
 		}
 		}
 
 
 	} else {
 	} else {
 
 
 		// rejoin the previous cluster
 		// rejoin the previous cluster
-		cluster = getMachines(nameToRaftURL)
+		cluster = r.getMachines(nameToRaftURL)
 		for i := 0; i < len(cluster); i++ {
 		for i := 0; i < len(cluster); i++ {
 			u, err := url.Parse(cluster[i])
 			u, err := url.Parse(cluster[i])
 			if err != nil {
 			if err != nil {
@@ -108,7 +108,7 @@ func (r *raftServer) ListenAndServe() {
 			}
 			}
 			cluster[i] = u.Host
 			cluster[i] = u.Host
 		}
 		}
-		ok := joinCluster(cluster)
+		ok := r.joinCluster(cluster)
 		if !ok {
 		if !ok {
 			warn("the entire cluster is down! this machine will restart the cluster.")
 			warn("the entire cluster is down! this machine will restart the cluster.")
 		}
 		}
@@ -118,7 +118,7 @@ func (r *raftServer) ListenAndServe() {
 
 
 	// open the snapshot
 	// open the snapshot
 	if snapshot {
 	if snapshot {
-		go monitorSnapshot()
+		go r.monitorSnapshot()
 	}
 	}
 
 
 	// start to response to raft requests
 	// start to response to raft requests
@@ -126,7 +126,7 @@ func (r *raftServer) ListenAndServe() {
 
 
 }
 }
 
 
-func startAsLeader() {
+func (r *raftServer) startAsLeader() {
 	// leader need to join self as a peer
 	// leader need to join self as a peer
 	for {
 	for {
 		_, err := r.Do(newJoinCommand(r.version, r.Name(), r.url, e.url))
 		_, err := r.Do(newJoinCommand(r.version, r.Name(), r.url, e.url))
@@ -137,10 +137,10 @@ func startAsLeader() {
 	debugf("%s start as a leader", r.name)
 	debugf("%s start as a leader", r.name)
 }
 }
 
 
-func startAsFollower() {
+func (r *raftServer) startAsFollower() {
 	// start as a follower in a existing cluster
 	// start as a follower in a existing cluster
 	for i := 0; i < retryTimes; i++ {
 	for i := 0; i < retryTimes; i++ {
-		ok := joinCluster(cluster)
+		ok := r.joinCluster(cluster)
 		if ok {
 		if ok {
 			return
 			return
 		}
 		}
@@ -164,16 +164,16 @@ func (r *raftServer) startTransport(scheme string, tlsConf tls.Config) {
 	}
 	}
 
 
 	// internal commands
 	// internal commands
-	raftMux.HandleFunc("/name", NameHttpHandler)
-	raftMux.HandleFunc("/version", RaftVersionHttpHandler)
-	raftMux.Handle("/join", errorHandler(JoinHttpHandler))
-	raftMux.HandleFunc("/remove/", RemoveHttpHandler)
-	raftMux.HandleFunc("/vote", VoteHttpHandler)
-	raftMux.HandleFunc("/log", GetLogHttpHandler)
-	raftMux.HandleFunc("/log/append", AppendEntriesHttpHandler)
-	raftMux.HandleFunc("/snapshot", SnapshotHttpHandler)
-	raftMux.HandleFunc("/snapshotRecovery", SnapshotRecoveryHttpHandler)
-	raftMux.HandleFunc("/etcdURL", EtcdURLHttpHandler)
+	raftMux.HandleFunc("/name", r.NameHttpHandler)
+	raftMux.HandleFunc("/version", r.RaftVersionHttpHandler)
+	raftMux.Handle("/join", errorHandler(r.JoinHttpHandler))
+	raftMux.HandleFunc("/remove/", r.RemoveHttpHandler)
+	raftMux.HandleFunc("/vote", r.VoteHttpHandler)
+	raftMux.HandleFunc("/log", r.GetLogHttpHandler)
+	raftMux.HandleFunc("/log/append", r.AppendEntriesHttpHandler)
+	raftMux.HandleFunc("/snapshot", r.SnapshotHttpHandler)
+	raftMux.HandleFunc("/snapshotRecovery", r.SnapshotRecoveryHttpHandler)
+	raftMux.HandleFunc("/etcdURL", r.EtcdURLHttpHandler)
 
 
 	if scheme == "http" {
 	if scheme == "http" {
 		fatal(server.ListenAndServe())
 		fatal(server.ListenAndServe())
@@ -202,14 +202,14 @@ func getVersion(t *transporter, versionURL url.URL) (string, error) {
 	return string(body), nil
 	return string(body), nil
 }
 }
 
 
-func joinCluster(cluster []string) bool {
+func (r *raftServer) joinCluster(cluster []string) bool {
 	for _, machine := range cluster {
 	for _, machine := range cluster {
 
 
 		if len(machine) == 0 {
 		if len(machine) == 0 {
 			continue
 			continue
 		}
 		}
 
 
-		err := joinByMachine(r.Server, machine, r.tlsConf.Scheme)
+		err := r.joinByMachine(r.Server, machine, r.tlsConf.Scheme)
 		if err == nil {
 		if err == nil {
 			debugf("%s success join to the cluster via machine %s", r.name, machine)
 			debugf("%s success join to the cluster via machine %s", r.name, machine)
 			return true
 			return true
@@ -226,7 +226,7 @@ func joinCluster(cluster []string) bool {
 }
 }
 
 
 // Send join requests to machine.
 // Send join requests to machine.
-func joinByMachine(s *raft.Server, machine string, scheme string) error {
+func (r *raftServer) joinByMachine(s *raft.Server, machine string, scheme string) error {
 	var b bytes.Buffer
 	var b bytes.Buffer
 
 
 	// t must be ok
 	// t must be ok

+ 2 - 1
raft_stats.go

@@ -33,6 +33,7 @@ func (ps *packageStats) Time() time.Time {
 }
 }
 
 
 type raftServerStats struct {
 type raftServerStats struct {
+	Name      string    `json:"name"`
 	State     string    `json:"state"`
 	State     string    `json:"state"`
 	StartTime time.Time `json:"startTime"`
 	StartTime time.Time `json:"startTime"`
 
 
@@ -70,7 +71,7 @@ func (ss *raftServerStats) SendAppendReq(pkgSize int) {
 
 
 	if ss.State != raft.Leader {
 	if ss.State != raft.Leader {
 		ss.State = raft.Leader
 		ss.State = raft.Leader
-		ss.LeaderInfo.Name = r.Name()
+		ss.LeaderInfo.Name = ss.Name
 		ss.LeaderInfo.startTime = now
 		ss.LeaderInfo.startTime = now
 	}
 	}
 
 

+ 2 - 2
snapshot.go

@@ -18,12 +18,12 @@ type snapshotConf struct {
 
 
 var snapConf *snapshotConf
 var snapConf *snapshotConf
 
 
-func newSnapshotConf() *snapshotConf {
+func (r *raftServer) newSnapshotConf() *snapshotConf {
 	// check snapshot every 3 seconds and the threshold is 20K
 	// check snapshot every 3 seconds and the threshold is 20K
 	return &snapshotConf{time.Second * 3, 0, 20 * 1000}
 	return &snapshotConf{time.Second * 3, 0, 20 * 1000}
 }
 }
 
 
-func monitorSnapshot() {
+func (r *raftServer) monitorSnapshot() {
 	for {
 	for {
 		time.Sleep(snapConf.checkingInterval)
 		time.Sleep(snapConf.checkingInterval)
 		//currentWrites := etcdStore.TotalWrites() - snapConf.lastWrites
 		//currentWrites := etcdStore.TotalWrites() - snapConf.lastWrites

+ 3 - 1
store/store.go

@@ -356,7 +356,9 @@ func (s *Store) internalGet(nodePath string, index uint64, term uint64) (*Node,
 	nodePath = path.Clean(path.Join("/", nodePath))
 	nodePath = path.Clean(path.Join("/", nodePath))
 
 
 	// update file system known index and term
 	// update file system known index and term
-	s.Index, s.Term = index, term
+	if index > s.Index {
+		s.Index, s.Term = index, term
+	}
 
 
 	walkFunc := func(parent *Node, name string) (*Node, *etcdErr.Error) {
 	walkFunc := func(parent *Node, name string) (*Node, *etcdErr.Error) {
 
 

+ 8 - 6
transporter.go

@@ -27,14 +27,15 @@ var tranTimeout = ElectionTimeout
 
 
 // Transporter layer for communication between raft nodes
 // Transporter layer for communication between raft nodes
 type transporter struct {
 type transporter struct {
-	client    *http.Client
-	transport *http.Transport
+	client     *http.Client
+	transport  *http.Transport
+	raftServer *raftServer
 }
 }
 
 
 // Create transporter using by raft server
 // Create transporter using by raft server
 // Create http or https transporter based on
 // Create http or https transporter based on
 // whether the user give the server cert and key
 // whether the user give the server cert and key
-func newTransporter(scheme string, tlsConf tls.Config) *transporter {
+func newTransporter(scheme string, tlsConf tls.Config, raftServer *raftServer) *transporter {
 	t := transporter{}
 	t := transporter{}
 
 
 	tr := &http.Transport{
 	tr := &http.Transport{
@@ -49,6 +50,7 @@ func newTransporter(scheme string, tlsConf tls.Config) *transporter {
 
 
 	t.client = &http.Client{Transport: tr}
 	t.client = &http.Client{Transport: tr}
 	t.transport = tr
 	t.transport = tr
+	t.raftServer = raftServer
 
 
 	return &t
 	return &t
 }
 }
@@ -67,18 +69,18 @@ func (t *transporter) SendAppendEntriesRequest(server *raft.Server, peer *raft.P
 
 
 	size := b.Len()
 	size := b.Len()
 
 
-	r.serverStats.SendAppendReq(size)
+	t.raftServer.serverStats.SendAppendReq(size)
 
 
 	u, _ := nameToRaftURL(peer.Name)
 	u, _ := nameToRaftURL(peer.Name)
 
 
 	debugf("Send LogEntries to %s ", u)
 	debugf("Send LogEntries to %s ", u)
 
 
-	thisFollowerStats, ok := r.followersStats.Followers[peer.Name]
+	thisFollowerStats, ok := t.raftServer.followersStats.Followers[peer.Name]
 
 
 	if !ok { //this is the first time this follower has been seen
 	if !ok { //this is the first time this follower has been seen
 		thisFollowerStats = &raftFollowerStats{}
 		thisFollowerStats = &raftFollowerStats{}
 		thisFollowerStats.Latency.Minimum = 1 << 63
 		thisFollowerStats.Latency.Minimum = 1 << 63
-		r.followersStats.Followers[peer.Name] = thisFollowerStats
+		t.raftServer.followersStats.Followers[peer.Name] = thisFollowerStats
 	}
 	}
 
 
 	start := time.Now()
 	start := time.Now()

+ 1 - 1
transporter_test.go

@@ -21,7 +21,7 @@ func TestTransporterTimeout(t *testing.T) {
 
 
 	conf := tls.Config{}
 	conf := tls.Config{}
 
 
-	ts := newTransporter("http", conf)
+	ts := newTransporter("http", conf, nil)
 
 
 	ts.Get("http://google.com")
 	ts.Get("http://google.com")
 	_, _, err := ts.Get("http://google.com:9999")
 	_, _, err := ts.Get("http://google.com:9999")

+ 2 - 27
util.go

@@ -15,7 +15,6 @@ import (
 
 
 	etcdErr "github.com/coreos/etcd/error"
 	etcdErr "github.com/coreos/etcd/error"
 	"github.com/coreos/etcd/store"
 	"github.com/coreos/etcd/store"
-	"github.com/coreos/etcd/web"
 	"github.com/coreos/go-log/log"
 	"github.com/coreos/go-log/log"
 	"github.com/coreos/go-raft"
 	"github.com/coreos/go-raft"
 )
 )
@@ -39,35 +38,11 @@ func durationToExpireTime(strDuration string) (time.Time, error) {
 	}
 	}
 }
 }
 
 
-//--------------------------------------
-// Web Helper
-//--------------------------------------
-var storeMsg chan string
-
-// Help to send msg from store to webHub
-func webHelper() {
-	storeMsg = make(chan string)
-	// etcdStore.SetMessager(storeMsg)
-	for {
-		// transfer the new msg to webHub
-		web.Hub().Send(<-storeMsg)
-	}
-}
-
-// startWebInterface starts web interface if webURL is not empty
-func startWebInterface() {
-	if argInfo.WebURL != "" {
-		// start web
-		go webHelper()
-		go web.Start(r.Server, argInfo.WebURL)
-	}
-}
-
 //--------------------------------------
 //--------------------------------------
 // HTTP Utilities
 // HTTP Utilities
 //--------------------------------------
 //--------------------------------------
 
 
-func dispatch(c Command, w http.ResponseWriter, req *http.Request, toURL func(name string) (string, bool)) error {
+func (r *raftServer) dispatch(c Command, w http.ResponseWriter, req *http.Request, toURL func(name string) (string, bool)) error {
 	if r.State() == raft.Leader {
 	if r.State() == raft.Leader {
 		if response, err := r.Do(c); err != nil {
 		if response, err := r.Do(c); err != nil {
 			return err
 			return err
@@ -278,7 +253,7 @@ func send(c chan bool) {
 		command.Key = "foo"
 		command.Key = "foo"
 		command.Value = "bar"
 		command.Value = "bar"
 		command.ExpireTime = time.Unix(0, 0)
 		command.ExpireTime = time.Unix(0, 0)
-		r.Do(command)
+		//r.Do(command)
 	}
 	}
 	c <- true
 	c <- true
 }
 }