소스 검색

Merge pull request #5509 from heyitsanthony/clientv3-fix-concurrent-close

clientv3: fix deadlock on Get with concurrent Close
Xiang Li 9 년 전
부모
커밋
9ed3b446ca
2개의 변경된 파일38개의 추가작업 그리고 15개의 파일을 삭제
  1. 31 10
      clientv3/integration/kv_test.go
  2. 7 5
      clientv3/remote_client.go

+ 31 - 10
clientv3/integration/kv_test.go

@@ -288,25 +288,19 @@ func TestKVGetErrConnClosed(t *testing.T) {
 	cli := clus.Client(0)
 	kv := clientv3.NewKV(cli)
 
-	closed, donec := make(chan struct{}), make(chan struct{})
+	donec := make(chan struct{})
 	go func() {
-		select {
-		case <-time.After(3 * time.Second):
-			t.Fatal("cli.Close took too long")
-		case <-closed:
-		}
-
-		if _, err := kv.Get(context.TODO(), "foo"); err != rpctypes.ErrConnClosed {
+		defer close(donec)
+		_, err := kv.Get(context.TODO(), "foo")
+		if err != nil && err != rpctypes.ErrConnClosed {
 			t.Fatalf("expected %v, got %v", rpctypes.ErrConnClosed, err)
 		}
-		close(donec)
 	}()
 
 	if err := cli.Close(); err != nil {
 		t.Fatal(err)
 	}
 	clus.TakeClient(0)
-	close(closed)
 
 	select {
 	case <-time.After(3 * time.Second):
@@ -315,6 +309,33 @@ func TestKVGetErrConnClosed(t *testing.T) {
 	}
 }
 
+func TestKVNewAfterClose(t *testing.T) {
+	defer testutil.AfterTest(t)
+
+	clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
+	defer clus.Terminate(t)
+
+	cli := clus.Client(0)
+	clus.TakeClient(0)
+	if err := cli.Close(); err != nil {
+		t.Fatal(err)
+	}
+
+	donec := make(chan struct{})
+	go func() {
+		kv := clientv3.NewKV(cli)
+		if _, err := kv.Get(context.TODO(), "foo"); err != rpctypes.ErrConnClosed {
+			t.Fatalf("expected %v, got %v", rpctypes.ErrConnClosed, err)
+		}
+		close(donec)
+	}()
+	select {
+	case <-time.After(3 * time.Second):
+		t.Fatal("kv.Get took too long")
+	case <-donec:
+	}
+}
+
 func TestKVDeleteRange(t *testing.T) {
 	defer testutil.AfterTest(t)
 

+ 7 - 5
clientv3/remote_client.go

@@ -80,21 +80,23 @@ func (r *remoteClient) tryUpdate() bool {
 	return true
 }
 
+// acquire gets the client read lock on an established connection or
+// returns an error without holding the lock.
 func (r *remoteClient) acquire(ctx context.Context) error {
 	for {
+		r.mu.Lock()
 		r.client.mu.RLock()
 		closed := r.client.cancel == nil
 		c := r.client.conn
-		r.mu.Lock()
 		match := r.conn == c
 		r.mu.Unlock()
-		if closed {
-			return rpctypes.ErrConnClosed
-		}
-		if match {
+		if c != nil && match {
 			return nil
 		}
 		r.client.mu.RUnlock()
+		if closed {
+			return rpctypes.ErrConnClosed
+		}
 		if err := r.reconnectWait(ctx, nil); err != nil {
 			return err
 		}