Browse Source

Fix server dispatch redirection.

Ben Johnson 12 years ago
parent
commit
013d07bc2a
5 changed files with 22 additions and 15 deletions
  1. 3 6
      etcd_test.go
  2. 2 4
      server/peer_server.go
  3. 8 2
      server/registry.go
  4. 7 1
      server/server.go
  5. 2 2
      test/test.go

+ 3 - 6
etcd_test.go

@@ -21,7 +21,7 @@ import (
 func TestSingleNode(t *testing.T) {
 func TestSingleNode(t *testing.T) {
 	procAttr := new(os.ProcAttr)
 	procAttr := new(os.ProcAttr)
 	procAttr.Files = []*os.File{nil, os.Stdout, os.Stderr}
 	procAttr.Files = []*os.File{nil, os.Stdout, os.Stderr}
-	args := []string{"etcd", "-vv", "-n=node1", "-f", "-d=/tmp/node1"}
+	args := []string{"etcd", "-n=node1", "-f", "-d=/tmp/node1"}
 
 
 	process, err := os.StartProcess("etcd", args, procAttr)
 	process, err := os.StartProcess("etcd", args, procAttr)
 	if err != nil {
 	if err != nil {
@@ -249,6 +249,7 @@ func TestMultiNodeKillAllAndRecovery(t *testing.T) {
 
 
 	clusterSize := 5
 	clusterSize := 5
 	argGroup, etcds, err := test.CreateCluster(clusterSize, procAttr, false)
 	argGroup, etcds, err := test.CreateCluster(clusterSize, procAttr, false)
+	defer test.DestroyCluster(etcds)
 
 
 	if err != nil {
 	if err != nil {
 		t.Fatal("cannot create cluster")
 		t.Fatal("cannot create cluster")
@@ -300,9 +301,6 @@ func TestMultiNodeKillAllAndRecovery(t *testing.T) {
 	if result.Index != 18 {
 	if result.Index != 18 {
 		t.Fatalf("recovery failed! [%d/18]", result.Index)
 		t.Fatalf("recovery failed! [%d/18]", result.Index)
 	}
 	}
-
-	// kill all
-	test.DestroyCluster(etcds)
 }
 }
 
 
 // Create a five nodes
 // Create a five nodes
@@ -479,6 +477,7 @@ func TestRemoveNode(t *testing.T) {
 
 
 	clusterSize := 3
 	clusterSize := 3
 	argGroup, etcds, _ := test.CreateCluster(clusterSize, procAttr, false)
 	argGroup, etcds, _ := test.CreateCluster(clusterSize, procAttr, false)
+	defer test.DestroyCluster(etcds)
 
 
 	time.Sleep(time.Second)
 	time.Sleep(time.Second)
 
 
@@ -572,8 +571,6 @@ func TestRemoveNode(t *testing.T) {
 			}
 			}
 		}
 		}
 	}
 	}
-	test.DestroyCluster(etcds)
-
 }
 }
 
 
 func templateBenchmarkEtcdDirectCall(b *testing.B, tls bool) {
 func templateBenchmarkEtcdDirectCall(b *testing.B, tls bool) {

+ 2 - 4
server/peer_server.go

@@ -410,7 +410,7 @@ func (s *PeerServer) joinByMachine(server *raft.Server, machine string, scheme s
 	versionURL := url.URL{Host: machine, Scheme: scheme, Path: "/version"}
 	versionURL := url.URL{Host: machine, Scheme: scheme, Path: "/version"}
 	version, err := getVersion(t, versionURL)
 	version, err := getVersion(t, versionURL)
 	if err != nil {
 	if err != nil {
-		return fmt.Errorf("Unable to join: %v", err)
+		return fmt.Errorf("Error during join version check: %v", err)
 	}
 	}
 
 
 	// TODO: versioning of the internal protocol. See:
 	// TODO: versioning of the internal protocol. See:
@@ -442,12 +442,9 @@ func (s *PeerServer) joinByMachine(server *raft.Server, machine string, scheme s
 				return nil
 				return nil
 			}
 			}
 			if resp.StatusCode == http.StatusTemporaryRedirect {
 			if resp.StatusCode == http.StatusTemporaryRedirect {
-
 				address := resp.Header.Get("Location")
 				address := resp.Header.Get("Location")
 				log.Debugf("Send Join Request to %s", address)
 				log.Debugf("Send Join Request to %s", address)
-
 				json.NewEncoder(&b).Encode(NewJoinCommand(PeerVersion, server.Name(), s.url, s.server.URL()))
 				json.NewEncoder(&b).Encode(NewJoinCommand(PeerVersion, server.Name(), s.url, s.server.URL()))
-
 				resp, req, err = t.Post(address, &b)
 				resp, req, err = t.Post(address, &b)
 
 
 			} else if resp.StatusCode == http.StatusBadRequest {
 			} else if resp.StatusCode == http.StatusBadRequest {
@@ -538,6 +535,7 @@ func (s *PeerServer) dispatch(c raft.Command, w http.ResponseWriter, req *http.R
 		}
 		}
 		url, _ := s.registry.PeerURL(leader)
 		url, _ := s.registry.PeerURL(leader)
 
 
+		log.Debugf("Not leader; Current leader: %s; redirect: %s", leader, url)
 		redirect(url, w, req)
 		redirect(url, w, req)
 
 
 		return nil
 		return nil

+ 8 - 2
server/registry.go

@@ -38,14 +38,16 @@ func NewRegistry(s *store.Store) *Registry {
 }
 }
 
 
 // Adds a node to the registry.
 // Adds a node to the registry.
-func (r *Registry) Register(name string, peerVersion string, peerURL string, url string, commitIndex uint64, term uint64) {
+func (r *Registry) Register(name string, peerVersion string, peerURL string, url string, commitIndex uint64, term uint64) error {
 	r.Lock()
 	r.Lock()
 	defer r.Unlock()
 	defer r.Unlock()
 
 
 	// Write data to store.
 	// Write data to store.
 	key := path.Join(RegistryKey, name)
 	key := path.Join(RegistryKey, name)
 	value := fmt.Sprintf("raft=%s&etcd=%s&raftVersion=%s", peerURL, url, peerVersion)
 	value := fmt.Sprintf("raft=%s&etcd=%s&raftVersion=%s", peerURL, url, peerVersion)
-	r.store.Create(key, value, false, false, store.Permanent, commitIndex, term)
+	_, err := r.store.Create(key, value, false, false, store.Permanent, commitIndex, term)
+	log.Debugf("Register: %s (%v)", name, err)
+	return err
 }
 }
 
 
 // Removes a node from the registry.
 // Removes a node from the registry.
@@ -53,8 +55,12 @@ func (r *Registry) Unregister(name string, commitIndex uint64, term uint64) erro
 	r.Lock()
 	r.Lock()
 	defer r.Unlock()
 	defer r.Unlock()
 
 
+	// Remove from cache.
+	delete(r.nodes, name)
+
 	// Remove the key from the store.
 	// Remove the key from the store.
 	_, err := r.store.Delete(path.Join(RegistryKey, name), false, commitIndex, term)
 	_, err := r.store.Delete(path.Join(RegistryKey, name), false, commitIndex, term)
+	log.Debugf("Unregister: %s (%v)", name, err)
 	return err
 	return err
 }
 }
 
 

+ 7 - 1
server/server.go

@@ -193,7 +193,13 @@ func (s *Server) Dispatch(c raft.Command, w http.ResponseWriter, req *http.Reque
 			return etcdErr.NewError(300, "", store.UndefIndex, store.UndefTerm)
 			return etcdErr.NewError(300, "", store.UndefIndex, store.UndefTerm)
 		}
 		}
 
 
