Browse Source

clientv3/balancer: mark partitioned member as unhealthy

Previous behavior is when server returns errors, retry
wrapper does not do anything, while passively expecting
balancer to gray-list the isolated endpoint. This is
problematic when multiple endpoints are passed, and
network partition happens.

This patch adds 'endpointError' method to 'balancer' interface
to actively(possibly even before health-check API gets called)
handle RPC errors and gray-list endpoints for the time being,
thus speeding up the endpoint switch.

This is safe in a single-endpoint case, because balancer will
retry no matter what in such case.

Signed-off-by: Gyu-Ho Lee <gyuhox@gmail.com>
Gyu-Ho Lee 8 years ago
parent
commit
fbed568b6a
3 changed files with 15 additions and 0 deletions
  1. 4 0
      clientv3/balancer.go
  2. 9 0
      clientv3/health_balancer.go
  3. 2 0
      clientv3/retry.go

+ 4 - 0
clientv3/balancer.go

@@ -44,6 +44,8 @@ type balancer interface {
 	endpoints() []string
 	endpoints() []string
 	// pinned returns the current pinned endpoint.
 	// pinned returns the current pinned endpoint.
 	pinned() string
 	pinned() string
+	// endpointError handles error from server-side.
+	endpointError(addr string, err error)
 
 
 	// up is Up but includes whether the balancer will use the connection.
 	// up is Up but includes whether the balancer will use the connection.
 	up(addr grpc.Address) (func(error), bool)
 	up(addr grpc.Address) (func(error), bool)
@@ -150,6 +152,8 @@ func (b *simpleBalancer) pinned() string {
 	return b.pinAddr
 	return b.pinAddr
 }
 }
 
 
+func (b *simpleBalancer) endpointError(addr string, err error) { return }
+
 func getHost2ep(eps []string) map[string]string {
 func getHost2ep(eps []string) map[string]string {
 	hm := make(map[string]string, len(eps))
 	hm := make(map[string]string, len(eps))
 	for i := range eps {
 	for i := range eps {

+ 9 - 0
clientv3/health_balancer.go

@@ -177,6 +177,15 @@ func (hb *healthBalancer) liveAddrs() []grpc.Address {
 	return addrs
 	return addrs
 }
 }
 
 
+func (hb *healthBalancer) endpointError(addr string, err error) {
+	hb.mu.Lock()
+	hb.unhealthy[addr] = time.Now()
+	hb.mu.Unlock()
+	if logger.V(4) {
+		logger.Infof("clientv3/health-balancer: marking %s as unhealthy (%v)", addr, err)
+	}
+}
+
 func (hb *healthBalancer) mayPin(addr grpc.Address) bool {
 func (hb *healthBalancer) mayPin(addr grpc.Address) bool {
 	hb.mu.RLock()
 	hb.mu.RLock()
 	skip := len(hb.addrs) == 1 || len(hb.unhealthy) == 0
 	skip := len(hb.addrs) == 1 || len(hb.unhealthy) == 0

+ 2 - 0
clientv3/retry.go

@@ -66,6 +66,8 @@ func (c *Client) newRetryWrapper(isStop retryStopErrFunc) retryRpcFunc {
 			if logger.V(4) {
 			if logger.V(4) {
 				logger.Infof("clientv3/retry: error %v on pinned endpoint %s", err, pinned)
 				logger.Infof("clientv3/retry: error %v on pinned endpoint %s", err, pinned)
 			}
 			}
+			// mark this before endpoint switch is triggered
+			c.balancer.endpointError(pinned, err)
 			notify := c.balancer.ConnectNotify()
 			notify := c.balancer.ConnectNotify()
 			if s, ok := status.FromError(err); ok && s.Code() == codes.Unavailable {
 			if s, ok := status.FromError(err); ok && s.Code() == codes.Unavailable {
 				c.balancer.next()
 				c.balancer.next()