Browse Source

*: introduce mock server for testing load balancing and add a simple happy-path load balancer test

Author:    Joe Betz <jpbetz@google.com>
Date:      Wed Mar 28 15:51:33 2018 -0700
Joe Betz 7 years ago
parent
commit
657c2e15cc

+ 1 - 0
Gopkg.lock

@@ -353,6 +353,7 @@
     "peer",
     "resolver",
     "resolver/dns",
+    "resolver/manual",
     "resolver/passthrough",
     "stats",
     "status",

+ 105 - 0
clientv3/balancer/balancer_test.go

@@ -0,0 +1,105 @@
+// Copyright 2018 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package balancer
+
+import (
+	"context"
+	"fmt"
+	"testing"
+
+	"github.com/coreos/etcd/clientv3/balancer/picker"
+	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
+	"github.com/coreos/etcd/pkg/mock/mockserver"
+
+	"go.uber.org/zap"
+	"google.golang.org/grpc"
+	"google.golang.org/grpc/peer"
+	"google.golang.org/grpc/resolver"
+	"google.golang.org/grpc/resolver/manual"
+)
+
+// TestRoundRobinBalancedResolvableNoFailover ensures that
+// requests to a resolvable endpoint can be balanced between
+// multiple, if any, nodes. And there needs be no failover.
+func TestRoundRobinBalancedResolvableNoFailover(t *testing.T) {
+	testCases := []struct {
+		name        string
+		serverCount int
+		reqN        int
+	}{
+		{name: "rrBalanced_1", serverCount: 1, reqN: 5},
+		{name: "rrBalanced_3", serverCount: 3, reqN: 7},
+		{name: "rrBalanced_5", serverCount: 5, reqN: 10},
+	}
+
+	for _, tc := range testCases {
+		t.Run(tc.name, func(t *testing.T) {
+			ms, err := mockserver.StartMockServers(tc.serverCount)
+			if err != nil {
+				t.Fatalf("failed to start mock servers: %v", err)
+			}
+			defer ms.Stop()
+			var resolvedAddrs []resolver.Address
+			for _, svr := range ms {
+				resolvedAddrs = append(resolvedAddrs, resolver.Address{Addr: svr.Address})
+			}
+
+			rsv, closeResolver := manual.GenerateAndRegisterManualResolver()
+			defer closeResolver()
+			cfg := Config{
+				Policy:    picker.RoundrobinBalanced,
+				Name:      genName(),
+				Logger:    zap.NewExample(),
+				Endpoints: []string{fmt.Sprintf("%s:///mock.server", rsv.Scheme())},
+			}
+			rrb := New(cfg)
+			conn, err := grpc.Dial(cfg.Endpoints[0], grpc.WithInsecure(), grpc.WithBalancerName(rrb.Name()))
+			if err != nil {
+				t.Fatalf("failed to dial mock server: %s", err)
+			}
+			defer conn.Close()
+			rsv.NewAddress(resolvedAddrs)
+			cli := pb.NewKVClient(conn)
+
+			reqFunc := func(ctx context.Context) (picked string, err error) {
+				var p peer.Peer
+				_, err = cli.Range(ctx, &pb.RangeRequest{Key: []byte("/x")}, grpc.Peer(&p))
+				if p.Addr != nil {
+					picked = p.Addr.String()
+				}
+				return picked, err
+			}
+
+			prev, switches := "", 0
+			for i := 0; i < tc.reqN; i++ {
+				picked, err := reqFunc(context.Background())
+				if err != nil {
+					t.Fatalf("#%d: unexpected failure %v", i, err)
+				}
+				if prev == "" {
+					prev = picked
+					continue
+				}
+				if prev != picked {
+					switches++
+				}
+				prev = picked
+			}
+			if tc.serverCount > 1 && switches < tc.reqN-3 { // -3 for initial resolutions
+				t.Fatalf("expected balanced loads for %d requests, got switches %d", tc.reqN, switches)
+			}
+		})
+	}
+}

+ 9 - 0
clientv3/balancer/utils.go

@@ -4,6 +4,8 @@ import (
 	"fmt"
 	"net/url"
 	"sort"
+	"sync/atomic"
+	"time"
 
 	"google.golang.org/grpc/balancer"
 	"google.golang.org/grpc/resolver"
@@ -43,3 +45,10 @@ func epsToAddrs(eps ...string) (addrs []resolver.Address) {
 	}
 	return addrs
 }
+
+var genN = new(uint32)
+
+func genName() string {
+	now := time.Now().UnixNano()
+	return fmt.Sprintf("%X%X", now, atomic.AddUint32(genN, 1))
+}

+ 16 - 0
pkg/mock/mockserver/doc.go

@@ -0,0 +1,16 @@
+// Copyright 2018 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package mockserver provides mock implementations for etcdserver's server interface.
+package mockserver

+ 90 - 0
pkg/mock/mockserver/mockserver.go

