Browse Source

Merge pull request #7443 from fanminshi/fix_balancer_deadlock

clientv3: serialize updating notifych in balancer
Xiang Li 8 years ago
parent
commit
0a692b0524
2 changed files with 167 additions and 7 deletions
  1. 69 6
      clientv3/balancer.go
  2. 98 1
      clientv3/balancer_test.go

+ 69 - 6
clientv3/balancer.go

@@ -47,6 +47,15 @@ type simpleBalancer struct {
 	// upc closes when upEps transitions from empty to non-zero or the balancer closes.
 	// upc closes when upEps transitions from empty to non-zero or the balancer closes.
 	upc chan struct{}
 	upc chan struct{}
 
 
+	// downc closes when grpc calls down() on pinAddr
+	downc chan struct{}
+
+	// stopc is closed to signal updateNotifyLoop should stop.
+	stopc chan struct{}
+
+	// donec closes when all goroutines are exited
+	donec chan struct{}
+
 	// grpc issues TLS cert checks using the string passed into dial so
 	// grpc issues TLS cert checks using the string passed into dial so
 	// that string must be the host. To recover the full scheme://host URL,
 	// that string must be the host. To recover the full scheme://host URL,
 	// have a map from hosts to the original endpoint.
 	// have a map from hosts to the original endpoint.
@@ -71,8 +80,12 @@ func newSimpleBalancer(eps []string) *simpleBalancer {
 		notifyCh: notifyCh,
 		notifyCh: notifyCh,
 		readyc:   make(chan struct{}),
 		readyc:   make(chan struct{}),
 		upc:      make(chan struct{}),
 		upc:      make(chan struct{}),
+		stopc:    make(chan struct{}),
+		downc:    make(chan struct{}),
+		donec:    make(chan struct{}),
 		host2ep:  getHost2ep(eps),
 		host2ep:  getHost2ep(eps),
 	}
 	}
+	go sb.updateNotifyLoop()
 	return sb
 	return sb
 }
 }
 
 
@@ -131,6 +144,50 @@ func (b *simpleBalancer) updateAddrs(eps []string) {
 	}
 	}
 }
 }
 
 
