Bläddra i källkod

Merge pull request #6106 from heyitsanthony/strict-reconfig-healthy

etcdserver, embed: stricter reconfig checking
Anthony Romano 9 år sedan
förälder
incheckning
c71f0ea174
6 ändrade filer med 73 tillägg och 5 borttagningar
  1. 1 0
      embed/config.go
  2. 1 1
      etcdserver/api/v2http/http.go
  3. 1 0
      etcdserver/errors.go
  4. 14 4
      etcdserver/server.go
  5. 11 0
      etcdserver/util.go
  6. 45 0
      integration/cluster_test.go

+ 1 - 0
embed/config.go

@@ -153,6 +153,7 @@ func NewConfig() *Config {
 		ACUrls:              []url.URL{*acurl},
 		ClusterState:        ClusterStateFlagNew,
 		InitialClusterToken: "etcd-cluster",
+		StrictReconfigCheck: true,
 	}
 	cfg.InitialCluster = cfg.InitialClusterFromName(cfg.Name)
 	return cfg

+ 1 - 1
etcdserver/api/v2http/http.go

@@ -60,7 +60,7 @@ func writeError(w http.ResponseWriter, r *http.Request, err error) {
 		}
 	default:
 		switch err {
-		case etcdserver.ErrTimeoutDueToLeaderFail, etcdserver.ErrTimeoutDueToConnectionLost, etcdserver.ErrNotEnoughStartedMembers:
+		case etcdserver.ErrTimeoutDueToLeaderFail, etcdserver.ErrTimeoutDueToConnectionLost, etcdserver.ErrNotEnoughStartedMembers, etcdserver.ErrUnhealthy:
 			mlog.MergeError(err)
 		default:
 			mlog.MergeErrorf("got unexpected response error (%v)", err)

+ 1 - 0
etcdserver/errors.go

@@ -32,6 +32,7 @@ var (
 	ErrNoSpace                    = errors.New("etcdserver: no space")
 	ErrInvalidAuthToken           = errors.New("etcdserver: invalid auth token")
 	ErrTooManyRequests            = errors.New("etcdserver: too many requests")
+	ErrUnhealthy                  = errors.New("etcdserver: unhealthy cluster")
 )
 
 type DiscoveryError struct {

+ 14 - 4
etcdserver/server.go

@@ -65,6 +65,10 @@ const (
 	StoreClusterPrefix = "/0"
 	StoreKeysPrefix    = "/1"
 
+	// HealthInterval is the minimum time the cluster should be healthy
+	// before accepting add member requests.
+	HealthInterval = 5 * time.Second
+
 	purgeFileInterval = 30 * time.Second
 	// monitorVersionInterval should be smaller than the timeout
 	// on the connection. Or we will not be able to reuse the connection
@@ -814,10 +818,16 @@ func (s *EtcdServer) LeaderStats() []byte {
 func (s *EtcdServer) StoreStats() []byte { return s.store.JsonStats() }
 
 func (s *EtcdServer) AddMember(ctx context.Context, memb membership.Member) error {
-	if s.Cfg.StrictReconfigCheck && !s.cluster.IsReadyToAddNewMember() {
-		// If s.cfg.StrictReconfigCheck is false, it means the option --strict-reconfig-check isn't passed to etcd.
-		// In such a case adding a new member is allowed unconditionally
-		return ErrNotEnoughStartedMembers
+	if s.Cfg.StrictReconfigCheck {
+		// by default StrictReconfigCheck is enabled; reject new members if unhealthy
+		if !s.cluster.IsReadyToAddNewMember() {
+			plog.Warningf("not enough started members, rejecting member add %+v", memb)
+			return ErrNotEnoughStartedMembers
+		}
+		if !isConnectedFullySince(s.r.transport, time.Now().Add(-HealthInterval), s.ID(), s.cluster.Members()) {
+			plog.Warningf("not healthy for reconfigure, rejecting member add %+v", memb)
+			return ErrUnhealthy
+		}
 	}
 
 	// TODO: move Member to protobuf type

+ 11 - 0
etcdserver/util.go

@@ -40,3 +40,14 @@ func isConnectedSince(transport rafthttp.Transporter, since time.Time, remote ty
 	t := transport.ActiveSince(remote)
 	return !t.IsZero() && t.Before(since)
 }
+
+// isConnectedFullySince checks whether the local member is connected to all
+// members in the cluster since the given time.
+func isConnectedFullySince(transport rafthttp.Transporter, since time.Time, self types.ID, members []*membership.Member) bool {
+	for _, m := range members {
+		if m.ID != self && !isConnectedSince(transport, since, m.ID) {
+			return false
+		}
+	}
+	return true
+}

+ 45 - 0
integration/cluster_test.go

@@ -20,10 +20,12 @@ import (
 	"math/rand"
 	"os"
 	"strconv"
+	"strings"
 	"testing"
 	"time"
 
 	"github.com/coreos/etcd/client"
+	"github.com/coreos/etcd/etcdserver"
 	"github.com/coreos/etcd/pkg/testutil"
 
 	"golang.org/x/net/context"
@@ -346,6 +348,49 @@ func TestIssue3699(t *testing.T) {
 	cancel()
 }
 
+// TestRejectUnhealthyAdd ensures an unhealthy cluster rejects adding members.
+func TestRejectUnhealthyAdd(t *testing.T) {
+	defer testutil.AfterTest(t)
+	c := NewCluster(t, 3)
+	for _, m := range c.Members {
+		m.ServerConfig.StrictReconfigCheck = true
+	}
+	c.Launch(t)
+	defer c.Terminate(t)
+
+	// make cluster unhealthy and wait for downed peer
+	c.Members[0].Stop(t)
+	c.WaitLeader(t)
+
+	// all attempts to add member should fail
+	for i := 1; i < len(c.Members); i++ {
+		err := c.addMemberByURL(t, c.URL(i), "unix://foo:12345")
+		if err == nil {
+			t.Fatalf("should have failed adding peer")
+		}
+		// TODO: client should return descriptive error codes for internal errors
+		if !strings.Contains(err.Error(), "has no leader") {
+			t.Errorf("unexpected error (%v)", err)
+		}
+	}
+
+	// make cluster healthy
+	c.Members[0].Restart(t)
+	c.WaitLeader(t)
+	time.Sleep(2 * etcdserver.HealthInterval)
+
+	// add member should succeed now that it's healthy
+	var err error
+	for i := 1; i < len(c.Members); i++ {
+		if err = c.addMemberByURL(t, c.URL(i), "unix://foo:12345"); err == nil {
+			break
+		}
+	}
+	if err != nil {
+		t.Fatalf("should have added peer to healthy cluster (%v)", err)
+	}
+}
+
 // clusterMustProgress ensures that cluster can make progress. It creates
 // a random key first, and check the new key could be got from all client urls
 // of the cluster.