@@ -0,0 +1,90 @@
+// Copyright 2018 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package mockserver
+
+import (
+	"context"
+	"fmt"
+	"net"
+
+	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
+
+	"google.golang.org/grpc"
+)
+
+// MockServer provides a mocked out grpc server of the etcdserver interface.
+type MockServer struct {
+	GrpcServer *grpc.Server
+	Address    string
+}
+
+// MockServers provides a cluster of mocket out gprc servers of the etcdserver interface.
+type MockServers []*MockServer
+
+// StartMockServers creates the desired count of mock servers
+// and starts them.
+func StartMockServers(count int) (svrs MockServers, err error) {
+	svrs = make(MockServers, count)
+	defer func() {
+		if err != nil {
+			svrs.Stop()
+		}
+	}()
+
+	for i := 0; i < count; i++ {
+		listener, err := net.Listen("tcp", "localhost:0")
+		if err != nil {
+			return nil, fmt.Errorf("failed to listen %v", err)
+		}
+
+		svr := grpc.NewServer()
+		pb.RegisterKVServer(svr, &mockKVServer{})
+		svrs[i] = &MockServer{GrpcServer: svr, Address: listener.Addr().String()}
+		go func(svr *grpc.Server, l net.Listener) {
+			svr.Serve(l)
+		}(svr, listener)
+	}
+
+	return svrs, nil
+}
+
+// Stop stops the mock server, immediately closing all open connections and listeners.
+func (svrs MockServers) Stop() {
+	for _, svr := range svrs {
+		svr.GrpcServer.Stop()
+	}
+}
+
+type mockKVServer struct{}
+
+func (m *mockKVServer) Range(context.Context, *pb.RangeRequest) (*pb.RangeResponse, error) {
+	return &pb.RangeResponse{}, nil
+}
+
+func (m *mockKVServer) Put(context.Context, *pb.PutRequest) (*pb.PutResponse, error) {
+	return &pb.PutResponse{}, nil
+}
+
+func (m *mockKVServer) DeleteRange(context.Context, *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) {
+	return &pb.DeleteRangeResponse{}, nil
+}
+
+func (m *mockKVServer) Txn(context.Context, *pb.TxnRequest) (*pb.TxnResponse, error) {
+	return &pb.TxnResponse{}, nil
+}
+
+func (m *mockKVServer) Compact(context.Context, *pb.CompactionRequest) (*pb.CompactionResponse, error) {
+	return &pb.CompactionResponse{}, nil
+}

+ 91 - 0
vendor/google.golang.org/grpc/resolver/manual/manual.go

@@ -0,0 +1,91 @@
+/*
+ *
+ * Copyright 2017 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+// Package manual defines a resolver that can be used to manually send resolved
+// addresses to ClientConn.
+package manual
+
+import (
+	"strconv"
+	"time"
+
+	"google.golang.org/grpc/resolver"
+)
+
+// NewBuilderWithScheme creates a new test resolver builder with the given scheme.
+func NewBuilderWithScheme(scheme string) *Resolver {
+	return &Resolver{
+		scheme: scheme,
+	}
+}
+
+// Resolver is also a resolver builder.
+// It's build() function always returns itself.
+type Resolver struct {
+	scheme string
+
+	// Fields actually belong to the resolver.
+	cc             resolver.ClientConn
+	bootstrapAddrs []resolver.Address
+}
+
+// InitialAddrs adds resolved addresses to the resolver so that
+// NewAddress doesn't need to be explicitly called after Dial.
+func (r *Resolver) InitialAddrs(addrs []resolver.Address) {
+	r.bootstrapAddrs = addrs
+}
+
+// Build returns itself for Resolver, because it's both a builder and a resolver.
+func (r *Resolver) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOption) (resolver.Resolver, error) {
+	r.cc = cc
+	if r.bootstrapAddrs != nil {
+		r.NewAddress(r.bootstrapAddrs)
+	}
+	return r, nil
+}
+
+// Scheme returns the test scheme.
+func (r *Resolver) Scheme() string {
+	return r.scheme
+}
+
+// ResolveNow is a noop for Resolver.
+func (*Resolver) ResolveNow(o resolver.ResolveNowOption) {}
+
+// Close is a noop for Resolver.
+func (*Resolver) Close() {}
+
+// NewAddress calls cc.NewAddress.
+func (r *Resolver) NewAddress(addrs []resolver.Address) {
+	r.cc.NewAddress(addrs)
+}
+
+// NewServiceConfig calls cc.NewServiceConfig.
+func (r *Resolver) NewServiceConfig(sc string) {
+	r.cc.NewServiceConfig(sc)
+}
+
+// GenerateAndRegisterManualResolver generates a random scheme and a Resolver
+// with it. It also regieter this Resolver.
+// It returns the Resolver and a cleanup function to unregister it.
+func GenerateAndRegisterManualResolver() (*Resolver, func()) {
+	scheme := strconv.FormatInt(time.Now().UnixNano(), 36)
+	r := NewBuilderWithScheme(scheme)
+	resolver.Register(r)
+	return r, func() { resolver.UnregisterForTesting(scheme) }
+}