Browse Source

fix(server): rejoin cluster with different ip

Yicheng Qin 11 years ago
parent
commit
273c293645
3 changed files with 131 additions and 2 deletions
  1. 40 2
      server/join_command.go
  2. 22 0
      server/registry.go
  3. 69 0
      tests/functional/rejoin_test.go

+ 40 - 2
server/join_command.go

@@ -29,6 +29,17 @@ func (c *JoinCommandV1) CommandName() string {
 	return "etcd:join"
 }
 
+func (c *JoinCommandV1) updatePeerURL(ps *PeerServer) error {
+	log.Debugf("Update peer URL of %v to %v", c.Name, c.RaftURL)
+	if err := ps.registry.UpdatePeerURL(c.Name, c.RaftURL); err != nil {
+		log.Debugf("Error while updating in registry: %s (%v)", c.Name, err)
+		return err
+	}
+	// Flush commit index, so raft will replay to here when restarted
+	ps.raftServer.FlushCommitIndex()
+	return nil
+}
+
 // Join a server to the cluster
 func (c *JoinCommandV1) Apply(context raft.Context) (interface{}, error) {
 	ps, _ := context.Server().Context().(*PeerServer)
@@ -40,7 +51,15 @@ func (c *JoinCommandV1) Apply(context raft.Context) (interface{}, error) {
 	ps.registry.Invalidate(c.Name)
 
 	// Check if the join command is from a previous peer, who lost all its previous log.
-	if _, ok := ps.registry.ClientURL(c.Name); ok {
+	if peerURL, ok := ps.registry.PeerURL(c.Name); ok {
+		// If previous node restarts with different peer URL,
+		// update its information.
+		if peerURL != c.RaftURL {
+			log.Infof("Rejoin with %v instead of %v from %v", c.RaftURL, peerURL, c.Name)
+			if err := c.updatePeerURL(ps); err != nil {
+				return []byte{0}, err
+			}
+		}
 		return b, nil
 	}
 
@@ -83,6 +102,17 @@ func (c *JoinCommandV2) CommandName() string {
 	return "etcd:v2:join"
 }
 
+func (c *JoinCommandV2) updatePeerURL(ps *PeerServer) error {
+	log.Debugf("Update peer URL of %v to %v", c.Name, c.PeerURL)
+	if err := ps.registry.UpdatePeerURL(c.Name, c.PeerURL); err != nil {
+		log.Debugf("Error while updating in registry: %s (%v)", c.Name, err)
+		return err
+	}
+	// Flush commit index, so raft will replay to here when restart
+	ps.raftServer.FlushCommitIndex()
+	return nil
+}
+
 // Apply attempts to join a machine to the cluster.
 func (c *JoinCommandV2) Apply(context raft.Context) (interface{}, error) {
 	ps, _ := context.Server().Context().(*PeerServer)
@@ -95,7 +125,15 @@ func (c *JoinCommandV2) Apply(context raft.Context) (interface{}, error) {
 	ps.registry.Invalidate(c.Name)
 
 	// Check if the join command is from a previous peer, who lost all its previous log.
-	if _, ok := ps.registry.ClientURL(c.Name); ok {
+	if peerURL, ok := ps.registry.PeerURL(c.Name); ok {
+		// If previous node restarts with different peer URL,
+		// update its information.
+		if peerURL != c.PeerURL {
+			log.Infof("Rejoin with %v instead of %v from %v", c.PeerURL, peerURL, c.Name)
+			if err := c.updatePeerURL(ps); err != nil {
+				return []byte{0}, err
+			}
+		}
 		return json.Marshal(msg)
 	}
 

+ 22 - 0
server/registry.go

@@ -103,6 +103,25 @@ func (r *Registry) register(key, name string, peerURL string, machURL string) er
 	return err
 }
 
+// UpdatePeerURL updates peer URL in registry
+func (r *Registry) UpdatePeerURL(name string, peerURL string) error {
+	r.Lock()
+	defer r.Unlock()
+
+	machURL, _ := r.clientURL(RegistryPeerKey, name)
+	// Write data to store.
+	key := path.Join(RegistryPeerKey, name)
+	v := url.Values{}
+	v.Set("raft", peerURL)
+	v.Set("etcd", machURL)
+	_, err := r.store.Update(key, v.Encode(), store.Permanent)
+
+	// Invalidate outdated cache.
+	r.invalidate(name)
+	log.Debugf("Update PeerURL: %s", name)
+	return err
+}
+
 // UnregisterPeer removes a peer from the registry.
 func (r *Registry) UnregisterPeer(name string) error {
 	return r.unregister(RegistryPeerKey, name)
@@ -290,7 +309,10 @@ func (r *Registry) urls(key, leaderName, selfName string, url func(key, name str
 func (r *Registry) Invalidate(name string) {
 	r.Lock()
 	defer r.Unlock()
+	r.invalidate(name)
+}
 
+func (r *Registry) invalidate(name string) {
 	delete(r.peers, name)
 	delete(r.standbys, name)
 }

+ 69 - 0
tests/functional/rejoin_test.go

@@ -0,0 +1,69 @@
+package test
+
+import (
+	"fmt"
+	"math/rand"
+	"os"
+	"strconv"
+	"strings"
+	"testing"
+	"time"
+
+	"github.com/coreos/etcd/third_party/github.com/coreos/go-etcd/etcd"
+)
+
+func increasePeerAddressPort(args []string, delta int) []string {
+	for i, arg := range args {
+		if !strings.Contains(arg, "peer-addr") {
+			continue
+		}
+		splitArg := strings.Split(arg, ":")
+		port, _ := strconv.Atoi(splitArg[len(splitArg)-1])
+		args[i] = "-peer-addr=127.0.0.1:" + strconv.Itoa(port+delta)
+		return args
+	}
+	return append(args, "-peer-addr=127.0.0.1:"+strconv.Itoa(7001+delta))
+}
+
+// Create a five-node cluster
+// Random kill one of the nodes and restart it with different peer address
+func TestRejoinWithDifferentPeerAddress(t *testing.T) {
+	procAttr := new(os.ProcAttr)
+	procAttr.Files = []*os.File{nil, os.Stdout, os.Stderr}
+
+	clusterSize := 5
+	argGroup, etcds, err := CreateCluster(clusterSize, procAttr, false)
+
+	if err != nil {
+		t.Fatal("cannot create cluster")
+	}
+
+	defer DestroyCluster(etcds)
+
+	time.Sleep(2 * time.Second)
+
+	for i := 0; i < 10; i++ {
+		num := rand.Int() % clusterSize
+		fmt.Println("kill node", num+1)
+
+		// kill
+		etcds[num].Kill()
+		etcds[num].Release()
+		time.Sleep(time.Second)
+
+		argGroup[num] = increasePeerAddressPort(argGroup[num], clusterSize)
+		// restart
+		etcds[num], err = os.StartProcess(EtcdBinPath, argGroup[num], procAttr)
+		if err != nil {
+			panic(err)
+		}
+		time.Sleep(time.Second)
+	}
+
+	c := etcd.NewClient(nil)
+	c.SyncCluster()
+	result, err := c.Set("foo", "bar", 0)
+	if err != nil || result.Node.Key != "/foo" || result.Node.Value != "bar" {
+		t.Fatal("Failed to set value in etcd cluster")
+	}
+}