Browse Source

feat(peer_server): forbid rejoining with different name

Or it will confuse the cluster, especially the heartbeat between nodes.
Yicheng Qin 11 years ago
parent
commit
0c95e1eabb
6 changed files with 115 additions and 20 deletions
  1. 20 18
      error/error.go
  2. 18 0
      server/join_command.go
  3. 28 0
      server/peer_server.go
  4. 1 2
      server/registry.go
  5. 45 0
      tests/functional/rejoin_test.go
  6. 3 0
      tests/functional/util.go

+ 20 - 18
error/error.go

@@ -24,15 +24,16 @@ import (
 
 
 var errors = map[int]string{
 var errors = map[int]string{
 	// command related errors
 	// command related errors
-	EcodeKeyNotFound:    "Key not found",
-	EcodeTestFailed:     "Compare failed", //test and set
-	EcodeNotFile:        "Not a file",
-	EcodeNoMorePeer:     "Reached the max number of peers in the cluster",
-	EcodeNotDir:         "Not a directory",
-	EcodeNodeExist:      "Key already exists", // create
-	EcodeRootROnly:      "Root is read only",
-	EcodeKeyIsPreserved: "The prefix of given key is a keyword in etcd",
-	EcodeDirNotEmpty:    "Directory not empty",
+	EcodeKeyNotFound:      "Key not found",
+	EcodeTestFailed:       "Compare failed", //test and set
+	EcodeNotFile:          "Not a file",
+	EcodeNoMorePeer:       "Reached the max number of peers in the cluster",
+	EcodeNotDir:           "Not a directory",
+	EcodeNodeExist:        "Key already exists", // create
+	EcodeRootROnly:        "Root is read only",
+	EcodeKeyIsPreserved:   "The prefix of given key is a keyword in etcd",
+	EcodeDirNotEmpty:      "Directory not empty",
+	EcodeExistingPeerAddr: "Peer address has existed",
 
 
 	// Post form related errors
 	// Post form related errors
 	EcodeValueRequired:        "Value is Required in POST form",
 	EcodeValueRequired:        "Value is Required in POST form",
@@ -60,15 +61,16 @@ var errors = map[int]string{
 }
 }
 
 
 const (
 const (
-	EcodeKeyNotFound    = 100
-	EcodeTestFailed     = 101
-	EcodeNotFile        = 102
-	EcodeNoMorePeer     = 103
-	EcodeNotDir         = 104
-	EcodeNodeExist      = 105
-	EcodeKeyIsPreserved = 106
-	EcodeRootROnly      = 107
-	EcodeDirNotEmpty    = 108
+	EcodeKeyNotFound      = 100
+	EcodeTestFailed       = 101
+	EcodeNotFile          = 102
+	EcodeNoMorePeer       = 103
+	EcodeNotDir           = 104
+	EcodeNodeExist        = 105
+	EcodeKeyIsPreserved   = 106
+	EcodeRootROnly        = 107
+	EcodeDirNotEmpty      = 108
+	EcodeExistingPeerAddr = 109
 
 
 	EcodeValueRequired        = 200
 	EcodeValueRequired        = 200
 	EcodePrevValueRequired    = 201
 	EcodePrevValueRequired    = 201

+ 18 - 0
server/join_command.go

@@ -63,6 +63,15 @@ func (c *JoinCommandV1) Apply(context raft.Context) (interface{}, error) {
 		return b, nil
 		return b, nil
 	}
 	}
 
 
+	// Check if the join command adds an instance that collides with existing one on peer URL.
+	peerURLs := ps.registry.PeerURLs(ps.raftServer.Leader(), c.Name)
+	for _, peerURL := range peerURLs {
+		if peerURL == c.EtcdURL {
+			log.Warnf("%v tries to join the cluster with existing URL %v", c.Name, c.EtcdURL)
+			return []byte{0}, etcdErr.NewError(etcdErr.EcodeExistingPeerAddr, c.EtcdURL, context.CommitIndex())
+		}
+	}
+
 	// Check peer number in the cluster
 	// Check peer number in the cluster
 	if ps.registry.PeerCount() >= ps.ClusterConfig().ActiveSize {
 	if ps.registry.PeerCount() >= ps.ClusterConfig().ActiveSize {
 		log.Debug("Reject join request from ", c.Name)
 		log.Debug("Reject join request from ", c.Name)
@@ -137,6 +146,15 @@ func (c *JoinCommandV2) Apply(context raft.Context) (interface{}, error) {
 		return json.Marshal(msg)
 		return json.Marshal(msg)
 	}
 	}
 
 
+	// Check if the join command adds an instance that collides with existing one on peer URL.
+	peerURLs := ps.registry.PeerURLs(ps.raftServer.Leader(), c.Name)
+	for _, peerURL := range peerURLs {
+		if peerURL == c.PeerURL {
+			log.Warnf("%v tries to join the cluster with existing URL %v", c.Name, c.PeerURL)
+			return []byte{0}, etcdErr.NewError(etcdErr.EcodeExistingPeerAddr, c.PeerURL, context.CommitIndex())
+		}
+	}
+
 	// Check peer number in the cluster.
 	// Check peer number in the cluster.
 	if ps.registry.PeerCount() >= ps.ClusterConfig().ActiveSize {
 	if ps.registry.PeerCount() >= ps.ClusterConfig().ActiveSize {
 		log.Debug("Join as standby ", c.Name)
 		log.Debug("Join as standby ", c.Name)

+ 28 - 0
server/peer_server.go

@@ -10,6 +10,7 @@ import (
 	"net/url"
 	"net/url"
 	"sort"
 	"sort"
 	"strconv"
 	"strconv"
+	"strings"
 	"sync"
 	"sync"
 	"time"
 	"time"
 
 
@@ -187,6 +188,12 @@ func (s *PeerServer) findCluster(discoverURL string, peers []string) {
 
 
 	// Try its best to find possible peers, and connect with them.
 	// Try its best to find possible peers, and connect with them.
 	if !isNewNode {
 	if !isNewNode {
+		// It is not allowed to join the cluster with existing peer address
+		// This prevents old node joining with different name by mistake.
+		if !s.checkPeerAddressNonconflict() {
+			log.Fatalf("%v is not allowed to join the cluster with existing URL %v", s.Config.Name, s.Config.URL)
+		}
+
 		// Take old nodes into account.
 		// Take old nodes into account.
 		allPeers := s.getKnownPeers()
 		allPeers := s.getKnownPeers()
 		// Discover registered peers.
 		// Discover registered peers.
@@ -426,6 +433,25 @@ func (s *PeerServer) Upgradable() error {
 	return nil
 	return nil
 }
 }
 
 
+// checkPeerAddressNonconflict checks whether the peer address has existed with different name.
+func (s *PeerServer) checkPeerAddressNonconflict() bool {
+	// there exists the (name, peer address) pair
+	if peerURL, ok := s.registry.PeerURL(s.Config.Name); ok {
+		if peerURL == s.Config.URL {
+			return true
+		}
+	}
+
+	// check all existing peer addresses
+	peerURLs := s.registry.PeerURLs(s.raftServer.Leader(), s.Config.Name)
+	for _, peerURL := range peerURLs {
+		if peerURL == s.Config.URL {
+			return false
+		}
+	}
+	return true
+}
+
 // Helper function to do discovery and return results in expected format
 // Helper function to do discovery and return results in expected format
 func (s *PeerServer) handleDiscovery(discoverURL string) (peers []string, err error) {
 func (s *PeerServer) handleDiscovery(discoverURL string) (peers []string, err error) {
 	peers, err = discovery.Do(discoverURL, s.Config.Name, s.Config.URL)
 	peers, err = discovery.Do(discoverURL, s.Config.Name, s.Config.URL)
@@ -455,6 +481,8 @@ func (s *PeerServer) handleDiscovery(discoverURL string) (peers []string, err er
 // getKnownPeers gets the previous peers from log
 // getKnownPeers gets the previous peers from log
 func (s *PeerServer) getKnownPeers() []string {
 func (s *PeerServer) getKnownPeers() []string {
 	peers := s.registry.PeerURLs(s.raftServer.Leader(), s.Config.Name)
 	peers := s.registry.PeerURLs(s.raftServer.Leader(), s.Config.Name)
+	log.Infof("Peer URLs in log: %s / %s (%s)", s.raftServer.Leader(), s.Config.Name, strings.Join(peers, ","))
+
 	for i := range peers {
 	for i := range peers {
 		u, err := url.Parse(peers[i])
 		u, err := url.Parse(peers[i])
 		if err != nil {
 		if err != nil {

+ 1 - 2
server/registry.go

@@ -300,8 +300,7 @@ func (r *Registry) urls(key, leaderName, selfName string, url func(key, name str
 		}
 		}
 	}
 	}
 
 
-	log.Infof("URLs: %s: %s / %s (%s)", key, leaderName, selfName, strings.Join(urls, ","))
-
+	log.Debugf("URLs: %s: %s / %s (%s)", key, leaderName, selfName, strings.Join(urls, ","))
 	return urls
 	return urls
 }
 }
 
 

+ 45 - 0
tests/functional/rejoin_test.go

@@ -146,3 +146,48 @@ func TestReplaceWithDifferentPeerAddress(t *testing.T) {
 		t.Fatal("Failed to set value in etcd cluster")
 		t.Fatal("Failed to set value in etcd cluster")
 	}
 	}
 }
 }
+
+// Create a five nodes
+// Let the sixth instance join with different name and existing peer address
+func TestRejoinWithDifferentName(t *testing.T) {
+	procAttr := new(os.ProcAttr)
+	procAttr.Files = []*os.File{nil, os.Stdout, os.Stderr}
+
+	clusterSize := 5
+	argGroup, etcds, err := CreateCluster(clusterSize, procAttr, false)
+
+	if err != nil {
+		t.Fatal("cannot create cluster")
+	}
+
+	defer DestroyCluster(etcds)
+
+	time.Sleep(2 * time.Second)
+
+	num := rand.Int() % clusterSize
+	fmt.Println("join node 6 that collides with node", num+1)
+
+	// kill
+	etcds[num].Kill()
+	etcds[num].Release()
+	time.Sleep(time.Second)
+
+	for i := 0; i < 2; i++ {
+		// restart
+		if i == 0 {
+			etcds[num], err = os.StartProcess(EtcdBinPath, append(argGroup[num], "-name=node6", "-peers=127.0.0.1:7002"), procAttr)
+		} else {
+			etcds[num], err = os.StartProcess(EtcdBinPath, append(argGroup[num], "-f", "-name=node6", "-peers=127.0.0.1:7002"), procAttr)
+		}
+		if err != nil {
+			t.Fatal("fail starting etcd:", err)
+		}
+
+		timer := time.AfterFunc(10*time.Second, func() {
+			t.Fatal("new etcd should fail immediately")
+		})
+		etcds[num].Wait()
+		etcds[num] = nil
+		timer.Stop()
+	}
+}

+ 3 - 0
tests/functional/util.go

@@ -154,6 +154,9 @@ func CreateCluster(size int, procAttr *os.ProcAttr, ssl bool) ([][]string, []*os
 // Destroy all the nodes in the cluster
 // Destroy all the nodes in the cluster
 func DestroyCluster(etcds []*os.Process) error {
 func DestroyCluster(etcds []*os.Process) error {
 	for _, etcd := range etcds {
 	for _, etcd := range etcds {
+		if etcd == nil {
+			continue
+		}
 		err := etcd.Kill()
 		err := etcd.Kill()
 		if err != nil {
 		if err != nil {
 			panic(err.Error())
 			panic(err.Error())