Przeglądaj źródła

Merge pull request #8545 from heyitsanthony/health-balancer

clientv3: Health balancer
Anthony Romano 8 lat temu
rodzic
commit
3cad5e4da1

+ 94 - 28
clientv3/balancer.go

@@ -29,11 +29,40 @@ import (
 // This error is returned only when opts.BlockingWait is true.
 var ErrNoAddrAvilable = grpc.Errorf(codes.Unavailable, "there is no address available")
 
+type notifyMsg int
+
+const (
+	notifyReset notifyMsg = iota
+	notifyNext
+)
+
+type balancer interface {
+	grpc.Balancer
+	ConnectNotify() <-chan struct{}
+
+	endpoint(host string) string
+	endpoints() []string
+
+	// up is Up but includes whether the balancer will use the connection.
+	up(addr grpc.Address) (func(error), bool)
+
+	// updateAddrs changes the balancer's endpoints.
+	updateAddrs(endpoints ...string)
+	// ready returns a channel that closes when the balancer first connects.
+	ready() <-chan struct{}
+	// next forces the balancer to switch endpoints.
+	next()
+}
+
 // simpleBalancer does the bare minimum to expose multiple eps
 // to the grpc reconnection code path
 type simpleBalancer struct {
-	// addrs are the client's endpoints for grpc
+	// addrs are the client's endpoint addresses for grpc
 	addrs []grpc.Address
+
+	// eps holds the raw endpoints from the client
+	eps []string
+
 	// notifyCh notifies grpc of the set of addresses for connecting
 	notifyCh chan []grpc.Address
 
@@ -57,7 +86,7 @@ type simpleBalancer struct {
 	donec chan struct{}
 
 	// updateAddrsC notifies updateNotifyLoop to update addrs.
-	updateAddrsC chan struct{}
+	updateAddrsC chan notifyMsg
 
 	// grpc issues TLS cert checks using the string passed into dial so
 	// that string must be the host. To recover the full scheme://host URL,
@@ -72,20 +101,18 @@ type simpleBalancer struct {
 }
 
 func newSimpleBalancer(eps []string) *simpleBalancer {
-	notifyCh := make(chan []grpc.Address, 1)
-	addrs := make([]grpc.Address, len(eps))
-	for i := range eps {
-		addrs[i].Addr = getHost(eps[i])
-	}
+	notifyCh := make(chan []grpc.Address)
+	addrs := eps2addrs(eps)
 	sb := &simpleBalancer{
 		addrs:        addrs,
+		eps:          eps,
 		notifyCh:     notifyCh,
 		readyc:       make(chan struct{}),
 		upc:          make(chan struct{}),
 		stopc:        make(chan struct{}),
 		downc:        make(chan struct{}),
 		donec:        make(chan struct{}),
-		updateAddrsC: make(chan struct{}, 1),
+		updateAddrsC: make(chan notifyMsg),
 		host2ep:      getHost2ep(eps),
 	}
 	close(sb.downc)
@@ -101,12 +128,20 @@ func (b *simpleBalancer) ConnectNotify() <-chan struct{} {
 	return b.upc
 }
 
-func (b *simpleBalancer) getEndpoint(host string) string {
+func (b *simpleBalancer) ready() <-chan struct{} { return b.readyc }
+
+func (b *simpleBalancer) endpoint(host string) string {
 	b.mu.Lock()
 	defer b.mu.Unlock()
 	return b.host2ep[host]
 }
 
+func (b *simpleBalancer) endpoints() []string {
+	b.mu.RLock()
+	defer b.mu.RUnlock()
+	return b.eps
+}
+
 func getHost2ep(eps []string) map[string]string {
 	hm := make(map[string]string, len(eps))
 	for i := range eps {
@@ -116,7 +151,7 @@ func getHost2ep(eps []string) map[string]string {
 	return hm
 }
 
-func (b *simpleBalancer) updateAddrs(eps []string) {
+func (b *simpleBalancer) updateAddrs(eps ...string) {
 	np := getHost2ep(eps)
 
 	b.mu.Lock()
@@ -135,27 +170,37 @@ func (b *simpleBalancer) updateAddrs(eps []string) {
 	}
 
 	b.host2ep = np
-
-	addrs := make([]grpc.Address, 0, len(eps))
-	for i := range eps {
-		addrs = append(addrs, grpc.Address{Addr: getHost(eps[i])})
-	}
-	b.addrs = addrs
+	b.addrs, b.eps = eps2addrs(eps), eps
 
 	// updating notifyCh can trigger new connections,
 	// only update addrs if all connections are down
 	// or addrs does not include pinAddr.
-	update := !hasAddr(addrs, b.pinAddr)
+	update := !hasAddr(b.addrs, b.pinAddr)
 	b.mu.Unlock()
 
 	if update {
 		select {
-		case b.updateAddrsC <- struct{}{}:
+		case b.updateAddrsC <- notifyReset:
 		case <-b.stopc:
 		}
 	}
 }
 
+func (b *simpleBalancer) next() {
+	b.mu.RLock()
+	downc := b.downc
+	b.mu.RUnlock()
+	select {
+	case b.updateAddrsC <- notifyNext:
+	case <-b.stopc:
+	}
+	// wait until disconnect so new RPCs are not issued on old connection
+	select {
+	case <-downc:
+	case <-b.stopc:
+	}
+}
+
 func hasAddr(addrs []grpc.Address, targetAddr string) bool {
 	for _, addr := range addrs {
 		if targetAddr == addr.Addr {
@@ -192,11 +237,11 @@ func (b *simpleBalancer) updateNotifyLoop() {
 			default:
 			}
 		case downc == nil:
-			b.notifyAddrs()
+			b.notifyAddrs(notifyReset)
 			select {
 			case <-upc:
-			case <-b.updateAddrsC:
-				b.notifyAddrs()
+			case msg := <-b.updateAddrsC:
+				b.notifyAddrs(msg)
 			case <-b.stopc:
 				return
 			}
@@ -210,16 +255,24 @@ func (b *simpleBalancer) updateNotifyLoop() {
 			}
 			select {
 			case <-downc:
-			case <-b.updateAddrsC:
+				b.notifyAddrs(notifyReset)
+			case msg := <-b.updateAddrsC:
+				b.notifyAddrs(msg)
 			case <-b.stopc:
 				return
 			}
-			b.notifyAddrs()
 		}
 	}
 }
 
-func (b *simpleBalancer) notifyAddrs() {
+func (b *simpleBalancer) notifyAddrs(msg notifyMsg) {
+	if msg == notifyNext {
+		select {
+		case b.notifyCh <- []grpc.Address{}:
+		case <-b.stopc:
+			return
+		}
+	}
 	b.mu.RLock()
 	addrs := b.addrs
 	b.mu.RUnlock()
@@ -230,6 +283,11 @@ func (b *simpleBalancer) notifyAddrs() {
 }
 
 func (b *simpleBalancer) Up(addr grpc.Address) func(error) {
+	f, _ := b.up(addr)
+	return f
+}
+
+func (b *simpleBalancer) up(addr grpc.Address) (func(error), bool) {
 	b.mu.Lock()
 	defer b.mu.Unlock()
 
@@ -237,15 +295,15 @@ func (b *simpleBalancer) Up(addr grpc.Address) func(error) {
 	// to "fix" it up at application layer. Otherwise, will panic
 	// if b.upc is already closed.
 	if b.closed {
-		return func(err error) {}
+		return func(err error) {}, false
 	}
 	// gRPC might call Up on a stale address.
 	// Prevent updating pinAddr with a stale address.
 	if !hasAddr(b.addrs, addr.Addr) {
-		return func(err error) {}
+		return func(err error) {}, false
 	}
 	if b.pinAddr != "" {
-		return func(err error) {}
+		return func(err error) {}, false
 	}
 	// notify waiting Get()s and pin first connected address
 	close(b.upc)
@@ -259,7 +317,7 @@ func (b *simpleBalancer) Up(addr grpc.Address) func(error) {
 		close(b.downc)
 		b.pinAddr = ""
 		b.mu.Unlock()
-	}
+	}, true
 }
 
 func (b *simpleBalancer) Get(ctx context.Context, opts grpc.BalancerGetOptions) (grpc.Address, func(), error) {
@@ -354,3 +412,11 @@ func getHost(ep string) string {
 	}
 	return url.Host
 }
+
+func eps2addrs(eps []string) []grpc.Address {
+	addrs := make([]grpc.Address, len(eps))
+	for i := range eps {
+		addrs[i].Addr = getHost(eps[i])
+	}
+	return addrs
+}

+ 60 - 0
clientv3/balancer_test.go

@@ -133,6 +133,66 @@ func TestBalancerGetBlocking(t *testing.T) {
 	}
 }
 
+// TestHealthBalancerGraylist checks one endpoint is tried after the other
+// due to gray listing.
+func TestHealthBalancerGraylist(t *testing.T) {
+	var wg sync.WaitGroup
+	// Use 3 endpoints so gray list doesn't fallback to all connections
+	// after failing on 2 endpoints.
+	lns, eps := make([]net.Listener, 3), make([]string, 3)
+	wg.Add(3)
+	connc := make(chan string, 2)
+	for i := range eps {
+		ln, err := net.Listen("tcp", ":0")
+		testutil.AssertNil(t, err)
+		lns[i], eps[i] = ln, ln.Addr().String()
+		go func() {
+			defer wg.Done()
+			for {
+				conn, err := ln.Accept()
+				if err != nil {
+					return
+				}
+				_, err = conn.Read(make([]byte, 512))
+				conn.Close()
+				if err == nil {
+					select {
+					case connc <- ln.Addr().String():
+						// sleep some so balancer catches up
+						// before attempted next reconnect.
+						time.Sleep(50 * time.Millisecond)
+					default:
+					}
+				}
+			}
+		}()
+	}
+
+	sb := newSimpleBalancer(eps)
+	tf := func(s string) (bool, error) { return false, nil }
+	hb := newHealthBalancer(sb, 5*time.Second, tf)
+
+	conn, err := grpc.Dial("", grpc.WithInsecure(), grpc.WithBalancer(hb))
+	testutil.AssertNil(t, err)
+	defer conn.Close()
+
+	kvc := pb.NewKVClient(conn)
+	<-hb.ready()
+
+	kvc.Range(context.TODO(), &pb.RangeRequest{})
+	ep1 := <-connc
+	kvc.Range(context.TODO(), &pb.RangeRequest{})
+	ep2 := <-connc
+	for _, ln := range lns {
+		ln.Close()
+	}
+	wg.Wait()
+
+	if ep1 == ep2 {
+		t.Fatalf("expected %q != %q", ep1, ep2)
+	}
+}
+
 // TestBalancerDoNotBlockOnClose ensures that balancer and grpc don't deadlock each other
 // due to rapid open/close conn. The deadlock causes balancer.Close() to block forever.
 // See issue: https://github.com/coreos/etcd/issues/7283 for more detail.

+ 11 - 5
clientv3/client.go

@@ -55,7 +55,8 @@ type Client struct {
 
 	cfg      Config
 	creds    *credentials.TransportCredentials
-	balancer *simpleBalancer
+	balancer balancer
+	mu       sync.Mutex
 
 	ctx    context.Context
 	cancel context.CancelFunc
@@ -116,8 +117,10 @@ func (c *Client) Endpoints() (eps []string) {
 
 // SetEndpoints updates client's endpoints.
 func (c *Client) SetEndpoints(eps ...string) {
+	c.mu.Lock()
 	c.cfg.Endpoints = eps
-	c.balancer.updateAddrs(eps)
+	c.mu.Unlock()
+	c.balancer.updateAddrs(eps...)
 }
 
 // Sync synchronizes client's endpoints with the known endpoints from the etcd membership.
@@ -227,7 +230,7 @@ func (c *Client) dialSetupOpts(endpoint string, dopts ...grpc.DialOption) (opts
 	opts = append(opts, dopts...)
 
 	f := func(host string, t time.Duration) (net.Conn, error) {
-		proto, host, _ := parseEndpoint(c.balancer.getEndpoint(host))
+		proto, host, _ := parseEndpoint(c.balancer.endpoint(host))
 		if host == "" && endpoint != "" {
 			// dialing an endpoint not in the balancer; use
 			// endpoint passed into dial
@@ -375,7 +378,10 @@ func newClient(cfg *Config) (*Client, error) {
 		client.Password = cfg.Password
 	}
 
-	client.balancer = newSimpleBalancer(cfg.Endpoints)
+	sb := newSimpleBalancer(cfg.Endpoints)
+	hc := func(ep string) (bool, error) { return grpcHealthCheck(client, ep) }
+	client.balancer = newHealthBalancer(sb, cfg.DialTimeout, hc)
+
 	// use Endpoints[0] so that for https:// without any tls config given, then
 	// grpc will assume the certificate server name is the endpoint host.
 	conn, err := client.dial(cfg.Endpoints[0], grpc.WithBalancer(client.balancer))
@@ -391,7 +397,7 @@ func newClient(cfg *Config) (*Client, error) {
 		hasConn := false
 		waitc := time.After(cfg.DialTimeout)
 		select {
-		case <-client.balancer.readyc:
+		case <-client.balancer.ready():
 			hasConn = true
 		case <-ctx.Done():
 		case <-waitc:

+ 212 - 0
clientv3/health_balancer.go

@@ -0,0 +1,212 @@
+// Copyright 2017 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package clientv3
+
+import (
+	"context"
+	"sync"
+	"time"
+
+	"google.golang.org/grpc"
+	"google.golang.org/grpc/codes"
+	healthpb "google.golang.org/grpc/health/grpc_health_v1"
+	"google.golang.org/grpc/status"
+)
+
+const minHealthRetryDuration = 3 * time.Second
+const unknownService = "unknown service grpc.health.v1.Health"
+
+type healthCheckFunc func(ep string) (bool, error)
+
+// healthBalancer wraps a balancer so that it uses health checking
+// to choose its endpoints.
+type healthBalancer struct {
+	balancer
+
+	// healthCheck checks an endpoint's health.
+	healthCheck healthCheckFunc
+
+	// mu protects addrs, eps, unhealthy map, and stopc.
+	mu sync.RWMutex
+
+	// addrs stores all grpc addresses associated with the balancer.
+	addrs []grpc.Address
+
+	// eps stores all client endpoints
+	eps []string
+
+	// unhealthy tracks the last unhealthy time of endpoints.
+	unhealthy map[string]time.Time
+
+	stopc    chan struct{}
+	stopOnce sync.Once
+
+	host2ep map[string]string
+
+	wg sync.WaitGroup
+}
+
+func newHealthBalancer(b balancer, timeout time.Duration, hc healthCheckFunc) *healthBalancer {
+	hb := &healthBalancer{
+		balancer:    b,
+		healthCheck: hc,
+		eps:         b.endpoints(),
+		addrs:       eps2addrs(b.endpoints()),
+		host2ep:     getHost2ep(b.endpoints()),
+		unhealthy:   make(map[string]time.Time),
+		stopc:       make(chan struct{}),
+	}
+	if timeout < minHealthRetryDuration {
+		timeout = minHealthRetryDuration
+	}
+
+	hb.wg.Add(1)
+	go func() {
+		defer hb.wg.Done()
+		hb.updateUnhealthy(timeout)
+	}()
+
+	return hb
+}
+
+func (hb *healthBalancer) Up(addr grpc.Address) func(error) {
+	f, used := hb.up(addr)
+	if !used {
+		return f
+	}
+	return func(err error) {
+		// If connected to a black hole endpoint or a killed server, the gRPC ping
+		// timeout will induce a network I/O error, and retrying until success;
+		// finding healthy endpoint on retry could take several timeouts and redials.
+		// To avoid wasting retries, gray-list unhealthy endpoints.
+		hb.mu.Lock()
+		hb.unhealthy[addr.Addr] = time.Now()
+		hb.mu.Unlock()
+		f(err)
+	}
+}
+
+func (hb *healthBalancer) up(addr grpc.Address) (func(error), bool) {
+	if !hb.mayPin(addr) {
+		return func(err error) {}, false
+	}
+	return hb.balancer.up(addr)
+}
+
+func (hb *healthBalancer) Close() error {
+	hb.stopOnce.Do(func() { close(hb.stopc) })
+	hb.wg.Wait()
+	return hb.balancer.Close()
+}
+
+func (hb *healthBalancer) updateAddrs(eps ...string) {
+	addrs, host2ep := eps2addrs(eps), getHost2ep(eps)
+	hb.mu.Lock()
+	hb.addrs, hb.eps, hb.host2ep = addrs, eps, host2ep
+	hb.mu.Unlock()
+	hb.balancer.updateAddrs(eps...)
+}
+
+func (hb *healthBalancer) endpoint(host string) string {
+	hb.mu.RLock()
+	defer hb.mu.RUnlock()
+	return hb.host2ep[host]
+}
+
+func (hb *healthBalancer) endpoints() []string {
+	hb.mu.RLock()
+	defer hb.mu.RUnlock()
+	return hb.eps
+}
+
+func (hb *healthBalancer) updateUnhealthy(timeout time.Duration) {
+	for {
+		select {
+		case <-time.After(timeout):
+			hb.mu.Lock()
+			for k, v := range hb.unhealthy {
+				if time.Since(v) > timeout {
+					delete(hb.unhealthy, k)
+				}
+			}
+			hb.mu.Unlock()
+			eps := []string{}
+			for _, addr := range hb.liveAddrs() {
+				eps = append(eps, hb.endpoint(addr.Addr))
+			}
+			hb.balancer.updateAddrs(eps...)
+		case <-hb.stopc:
+			return
+		}
+	}
+}
+
+func (hb *healthBalancer) liveAddrs() []grpc.Address {
+	hb.mu.RLock()
+	defer hb.mu.RUnlock()
+	hbAddrs := hb.addrs
+	if len(hb.addrs) == 1 || len(hb.unhealthy) == 0 || len(hb.unhealthy) == len(hb.addrs) {
+		return hbAddrs
+	}
+	addrs := make([]grpc.Address, 0, len(hb.addrs)-len(hb.unhealthy))
+	for _, addr := range hb.addrs {
+		if _, unhealthy := hb.unhealthy[addr.Addr]; !unhealthy {
+			addrs = append(addrs, addr)
+		}
+	}
+	return addrs
+}
+
+func (hb *healthBalancer) mayPin(addr grpc.Address) bool {
+	hb.mu.RLock()
+	skip := len(hb.addrs) == 1 || len(hb.unhealthy) == 0
+	_, bad := hb.unhealthy[addr.Addr]
+	hb.mu.RUnlock()
+	if skip || !bad {
+		return true
+	}
+	if ok, _ := hb.healthCheck(addr.Addr); ok {
+		hb.mu.Lock()
+		delete(hb.unhealthy, addr.Addr)
+		hb.mu.Unlock()
+		return true
+	}
+	hb.mu.Lock()
+	hb.unhealthy[addr.Addr] = time.Now()
+	hb.mu.Unlock()
+	return false
+}
+
+func grpcHealthCheck(client *Client, ep string) (bool, error) {
+	conn, err := client.dial(ep)
+	if err != nil {
+		return false, err
+	}
+	defer conn.Close()
+	cli := healthpb.NewHealthClient(conn)
+	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
+	resp, err := cli.Check(ctx, &healthpb.HealthCheckRequest{})
+	cancel()
+	if err != nil {
+		if s, ok := status.FromError(err); ok && s.Code() == codes.Unavailable {
+			if s.Message() == unknownService {
+				// etcd < v3.3.0
+				return true, nil
+			}
+		}
+		return false, err
+	}
+	return resp.Status == healthpb.HealthCheckResponse_SERVING, nil
+}

+ 26 - 0
clientv3/integration/kv_test.go

@@ -933,3 +933,29 @@ func TestKVPutAtMostOnce(t *testing.T) {
 		t.Fatalf("expected version <= 10, got %+v", resp.Kvs[0])
 	}
 }
+
+func TestKVSwitchUnavailable(t *testing.T) {
+	defer testutil.AfterTest(t)
+	clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
+	defer clus.Terminate(t)
+
+	clus.Members[0].InjectPartition(t, clus.Members[1:])
+	// try to connect with dead node in the endpoint list
+	cfg := clientv3.Config{
+		Endpoints: []string{
+			clus.Members[0].GRPCAddr(),
+			clus.Members[1].GRPCAddr(),
+		},
+		DialTimeout: 1 * time.Second}
+	cli, err := clientv3.New(cfg)
+	if err != nil {
+		t.Fatal(err)
+	}
+	defer cli.Close()
+	timeout := 3 * clus.Members[0].ServerConfig.ReqTimeout()
+	ctx, cancel := context.WithTimeout(context.TODO(), timeout)
+	if _, err := cli.Get(ctx, "abc"); err != nil {
+		t.Fatal(err)
+	}
+	cancel()
+}

+ 10 - 2
clientv3/retry.go

@@ -51,11 +51,19 @@ func isWriteStopError(err error) bool {
 func (c *Client) newRetryWrapper(isStop retryStopErrFunc) retryRpcFunc {
 	return func(rpcCtx context.Context, f rpcFunc) error {
 		for {
-			if err := f(rpcCtx); err == nil || isStop(err) {
+			err := f(rpcCtx)
+			if err == nil {
+				return nil
+			}
+			notify := c.balancer.ConnectNotify()
+			if s, ok := status.FromError(err); ok && s.Code() == codes.Unavailable {
+				c.balancer.next()
+			}
+			if isStop(err) {
 				return err
 			}
 			select {
-			case <-c.balancer.ConnectNotify():
+			case <-notify:
 			case <-rpcCtx.Done():
 				return rpcCtx.Err()
 			case <-c.ctx.Done():