Browse Source

clientv3: health check balancer

Anthony Romano 8 years ago
parent
commit
84db8fdaea
3 changed files with 275 additions and 23 deletions
  1. 52 18
      clientv3/balancer.go
  2. 11 5
      clientv3/client.go
  3. 212 0
      clientv3/health_balancer.go

+ 52 - 18
clientv3/balancer.go

@@ -29,11 +29,31 @@ import (
 // This error is returned only when opts.BlockingWait is true.
 // This error is returned only when opts.BlockingWait is true.
 var ErrNoAddrAvilable = grpc.Errorf(codes.Unavailable, "there is no address available")
 var ErrNoAddrAvilable = grpc.Errorf(codes.Unavailable, "there is no address available")
 
 
+type balancer interface {
+	grpc.Balancer
+	ConnectNotify() <-chan struct{}
+
+	endpoint(host string) string
+	endpoints() []string
+
+	// up is Up but includes whether the balancer will use the connection.
+	up(addr grpc.Address) (func(error), bool)
+
+	// updateAddrs changes the balancer's endpoints.
+	updateAddrs(endpoints ...string)
+	// ready returns a channel that closes when the balancer first connects.
+	ready() <-chan struct{}
+}
+
 // simpleBalancer does the bare minimum to expose multiple eps
 // simpleBalancer does the bare minimum to expose multiple eps
 // to the grpc reconnection code path
 // to the grpc reconnection code path
 type simpleBalancer struct {
 type simpleBalancer struct {
-	// addrs are the client's endpoints for grpc
+	// addrs are the client's endpoint addresses for grpc
 	addrs []grpc.Address
 	addrs []grpc.Address
+
+	// eps holds the raw endpoints from the client
+	eps []string
+
 	// notifyCh notifies grpc of the set of addresses for connecting
 	// notifyCh notifies grpc of the set of addresses for connecting
 	notifyCh chan []grpc.Address
 	notifyCh chan []grpc.Address
 
 
@@ -73,12 +93,10 @@ type simpleBalancer struct {
 
 
 func newSimpleBalancer(eps []string) *simpleBalancer {
 func newSimpleBalancer(eps []string) *simpleBalancer {
 	notifyCh := make(chan []grpc.Address, 1)
 	notifyCh := make(chan []grpc.Address, 1)
-	addrs := make([]grpc.Address, len(eps))
-	for i := range eps {
-		addrs[i].Addr = getHost(eps[i])
-	}
+	addrs := eps2addrs(eps)
 	sb := &simpleBalancer{
 	sb := &simpleBalancer{
 		addrs:        addrs,
 		addrs:        addrs,
+		eps:          eps,
 		notifyCh:     notifyCh,
 		notifyCh:     notifyCh,
 		readyc:       make(chan struct{}),
 		readyc:       make(chan struct{}),
 		upc:          make(chan struct{}),
 		upc:          make(chan struct{}),
@@ -101,12 +119,20 @@ func (b *simpleBalancer) ConnectNotify() <-chan struct{} {
 	return b.upc
 	return b.upc
 }
 }
 
 
-func (b *simpleBalancer) getEndpoint(host string) string {
+func (b *simpleBalancer) ready() <-chan struct{} { return b.readyc }
+
+func (b *simpleBalancer) endpoint(host string) string {
 	b.mu.Lock()
 	b.mu.Lock()
 	defer b.mu.Unlock()
 	defer b.mu.Unlock()
 	return b.host2ep[host]
 	return b.host2ep[host]
 }
 }
 
 
+func (b *simpleBalancer) endpoints() []string {
+	b.mu.RLock()
+	defer b.mu.RUnlock()
+	return b.eps
+}
+
 func getHost2ep(eps []string) map[string]string {
 func getHost2ep(eps []string) map[string]string {
 	hm := make(map[string]string, len(eps))
 	hm := make(map[string]string, len(eps))
 	for i := range eps {
 	for i := range eps {
@@ -116,7 +142,7 @@ func getHost2ep(eps []string) map[string]string {
 	return hm
 	return hm
 }
 }
 
 
-func (b *simpleBalancer) updateAddrs(eps []string) {
+func (b *simpleBalancer) updateAddrs(eps ...string) {
 	np := getHost2ep(eps)
 	np := getHost2ep(eps)
 
 
 	b.mu.Lock()
 	b.mu.Lock()
@@ -135,17 +161,12 @@ func (b *simpleBalancer) updateAddrs(eps []string) {
 	}
 	}
 
 
 	b.host2ep = np
 	b.host2ep = np
-
-	addrs := make([]grpc.Address, 0, len(eps))
-	for i := range eps {
-		addrs = append(addrs, grpc.Address{Addr: getHost(eps[i])})
-	}
-	b.addrs = addrs
+	b.addrs, b.eps = eps2addrs(eps), eps
 
 
 	// updating notifyCh can trigger new connections,
 	// updating notifyCh can trigger new connections,
 	// only update addrs if all connections are down
 	// only update addrs if all connections are down
 	// or addrs does not include pinAddr.
 	// or addrs does not include pinAddr.
-	update := !hasAddr(addrs, b.pinAddr)
+	update := !hasAddr(b.addrs, b.pinAddr)
 	b.mu.Unlock()
 	b.mu.Unlock()
 
 
 	if update {
 	if update {
@@ -230,6 +251,11 @@ func (b *simpleBalancer) notifyAddrs() {
 }
 }
 
 
 func (b *simpleBalancer) Up(addr grpc.Address) func(error) {
 func (b *simpleBalancer) Up(addr grpc.Address) func(error) {
+	f, _ := b.up(addr)
+	return f
+}
+
+func (b *simpleBalancer) up(addr grpc.Address) (func(error), bool) {
 	b.mu.Lock()
 	b.mu.Lock()
 	defer b.mu.Unlock()
 	defer b.mu.Unlock()
 
 
@@ -237,15 +263,15 @@ func (b *simpleBalancer) Up(addr grpc.Address) func(error) {
 	// to "fix" it up at application layer. Otherwise, will panic
 	// to "fix" it up at application layer. Otherwise, will panic
 	// if b.upc is already closed.
 	// if b.upc is already closed.
 	if b.closed {
 	if b.closed {
-		return func(err error) {}
+		return func(err error) {}, false
 	}
 	}
 	// gRPC might call Up on a stale address.
 	// gRPC might call Up on a stale address.
 	// Prevent updating pinAddr with a stale address.
 	// Prevent updating pinAddr with a stale address.
 	if !hasAddr(b.addrs, addr.Addr) {
 	if !hasAddr(b.addrs, addr.Addr) {
-		return func(err error) {}
+		return func(err error) {}, false
 	}
 	}
 	if b.pinAddr != "" {
 	if b.pinAddr != "" {
-		return func(err error) {}
+		return func(err error) {}, false
 	}
 	}
 	// notify waiting Get()s and pin first connected address
 	// notify waiting Get()s and pin first connected address
 	close(b.upc)
 	close(b.upc)
@@ -259,7 +285,7 @@ func (b *simpleBalancer) Up(addr grpc.Address) func(error) {
 		close(b.downc)
 		close(b.downc)
 		b.pinAddr = ""
 		b.pinAddr = ""
 		b.mu.Unlock()
 		b.mu.Unlock()
-	}
+	}, true
 }
 }
 
 
 func (b *simpleBalancer) Get(ctx context.Context, opts grpc.BalancerGetOptions) (grpc.Address, func(), error) {
 func (b *simpleBalancer) Get(ctx context.Context, opts grpc.BalancerGetOptions) (grpc.Address, func(), error) {
@@ -354,3 +380,11 @@ func getHost(ep string) string {
 	}
 	}
 	return url.Host
 	return url.Host
 }
 }
+
+func eps2addrs(eps []string) []grpc.Address {
+	addrs := make([]grpc.Address, len(eps))
+	for i := range eps {
+		addrs[i].Addr = getHost(eps[i])
+	}
+	return addrs
+}

+ 11 - 5
clientv3/client.go

@@ -55,7 +55,8 @@ type Client struct {
 
 
 	cfg      Config
 	cfg      Config
 	creds    *credentials.TransportCredentials
 	creds    *credentials.TransportCredentials
-	balancer *simpleBalancer
+	balancer balancer
+	mu       sync.Mutex
 
 
 	ctx    context.Context
 	ctx    context.Context
 	cancel context.CancelFunc
 	cancel context.CancelFunc
@@ -116,8 +117,10 @@ func (c *Client) Endpoints() (eps []string) {
 
 
 // SetEndpoints updates client's endpoints.
 // SetEndpoints updates client's endpoints.
 func (c *Client) SetEndpoints(eps ...string) {
 func (c *Client) SetEndpoints(eps ...string) {
+	c.mu.Lock()
 	c.cfg.Endpoints = eps
 	c.cfg.Endpoints = eps
-	c.balancer.updateAddrs(eps)
+	c.mu.Unlock()
+	c.balancer.updateAddrs(eps...)
 }
 }
 
 
 // Sync synchronizes client's endpoints with the known endpoints from the etcd membership.
 // Sync synchronizes client's endpoints with the known endpoints from the etcd membership.
@@ -227,7 +230,7 @@ func (c *Client) dialSetupOpts(endpoint string, dopts ...grpc.DialOption) (opts
 	opts = append(opts, dopts...)
 	opts = append(opts, dopts...)
 
 
 	f := func(host string, t time.Duration) (net.Conn, error) {
 	f := func(host string, t time.Duration) (net.Conn, error) {
-		proto, host, _ := parseEndpoint(c.balancer.getEndpoint(host))
+		proto, host, _ := parseEndpoint(c.balancer.endpoint(host))
 		if host == "" && endpoint != "" {
 		if host == "" && endpoint != "" {
 			// dialing an endpoint not in the balancer; use
 			// dialing an endpoint not in the balancer; use
 			// endpoint passed into dial
 			// endpoint passed into dial
@@ -375,7 +378,10 @@ func newClient(cfg *Config) (*Client, error) {
 		client.Password = cfg.Password
 		client.Password = cfg.Password
 	}
 	}
 
 
-	client.balancer = newSimpleBalancer(cfg.Endpoints)
+	sb := newSimpleBalancer(cfg.Endpoints)
+	hc := func(ep string) (bool, error) { return grpcHealthCheck(client, ep) }
+	client.balancer = newHealthBalancer(sb, cfg.DialTimeout, hc)
+
 	// use Endpoints[0] so that for https:// without any tls config given, then
 	// use Endpoints[0] so that for https:// without any tls config given, then
 	// grpc will assume the certificate server name is the endpoint host.
 	// grpc will assume the certificate server name is the endpoint host.
 	conn, err := client.dial(cfg.Endpoints[0], grpc.WithBalancer(client.balancer))
 	conn, err := client.dial(cfg.Endpoints[0], grpc.WithBalancer(client.balancer))
@@ -391,7 +397,7 @@ func newClient(cfg *Config) (*Client, error) {
 		hasConn := false
 		hasConn := false
 		waitc := time.After(cfg.DialTimeout)
 		waitc := time.After(cfg.DialTimeout)
 		select {
 		select {
-		case <-client.balancer.readyc:
+		case <-client.balancer.ready():
 			hasConn = true
 			hasConn = true
 		case <-ctx.Done():
 		case <-ctx.Done():
 		case <-waitc:
 		case <-waitc:

+ 212 - 0
clientv3/health_balancer.go

@@ -0,0 +1,212 @@
+// Copyright 2017 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package clientv3
+
+import (
+	"context"
+	"sync"
+	"time"
+
+	"google.golang.org/grpc"
+	"google.golang.org/grpc/codes"
+	healthpb "google.golang.org/grpc/health/grpc_health_v1"
+	"google.golang.org/grpc/status"
+)
+
+const minHealthRetryDuration = 3 * time.Second
+const unknownService = "unknown service grpc.health.v1.Health"
+
+type healthCheckFunc func(ep string) (bool, error)
+
+// healthBalancer wraps a balancer so that it uses health checking
+// to choose its endpoints.
+type healthBalancer struct {
+	balancer
+
+	// healthCheck checks an endpoint's health.
+	healthCheck healthCheckFunc
+
+	// mu protects addrs, eps, unhealthy map, and stopc.
+	mu sync.RWMutex
+
+	// addrs stores all grpc addresses associated with the balancer.
+	addrs []grpc.Address
+
+	// eps stores all client endpoints
+	eps []string
+
+	// unhealthy tracks the last unhealthy time of endpoints.
+	unhealthy map[string]time.Time
+
+	stopc    chan struct{}
+	stopOnce sync.Once
+
+	host2ep map[string]string
+
+	wg sync.WaitGroup
+}
+
+func newHealthBalancer(b balancer, timeout time.Duration, hc healthCheckFunc) *healthBalancer {
+	hb := &healthBalancer{
+		balancer:    b,
+		healthCheck: hc,
+		eps:         b.endpoints(),
+		addrs:       eps2addrs(b.endpoints()),
+		host2ep:     getHost2ep(b.endpoints()),
+		unhealthy:   make(map[string]time.Time),
+		stopc:       make(chan struct{}),
+	}
+	if timeout < minHealthRetryDuration {
+		timeout = minHealthRetryDuration
+	}
+
+	hb.wg.Add(1)
+	go func() {
+		defer hb.wg.Done()
+		hb.updateUnhealthy(timeout)
+	}()
+
+	return hb
+}
+
+func (hb *healthBalancer) Up(addr grpc.Address) func(error) {
+	f, used := hb.up(addr)
+	if !used {
+		return f
+	}
+	return func(err error) {
+		// If connected to a black hole endpoint or a killed server, the gRPC ping
+		// timeout will induce a network I/O error, and retrying until success;
+		// finding healthy endpoint on retry could take several timeouts and redials.
+		// To avoid wasting retries, gray-list unhealthy endpoints.
+		hb.mu.Lock()
+		hb.unhealthy[addr.Addr] = time.Now()
+		hb.mu.Unlock()
+		f(err)
+	}
+}
+
+func (hb *healthBalancer) up(addr grpc.Address) (func(error), bool) {
+	if !hb.mayPin(addr) {
+		return func(err error) {}, false
+	}
+	return hb.balancer.up(addr)
+}
+
+func (hb *healthBalancer) Close() error {
+	hb.stopOnce.Do(func() { close(hb.stopc) })
+	hb.wg.Wait()
+	return hb.balancer.Close()
+}
+
+func (hb *healthBalancer) updateAddrs(eps ...string) {
+	addrs, host2ep := eps2addrs(eps), getHost2ep(eps)
+	hb.mu.Lock()
+	hb.addrs, hb.eps, hb.host2ep = addrs, eps, host2ep
+	hb.mu.Unlock()
+	hb.balancer.updateAddrs(eps...)
+}
+
+func (hb *healthBalancer) endpoint(host string) string {
+	hb.mu.RLock()
+	defer hb.mu.RUnlock()
+	return hb.host2ep[host]
+}
+
+func (hb *healthBalancer) endpoints() []string {
+	hb.mu.RLock()
+	defer hb.mu.RUnlock()
+	return hb.eps
+}
+
+func (hb *healthBalancer) updateUnhealthy(timeout time.Duration) {
+	for {
+		select {
+		case <-time.After(timeout):
+			hb.mu.Lock()
+			for k, v := range hb.unhealthy {
+				if time.Since(v) > timeout {
+					delete(hb.unhealthy, k)
+				}
+			}
+			hb.mu.Unlock()
+			eps := []string{}
+			for _, addr := range hb.liveAddrs() {
+				eps = append(eps, hb.endpoint(addr.Addr))
+			}
+			hb.balancer.updateAddrs(eps...)
+		case <-hb.stopc:
+			return
+		}
+	}
+}
+
+func (hb *healthBalancer) liveAddrs() []grpc.Address {
+	hb.mu.RLock()
+	defer hb.mu.RUnlock()
+	hbAddrs := hb.addrs
+	if len(hb.addrs) == 1 || len(hb.unhealthy) == 0 || len(hb.unhealthy) == len(hb.addrs) {
+		return hbAddrs
+	}
+	addrs := make([]grpc.Address, 0, len(hb.addrs)-len(hb.unhealthy))
+	for _, addr := range hb.addrs {
+		if _, unhealthy := hb.unhealthy[addr.Addr]; !unhealthy {
+			addrs = append(addrs, addr)
+		}
+	}
+	return addrs
+}
+
+func (hb *healthBalancer) mayPin(addr grpc.Address) bool {
+	hb.mu.RLock()
+	skip := len(hb.addrs) == 1 || len(hb.unhealthy) == 0
+	_, bad := hb.unhealthy[addr.Addr]
+	hb.mu.RUnlock()
+	if skip || !bad {
+		return true
+	}
+	if ok, _ := hb.healthCheck(addr.Addr); ok {
+		hb.mu.Lock()
+		delete(hb.unhealthy, addr.Addr)
+		hb.mu.Unlock()
+		return true
+	}
+	hb.mu.Lock()
+	hb.unhealthy[addr.Addr] = time.Now()
+	hb.mu.Unlock()
+	return false
+}
+
+func grpcHealthCheck(client *Client, ep string) (bool, error) {
+	conn, err := client.dial(ep)
+	if err != nil {
+		return false, err
+	}
+	defer conn.Close()
+	cli := healthpb.NewHealthClient(conn)
+	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
+	resp, err := cli.Check(ctx, &healthpb.HealthCheckRequest{})
+	cancel()
+	if err != nil {
+		if s, ok := status.FromError(err); ok && s.Code() == codes.Unavailable {
+			if s.Message() == unknownService {
+				// etcd < v3.3.0
+				return true, nil
+			}
+		}
+		return false, err
+	}
+	return resp.Status == healthpb.HealthCheckResponse_SERVING, nil
+}