Browse Source

clientv3: Fix auth client to use endpoints instead of host when dialing, fix tests to block on dial when required.

Joe Betz 7 years ago
parent
commit
f84f554301

+ 12 - 1
clientv3/balancer/resolver/endpoint/endpoint.go

@@ -146,8 +146,19 @@ func (r *Resolver) Close() {
 	bldr.removeResolver(r)
 	bldr.removeResolver(r)
 }
 }
 
 
+// Target constructs a endpoint target with current resolver's clusterName.
 func (r *Resolver) Target(endpoint string) string {
 func (r *Resolver) Target(endpoint string) string {
-	return fmt.Sprintf("%s://%s/%s", scheme, r.clusterName, endpoint)
+	return Target(r.clusterName, endpoint)
+}
+
+// Target constructs a endpoint resolver target.
+func Target(clusterName, endpoint string) string {
+	return fmt.Sprintf("%s://%s/%s", scheme, clusterName, endpoint)
+}
+
+// IsTarget checks if a given target string in an endpoint resolver target.
+func IsTarget(target string) bool {
+	return strings.HasPrefix(target, "endpoint://")
 }
 }
 
 
 // Parse endpoint parses a endpoint of the form (http|https)://<host>*|(unix|unixs)://<path>) and returns a
 // Parse endpoint parses a endpoint of the form (http|https)://<host>*|(unix|unixs)://<path>) and returns a

+ 15 - 9
clientv3/client.go

@@ -298,14 +298,13 @@ func (c *Client) getToken(ctx context.Context) error {
 
 
 	for i := 0; i < len(c.cfg.Endpoints); i++ {
 	for i := 0; i < len(c.cfg.Endpoints); i++ {
 		endpoint := c.cfg.Endpoints[i]
 		endpoint := c.cfg.Endpoints[i]
-		host := getHost(endpoint)
 		// use dial options without dopts to avoid reusing the client balancer
 		// use dial options without dopts to avoid reusing the client balancer
 		var dOpts []grpc.DialOption
 		var dOpts []grpc.DialOption
 		dOpts, err = c.dialSetupOpts(c.resolver.Target(endpoint), c.cfg.DialOptions...)
 		dOpts, err = c.dialSetupOpts(c.resolver.Target(endpoint), c.cfg.DialOptions...)
 		if err != nil {
 		if err != nil {
 			continue
 			continue
 		}
 		}
-		auth, err = newAuthenticator(ctx, host, dOpts, c)
+		auth, err = newAuthenticator(ctx, endpoint, dOpts, c)
 		if err != nil {
 		if err != nil {
 			continue
 			continue
 		}
 		}
@@ -327,8 +326,8 @@ func (c *Client) getToken(ctx context.Context) error {
 	return err
 	return err
 }
 }
 
 
-func (c *Client) dial(endpoint string, dopts ...grpc.DialOption) (*grpc.ClientConn, error) {
-	opts, err := c.dialSetupOpts(endpoint, dopts...)
+func (c *Client) dial(ep string, dopts ...grpc.DialOption) (*grpc.ClientConn, error) {
+	opts, err := c.dialSetupOpts(ep, dopts...)
 	if err != nil {
 	if err != nil {
 		return nil, err
 		return nil, err
 	}
 	}
@@ -367,11 +366,18 @@ func (c *Client) dial(endpoint string, dopts ...grpc.DialOption) (*grpc.ClientCo
 		defer cancel()
 		defer cancel()
 	}
 	}
 
 
-	// TODO: The hosts check doesn't really make sense for a load balanced endpoint url for the new grpc load balancer interface.
-	// Is it safe/sane to use the provided endpoint here?
-	//host := getHost(endpoint)
-	//conn, err := grpc.DialContext(c.ctx, host, opts...)
-	conn, err := grpc.DialContext(dctx, endpoint, opts...)
+	// We pass a target to DialContext of the form: endpoint://<clusterName>/<host-part> that
+	// does not include scheme (http/https/unix/unixs) or path parts.
+	if endpoint.IsTarget(ep) {
+		clusterName, tep, err := endpoint.ParseTarget(ep)
+		if err != nil {
+			return nil, fmt.Errorf("failed to parse endpoint target '%s': %v", ep, err)
+		}
+		_, host, _ := endpoint.ParseEndpoint(tep)
+		ep = endpoint.Target(clusterName, host)
+	}
+
+	conn, err := grpc.DialContext(dctx, ep, opts...)
 	if err != nil {
 	if err != nil {
 		return nil, err
 		return nil, err
 	}
 	}

+ 4 - 0
integration/cluster.go

@@ -536,6 +536,7 @@ type member struct {
 	PeerTLSInfo *transport.TLSInfo
 	PeerTLSInfo *transport.TLSInfo
 	// ClientTLSInfo enables client TLS when set
 	// ClientTLSInfo enables client TLS when set
 	ClientTLSInfo *transport.TLSInfo
 	ClientTLSInfo *transport.TLSInfo
+	DialOptions   []grpc.DialOption
 
 
 	raftHandler   *testutil.PauseableHandler
 	raftHandler   *testutil.PauseableHandler
 	s             *etcdserver.EtcdServer
 	s             *etcdserver.EtcdServer
@@ -744,6 +745,9 @@ func NewClientV3(m *member) (*clientv3.Client, error) {
 		}
 		}
 		cfg.TLS = tls
 		cfg.TLS = tls
 	}
 	}
+	if m.DialOptions != nil {
+		cfg.DialOptions = append(cfg.DialOptions, m.DialOptions...)
+	}
 	return newClientV3(cfg)
 	return newClientV3(cfg)
 }
 }
 
 