-		url, _ := s.registry.PeerURL(leader)
+		var url string
+		switch c.(type) {
+		case *JoinCommand, *RemoveCommand:
+			url, _ = s.registry.PeerURL(leader)
+		default:
+			url, _ = s.registry.URL(leader)
+		}
 		redirect(url, w, req)
 		redirect(url, w, req)
 
 
 		return nil
 		return nil

+ 2 - 2
test/test.go

@@ -69,13 +69,13 @@ func CreateCluster(size int, procAttr *os.ProcAttr, ssl bool) ([][]string, []*os
 
 
 	for i := 0; i < size; i++ {
 	for i := 0; i < size; i++ {
 		if i == 0 {
 		if i == 0 {
-			argGroup[i] = []string{"etcd", "-d=/tmp/node1", "-n=node1"}
+			argGroup[i] = []string{"etcd", "-v", "-d=/tmp/node1", "-n=node1"}
 			if ssl {
 			if ssl {
 				argGroup[i] = append(argGroup[i], sslServer1...)
 				argGroup[i] = append(argGroup[i], sslServer1...)
 			}
 			}
 		} else {
 		} else {
 			strI := strconv.Itoa(i + 1)
 			strI := strconv.Itoa(i + 1)
-			argGroup[i] = []string{"etcd", "-n=node" + strI, "-c=127.0.0.1:400" + strI, "-s=127.0.0.1:700" + strI, "-d=/tmp/node" + strI, "-C=127.0.0.1:7001"}
+			argGroup[i] = []string{"etcd", "-v", "-n=node" + strI, "-c=127.0.0.1:400" + strI, "-s=127.0.0.1:700" + strI, "-d=/tmp/node" + strI, "-C=127.0.0.1:7001"}
 			if ssl {
 			if ssl {
 				argGroup[i] = append(argGroup[i], sslServer2...)
 				argGroup[i] = append(argGroup[i], sslServer2...)
 			}
 			}