Browse Source

clientv3: try next endpoint point on unavailable error

Anthony Romano 8 years ago
parent
commit
efd7800e0f
2 changed files with 52 additions and 12 deletions
  1. 42 10
      clientv3/balancer.go
  2. 10 2
      clientv3/retry.go

+ 42 - 10
clientv3/balancer.go

@@ -29,6 +29,13 @@ import (
 // This error is returned only when opts.BlockingWait is true.
 // This error is returned only when opts.BlockingWait is true.
 var ErrNoAddrAvilable = grpc.Errorf(codes.Unavailable, "there is no address available")
 var ErrNoAddrAvilable = grpc.Errorf(codes.Unavailable, "there is no address available")
 
 
+type notifyMsg int
+
+const (
+	notifyReset notifyMsg = iota
+	notifyNext
+)
+
 type balancer interface {
 type balancer interface {
 	grpc.Balancer
 	grpc.Balancer
 	ConnectNotify() <-chan struct{}
 	ConnectNotify() <-chan struct{}
@@ -43,6 +50,8 @@ type balancer interface {
 	updateAddrs(endpoints ...string)
 	updateAddrs(endpoints ...string)
 	// ready returns a channel that closes when the balancer first connects.
 	// ready returns a channel that closes when the balancer first connects.
 	ready() <-chan struct{}
 	ready() <-chan struct{}
+	// next forces the balancer to switch endpoints.
+	next()
 }
 }
 
 
 // simpleBalancer does the bare minimum to expose multiple eps
 // simpleBalancer does the bare minimum to expose multiple eps
@@ -77,7 +86,7 @@ type simpleBalancer struct {
 	donec chan struct{}
 	donec chan struct{}
 
 
 	// updateAddrsC notifies updateNotifyLoop to update addrs.
 	// updateAddrsC notifies updateNotifyLoop to update addrs.
-	updateAddrsC chan struct{}
+	updateAddrsC chan notifyMsg
 
 
 	// grpc issues TLS cert checks using the string passed into dial so
 	// grpc issues TLS cert checks using the string passed into dial so
 	// that string must be the host. To recover the full scheme://host URL,
 	// that string must be the host. To recover the full scheme://host URL,
@@ -92,7 +101,7 @@ type simpleBalancer struct {
 }
 }
 
 
 func newSimpleBalancer(eps []string) *simpleBalancer {
 func newSimpleBalancer(eps []string) *simpleBalancer {
-	notifyCh := make(chan []grpc.Address, 1)
+	notifyCh := make(chan []grpc.Address)
 	addrs := eps2addrs(eps)
 	addrs := eps2addrs(eps)
 	sb := &simpleBalancer{
 	sb := &simpleBalancer{
 		addrs:        addrs,
 		addrs:        addrs,
@@ -103,7 +112,7 @@ func newSimpleBalancer(eps []string) *simpleBalancer {
 		stopc:        make(chan struct{}),
 		stopc:        make(chan struct{}),
 		downc:        make(chan struct{}),
 		downc:        make(chan struct{}),
 		donec:        make(chan struct{}),
 		donec:        make(chan struct{}),
-		updateAddrsC: make(chan struct{}, 1),
+		updateAddrsC: make(chan notifyMsg),
 		host2ep:      getHost2ep(eps),
 		host2ep:      getHost2ep(eps),
 	}
 	}
 	close(sb.downc)
 	close(sb.downc)
@@ -171,12 +180,27 @@ func (b *simpleBalancer) updateAddrs(eps ...string) {
 
 
 	if update {
 	if update {
 		select {
 		select {
-		case b.updateAddrsC <- struct{}{}:
+		case b.updateAddrsC <- notifyReset:
 		case <-b.stopc:
 		case <-b.stopc:
 		}
 		}
 	}
 	}
 }
 }
 
 
+func (b *simpleBalancer) next() {
+	b.mu.RLock()
+	downc := b.downc
+	b.mu.RUnlock()
+	select {
+	case b.updateAddrsC <- notifyNext:
+	case <-b.stopc:
+	}
+	// wait until disconnect so new RPCs are not issued on old connection
+	select {
+	case <-downc:
+	case <-b.stopc:
+	}
+}
+
 func hasAddr(addrs []grpc.Address, targetAddr string) bool {
 func hasAddr(addrs []grpc.Address, targetAddr string) bool {
 	for _, addr := range addrs {
 	for _, addr := range addrs {
 		if targetAddr == addr.Addr {
 		if targetAddr == addr.Addr {
@@ -213,11 +237,11 @@ func (b *simpleBalancer) updateNotifyLoop() {
 			default:
 			default:
 			}
 			}
 		case downc == nil:
 		case downc == nil:
-			b.notifyAddrs()
+			b.notifyAddrs(notifyReset)
 			select {
 			select {
 			case <-upc:
 			case <-upc:
-			case <-b.updateAddrsC:
-				b.notifyAddrs()
+			case msg := <-b.updateAddrsC:
+				b.notifyAddrs(msg)
 			case <-b.stopc:
 			case <-b.stopc:
 				return
 				return
 			}
 			}
@@ -231,16 +255,24 @@ func (b *simpleBalancer) updateNotifyLoop() {
 			}
 			}
 			select {
 			select {
 			case <-downc:
 			case <-downc:
-			case <-b.updateAddrsC:
+				b.notifyAddrs(notifyReset)
+			case msg := <-b.updateAddrsC:
+				b.notifyAddrs(msg)
 			case <-b.stopc:
 			case <-b.stopc:
 				return
 				return
 			}
 			}
-			b.notifyAddrs()
 		}
 		}
 	}
 	}
 }
 }
 
 
-func (b *simpleBalancer) notifyAddrs() {
+func (b *simpleBalancer) notifyAddrs(msg notifyMsg) {
+	if msg == notifyNext {
+		select {
+		case b.notifyCh <- []grpc.Address{}:
+		case <-b.stopc:
+			return
+		}
+	}
 	b.mu.RLock()
 	b.mu.RLock()
 	addrs := b.addrs
 	addrs := b.addrs
 	b.mu.RUnlock()
 	b.mu.RUnlock()

+ 10 - 2
clientv3/retry.go

@@ -51,11 +51,19 @@ func isWriteStopError(err error) bool {
 func (c *Client) newRetryWrapper(isStop retryStopErrFunc) retryRpcFunc {
 func (c *Client) newRetryWrapper(isStop retryStopErrFunc) retryRpcFunc {
 	return func(rpcCtx context.Context, f rpcFunc) error {
 	return func(rpcCtx context.Context, f rpcFunc) error {
 		for {
 		for {
-			if err := f(rpcCtx); err == nil || isStop(err) {
+			err := f(rpcCtx)
+			if err == nil {
+				return nil
+			}
+			notify := c.balancer.ConnectNotify()
+			if s, ok := status.FromError(err); ok && s.Code() == codes.Unavailable {
+				c.balancer.next()
+			}
+			if isStop(err) {
 				return err
 				return err
 			}
 			}
 			select {
 			select {
-			case <-c.balancer.ConnectNotify():
+			case <-notify:
 			case <-rpcCtx.Done():
 			case <-rpcCtx.Done():
 				return rpcCtx.Err()
 				return rpcCtx.Err()
 			case <-c.ctx.Done():
 			case <-c.ctx.Done():