Browse Source

Merge pull request #8738 from gyuho/ccc

clientv3: fix balancer notify, stale endpoint handling, retry
Xiang Li 8 years ago
parent
commit
97f0b28bdb
3 changed files with 35 additions and 21 deletions
  1. 19 0
      clientv3/balancer.go
  2. 7 17
      clientv3/health_balancer.go
  3. 9 4
      clientv3/retry.go

+ 19 - 0
clientv3/balancer.go

@@ -263,9 +263,28 @@ func (b *simpleBalancer) notifyAddrs(msg notifyMsg) {
 	}
 	b.mu.RLock()
 	addrs := b.addrs
+	pinAddr := b.pinAddr
+	downc := b.downc
 	b.mu.RUnlock()
+
+	var waitDown bool
+	if pinAddr != "" {
+		waitDown = true
+		for _, a := range addrs {
+			if a.Addr == pinAddr {
+				waitDown = false
+			}
+		}
+	}
+
 	select {
 	case b.notifyCh <- addrs:
+		if waitDown {
+			select {
+			case <-downc:
+			case <-b.stopc:
+			}
+		}
 	case <-b.stopc:
 	}
 }

+ 7 - 17
clientv3/health_balancer.go

@@ -93,13 +93,8 @@ func (hb *healthBalancer) Up(addr grpc.Address) func(error) {
 		// timeout will induce a network I/O error, and retrying until success;
 		// finding healthy endpoint on retry could take several timeouts and redials.
 		// To avoid wasting retries, gray-list unhealthy endpoints.
-		hb.mu.Lock()
-		hb.unhealthy[addr.Addr] = time.Now()
-		hb.mu.Unlock()
+		hb.hostPortError(addr.Addr, err)
 		f(err)
-		if logger.V(4) {
-			logger.Infof("clientv3/health-balancer: %q becomes unhealthy (%q)", addr.Addr, err.Error())
-		}
 	}
 }
 
@@ -143,13 +138,6 @@ func (hb *healthBalancer) updateUnhealthy(timeout time.Duration) {
 		case <-time.After(timeout):
 			hb.mu.Lock()
 			for k, v := range hb.unhealthy {
-				if _, ok := hb.hostPort2ep[k]; !ok {
-					delete(hb.unhealthy, k)
-					if logger.V(4) {
-						logger.Infof("clientv3/health-balancer: removes stale host:port %q from unhealthy", k)
-					}
-					continue
-				}
 				if time.Since(v) > timeout {
 					delete(hb.unhealthy, k)
 					if logger.V(4) {
@@ -187,11 +175,13 @@ func (hb *healthBalancer) liveAddrs() []grpc.Address {
 
 func (hb *healthBalancer) hostPortError(hostPort string, err error) {
 	hb.mu.Lock()
-	hb.unhealthy[hostPort] = time.Now()
-	hb.mu.Unlock()
-	if logger.V(4) {
-		logger.Infof("clientv3/health-balancer: marking %q as unhealthy (%q)", hostPort, err.Error())
+	if _, ok := hb.hostPort2ep[hostPort]; ok {
+		hb.unhealthy[hostPort] = time.Now()
+		if logger.V(4) {
+			logger.Infof("clientv3/health-balancer: marking %q as unhealthy (%q)", hostPort, err.Error())
+		}
 	}
+	hb.mu.Unlock()
 }
 
 func (hb *healthBalancer) mayPin(addr grpc.Address) bool {

+ 9 - 4
clientv3/retry.go

@@ -32,7 +32,7 @@ type retryStopErrFunc func(error) bool
 func isRepeatableStopError(err error) bool {
 	eErr := rpctypes.Error(err)
 	// always stop retry on etcd errors
-	if _, ok := eErr.(rpctypes.EtcdError); ok {
+	if serverErr, ok := eErr.(rpctypes.EtcdError); ok && serverErr.Code() != codes.Unavailable {
 		return true
 	}
 	// only retry if unavailable
@@ -62,11 +62,16 @@ func (c *Client) newRetryWrapper(isStop retryStopErrFunc) retryRPCFunc {
 			if logger.V(4) {
 				logger.Infof("clientv3/retry: error %q on pinned endpoint %q", err.Error(), pinned)
 			}
-			// mark this before endpoint switch is triggered
-			c.balancer.hostPortError(pinned, err)
-			if s, ok := status.FromError(err); ok && s.Code() == codes.Unavailable {
+
+			if s, ok := status.FromError(err); ok && (s.Code() == codes.Unavailable || s.Code() == codes.DeadlineExceeded || s.Code() == codes.Internal) {
+				// mark this before endpoint switch is triggered
+				c.balancer.hostPortError(pinned, err)
 				c.balancer.next()
+				if logger.V(4) {
+					logger.Infof("clientv3/retry: switching from %q due to error %q", pinned, err.Error())
+				}
 			}
+
 			if isStop(err) {
 				return err
 			}