Browse Source

clientv3/balancer: add endpoints resolver

Joe Betz 7 years ago
parent
commit
4d2a25b056

+ 9 - 2
clientv3/balancer/balancer.go

@@ -16,6 +16,7 @@ package balancer
 
 import (
 	"fmt"
+	"strings"
 	"sync"
 
 	"github.com/coreos/etcd/clientv3/balancer/picker"
@@ -69,7 +70,13 @@ type baseBalancer struct {
 }
 
 // New returns a new balancer from specified picker policy.
-func New(cfg Config) Balancer {
+func New(cfg Config) (Balancer, error) {
+	for _, ep := range cfg.Endpoints {
+		if !strings.HasPrefix(ep, "etcd://") {
+			return nil, fmt.Errorf("'etcd' target schema required for etcd load balancer endpoints but got '%s'", ep)
+		}
+	}
+
 	bb := &baseBalancer{
 		policy: cfg.Policy,
 		name:   cfg.Policy.String(),
@@ -100,7 +107,7 @@ func New(cfg Config) Balancer {
 		zap.String("policy", bb.policy.String()),
 		zap.String("name", bb.name),
 	)
-	return bb
+	return bb, nil
 }
 
 // Name implements "grpc/balancer.Builder" interface.

+ 28 - 76
clientv3/balancer/balancer_test.go

@@ -22,6 +22,7 @@ import (
 	"time"
 
 	"github.com/coreos/etcd/clientv3/balancer/picker"
+	"github.com/coreos/etcd/clientv3/balancer/resolver/endpoint"
 	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
 	"github.com/coreos/etcd/pkg/mock/mockserver"
 
@@ -30,7 +31,6 @@ import (
 	"google.golang.org/grpc/codes"
 	"google.golang.org/grpc/peer"
 	"google.golang.org/grpc/resolver"
-	"google.golang.org/grpc/resolver/manual"
 	"google.golang.org/grpc/status"
 )
 
@@ -60,21 +60,25 @@ func TestRoundRobinBalancedResolvableNoFailover(t *testing.T) {
 				resolvedAddrs = append(resolvedAddrs, resolver.Address{Addr: svr.Address})
 			}
 
-			rsv, closeResolver := manual.GenerateAndRegisterManualResolver()
-			defer closeResolver()
+			rsv := endpoint.EndpointResolver("nofailover")
+			defer rsv.Close()
+			rsv.InitialAddrs(resolvedAddrs)
+
 			cfg := Config{
 				Policy:    picker.RoundrobinBalanced,
 				Name:      genName(),
 				Logger:    zap.NewExample(),
-				Endpoints: []string{fmt.Sprintf("%s:///mock.server", rsv.Scheme())},
+				Endpoints: []string{fmt.Sprintf("etcd://nofailover/mock.server")},
+			}
+			rrb, err := New(cfg)
+			if err != nil {
+				t.Fatalf("failed to create builder: %v", err)
 			}
-			rrb := New(cfg)
 			conn, err := grpc.Dial(cfg.Endpoints[0], grpc.WithInsecure(), grpc.WithBalancerName(rrb.Name()))
 			if err != nil {
-				t.Fatalf("failed to dial mock server: %s", err)
+				t.Fatalf("failed to dial mock server: %v", err)
 			}
 			defer conn.Close()
-			rsv.NewAddress(resolvedAddrs)
 			cli := pb.NewKVClient(conn)
 
 			reqFunc := func(ctx context.Context) (picked string, err error) {
@@ -122,21 +126,25 @@ func TestRoundRobinBalancedResolvableFailoverFromServerFail(t *testing.T) {
 		resolvedAddrs = append(resolvedAddrs, resolver.Address{Addr: svr.Address})
 	}
 
-	rsv, closeResolver := manual.GenerateAndRegisterManualResolver()
-	defer closeResolver()
+	rsv := endpoint.EndpointResolver("serverfail")
+	defer rsv.Close()
+	rsv.InitialAddrs(resolvedAddrs)
+
 	cfg := Config{
 		Policy:    picker.RoundrobinBalanced,
 		Name:      genName(),
 		Logger:    zap.NewExample(),
-		Endpoints: []string{fmt.Sprintf("%s:///mock.server", rsv.Scheme())},
+		Endpoints: []string{fmt.Sprintf("etcd://serverfail/mock.server")},
+	}
+	rrb, err := New(cfg)
+	if err != nil {
+		t.Fatalf("failed to create builder: %v", err)
 	}
-	rrb := New(cfg)
 	conn, err := grpc.Dial(cfg.Endpoints[0], grpc.WithInsecure(), grpc.WithBalancerName(rrb.Name()))
 	if err != nil {
 		t.Fatalf("failed to dial mock server: %s", err)
 	}
 	defer conn.Close()
-	rsv.NewAddress(resolvedAddrs)
 	cli := pb.NewKVClient(conn)
 
 	reqFunc := func(ctx context.Context) (picked string, err error) {
@@ -235,22 +243,25 @@ func TestRoundRobinBalancedResolvableFailoverFromRequestFail(t *testing.T) {
 		resolvedAddrs = append(resolvedAddrs, resolver.Address{Addr: svr.Address})
 		available[svr.Address] = struct{}{}
 	}
+	rsv := endpoint.EndpointResolver("requestfail")
+	defer rsv.Close()
+	rsv.InitialAddrs(resolvedAddrs)
 
-	rsv, closeResolver := manual.GenerateAndRegisterManualResolver()
-	defer closeResolver()
 	cfg := Config{
 		Policy:    picker.RoundrobinBalanced,
 		Name:      genName(),
 		Logger:    zap.NewExample(),
-		Endpoints: []string{fmt.Sprintf("%s:///mock.server", rsv.Scheme())},
+		Endpoints: []string{fmt.Sprintf("etcd://requestfail/mock.server")},
+	}
+	rrb, err := New(cfg)
+	if err != nil {
+		t.Fatalf("failed to create builder: %v", err)
 	}
-	rrb := New(cfg)
 	conn, err := grpc.Dial(cfg.Endpoints[0], grpc.WithInsecure(), grpc.WithBalancerName(rrb.Name()))
 	if err != nil {
 		t.Fatalf("failed to dial mock server: %s", err)
 	}
 	defer conn.Close()
-	rsv.NewAddress(resolvedAddrs)
 	cli := pb.NewKVClient(conn)
 
 	reqFunc := func(ctx context.Context) (picked string, err error) {
@@ -293,62 +304,3 @@ func TestRoundRobinBalancedResolvableFailoverFromRequestFail(t *testing.T) {
 		t.Fatalf("expected balanced loads for %d requests, got switches %d", reqN, switches)
 	}
 }
-
-// TestRoundRobinBalancedPassthrough ensures that requests with
-// passthrough resolver be balanced with roundrobin picker.
-// TODO: this is not working right now...
-// Maintain multiple client connections?
-func TestRoundRobinBalancedPassthrough(t *testing.T) {
-	ms, err := mockserver.StartMockServers(3)
-	if err != nil {
-		t.Fatalf("failed to start mock servers: %s", err)
-	}
-	defer ms.Stop()
-	eps := make([]string, len(ms.Servers))
-	for i, svr := range ms.Servers {
-		eps[i] = fmt.Sprintf("passthrough:///%s", svr.Address)
-	}
-
-	cfg := Config{
-		Policy:    picker.RoundrobinBalanced,
-		Name:      genName(),
-		Logger:    zap.NewExample(),
-		Endpoints: eps,
-	}
-	rrb := New(cfg)
-	conn, err := grpc.Dial(cfg.Endpoints[0], grpc.WithInsecure(), grpc.WithBalancerName(rrb.Name()))
-	if err != nil {
-		t.Fatalf("Failed to dial mock server: %s", err)
-	}
-	defer conn.Close()
-	cli := pb.NewKVClient(conn)
-
-	reqFunc := func(ctx context.Context) (picked string, err error) {
-		var p peer.Peer
-		_, err = cli.Range(ctx, &pb.RangeRequest{Key: []byte("/x")}, grpc.Peer(&p))
-		if p.Addr != nil {
-			picked = p.Addr.String()
-		}
-		return picked, err
-	}
-
-	reqN := 20
-	prev, switches := "", 0
-	for i := 0; i < reqN; i++ {
-		picked, err := reqFunc(context.Background())
-		if err != nil {
-			t.Fatal(err)
-		}
-		if prev == "" && picked != "" {
-			prev = picked
-			continue
-		}
-		if prev != picked {
-			switches++
-		}
-		prev = picked
-	}
-	if switches < reqN/2-3 { // -3 for initial resolutions + failover
-		t.Logf("expected balanced loads for %d requests, got switches %d", reqN, switches)
-	}
-}

+ 119 - 0
clientv3/balancer/resolver/endpoint/endpoint.go

@@ -0,0 +1,119 @@
+// Copyright 2018 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// resolves to etcd entpoints for grpc targets of the form 'etcd://<cluster-name>/<endpoint>'.
+package endpoint
+
+import (
+	"fmt"
+	"sync"
+
+	"google.golang.org/grpc/resolver"
+)
+
+const (
+	scheme = "etcd"
+)
+
+var (
+	bldr *builder
+)
+
+func init() {
+	bldr = &builder{
+		clusterResolvers: make(map[string]*Resolver),
+	}
+	resolver.Register(bldr)
+}
+
+type builder struct {
+	clusterResolvers map[string]*Resolver
+	sync.RWMutex
+}
+
+// Build creates or reuses an etcd resolver for the etcd cluster name identified by the authority part of the target.
+func (b *builder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOption) (resolver.Resolver, error) {
+	if len(target.Authority) < 1 {
+		return nil, fmt.Errorf("'etcd' target scheme requires non-empty authority identifying etcd cluster being routed to")
+	}
+	r := b.getResolver(target.Authority)
+	r.cc = cc
+	if r.bootstrapAddrs != nil {
+		r.NewAddress(r.bootstrapAddrs)
+	}
+	return r, nil
+}
+
+func (b *builder) getResolver(clusterName string) *Resolver {
+	b.RLock()
+	r, ok := b.clusterResolvers[clusterName]
+	b.RUnlock()
+	if !ok {
+		r = &Resolver{
+			clusterName: clusterName,
+		}
+		b.Lock()
+		b.clusterResolvers[clusterName] = r
+		b.Unlock()
+	}
+	return r
+}
+
+func (b *builder) addResolver(r *Resolver) {
+	bldr.Lock()
+	bldr.clusterResolvers[r.clusterName] = r
+	bldr.Unlock()
+}
+
+func (b *builder) removeResolver(r *Resolver) {
+	bldr.Lock()
+	delete(bldr.clusterResolvers, r.clusterName)
+	bldr.Unlock()
+}
+
+func (r *builder) Scheme() string {
+	return scheme
+}
+
+// EndpointResolver gets the resolver for  given etcd cluster name.
+func EndpointResolver(clusterName string) *Resolver {
+	return bldr.getResolver(clusterName)
+}
+
+// Resolver provides a resolver for a single etcd cluster, identified by name.
+type Resolver struct {
+	clusterName    string
+	cc             resolver.ClientConn
+	bootstrapAddrs []resolver.Address
+}
+
+// InitialAddrs sets the initial endpoint addresses for the resolver.
+func (r *Resolver) InitialAddrs(addrs []resolver.Address) {
+	r.bootstrapAddrs = addrs
+}
+
+// NewAddress updates the addresses of the resolver.
+func (r *Resolver) NewAddress(addrs []resolver.Address) error {
+	if r.cc == nil {
+		return fmt.Errorf("resolver not yet built, use InitialAddrs to provide initialization endpoints")
+	}
+	r.cc.NewAddress(addrs)
+	return nil
+}
+
+func (*Resolver) ResolveNow(o resolver.ResolveNowOption) {}
+
+func (r *Resolver) Close() {
+	bldr.removeResolver(r)
+}