Procházet zdrojové kódy

Merge pull request #7842 from heyitsanthony/fix-switch-race

clientv3: don't race on upc/downc/switch endpoints in balancer
Anthony Romano před 8 roky
rodič
revize
9fee35b02d
2 změnil soubory, kde provedl 57 přidání a 38 odebrání
  1. 49 37
      clientv3/balancer.go
  2. 8 1
      integration/bridge.go

+ 49 - 37
clientv3/balancer.go

@@ -77,7 +77,6 @@ func newSimpleBalancer(eps []string) *simpleBalancer {
 	for i := range eps {
 		addrs[i].Addr = getHost(eps[i])
 	}
-	notifyCh <- addrs
 	sb := &simpleBalancer{
 		addrs:        addrs,
 		notifyCh:     notifyCh,
@@ -89,6 +88,7 @@ func newSimpleBalancer(eps []string) *simpleBalancer {
 		updateAddrsC: make(chan struct{}, 1),
 		host2ep:      getHost2ep(eps),
 	}
+	close(sb.downc)
 	go sb.updateNotifyLoop()
 	return sb
 }
@@ -170,38 +170,51 @@ func (b *simpleBalancer) updateNotifyLoop() {
 
 	for {
 		b.mu.RLock()
-		upc := b.upc
+		upc, downc, addr := b.upc, b.downc, b.pinAddr
 		b.mu.RUnlock()
-		var downc chan struct{}
+		// downc or upc should be closed
+		select {
+		case <-downc:
+			downc = nil
+		default:
+		}
 		select {
 		case <-upc:
-			var addr string
-			b.mu.RLock()
-			addr = b.pinAddr
-			// Up() sets pinAddr and downc as a pair under b.mu
-			downc = b.downc
-			b.mu.RUnlock()
-			if addr == "" {
-				break
+			upc = nil
+		default:
+		}
+		switch {
+		case downc == nil && upc == nil:
+			// stale
+			select {
+			case <-b.stopc:
+				return
+			default:
 			}
-			// close opened connections that are not pinAddr
-			// this ensures only one connection is open per client
+		case downc == nil:
+			b.notifyAddrs()
 			select {
+			case <-upc:
+			case <-b.updateAddrsC:
+				b.notifyAddrs()
+			case <-b.stopc:
+				return
+			}
+		case upc == nil:
+			select {
+			// close connections that are not the pinned address
 			case b.notifyCh <- []grpc.Address{{Addr: addr}}:
+			case <-downc:
+			case <-b.stopc:
+				return
+			}
+			select {
+			case <-downc:
+			case <-b.updateAddrsC:
 			case <-b.stopc:
 				return
 			}
-		case <-b.updateAddrsC:
-			b.notifyAddrs()
-			continue
-		}
-		select {
-		case <-downc:
-			b.notifyAddrs()
-		case <-b.updateAddrsC:
 			b.notifyAddrs()
-		case <-b.stopc:
-			return
 		}
 	}
 }
@@ -231,23 +244,20 @@ func (b *simpleBalancer) Up(addr grpc.Address) func(error) {
 	if !hasAddr(b.addrs, addr.Addr) {
 		return func(err error) {}
 	}
-
-	if b.pinAddr == "" {
-		// notify waiting Get()s and pin first connected address
-		close(b.upc)
-		b.downc = make(chan struct{})
-		b.pinAddr = addr.Addr
-		// notify client that a connection is up
-		b.readyOnce.Do(func() { close(b.readyc) })
+	if b.pinAddr != "" {
+		return func(err error) {}
 	}
-
+	// notify waiting Get()s and pin first connected address
+	close(b.upc)
+	b.downc = make(chan struct{})
+	b.pinAddr = addr.Addr
+	// notify client that a connection is up
+	b.readyOnce.Do(func() { close(b.readyc) })
 	return func(err error) {
 		b.mu.Lock()
-		if b.pinAddr == addr.Addr {
-			b.upc = make(chan struct{})
-			close(b.downc)
-			b.pinAddr = ""
-		}
+		b.upc = make(chan struct{})
+		close(b.downc)
+		b.pinAddr = ""
 		b.mu.Unlock()
 	}
 }
@@ -280,6 +290,8 @@ func (b *simpleBalancer) Get(ctx context.Context, opts grpc.BalancerGetOptions)
 		b.mu.RUnlock()
 		select {
 		case <-ch:
+		case <-b.donec:
+			return grpc.Address{Addr: ""}, nil, grpc.ErrClientConnClosing
 		case <-ctx.Done():
 			return grpc.Address{Addr: ""}, nil, ctx.Err()
 		}

+ 8 - 1
integration/bridge.go

@@ -119,6 +119,7 @@ func (b *bridge) serveListen() {
 		b.mu.Unlock()
 		select {
 		case <-b.stopc:
+			inc.Close()
 			return
 		case <-pausec:
 		}
@@ -152,10 +153,12 @@ func (b *bridge) serveConn(bc *bridgeConn) {
 	wg.Add(2)
 	go func() {
 		io.Copy(bc.out, bc.in)
+		bc.close()
 		wg.Done()
 	}()
 	go func() {
 		io.Copy(bc.in, bc.out)
+		bc.close()
 		wg.Done()
 	}()
 	wg.Wait()
@@ -168,7 +171,11 @@ type bridgeConn struct {
 }
 
 func (bc *bridgeConn) Close() {
+	bc.close()
+	<-bc.donec
+}
+
+func (bc *bridgeConn) close() {
 	bc.in.Close()
 	bc.out.Close()
-	<-bc.donec
 }