Browse Source

Merge pull request #7342 from heyitsanthony/client-version

clientv3: version checking
Anthony Romano 8 năm trước cách đây
mục cha
commit
0c0fbbd7c5
3 tập tin đã thay đổi với 82 bổ sung2 xóa
  1. 56 2
      clientv3/client.go
  2. 8 0
      clientv3/config.go
  3. 18 0
      clientv3/integration/dial_test.go

+ 56 - 2
clientv3/client.go

@@ -20,6 +20,7 @@ import (
 	"fmt"
 	"net"
 	"net/url"
+	"strconv"
 	"strings"
 	"sync"
 	"time"
@@ -35,6 +36,7 @@ import (
 
 var (
 	ErrNoAvailableEndpoints = errors.New("etcdclient: no available endpoints")
+	ErrOldCluster           = errors.New("etcdclient: old cluster version")
 )
 
 // Client provides and manages an etcd v3 client session.
@@ -272,7 +274,7 @@ func (c *Client) dial(endpoint string, dopts ...grpc.DialOption) (*grpc.ClientCo
 			tokenMu: &sync.RWMutex{},
 		}
 
-		err := c.getToken(context.TODO())
+		err := c.getToken(c.ctx)
 		if err != nil {
 			return nil, err
 		}
@@ -307,7 +309,12 @@ func newClient(cfg *Config) (*Client, error) {
 	}
 
 	// use a temporary skeleton client to bootstrap first connection
-	ctx, cancel := context.WithCancel(context.TODO())
+	baseCtx := context.TODO()
+	if cfg.Context != nil {
+		baseCtx = cfg.Context
+	}
+
+	ctx, cancel := context.WithCancel(baseCtx)
 	client := &Client{
 		conn:   nil,
 		cfg:    *cfg,
@@ -353,10 +360,57 @@ func newClient(cfg *Config) (*Client, error) {
 	client.Auth = NewAuth(client)
 	client.Maintenance = NewMaintenance(client)
 
+	if cfg.RejectOldCluster {
+		if err := client.checkVersion(); err != nil {
+			client.Close()
+			return nil, err
+		}
+	}
+
 	go client.autoSync()
 	return client, nil
 }
 
+func (c *Client) checkVersion() (err error) {
+	var wg sync.WaitGroup
+	errc := make(chan error, len(c.cfg.Endpoints))
+	ctx, cancel := context.WithCancel(c.ctx)
+	if c.cfg.DialTimeout > 0 {
+		ctx, _ = context.WithTimeout(ctx, c.cfg.DialTimeout)
+	}
+	wg.Add(len(c.cfg.Endpoints))
+	for _, ep := range c.cfg.Endpoints {
+		// if cluster is current, any endpoint gives a recent version
+		go func(e string) {
+			defer wg.Done()
+			resp, rerr := c.Status(ctx, e)
+			if rerr != nil {
+				errc <- rerr
+				return
+			}
+			vs := strings.Split(resp.Version, ".")
+			maj, min := 0, 0
+			if len(vs) >= 2 {
+				maj, rerr = strconv.Atoi(vs[0])
+				min, rerr = strconv.Atoi(vs[1])
+			}
+			if maj < 3 || (maj == 3 && min < 2) {
+				rerr = ErrOldCluster
+			}
+			errc <- rerr
+		}(ep)
+	}
+	// wait for success
+	for i := 0; i < len(c.cfg.Endpoints); i++ {
+		if err = <-errc; err == nil {
+			break
+		}
+	}
+	cancel()
+	wg.Wait()
+	return err
+}
+
 // ActiveConnection returns the current in-use connection
 func (c *Client) ActiveConnection() *grpc.ClientConn { return c.conn }
 

+ 8 - 0
clientv3/config.go

@@ -18,6 +18,7 @@ import (
 	"crypto/tls"
 	"time"
 
+	"golang.org/x/net/context"
 	"google.golang.org/grpc"
 )
 
@@ -41,6 +42,13 @@ type Config struct {
 	// Password is a password for authentication.
 	Password string `json:"password"`
 
+	// RejectOldCluster when set will refuse to create a client against an outdated cluster.
+	RejectOldCluster bool `json:"reject-old-cluster"`
+
 	// DialOptions is a list of dial options for the grpc client (e.g., for interceptors).
 	DialOptions []grpc.DialOption
+
+	// Context is the default client context; it can be used to cancel grpc dial out and
+	// other operations that do not have an explicit context.
+	Context context.Context
 }

+ 18 - 0
clientv3/integration/dial_test.go

@@ -70,3 +70,21 @@ func testDialSetEndpoints(t *testing.T, setBefore bool) {
 	}
 	cancel()
 }
+
+func TestRejectOldCluster(t *testing.T) {
+	defer testutil.AfterTest(t)
+	// 2 endpoints to test multi-endpoint Status
+	clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 2})
+	defer clus.Terminate(t)
+
+	cfg := clientv3.Config{
+		Endpoints:        []string{clus.Members[0].GRPCAddr(), clus.Members[1].GRPCAddr()},
+		DialTimeout:      5 * time.Second,
+		RejectOldCluster: true,
+	}
+	cli, err := clientv3.New(cfg)
+	if err != nil {
+		t.Fatal(err)
+	}
+	cli.Close()
+}