Browse Source

feat(discovery): adjust boot order to find peers

The boot order for peers is -discovery, -peers, log data, forming
new cluster itself.

Special rules:
1. If discovery succeeds, it would find peers specified by discover URL
only.
2. Etcd would fail when meeting bad -discovery, no -peers and log data.

Add TestDiscoveryDownNoBackupPeersWithDataDir as the test.
Yicheng Qin 12 years ago
parent
commit
3a4df1612c

+ 0 - 38
config/config.go

@@ -14,7 +14,6 @@ import (
 
 	"github.com/coreos/etcd/third_party/github.com/BurntSushi/toml"
 
-	"github.com/coreos/etcd/discovery"
 	"github.com/coreos/etcd/log"
 	ustrings "github.com/coreos/etcd/pkg/strings"
 	"github.com/coreos/etcd/server"
@@ -144,13 +143,6 @@ func (c *Config) Load(arguments []string) error {
 		return fmt.Errorf("sanitize: %v", err)
 	}
 
-	// Attempt cluster discovery
-	if c.Discovery != "" {
-		if err := c.handleDiscovery(); err != nil {
-			return err
-		}
-	}
-
 	// Force remove server configuration if specified.
 	if c.Force {
 		c.Reset()
@@ -215,36 +207,6 @@ func (c *Config) loadEnv(target interface{}) error {
 	return nil
 }
 
-func (c *Config) handleDiscovery() error {
-	p, err := discovery.Do(c.Discovery, c.Name, c.Peer.Addr)
-
-	// This is fatal, discovery encountered an unexpected error
-	// and we have no peer list.
-	if err != nil && len(c.Peers) == 0 {
-		log.Fatalf("Discovery failed and a backup peer list wasn't provided: %v", err)
-		return err
-	}
-
-	// Warn about errors coming from discovery, this isn't fatal
-	// since the user might have provided a peer list elsewhere.
-	if err != nil {
-		log.Warnf("Discovery encountered an error but a backup peer list (%v) was provided: %v", c.Peers, err)
-	}
-
-	for i := range p {
-		// Strip the scheme off of the peer if it has one
-		// TODO(bp): clean this up!
-		purl, err := url.Parse(p[i])
-		if err == nil {
-			p[i] = purl.Host
-		}
-	}
-
-	c.Peers = p
-
-	return nil
-}
-
 // Loads configuration from command line flags.
 func (c *Config) LoadFlags(arguments []string) error {
 	var peers, cors, path string

+ 1 - 1
etcd.go

@@ -187,7 +187,7 @@ func main() {
 	}
 
 	ps.SetServer(s)
-	ps.Start(config.Snapshot, config.Peers)
+	ps.Start(config.Snapshot, config.Discovery, config.Peers)
 
 	go func() {
 		log.Infof("peer server [name %s, listen on %s, advertised url %s]", ps.Config.Name, psListener.Addr(), ps.Config.URL)

+ 96 - 27
server/peer_server.go

@@ -14,6 +14,7 @@ import (
 	"github.com/coreos/etcd/third_party/github.com/coreos/raft"
 	"github.com/coreos/etcd/third_party/github.com/gorilla/mux"
 
+	"github.com/coreos/etcd/discovery"
 	etcdErr "github.com/coreos/etcd/error"
 	"github.com/coreos/etcd/log"
 	"github.com/coreos/etcd/metrics"
@@ -99,8 +100,100 @@ func (s *PeerServer) SetRaftServer(raftServer raft.Server) {
 	s.raftServer = raftServer
 }
 
+// Helper function to do discovery and return results in expected format
+func (s *PeerServer) handleDiscovery(discoverURL string) (peers []string, err error) {
+	peers, err = discovery.Do(discoverURL, s.Config.Name, s.Config.URL)
+
+	// Warn about errors coming from discovery, this isn't fatal
+	// since the user might have provided a peer list elsewhere,
+	// or there is some log in data dir.
+	if err != nil {
+		log.Warnf("Discovery encountered an error: %v", err)
+		return
+	}
+
+	for i := range peers {
+		// Strip the scheme off of the peer if it has one
+		// TODO(bp): clean this up!
+		purl, err := url.Parse(peers[i])
+		if err == nil {
+			peers[i] = purl.Host
+		}
+	}
+
+	log.Infof("Discovery fetched back peer list: %v", peers)
+
+	return
+}
+
+// Try all possible ways to find clusters to join
+// Include -discovery, -peers and log data in -data-dir
+//
+// Peer discovery follows this order:
+// 1. -discovery
+// 2. -peers
+// 3. previous peers in -data-dir
+func (s *PeerServer) findCluster(discoverURL string, peers []string) {
+	// Attempt cluster discovery
+	toDiscover := discoverURL != ""
+	if toDiscover {
+		discoverPeers, discoverErr := s.handleDiscovery(discoverURL)
+		// It is registered in discover url
+		if discoverErr == nil {
+			// start as a leader in a new cluster
+			if len(discoverPeers) == 0 {
+				log.Debug("This peer is starting a brand new cluster based on discover URL.")
+				s.startAsLeader()
+			} else {
+				s.startAsFollower(discoverPeers)
+			}
+			return
+		}
+	}
+
+	hasPeerList := len(peers) > 0
+	// if there is log in data dir, append previous peers to peers in config
+	// to find cluster
+	prevPeers := s.registry.PeerURLs(s.raftServer.Leader(), s.Config.Name)
+	for i := 0; i < len(prevPeers); i++ {
+		u, err := url.Parse(prevPeers[i])
+		if err != nil {
+			log.Debug("rejoin cannot parse url: ", err)
+		}
+		prevPeers[i] = u.Host
+	}
+	peers = append(peers, prevPeers...)
+
+	// if there is backup peer lists, use it to find cluster
+	if len(peers) > 0 {
+		ok := s.joinCluster(peers)
+		if !ok {
+			log.Warn("No living peers are found!")
+		} else {
+			log.Debugf("%s restart as a follower based on peers[%v]", s.Config.Name)
+			return
+		}
+	}
+
+	if !s.raftServer.IsLogEmpty() {
+		log.Debug("Entire cluster is down! %v will restart the cluster.", s.Config.Name)
+		return
+	}
+
+	if toDiscover {
+		log.Fatalf("Discovery failed, no available peers in backup list, and no log data")
+	}
+
+	if hasPeerList {
+		log.Fatalf("No available peers in backup list, and no log data")
+	}
+
+	log.Infof("This peer is starting a brand new cluster now.")
+	s.startAsLeader()
+}
+
 // Start the raft server
-func (s *PeerServer) Start(snapshot bool, cluster []string) error {
+func (s *PeerServer) Start(snapshot bool, discoverURL string, peers []string) error {
 	// LoadSnapshot
 	if snapshot {
 		err := s.raftServer.LoadSnapshot()
@@ -114,31 +207,7 @@ func (s *PeerServer) Start(snapshot bool, cluster []string) error {
 
 	s.raftServer.Start()
 
-	if s.raftServer.IsLogEmpty() {
-		// start as a leader in a new cluster
-		if len(cluster) == 0 {
-			s.startAsLeader()
-		} else {
-			s.startAsFollower(cluster)
-		}
-
-	} else {
-		// Rejoin the previous cluster
-		cluster = s.registry.PeerURLs(s.raftServer.Leader(), s.Config.Name)
-		for i := 0; i < len(cluster); i++ {
-			u, err := url.Parse(cluster[i])
-			if err != nil {
-				log.Debug("rejoin cannot parse url: ", err)
-			}
-			cluster[i] = u.Host
-		}
-		ok := s.joinCluster(cluster)
-		if !ok {
-			log.Warn("the entire cluster is down! this peer will restart the cluster.")
-		}
-
-		log.Debugf("%s restart as a follower", s.Config.Name)
-	}
+	s.findCluster(discoverURL, peers)
 
 	s.closeChan = make(chan bool)
 
@@ -209,7 +278,7 @@ func (s *PeerServer) startAsFollower(cluster []string) {
 		if ok {
 			return
 		}
-		log.Warnf("Unable to join the cluster using any of the peers %v. Retrying in %.1f seconds", cluster, s.Config.RetryInterval)
+		log.Warnf("%v is unable to join the cluster using any of the peers %v at %dth time. Retrying in %.1f seconds", s.Config.Name, cluster, i, s.Config.RetryInterval)
 		time.Sleep(time.Second * time.Duration(s.Config.RetryInterval))
 	}
 

+ 3 - 1
server/registry.go

@@ -94,7 +94,9 @@ func (r *Registry) clientURL(name string) (string, bool) {
 	return "", false
 }
 
-// Retrieves the host part of peer URL for a given node by name.
+// TODO(yichengq): have all of the code use a full URL with scheme
+// and remove this method
+// PeerHost retrieves the host part of peer URL for a given node by name.
 func (r *Registry) PeerHost(name string) (string, bool) {
 	rawurl, ok := r.PeerURL(name)
 	if ok {

+ 1 - 1
server/server.go

@@ -79,7 +79,7 @@ func (s *Server) URL() string {
 	return s.url
 }
 
-// Returns the host part of Peer URL for a given node name.
+// PeerHost retrieves the host part of Peer URL for a given node name.
 func (s *Server) PeerHost(name string) (string, bool) {
 	return s.registry.PeerHost(name)
 }

+ 53 - 0
tests/functional/discovery_test.go

@@ -111,6 +111,59 @@ func TestDiscoveryNoWithBackupPeers(t *testing.T) {
 	})
 }
 
+// TestDiscoveryDownNoBackupPeersWithDataDir ensures that etcd runs if it is
+// started with a bad discovery URL, no backups and valid data dir.
+func TestDiscoveryDownNoBackupPeersWithDataDir(t *testing.T) {
+	etcdtest.RunServer(func(s *server.Server) {
+		u, ok := s.PeerHost("ETCDTEST")
+		if !ok {
+			t.Fatalf("Couldn't find the URL")
+		}
+
+		// run etcd and connect to ETCDTEST server
+		proc, err := startServer([]string{"-peers", u})
+		if err != nil {
+			t.Fatal(err.Error())
+		}
+
+		// check it runs well
+		client := http.Client{}
+		err = assertServerFunctional(client, "http")
+		if err != nil {
+			t.Fatal(err.Error())
+		}
+
+		// stop etcd, and leave valid data dir for later usage
+		stopServer(proc)
+
+		g := garbageHandler{t: t}
+		ts := httptest.NewServer(&g)
+		defer ts.Close()
+
+		discover := ts.URL + "/v2/keys/_etcd/registry/1"
+		// connect to ETCDTEST server again with previous data dir
+		proc, err = startServerWithDataDir([]string{"-discovery", discover})
+		if err != nil {
+			t.Fatal(err.Error())
+		}
+		defer stopServer(proc)
+
+		// TODO(yichengq): it needs some time to do leader election
+		// improve to get rid of it
+		time.Sleep(1 * time.Second)
+
+		client = http.Client{}
+		err = assertServerFunctional(client, "http")
+		if err != nil {
+			t.Fatal(err.Error())
+		}
+
+		if !g.success {
+			t.Fatal("Discovery server never called")
+		}
+	})
+}
+
 // TestDiscoveryFirstPeer ensures that etcd starts as the leader if it
 // registers as the first peer.
 func TestDiscoveryFirstPeer(t *testing.T) {

+ 15 - 2
tests/functional/etcd_tls_test.go

@@ -167,6 +167,18 @@ func startServer(extra []string) (*os.Process, error) {
 	return os.StartProcess(EtcdBinPath, cmd, procAttr)
 }
 
+func startServerWithDataDir(extra []string) (*os.Process, error) {
+	procAttr := new(os.ProcAttr)
+	procAttr.Files = []*os.File{nil, os.Stdout, os.Stderr}
+
+	cmd := []string{"etcd",	"-data-dir=/tmp/node1", "-name=node1"}
+	cmd = append(cmd, extra...)
+
+	println(strings.Join(cmd, " "))
+
+	return os.StartProcess(EtcdBinPath, cmd, procAttr)
+}
+
 func stopServer(proc *os.Process) {
 	err := proc.Kill()
 	if err != nil {
@@ -194,7 +206,8 @@ func assertServerFunctional(client http.Client, scheme string) error {
 		}
 
 		if err == nil {
-			if resp.StatusCode != 201 {
+			// Internal error may mean that servers are in leader election
+			if resp.StatusCode != http.StatusCreated && resp.StatusCode != http.StatusInternalServerError {
 				return errors.New(fmt.Sprintf("resp.StatusCode == %s", resp.Status))
 			} else {
 				return nil
@@ -202,7 +215,7 @@ func assertServerFunctional(client http.Client, scheme string) error {
 		}
 	}
 
-	return errors.New("etcd server was not reachable in time")
+	return errors.New("etcd server was not reachable in time / had internal error")
 }
 
 func assertServerNotFunctional(client http.Client, scheme string) error {

+ 1 - 1
tests/server_utils.go

@@ -76,7 +76,7 @@ func RunServer(f func(*server.Server)) {
 	c := make(chan bool)
 	go func() {
 		c <- true
-		ps.Start(false, []string{})
+		ps.Start(false, "", []string{})
 		h := waitHandler{w, ps.HTTPHandler()}
 		http.Serve(psListener, &h)
 	}()