Browse Source

clientv3: check if KV.Client is closed

For https://github.com/coreos/etcd/issues/5495.
Gyu-Ho Lee 9 năm trước cách đây
mục cha
commit
7b5657cf1a
3 tập tin đã thay đổi với 43 bổ sung1 xóa
  1. 1 1
      clientv3/client.go
  2. 36 0
      clientv3/integration/kv_test.go
  3. 6 0
      clientv3/remote_client.go

+ 1 - 1
clientv3/client.go

@@ -384,5 +384,5 @@ func dialEndpointList(c *Client) (*grpc.ClientConn, error) {
 // progress can be made, even after reconnecting.
 func isHaltErr(ctx context.Context, err error) bool {
 	isRPCError := strings.HasPrefix(grpc.ErrorDesc(err), "etcdserver: ")
-	return isRPCError || ctx.Err() != nil
+	return isRPCError || ctx.Err() != nil || err == rpctypes.ErrConnClosed
 }

+ 36 - 0
clientv3/integration/kv_test.go

@@ -279,6 +279,42 @@ func TestKVRange(t *testing.T) {
 	}
 }
 
+func TestKVGetErrConnClosed(t *testing.T) {
+	defer testutil.AfterTest(t)
+
+	clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
+	defer clus.Terminate(t)
+
+	cli := clus.Client(0)
+	kv := clientv3.NewKV(cli)
+
+	closed, donec := make(chan struct{}), make(chan struct{})
+	go func() {
+		select {
+		case <-time.After(3 * time.Second):
+			t.Fatal("cli.Close took too long")
+		case <-closed:
+		}
+
+		if _, err := kv.Get(context.TODO(), "foo"); err != rpctypes.ErrConnClosed {
+			t.Fatalf("expected %v, got %v", rpctypes.ErrConnClosed, err)
+		}
+		close(donec)
+	}()
+
+	if err := cli.Close(); err != nil {
+		t.Fatal(err)
+	}
+	clus.TakeClient(0)
+	close(closed)
+
+	select {
+	case <-time.After(3 * time.Second):
+		t.Fatal("kv.Get took too long")
+	case <-donec:
+	}
+}
+
 func TestKVDeleteRange(t *testing.T) {
 	defer testutil.AfterTest(t)
 

+ 6 - 0
clientv3/remote_client.go

@@ -17,6 +17,8 @@ package clientv3
 import (
 	"sync"
 
+	"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
+
 	"golang.org/x/net/context"
 	"google.golang.org/grpc"
 )
@@ -81,10 +83,14 @@ func (r *remoteClient) tryUpdate() bool {
 func (r *remoteClient) acquire(ctx context.Context) error {
 	for {
 		r.client.mu.RLock()
+		closed := r.client.cancel == nil
 		c := r.client.conn
 		r.mu.Lock()
 		match := r.conn == c
 		r.mu.Unlock()
+		if closed {
+			return rpctypes.ErrConnClosed
+		}
 		if match {
 			return nil
 		}