+func (b *simpleBalancer) updateNotifyLoop() {
+	defer close(b.donec)
+
+	for {
+		b.mu.RLock()
+		upc := b.upc
+		b.mu.RUnlock()
+		var downc chan struct{}
+		select {
+		case <-upc:
+			var addr string
+			b.mu.RLock()
+			addr = b.pinAddr
+			// Up() sets pinAddr and downc as a pair under b.mu
+			downc = b.downc
+			b.mu.RUnlock()
+			if addr == "" {
+				break
+			}
+			// close opened connections that are not pinAddr
+			// this ensures only one connection is open per client
+			select {
+			case b.notifyCh <- []grpc.Address{{Addr: addr}}:
+			case <-b.stopc:
+				return
+			}
+		}
+		select {
+		case <-downc:
+			b.mu.RLock()
+			addrs := b.addrs
+			b.mu.RUnlock()
+			select {
+			case b.notifyCh <- addrs:
+			case <-b.stopc:
+				return
+			}
+		case <-b.stopc:
+			return
+		}
+
+	}
+}
+
 func (b *simpleBalancer) Up(addr grpc.Address) func(error) {
 func (b *simpleBalancer) Up(addr grpc.Address) func(error) {
 	b.mu.Lock()
 	b.mu.Lock()
 	defer b.mu.Unlock()
 	defer b.mu.Unlock()
@@ -145,20 +202,18 @@ func (b *simpleBalancer) Up(addr grpc.Address) func(error) {
 	if b.pinAddr == "" {
 	if b.pinAddr == "" {
 		// notify waiting Get()s and pin first connected address
 		// notify waiting Get()s and pin first connected address
 		close(b.upc)
 		close(b.upc)
+		b.downc = make(chan struct{})
 		b.pinAddr = addr.Addr
 		b.pinAddr = addr.Addr
 		// notify client that a connection is up
 		// notify client that a connection is up
 		b.readyOnce.Do(func() { close(b.readyc) })
 		b.readyOnce.Do(func() { close(b.readyc) })
-		// close opened connections that are not pinAddr
-		// this ensures only one connection is open per client
-		b.notifyCh <- []grpc.Address{addr}
 	}
 	}
 
 
 	return func(err error) {
 	return func(err error) {
 		b.mu.Lock()
 		b.mu.Lock()
 		if b.pinAddr == addr.Addr {
 		if b.pinAddr == addr.Addr {
 			b.upc = make(chan struct{})
 			b.upc = make(chan struct{})
+			close(b.downc)
 			b.pinAddr = ""
 			b.pinAddr = ""
-			b.notifyCh <- b.addrs
 		}
 		}
 		b.mu.Unlock()
 		b.mu.Unlock()
 	}
 	}
@@ -214,14 +269,15 @@ func (b *simpleBalancer) Notify() <-chan []grpc.Address { return b.notifyCh }
 
 
 func (b *simpleBalancer) Close() error {
 func (b *simpleBalancer) Close() error {
 	b.mu.Lock()
 	b.mu.Lock()
-	defer b.mu.Unlock()
 	// In case gRPC calls close twice. TODO: remove the checking
 	// In case gRPC calls close twice. TODO: remove the checking
 	// when we are sure that gRPC wont call close twice.
 	// when we are sure that gRPC wont call close twice.
 	if b.closed {
 	if b.closed {
+		b.mu.Unlock()
+		<-b.donec
 		return nil
 		return nil
 	}
 	}
 	b.closed = true
 	b.closed = true
-	close(b.notifyCh)
+	close(b.stopc)
 	b.pinAddr = ""
 	b.pinAddr = ""
 
 
 	// In the case of following scenario:
 	// In the case of following scenario:
@@ -236,6 +292,13 @@ func (b *simpleBalancer) Close() error {
 		// terminate all waiting Get()s
 		// terminate all waiting Get()s
 		close(b.upc)
 		close(b.upc)
 	}
 	}
+
+	b.mu.Unlock()
+
+	// wait for updateNotifyLoop to finish
+	<-b.donec
+	close(b.notifyCh)
+
 	return nil
 	return nil
 }
 }
 
 

+ 98 - 1
clientv3/balancer_test.go

@@ -16,9 +16,14 @@ package clientv3
 
 
 import (
 import (
 	"errors"
 	"errors"
+	"net"
+	"sync"
 	"testing"
 	"testing"
 	"time"
 	"time"
 
 
+	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
+	"github.com/coreos/etcd/pkg/testutil"
+
 	"golang.org/x/net/context"
 	"golang.org/x/net/context"
 	"google.golang.org/grpc"
 	"google.golang.org/grpc"
 )
 )
@@ -29,6 +34,7 @@ var (
 
 
 func TestBalancerGetUnblocking(t *testing.T) {
 func TestBalancerGetUnblocking(t *testing.T) {
 	sb := newSimpleBalancer(endpoints)
 	sb := newSimpleBalancer(endpoints)
+	defer sb.Close()
 	if addrs := <-sb.Notify(); len(addrs) != len(endpoints) {
 	if addrs := <-sb.Notify(); len(addrs) != len(endpoints) {
 		t.Errorf("Initialize newSimpleBalancer should have triggered Notify() chan, but it didn't")
 		t.Errorf("Initialize newSimpleBalancer should have triggered Notify() chan, but it didn't")
 	}
 	}
@@ -72,6 +78,7 @@ func TestBalancerGetUnblocking(t *testing.T) {
 
 
 func TestBalancerGetBlocking(t *testing.T) {
 func TestBalancerGetBlocking(t *testing.T) {
 	sb := newSimpleBalancer(endpoints)
 	sb := newSimpleBalancer(endpoints)
+	defer sb.Close()
 	if addrs := <-sb.Notify(); len(addrs) != len(endpoints) {
 	if addrs := <-sb.Notify(); len(addrs) != len(endpoints) {
 		t.Errorf("Initialize newSimpleBalancer should have triggered Notify() chan, but it didn't")
 		t.Errorf("Initialize newSimpleBalancer should have triggered Notify() chan, but it didn't")
 	}
 	}
@@ -88,10 +95,11 @@ func TestBalancerGetBlocking(t *testing.T) {
 	go func() {
 	go func() {
 		// ensure sb.Up() will be called after sb.Get() to see if Up() releases blocking Get()
 		// ensure sb.Up() will be called after sb.Get() to see if Up() releases blocking Get()
 		time.Sleep(time.Millisecond * 100)
 		time.Sleep(time.Millisecond * 100)
-		downC <- sb.Up(grpc.Address{Addr: endpoints[1]})
+		f := sb.Up(grpc.Address{Addr: endpoints[1]})
 		if addrs := <-sb.Notify(); len(addrs) != 1 {
 		if addrs := <-sb.Notify(); len(addrs) != 1 {
 			t.Errorf("first Up() should have triggered balancer to send the first connected address via Notify chan so that other connections can be closed")
 			t.Errorf("first Up() should have triggered balancer to send the first connected address via Notify chan so that other connections can be closed")
 		}
 		}
+		downC <- f
 	}()
 	}()
 	addrFirst, putFun, err := sb.Get(context.Background(), blockingOpts)
 	addrFirst, putFun, err := sb.Get(context.Background(), blockingOpts)
 	if err != nil {
 	if err != nil {
@@ -122,3 +130,92 @@ func TestBalancerGetBlocking(t *testing.T) {
 		t.Errorf("Get() with no up endpoints should timeout, got %v", err)
 		t.Errorf("Get() with no up endpoints should timeout, got %v", err)
 	}
 	}
 }
 }
+
+// TestBalancerDoNotBlockOnClose ensures that balancer and grpc don't deadlock each other
+// due to rapid open/close conn. The deadlock causes balancer.Close() to block forever.
+// See issue: https://github.com/coreos/etcd/issues/7283 for more detail.
+func TestBalancerDoNotBlockOnClose(t *testing.T) {
+	defer testutil.AfterTest(t)
+
+	kcl := newKillConnListener(t, 3)
+	defer kcl.close()
+
+	for i := 0; i < 5; i++ {
+		sb := newSimpleBalancer(kcl.endpoints())
+		conn, err := grpc.Dial("", grpc.WithInsecure(), grpc.WithBalancer(sb))
+		if err != nil {
+			t.Fatal(err)
+		}
+		kvc := pb.NewKVClient(conn)
+		<-sb.readyc
+		for j := 0; j < 100; j++ {
+			go kvc.Range(context.TODO(), &pb.RangeRequest{}, grpc.FailFast(false))
+		}
+		// balancer.Close() might block
+		// if balancer and grpc deadlock each other.
+		closec := make(chan struct{})
+		go func() {
+			defer close(closec)
+			sb.Close()
+		}()
+		go conn.Close()
+		select {
+		case <-closec:
+		case <-time.After(3 * time.Second):
+			testutil.FatalStack(t, "balancer close timeout")
+		}
+	}
+}
+
+// killConnListener listens incoming conn and kills it immediately.
+type killConnListener struct {
+	wg    sync.WaitGroup
+	eps   []string
+	stopc chan struct{}
+	t     *testing.T
+}
+
+func newKillConnListener(t *testing.T, size int) *killConnListener {
+	kcl := &killConnListener{stopc: make(chan struct{}), t: t}
+
+	for i := 0; i < size; i++ {
+		ln, err := net.Listen("tcp", ":0")
+		if err != nil {
+			t.Fatal(err)
+		}
+		kcl.eps = append(kcl.eps, ln.Addr().String())
+		kcl.wg.Add(1)
+		go kcl.listen(ln)
+	}
+	return kcl
+}
+
+func (kcl *killConnListener) endpoints() []string {
+	return kcl.eps
+}
+
+func (kcl *killConnListener) listen(l net.Listener) {
+	go func() {
+		defer kcl.wg.Done()
+		for {
+			conn, err := l.Accept()
+			select {
+			case <-kcl.stopc:
+				return
+			default:
+			}
+			if err != nil {
+				kcl.t.Fatal(err)
+			}
+			time.Sleep(1 * time.Millisecond)
+			conn.Close()
+		}
+	}()
+	<-kcl.stopc
+	l.Close()
+}
+
+func (kcl *killConnListener) close() {
+	close(kcl.stopc)
+	kcl.wg.Wait()
+}