瀏覽代碼

clientv3: don't race on upc/downc/switch endpoints in balancer

If the balancer update notification loop starts with a downed
connection and endpoints are switched while the old connection is up,
the balancer can potentially wait forever for an up connection without
refreshing the connections to reflect the current endpoints.

Instead, fetch upc/downc together, only caring about a single transition
either from down->up or up->down for each iteration

Simple way to reproduce failures: add time.Sleep(time.Second) to the
beginning of the update notification loop.
Anthony Romano 8 年之前
父節點
當前提交
43e5f892f6
共有 1 個文件被更改,包括 49 次插入37 次删除
  1. 49 37
      clientv3/balancer.go

+ 49 - 37
clientv3/balancer.go

@@ -77,7 +77,6 @@ func newSimpleBalancer(eps []string) *simpleBalancer {
 	for i := range eps {
 		addrs[i].Addr = getHost(eps[i])
 	}
-	notifyCh <- addrs
 	sb := &simpleBalancer{
 		addrs:        addrs,
 		notifyCh:     notifyCh,
@@ -89,6 +88,7 @@ func newSimpleBalancer(eps []string) *simpleBalancer {
 		updateAddrsC: make(chan struct{}, 1),
 		host2ep:      getHost2ep(eps),
 	}
+	close(sb.downc)
 	go sb.updateNotifyLoop()
 	return sb
 }
@@ -170,38 +170,51 @@ func (b *simpleBalancer) updateNotifyLoop() {
 
 	for {
 		b.mu.RLock()
-		upc := b.upc
+		upc, downc, addr := b.upc, b.downc, b.pinAddr
 		b.mu.RUnlock()
-		var downc chan struct{}
+		// downc or upc should be closed
+		select {
+		case <-downc:
+			downc = nil
+		default:
+		}
 		select {
 		case <-upc:
-			var addr string
-			b.mu.RLock()
-			addr = b.pinAddr
-			// Up() sets pinAddr and downc as a pair under b.mu
-			downc = b.downc
-			b.mu.RUnlock()
-			if addr == "" {
-				break
+			upc = nil
+		default:
+		}
+		switch {
+		case downc == nil && upc == nil:
+			// stale
+			select {
+			case <-b.stopc:
+				return
+			default:
 			}
-			// close opened connections that are not pinAddr
-			// this ensures only one connection is open per client
+		case downc == nil:
+			b.notifyAddrs()
 			select {
+			case <-upc:
+			case <-b.updateAddrsC:
+				b.notifyAddrs()
+			case <-b.stopc:
+				return
+			}
+		case upc == nil:
+			select {
+			// close connections that are not the pinned address
 			case b.notifyCh <- []grpc.Address{{Addr: addr}}:
+			case <-downc:
+			case <-b.stopc:
+				return
+			}
+			select {
+			case <-downc:
+			case <-b.updateAddrsC:
 			case <-b.stopc:
 				return
 			}
-		case <-b.updateAddrsC:
-			b.notifyAddrs()
-			continue
-		}
-		select {
-		case <-downc:
-			b.notifyAddrs()
-		case <-b.updateAddrsC:
 			b.notifyAddrs()
-		case <-b.stopc:
-			return
 		}
 	}
 }
@@ -231,23 +244,20 @@ func (b *simpleBalancer) Up(addr grpc.Address) func(error) {
 	if !hasAddr(b.addrs, addr.Addr) {
 		return func(err error) {}
 	}
-
-	if b.pinAddr == "" {
-		// notify waiting Get()s and pin first connected address
-		close(b.upc)
-		b.downc = make(chan struct{})
-		b.pinAddr = addr.Addr
-		// notify client that a connection is up
-		b.readyOnce.Do(func() { close(b.readyc) })
+	if b.pinAddr != "" {
+		return func(err error) {}
 	}
-
+	// notify waiting Get()s and pin first connected address
+	close(b.upc)
+	b.downc = make(chan struct{})
+	b.pinAddr = addr.Addr
+	// notify client that a connection is up
+	b.readyOnce.Do(func() { close(b.readyc) })
 	return func(err error) {
 		b.mu.Lock()
-		if b.pinAddr == addr.Addr {
-			b.upc = make(chan struct{})
-			close(b.downc)
-			b.pinAddr = ""
-		}
+		b.upc = make(chan struct{})
+		close(b.downc)
+		b.pinAddr = ""
 		b.mu.Unlock()
 	}
 }
@@ -280,6 +290,8 @@ func (b *simpleBalancer) Get(ctx context.Context, opts grpc.BalancerGetOptions)
 		b.mu.RUnlock()
 		select {
 		case <-ch:
+		case <-b.donec:
+			return grpc.Address{Addr: ""}, nil, grpc.ErrClientConnClosing
 		case <-ctx.Done():
 			return grpc.Address{Addr: ""}, nil, ctx.Err()
 		}