Browse Source

Mostly working.

Ben Johnson 12 years ago
parent
commit
7416d2fdcc
7 changed files with 105 additions and 97 deletions
  1. 1 1
      etcd.go
  2. 3 3
      etcd_test.go
  3. 1 1
      server/peer_server.go
  4. 36 21
      server/registry.go
  5. 64 14
      server/server.go
  6. 0 56
      server/v2/handlers.go
  7. 0 1
      third_party/github.com/coreos/go-etcd/etcd/client.go

+ 1 - 1
etcd.go

@@ -177,7 +177,7 @@ func main() {
 	ps.MaxClusterSize = maxClusterSize
 	ps.RetryTimes = retryTimes
 
-	s := server.New(info.Name, info.EtcdURL, info.EtcdListenHost, &etcdTLSConfig, &info.EtcdTLS, ps.Server, registry, store)
+	s := server.New(info.Name, info.EtcdURL, info.EtcdListenHost, &etcdTLSConfig, &info.EtcdTLS, ps, registry, store)
 	if err := s.AllowOrigins(cors); err != nil {
 		panic(err)
 	}

+ 3 - 3
etcd_test.go

@@ -41,7 +41,7 @@ func TestSingleNode(t *testing.T) {
 
 	if err != nil || result.Key != "/foo" || result.Value != "bar" || result.TTL < 95 {
 		if err != nil {
-			t.Fatal(err)
+			t.Fatal("Set 1: ", err)
 		}
 
 		t.Fatalf("Set 1 failed with %s %s %v", result.Key, result.Value, result.TTL)
@@ -53,7 +53,7 @@ func TestSingleNode(t *testing.T) {
 
 	if err != nil || result.Key != "/foo" || result.Value != "bar" || result.PrevValue != "bar" || result.TTL != 100 {
 		if err != nil {
-			t.Fatal(err)
+			t.Fatal("Set 2: ", err)
 		}
 		t.Fatalf("Set 2 failed with %s %s %v", result.Key, result.Value, result.TTL)
 	}
@@ -295,7 +295,7 @@ func TestMultiNodeKillAllAndRecovery(t *testing.T) {
 	result, err := c.Set("foo", "bar", 0)
 
 	if err != nil {
-		panic(err)
+		t.Fatalf("Recovery error: %s", err)
 	}
 
 	if result.Index != 18 {

+ 1 - 1
server/peer_server.go

@@ -116,7 +116,7 @@ func (s *PeerServer) ListenAndServe(snapshot bool, cluster []string) {
 
     } else {
         // Rejoin the previous cluster
-        cluster = s.registry.PeerURLs()
+        cluster = s.registry.PeerURLs(s.Leader(), s.name)
         for i := 0; i < len(cluster); i++ {
             u, err := url.Parse(cluster[i])
             if err != nil {

+ 36 - 21
server/registry.go

@@ -4,8 +4,10 @@ import (
     "fmt"
     "net/url"
     "path"
+    "strings"
     "sync"
 
+    "github.com/coreos/etcd/log"
     "github.com/coreos/etcd/store"
 )
 
@@ -84,28 +86,34 @@ func (r *Registry) url(name string) (string, bool) {
 }
 
 // Retrieves the URLs for all nodes.
-func (r *Registry) URLs() []string {
+func (r *Registry) URLs(leaderName, selfName string) []string {
     r.Lock()
     defer r.Unlock()
 
-    // Retrieve a list of all nodes.
-    e, err := r.store.Get(RegistryKey, false, false, 0, 0)
-    if err != nil {
-        return make([]string, 0)
+    // Build list including the leader and self.
+    urls := make([]string, 0)
+    if url, _ := r.url(leaderName); len(url) > 0 {
+        urls = append(urls, url)
+    }
+    if url, _ := r.url(selfName); len(url) > 0 {
+        urls = append(urls, url)
     }
 
-    // Lookup the URL for each one.
-    urls := make([]string, 0)
-    for _, pair := range e.KVPairs {
-        if url, ok := r.url(pair.Key); ok {
-            urls = append(urls, url)
+    // Retrieve a list of all nodes.
+    if e, _ := r.store.Get(RegistryKey, false, false, 0, 0); e != nil {
+        // Lookup the URL for each one.
+        for _, pair := range e.KVPairs {
+            if url, _ := r.url(pair.Key); len(url) > 0 {
+                urls = append(urls, url)
+            }
         }
     }
 
+    log.Infof("URLs: %s / %s (%s", leaderName, selfName, strings.Join(urls, ","))
+
     return urls
 }
 
-
 // Retrieves the peer URL for a given node by name.
 func (r *Registry) PeerURL(name string) (string, bool) {
     r.Lock()
@@ -126,24 +134,31 @@ func (r *Registry) peerURL(name string) (string, bool) {
 }
 
 // Retrieves the peer URLs for all nodes.
-func (r *Registry) PeerURLs() []string {
+func (r *Registry) PeerURLs(leaderName, selfName string) []string {
     r.Lock()
     defer r.Unlock()
 
-    // Retrieve a list of all nodes.
-    e, err := r.store.Get(RegistryKey, false, false, 0, 0)
-    if err != nil {
-        return make([]string, 0)
+    // Build list including the leader and self.
+    urls := make([]string, 0)
+    if url, _ := r.peerURL(leaderName); len(url) > 0 {
+        urls = append(urls, url)
+    }
+    if url, _ := r.peerURL(selfName); len(url) > 0 {
+        urls = append(urls, url)
     }
 
-    // Lookup the URL for each one.
-    urls := make([]string, 0)
-    for _, pair := range e.KVPairs {
-        if url, ok := r.peerURL(pair.Key); ok {
-            urls = append(urls, url)
+    // Retrieve a list of all nodes.
+    if e, _ := r.store.Get(RegistryKey, false, false, 0, 0); e != nil {
+        // Lookup the URL for each one.
+        for _, pair := range e.KVPairs {
+            if url, _ := r.peerURL(pair.Key); len(url) > 0 {
+                urls = append(urls, url)
+            }
         }
     }
 
+    log.Infof("PeerURLs: %s / %s (%s", leaderName, selfName, strings.Join(urls, ","))
+
     return urls
 }
 

+ 64 - 14
server/server.go

@@ -18,9 +18,9 @@ import (
 // This is the default implementation of the Server interface.
 type Server struct {
 	http.Server
-	raftServer  *raft.Server
-    registry    *Registry
-    store       *store.Store
+	peerServer  *PeerServer
+	registry    *Registry
+	store       *store.Store
 	name        string
 	url         string
 	tlsConf     *TLSConfig
@@ -29,7 +29,7 @@ type Server struct {
 }
 
 // Creates a new Server.
-func New(name string, urlStr string, listenHost string, tlsConf *TLSConfig, tlsInfo *TLSInfo, raftServer *raft.Server, registry *Registry, store *store.Store) *Server {
+func New(name string, urlStr string, listenHost string, tlsConf *TLSConfig, tlsInfo *TLSInfo, peerServer *PeerServer, registry *Registry, store *store.Store) *Server {
 	s := &Server{
 		Server: http.Server{
 			Handler:   mux.NewRouter(),
@@ -41,7 +41,7 @@ func New(name string, urlStr string, listenHost string, tlsConf *TLSConfig, tlsI
 		url:        urlStr,
 		tlsConf:    tlsConf,
 		tlsInfo:    tlsInfo,
-		raftServer: raftServer,
+		peerServer: peerServer,
 	}
 
 	// Install the routes for each version of the API.
@@ -52,12 +52,12 @@ func New(name string, urlStr string, listenHost string, tlsConf *TLSConfig, tlsI
 
 // The current Raft committed index.
 func (s *Server) CommitIndex() uint64 {
-	return s.raftServer.CommitIndex()
+	return s.peerServer.CommitIndex()
 }
 
 // The current Raft term.
 func (s *Server) Term() uint64 {
-	return s.raftServer.Term()
+	return s.peerServer.Term()
 }
 
 // The server URL.
@@ -74,19 +74,21 @@ func (s *Server) installV1() {
 	s.handleFuncV1("/v1/keys/{key:.*}", v1.GetKeyHandler).Methods("GET")
 	s.handleFuncV1("/v1/keys/{key:.*}", v1.SetKeyHandler).Methods("POST", "PUT")
 	s.handleFuncV1("/v1/keys/{key:.*}", v1.DeleteKeyHandler).Methods("DELETE")
-
 	s.handleFuncV1("/v1/watch/{key:.*}", v1.WatchKeyHandler).Methods("GET", "POST")
+	s.handleFunc("/v1/leader", s.GetLeaderHandler).Methods("GET")
+	s.handleFunc("/v1/machines", s.GetMachinesHandler).Methods("GET")
+	s.handleFunc("/v1/stats", s.GetStatsHandler).Methods("GET")
 }
 
 // Adds a v1 server handler to the router.
 func (s *Server) handleFuncV1(path string, f func(http.ResponseWriter, *http.Request, v1.Server) error) *mux.Route {
-	return s.handleFunc(path, func(w http.ResponseWriter, req *http.Request, s *Server) error {
+	return s.handleFunc(path, func(w http.ResponseWriter, req *http.Request) error {
 		return f(w, req, s)
 	})
 }
 
 // Adds a server handler to the router.
-func (s *Server) handleFunc(path string, f func(http.ResponseWriter, *http.Request, *Server) error) *mux.Route {
+func (s *Server) handleFunc(path string, f func(http.ResponseWriter, *http.Request) error) *mux.Route {
 	r := s.Handler.(*mux.Router)
 
 	// Wrap the standard HandleFunc interface to pass in the server reference.
@@ -102,7 +104,7 @@ func (s *Server) handleFunc(path string, f func(http.ResponseWriter, *http.Reque
 		}
 
 		// Execute handler function and return error if necessary.
-		if err := f(w, req, s); err != nil {
+		if err := f(w, req); err != nil {
 			if etcdErr, ok := err.(*etcdErr.Error); ok {
 				log.Debug("Return error: ", (*etcdErr).Error())
 				etcdErr.Write(w)
@@ -125,8 +127,8 @@ func (s *Server) ListenAndServe() {
 }
 
 func (s *Server) Dispatch(c raft.Command, w http.ResponseWriter, req *http.Request) error {
-	if s.raftServer.State() == raft.Leader {
-		event, err := s.raftServer.Do(c)
+	if s.peerServer.State() == raft.Leader {
+		event, err := s.peerServer.Do(c)
 		if err != nil {
 			return err
 		}
@@ -143,7 +145,7 @@ func (s *Server) Dispatch(c raft.Command, w http.ResponseWriter, req *http.Reque
 		return nil
 
 	} else {
-		leader := s.raftServer.Leader()
+		leader := s.peerServer.Leader()
 
 		// No leader available.
 		if leader == "" {
@@ -178,3 +180,51 @@ func (s *Server) AllowOrigins(origins string) error {
 func (s *Server) OriginAllowed(origin string) bool {
 	return s.corsOrigins["*"] || s.corsOrigins[origin]
 }
+
+// Handler to return the current leader's raft address
+func (s *Server) GetLeaderHandler(w http.ResponseWriter, req *http.Request) error {
+	leader := s.peerServer.Leader()
+	if leader == "" {
+		return etcdErr.NewError(etcdErr.EcodeLeaderElect, "", store.UndefIndex, store.UndefTerm)
+	}
+	w.WriteHeader(http.StatusOK)
+	url, _ := s.registry.PeerURL(leader)
+	w.Write([]byte(url))
+	return nil
+}
+
+// Handler to return all the known machines in the current cluster.
+func (s *Server) GetMachinesHandler(w http.ResponseWriter, req *http.Request) error {
+  machines := s.registry.URLs(s.peerServer.Leader(), s.name)
+  w.WriteHeader(http.StatusOK)
+  w.Write([]byte(strings.Join(machines, ", ")))
+  return nil
+}
+
+// Retrieves stats on the Raft server.
+func (s *Server) GetStatsHandler(w http.ResponseWriter, req *http.Request) error {
+  w.Write(s.peerServer.Stats())
+  return nil
+}
+
+// Retrieves stats on the leader.
+func (s *Server) GetLeaderStatsHandler(w http.ResponseWriter, req *http.Request) error {
+	if s.peerServer.State() == raft.Leader {
+	  w.Write(s.peerServer.PeerStats())
+	  return nil
+	}
+
+	leader := s.peerServer.Leader()
+	if leader == "" {
+	return etcdErr.NewError(300, "", store.UndefIndex, store.UndefTerm)
+	}
+	hostname, _ := s.registry.URL(leader)
+	redirect(hostname, w, req)
+	return nil
+}
+
+// Retrieves stats on the leader.
+func (s *Server) GetStoreStatsHandler(w http.ResponseWriter, req *http.Request) error {
+  w.Write(s.store.JsonStats())
+  return nil
+}

+ 0 - 56
server/v2/handlers.go

@@ -202,33 +202,6 @@ func (e *etcdServer) dispatchEtcdCommand(c Command, w http.ResponseWriter, req *
 // still dispatch to the leader
 //--------------------------------------
 
-// Handler to return the current leader's raft address
-func (e *etcdServer) LeaderHttpHandler(w http.ResponseWriter, req *http.Request) error {
-	r := e.raftServer
-
-	leader := r.Leader()
-
-	if leader != "" {
-		w.WriteHeader(http.StatusOK)
-		raftURL, _ := nameToRaftURL(leader)
-		w.Write([]byte(raftURL))
-
-		return nil
-	} else {
-		return etcdErr.NewError(etcdErr.EcodeLeaderElect, "", store.UndefIndex, store.UndefTerm)
-	}
-}
-
-// Handler to return all the known machines in the current cluster
-func (e *etcdServer) MachinesHttpHandler(w http.ResponseWriter, req *http.Request) error {
-	machines := e.raftServer.getMachines(nameToEtcdURL)
-
-	w.WriteHeader(http.StatusOK)
-	w.Write([]byte(strings.Join(machines, ", ")))
-
-	return nil
-}
-
 // Handler to return the current version of etcd
 func (e *etcdServer) VersionHttpHandler(w http.ResponseWriter, req *http.Request) error {
 	w.WriteHeader(http.StatusOK)
@@ -237,35 +210,6 @@ func (e *etcdServer) VersionHttpHandler(w http.ResponseWriter, req *http.Request
 	return nil
 }
 
-// Handler to return the basic stats of etcd
-func (e *etcdServer) StatsHttpHandler(w http.ResponseWriter, req *http.Request) error {
-	option := req.URL.Path[len("/v1/stats/"):]
-	w.WriteHeader(http.StatusOK)
-
-	r := e.raftServer
-
-	switch option {
-	case "self":
-		w.Write(r.Stats())
-	case "leader":
-		if r.State() == raft.Leader {
-			w.Write(r.PeerStats())
-		} else {
-			leader := r.Leader()
-			// current no leader
-			if leader == "" {
-				return etcdErr.NewError(300, "", store.UndefIndex, store.UndefTerm)
-			}
-			hostname, _ := nameToEtcdURL(leader)
-			redirect(hostname, w, req)
-		}
-	case "store":
-		w.Write(etcdStore.JsonStats())
-	}
-
-	return nil
-}
-
 func (e *etcdServer) GetHttpHandler(w http.ResponseWriter, req *http.Request) error {
 	var err error
 	var event interface{}

+ 0 - 1
third_party/github.com/coreos/go-etcd/etcd/client.go

@@ -136,7 +136,6 @@ func (c *Client) internalSyncCluster(machines []string) bool {
 			logger.Debugf("update.leader[%s,%s]", c.cluster.Leader, c.cluster.Machines[0])
 			c.cluster.Leader = c.cluster.Machines[0]
 
-			logger.Debug("sync.machines ", c.cluster.Machines)
 			return true
 		}
 	}