Browse Source

clientv3: Fix new load balancer integration issues

Joe Betz 7 years ago
parent
commit
f20a1173d8

+ 2 - 25
clientv3/balancer/balancer.go

@@ -16,7 +16,6 @@ package balancer
 
 import (
 	"fmt"
-	"strings"
 	"sync"
 
 	"github.com/coreos/etcd/clientv3/balancer/picker"
@@ -44,9 +43,6 @@ type Balancer interface {
 
 	// Picker calls "Pick" for every client request.
 	picker.Picker
-
-	// SetEndpoints updates client's endpoints.
-	SetEndpoints(eps ...string)
 }
 
 type baseBalancer struct {
@@ -56,8 +52,6 @@ type baseBalancer struct {
 
 	mu sync.RWMutex
 
-	eps []string
-
 	addrToSc map[resolver.Address]balancer.SubConn
 	scToAddr map[balancer.SubConn]resolver.Address
 	scToSt   map[balancer.SubConn]connectivity.State
@@ -70,20 +64,12 @@ type baseBalancer struct {
 }
 
 // New returns a new balancer from specified picker policy.
-func New(cfg Config) (Balancer, error) {
-	for _, ep := range cfg.Endpoints {
-		if !strings.HasPrefix(ep, "endpoint://") {
-			return nil, fmt.Errorf("'endpoint' target schema required for etcd load balancer endpoints but got '%s'", ep)
-		}
-	}
-
+func New(cfg Config) Balancer {
 	bb := &baseBalancer{
 		policy: cfg.Policy,
 		name:   cfg.Policy.String(),
 		lg:     cfg.Logger,
 
-		eps: cfg.Endpoints,
-
 		addrToSc: make(map[resolver.Address]balancer.SubConn),
 		scToAddr: make(map[balancer.SubConn]resolver.Address),
 		scToSt:   make(map[balancer.SubConn]connectivity.State),
@@ -107,7 +93,7 @@ func New(cfg Config) (Balancer, error) {
 		zap.String("policy", bb.policy.String()),
 		zap.String("name", bb.name),
 	)
-	return bb, nil
+	return bb
 }
 
 // Name implements "grpc/balancer.Builder" interface.
@@ -265,15 +251,6 @@ func (bb *baseBalancer) regeneratePicker() {
 	)
 }
 
-// SetEndpoints updates client's endpoints.
-// TODO: implement this
-func (bb *baseBalancer) SetEndpoints(eps ...string) {
-	addrs := epsToAddrs(eps...)
-	bb.mu.Lock()
-	bb.Picker.UpdateAddrs(addrs)
-	bb.mu.Unlock()
-}
-
 // Close implements "grpc/balancer.Balancer" interface.
 // Close is a nop because base balancer doesn't have internal state to clean up,
 // and it doesn't need to call RemoveSubConn for the SubConns.

+ 15 - 27
clientv3/balancer/balancer_test.go

@@ -68,16 +68,12 @@ func TestRoundRobinBalancedResolvableNoFailover(t *testing.T) {
 			rsv.InitialAddrs(resolvedAddrs)
 
 			cfg := Config{
-				Policy:    picker.RoundrobinBalanced,
-				Name:      genName(),
-				Logger:    zap.NewExample(),
-				Endpoints: []string{fmt.Sprintf("endpoint://nofailover/*")},
+				Policy: picker.RoundrobinBalanced,
+				Name:   genName(),
+				Logger: zap.NewExample(),
 			}
-			rrb, err := New(cfg)
-			if err != nil {
-				t.Fatalf("failed to create builder: %v", err)
-			}
-			conn, err := grpc.Dial(cfg.Endpoints[0], grpc.WithInsecure(), grpc.WithBalancerName(rrb.Name()))
+			rrb := New(cfg)
+			conn, err := grpc.Dial(fmt.Sprintf("endpoint://nofailover/*"), grpc.WithInsecure(), grpc.WithBalancerName(rrb.Name()))
 			if err != nil {
 				t.Fatalf("failed to dial mock server: %v", err)
 			}
@@ -134,16 +130,12 @@ func TestRoundRobinBalancedResolvableFailoverFromServerFail(t *testing.T) {
 	rsv.InitialAddrs(resolvedAddrs)
 
 	cfg := Config{
-		Policy:    picker.RoundrobinBalanced,
-		Name:      genName(),
-		Logger:    zap.NewExample(),
-		Endpoints: []string{fmt.Sprintf("endpoint://serverfail/mock.server")},
-	}
-	rrb, err := New(cfg)
-	if err != nil {
-		t.Fatalf("failed to create builder: %v", err)
+		Policy: picker.RoundrobinBalanced,
+		Name:   genName(),
+		Logger: zap.NewExample(),
 	}
-	conn, err := grpc.Dial(cfg.Endpoints[0], grpc.WithInsecure(), grpc.WithBalancerName(rrb.Name()))
+	rrb := New(cfg)
+	conn, err := grpc.Dial(fmt.Sprintf("endpoint://serverfail/mock.server"), grpc.WithInsecure(), grpc.WithBalancerName(rrb.Name()))
 	if err != nil {
 		t.Fatalf("failed to dial mock server: %s", err)
 	}
@@ -251,16 +243,12 @@ func TestRoundRobinBalancedResolvableFailoverFromRequestFail(t *testing.T) {
 	rsv.InitialAddrs(resolvedAddrs)
 
 	cfg := Config{
-		Policy:    picker.RoundrobinBalanced,
-		Name:      genName(),
-		Logger:    zap.NewExample(),
-		Endpoints: []string{fmt.Sprintf("endpoint://requestfail/mock.server")},
-	}
-	rrb, err := New(cfg)
-	if err != nil {
-		t.Fatalf("failed to create builder: %v", err)
+		Policy: picker.RoundrobinBalanced,
+		Name:   genName(),
+		Logger: zap.NewExample(),
 	}
-	conn, err := grpc.Dial(cfg.Endpoints[0], grpc.WithInsecure(), grpc.WithBalancerName(rrb.Name()))
+	rrb := New(cfg)
+	conn, err := grpc.Dial(fmt.Sprintf("endpoint://requestfail/mock.server"), grpc.WithInsecure(), grpc.WithBalancerName(rrb.Name()))
 	if err != nil {
 		t.Fatalf("failed to dial mock server: %s", err)
 	}

+ 0 - 3
clientv3/balancer/config.go

@@ -33,7 +33,4 @@ type Config struct {
 	// Logger configures balancer logging.
 	// If nil, logs are discarded.
 	Logger *zap.Logger
-
-	// Endpoints is a list of server endpoints.
-	Endpoints []string
 }

+ 0 - 5
clientv3/balancer/picker/err.go

@@ -18,7 +18,6 @@ import (
 	"context"
 
 	"google.golang.org/grpc/balancer"
-	"google.golang.org/grpc/resolver"
 )
 
 // NewErr returns a picker that always returns err on "Pick".
@@ -33,7 +32,3 @@ type errPicker struct {
 func (p *errPicker) Pick(context.Context, balancer.PickOptions) (balancer.SubConn, func(balancer.DoneInfo), error) {
 	return nil, nil, p.err
 }
-
-func (p *errPicker) UpdateAddrs(addrs []resolver.Address) {
-	return
-}

+ 0 - 7
clientv3/balancer/picker/picker.go

@@ -16,16 +16,9 @@ package picker
 
 import (
 	"google.golang.org/grpc/balancer"
-	"google.golang.org/grpc/resolver"
 )
 
 // Picker defines balancer Picker methods.
 type Picker interface {
 	balancer.Picker
-
-	// UpdateAddrs updates current endpoints in picker.
-	// Used when endpoints are updated manually.
-	// TODO: handle resolver target change
-	// TODO: handle resolved addresses change
-	UpdateAddrs(addrs []resolver.Address)
 }

+ 0 - 13
clientv3/balancer/picker/roundrobin_balanced.go

@@ -48,8 +48,6 @@ type rrBalanced struct {
 
 	addrToSc map[resolver.Address]balancer.SubConn
 	scToAddr map[balancer.SubConn]resolver.Address
-
-	updateAddrs func(addrs []resolver.Address)
 }
 
 // Pick is called for every client request.
@@ -92,14 +90,3 @@ func (rb *rrBalanced) Pick(ctx context.Context, opts balancer.PickOptions) (bala
 	}
 	return sc, doneFunc, nil
 }
-
-// UpdateAddrs
-// TODO: implement this
-func (rb *rrBalanced) UpdateAddrs(addrs []resolver.Address) {
-	rb.mu.Lock()
-	// close all resolved sub-connections first
-	for _, sc := range rb.scs {
-		sc.UpdateAddresses([]resolver.Address{})
-	}
-	rb.mu.Unlock()
-}

+ 4 - 0
clientv3/balancer/resolver/endpoint/endpoint.go

@@ -165,6 +165,10 @@ func (r *Resolver) Close() {
 	bldr.removeResolver(r)
 }
 
+func (r *Resolver) Target(endpoint string) string {
+	return fmt.Sprintf("%s://%s/%s", scheme, r.clusterName, endpoint)
+}
+
 // Parse endpoint parses a endpoint of the form (http|https)://<host>*|(unix|unixs)://<path>) and returns a
 // protocol ('tcp' or 'unix'), host (or filepath if a unix socket) and scheme (http, https, unix, unixs).
 func ParseEndpoint(endpoint string) (proto string, host string, scheme string) {

+ 25 - 17
clientv3/client.go

@@ -44,8 +44,18 @@ import (
 var (
 	ErrNoAvailableEndpoints = errors.New("etcdclient: no available endpoints")
 	ErrOldCluster           = errors.New("etcdclient: old cluster version")
+
+	defaultBalancer balancer.Balancer
 )
 
+func init() {
+	defaultBalancer = balancer.New(balancer.Config{
+		Policy: picker.RoundrobinBalanced,
+		Name:   fmt.Sprintf("etcd-%s", picker.RoundrobinBalanced.String()),
+		Logger: zap.NewNop(), // zap.NewExample(),
+	})
+}
+
 // Client provides and manages an etcd v3 client session.
 type Client struct {
 	Cluster
@@ -112,6 +122,9 @@ func (c *Client) Close() error {
 	if c.conn != nil {
 		return toErr(c.ctx, c.conn.Close())
 	}
+	if c.resolver != nil {
+		c.resolver.Close()
+	}
 	return c.ctx.Err()
 }
 
@@ -281,8 +294,8 @@ func (c *Client) dialSetupOpts(target string, dopts ...grpc.DialOption) (opts []
 }
 
 // Dial connects to a single endpoint using the client's config.
-func (c *Client) Dial(ep string) (*grpc.ClientConn, error) {
-	return c.dial(ep)
+func (c *Client) Dial(endpoint string) (*grpc.ClientConn, error) {
+	return c.dial(endpoint)
 }
 
 func (c *Client) getToken(ctx context.Context) error {
@@ -294,7 +307,7 @@ func (c *Client) getToken(ctx context.Context) error {
 		host := getHost(endpoint)
 		// use dial options without dopts to avoid reusing the client balancer
 		var dOpts []grpc.DialOption
-		dOpts, err = c.dialSetupOpts(endpoint)
+		dOpts, err = c.dialSetupOpts(c.resolver.Target(endpoint))
 		if err != nil {
 			continue
 		}
@@ -420,28 +433,23 @@ func newClient(cfg *Config) (*Client, error) {
 		client.callOpts = callOpts
 	}
 
-	rsv := endpoint.EndpointResolver("default")
+	clientId := fmt.Sprintf("client-%s", strconv.FormatInt(time.Now().UnixNano(), 36))
+	rsv := endpoint.EndpointResolver(clientId)
 	rsv.InitialEndpoints(cfg.Endpoints)
-	bCfg := balancer.Config{
-		Policy: picker.RoundrobinBalanced,
-		Name:   "rrbalancer",
-		Logger: zap.NewExample(), // zap.NewNop(),
-		// TODO: these are really "targets", not "endpoints"
-		Endpoints: []string{fmt.Sprintf("endpoint://default/%s", cfg.Endpoints[0])},
-	}
-	rrb, err := balancer.New(bCfg)
-	if err != nil {
-		return nil, err
+
+	targets := []string{}
+	for _, ep := range cfg.Endpoints {
+		targets = append(targets, fmt.Sprintf("endpoint://%s/%s", clientId, ep))
 	}
+
 	client.resolver = rsv
-	client.balancer = rrb
+	client.balancer = defaultBalancer // TODO: allow alternate balancers to be passed in via config?
 
 	// use Endpoints[0] so that for https:// without any tls config given, then
 	// grpc will assume the certificate server name is the endpoint host.
-	conn, err := client.dial(bCfg.Endpoints[0], grpc.WithBalancerName(rrb.Name()))
+	conn, err := client.dial(targets[0], grpc.WithBalancerName(client.balancer.Name()))
 	if err != nil {
 		client.cancel()
-		client.balancer.Close()
 		rsv.Close()
 		return nil, err
 	}