Browse Source

Fix registry cache issues.

Ben Johnson 12 years ago
parent
commit
d44fd6661a
6 changed files with 27 additions and 8 deletions
  1. 3 3
      etcd_test.go
  2. 3 0
      server/join_command.go
  3. 6 1
      server/registry.go
  4. 3 1
      server/remove_command.go
  5. 11 2
      server/server.go
  6. 1 1
      server/transporter.go

+ 3 - 3
etcd_test.go

@@ -106,7 +106,7 @@ func TestInternalVersionFail(t *testing.T) {
 
 	procAttr := new(os.ProcAttr)
 	procAttr.Files = []*os.File{nil, os.Stdout, os.Stderr}
-	args := []string{"etcd", "-n=node1", "-f", "-d=/tmp/node1", "-vv", "-C=" + fakeURL.Host}
+	args := []string{"etcd", "-n=node1", "-f", "-d=/tmp/node1", "-C=" + fakeURL.Host}
 
 	process, err := os.StartProcess("etcd", args, procAttr)
 	if err != nil {
@@ -525,7 +525,7 @@ func TestRemoveNode(t *testing.T) {
 			}
 
 			if len(resp) != 3 {
-				t.Fatal("add machine fails")
+				t.Fatalf("add machine fails #1 (%d != 3)", len(resp))
 			}
 		}
 
@@ -567,7 +567,7 @@ func TestRemoveNode(t *testing.T) {
 			}
 
 			if len(resp) != 3 {
-				t.Fatal("add machine fails")
+				t.Fatalf("add machine fails #2 (%d != 3)", len(resp))
 			}
 		}
 	}

+ 3 - 0
server/join_command.go

@@ -41,6 +41,9 @@ func (c *JoinCommand) Apply(server *raft.Server) (interface{}, error) {
 	b := make([]byte, 8)
 	binary.PutUvarint(b, server.CommitIndex())
 
+	// Make sure we're not getting a cached value from the registry.
+	ps.registry.Invalidate(c.Name)
+
 	// Check if the join command is from a previous machine, who lost all its previous log.
 	if _, ok := ps.registry.URL(c.Name); ok {
 		return b, nil

+ 6 - 1
server/registry.go

@@ -56,7 +56,7 @@ func (r *Registry) Unregister(name string, commitIndex uint64, term uint64) erro
 	defer r.Unlock()
 
 	// Remove from cache.
-	delete(r.nodes, name)
+	// delete(r.nodes, name)
 
 	// Remove the key from the store.
 	_, err := r.store.Delete(path.Join(RegistryKey, name), false, commitIndex, term)
@@ -165,6 +165,11 @@ func (r *Registry) PeerURLs(leaderName, selfName string) []string {
 	return urls
 }
 
+// Removes a node from the cache.
+func (r *Registry) Invalidate(name string) {
+	delete(r.nodes, name)	
+}
+
 // Loads the given node by name from the store into the cache.
 func (r *Registry) load(name string) {
 	if name == "" {

+ 3 - 1
server/remove_command.go

@@ -33,12 +33,14 @@ func (c *RemoveCommand) Apply(server *raft.Server) (interface{}, error) {
 	delete(ps.followersStats.Followers, c.Name)
 
 	if err != nil {
+		log.Debugf("Error while unregistering: %s (%v)", c.Name, err)
 		return []byte{0}, err
 	}
 
 	// Remove peer in raft
 	err = server.RemovePeer(c.Name)
 	if err != nil {
+		log.Debugf("Unable to remove peer: %s (%v)", c.Name, err)
 		return []byte{0}, err
 	}
 
@@ -57,7 +59,7 @@ func (c *RemoveCommand) Apply(server *raft.Server) (interface{}, error) {
 			log.Debugf("ignore previous remove command.")
 		}
 	}
-
+	
 	b := make([]byte, 8)
 	binary.PutUvarint(b, server.CommitIndex())
 

+ 11 - 2
server/server.go

@@ -178,8 +178,17 @@ func (s *Server) Dispatch(c raft.Command, w http.ResponseWriter, req *http.Reque
 			return etcdErr.NewError(300, "Empty result from raft", store.UndefIndex, store.UndefTerm)
 		}
 
-		response := event.(*store.Event).Response()
-		b, _ := json.Marshal(response)
+		if b, ok := event.([]byte); ok {
+			w.WriteHeader(http.StatusOK)
+			w.Write(b)
+		}
+
+		var b []byte
+		if strings.HasPrefix(req.URL.Path, "/v1") {
+			b, _ = json.Marshal(event.(*store.Event).Response())
+		} else {
+			b, _ = json.Marshal(event.(*store.Event))
+		}
 		w.WriteHeader(http.StatusOK)
 		w.Write(b)
 

+ 1 - 1
server/transporter.go

@@ -123,7 +123,7 @@ func (t *transporter) SendVoteRequest(server *raft.Server, peer *raft.Peer, req
 	json.NewEncoder(&b).Encode(req)
 
 	u, _ := t.peerServer.registry.PeerURL(peer.Name)
-	log.Debugf("Send Vote to %s", u)
+	log.Debugf("Send Vote from %s to %s", server.Name(), u)
 
 	resp, httpRequest, err := t.Post(fmt.Sprintf("%s/vote", u), &b)