+ 2 - 0
integration/v3_alarm_test.go

@@ -191,10 +191,12 @@ func TestV3CorruptAlarm(t *testing.T) {
 	}
 	}
 	// Member 0 restarts into split brain.
 	// Member 0 restarts into split brain.
 
 
+	clus.Members[0].WaitStarted(t)
 	resp0, err0 := clus.Client(0).Get(context.TODO(), "abc")
 	resp0, err0 := clus.Client(0).Get(context.TODO(), "abc")
 	if err0 != nil {
 	if err0 != nil {
 		t.Fatal(err0)
 		t.Fatal(err0)
 	}
 	}
+	clus.Members[1].WaitStarted(t)
 	resp1, err1 := clus.Client(1).Get(context.TODO(), "abc")
 	resp1, err1 := clus.Client(1).Get(context.TODO(), "abc")
 	if err1 != nil {
 	if err1 != nil {
 		t.Fatal(err1)
 		t.Fatal(err1)

+ 4 - 2
integration/v3_grpc_inflight_test.go

@@ -25,6 +25,7 @@ import (
 	"github.com/coreos/etcd/pkg/testutil"
 	"github.com/coreos/etcd/pkg/testutil"
 
 
 	"google.golang.org/grpc"
 	"google.golang.org/grpc"
+	"google.golang.org/grpc/transport"
 )
 )
 
 
 // TestV3MaintenanceDefragmentInflightRange ensures inflight range requests
 // TestV3MaintenanceDefragmentInflightRange ensures inflight range requests
@@ -81,8 +82,9 @@ func TestV3KVInflightRangeRequests(t *testing.T) {
 			defer wg.Done()
 			defer wg.Done()
 			_, err := kvc.Range(ctx, &pb.RangeRequest{Key: []byte("foo"), Serializable: true}, grpc.FailFast(false))
 			_, err := kvc.Range(ctx, &pb.RangeRequest{Key: []byte("foo"), Serializable: true}, grpc.FailFast(false))
 			if err != nil {
 			if err != nil {
-				if err != nil && rpctypes.ErrorDesc(err) != context.Canceled.Error() {
-					t.Fatalf("inflight request should be canceld with %v, got %v", context.Canceled, err)
+				errDesc := rpctypes.ErrorDesc(err)
+				if err != nil && !(errDesc == context.Canceled.Error() || errDesc == transport.ErrConnClosing.Desc) {
+					t.Fatalf("inflight request should be canceled with '%v' or '%v', got '%v'", context.Canceled.Error(), transport.ErrConnClosing.Desc, errDesc)
 				}
 				}
 			}
 			}
 		}()
 		}()

+ 2 - 0
integration/v3_grpc_test.go

@@ -1570,6 +1570,7 @@ func TestTLSGRPCRejectSecureClient(t *testing.T) {
 	defer clus.Terminate(t)
 	defer clus.Terminate(t)
 
 
 	clus.Members[0].ClientTLSInfo = &testTLSInfo
 	clus.Members[0].ClientTLSInfo = &testTLSInfo
+	clus.Members[0].DialOptions = []grpc.DialOption{grpc.WithBlock()}
 	client, err := NewClientV3(clus.Members[0])
 	client, err := NewClientV3(clus.Members[0])
 	if client != nil || err == nil {
 	if client != nil || err == nil {
 		t.Fatalf("expected no client")
 		t.Fatalf("expected no client")
@@ -1752,6 +1753,7 @@ func testTLSReload(
 				continue
 				continue
 			}
 			}
 			cli, cerr := clientv3.New(clientv3.Config{
 			cli, cerr := clientv3.New(clientv3.Config{
+				DialOptions: []grpc.DialOption{grpc.WithBlock()},
 				Endpoints:   []string{clus.Members[0].GRPCAddr()},
 				Endpoints:   []string{clus.Members[0].GRPCAddr()},
 				DialTimeout: time.Second,
 				DialTimeout: time.Second,
 				TLS:         cc,
 				TLS:         cc,