Browse Source

Merge pull request #8422 from heyitsanthony/close-leasing

leasing, integration, etcdmain: closer function for leasing kv
Anthony Romano 8 years ago
parent
commit
3e32cd3877
3 changed files with 195 additions and 191 deletions
  1. 169 184
      clientv3/integration/leasing_test.go
  2. 25 6
      clientv3/leasing/kv.go
  3. 1 1
      etcdmain/grpc_proxy.go

+ 169 - 184
clientv3/integration/leasing_test.go

@@ -35,9 +35,17 @@ func TestLeasingPutGet(t *testing.T) {
 	clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
 	defer clus.Terminate(t)
 
-	lKV1, err := leasing.NewKV(clus.Client(0), "foo/")
-	lKV2, err := leasing.NewKV(clus.Client(1), "foo/")
-	lKV3, err := leasing.NewKV(clus.Client(2), "foo/")
+	lKV1, closeLKV1, err := leasing.NewKV(clus.Client(0), "foo/")
+	testutil.AssertNil(t, err)
+	defer closeLKV1()
+
+	lKV2, closeLKV2, err := leasing.NewKV(clus.Client(1), "foo/")
+	testutil.AssertNil(t, err)
+	defer closeLKV2()
+
+	lKV3, closeLKV3, err := leasing.NewKV(clus.Client(2), "foo/")
+	testutil.AssertNil(t, err)
+	defer closeLKV3()
 
 	resp, err := lKV1.Get(context.TODO(), "abc")
 	if err != nil {
@@ -85,10 +93,9 @@ func TestLeasingInterval(t *testing.T) {
 	clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
 	defer clus.Terminate(t)
 
-	lkv, err := leasing.NewKV(clus.Client(0), "pfx/")
-	if err != nil {
-		t.Fatal(err)
-	}
+	lkv, closeLKV, err := leasing.NewKV(clus.Client(0), "pfx/")
+	testutil.AssertNil(t, err)
+	defer closeLKV()
 
 	keys := []string{"abc/a", "abc/b", "abc/a/a"}
 	for _, k := range keys {
@@ -125,10 +132,9 @@ func TestLeasingPutInvalidateNew(t *testing.T) {
 	clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
 	defer clus.Terminate(t)
 
-	lkv, err := leasing.NewKV(clus.Client(0), "pfx/")
-	if err != nil {
-		t.Fatal(err)
-	}
+	lkv, closeLKV, err := leasing.NewKV(clus.Client(0), "pfx/")
+	testutil.AssertNil(t, err)
+	defer closeLKV()
 
 	if _, err = lkv.Get(context.TODO(), "k"); err != nil {
 		t.Fatal(err)
@@ -160,10 +166,9 @@ func TestLeasingPutInvalidatExisting(t *testing.T) {
 		t.Fatal(err)
 	}
 
-	lkv, err := leasing.NewKV(clus.Client(0), "pfx/")
-	if err != nil {
-		t.Fatal(err)
-	}
+	lkv, closeLKV, err := leasing.NewKV(clus.Client(0), "pfx/")
+	testutil.AssertNil(t, err)
+	defer closeLKV()
 
 	if _, err = lkv.Get(context.TODO(), "k"); err != nil {
 		t.Fatal(err)
@@ -192,10 +197,9 @@ func TestLeasingGetSerializable(t *testing.T) {
 	clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 2})
 	defer clus.Terminate(t)
 
-	lkv, err := leasing.NewKV(clus.Client(0), "pfx/")
-	if err != nil {
-		t.Fatal(err)
-	}
+	lkv, closeLKV, err := leasing.NewKV(clus.Client(0), "pfx/")
+	testutil.AssertNil(t, err)
+	defer closeLKV()
 
 	if _, err = clus.Client(0).Put(context.TODO(), "cached", "abc"); err != nil {
 		t.Fatal(err)
@@ -233,10 +237,10 @@ func TestLeasingPrevKey(t *testing.T) {
 	clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 2})
 	defer clus.Terminate(t)
 
-	lkv, err := leasing.NewKV(clus.Client(0), "pfx/")
-	if err != nil {
-		t.Fatal(err)
-	}
+	lkv, closeLKV, err := leasing.NewKV(clus.Client(0), "pfx/")
+	testutil.AssertNil(t, err)
+	defer closeLKV()
+
 	if _, err = clus.Client(0).Put(context.TODO(), "k", "abc"); err != nil {
 		t.Fatal(err)
 	}
@@ -260,10 +264,9 @@ func TestLeasingRevGet(t *testing.T) {
 	clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
 	defer clus.Terminate(t)
 
-	lkv, err := leasing.NewKV(clus.Client(0), "pfx/")
-	if err != nil {
-		t.Fatal(err)
-	}
+	lkv, closeLKV, err := leasing.NewKV(clus.Client(0), "pfx/")
+	testutil.AssertNil(t, err)
+	defer closeLKV()
 
 	putResp, err := clus.Client(0).Put(context.TODO(), "k", "abc")
 	if err != nil {
@@ -297,10 +300,10 @@ func TestLeasingGetWithOpts(t *testing.T) {
 	clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
 	defer clus.Terminate(t)
 
-	lkv, err := leasing.NewKV(clus.Client(0), "pfx/")
-	if err != nil {
-		t.Fatal(err)
-	}
+	lkv, closeLKV, err := leasing.NewKV(clus.Client(0), "pfx/")
+	testutil.AssertNil(t, err)
+	defer closeLKV()
+
 	if _, err = clus.Client(0).Put(context.TODO(), "k", "abc"); err != nil {
 		t.Fatal(err)
 	}
@@ -342,10 +345,10 @@ func TestLeasingConcurrentPut(t *testing.T) {
 	clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
 	defer clus.Terminate(t)
 
-	lkv, err := leasing.NewKV(clus.Client(0), "pfx/")
-	if err != nil {
-		t.Fatal(err)
-	}
+	lkv, closeLKV, err := leasing.NewKV(clus.Client(0), "pfx/")
+	testutil.AssertNil(t, err)
+	defer closeLKV()
+
 	// force key into leasing key cache
 	if _, err = lkv.Get(context.TODO(), "k"); err != nil {
 		t.Fatal(err)
@@ -389,10 +392,10 @@ func TestLeasingDisconnectedGet(t *testing.T) {
 	clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
 	defer clus.Terminate(t)
 
-	lkv, err := leasing.NewKV(clus.Client(0), "pfx/")
-	if err != nil {
-		t.Fatal(err)
-	}
+	lkv, closeLKV, err := leasing.NewKV(clus.Client(0), "pfx/")
+	testutil.AssertNil(t, err)
+	defer closeLKV()
+
 	if _, err = clus.Client(0).Put(context.TODO(), "cached", "abc"); err != nil {
 		t.Fatal(err)
 	}
@@ -418,10 +421,10 @@ func TestLeasingDeleteOwner(t *testing.T) {
 	clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
 	defer clus.Terminate(t)
 
-	lkv, err := leasing.NewKV(clus.Client(0), "pfx/")
-	if err != nil {
-		t.Fatal(err)
-	}
+	lkv, closeLKV, err := leasing.NewKV(clus.Client(0), "pfx/")
+	testutil.AssertNil(t, err)
+	defer closeLKV()
+
 	if _, err = clus.Client(0).Put(context.TODO(), "k", "abc"); err != nil {
 		t.Fatal(err)
 	}
@@ -452,14 +455,13 @@ func TestLeasingDeleteNonOwner(t *testing.T) {
 	clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
 	defer clus.Terminate(t)
 
-	lkv1, err := leasing.NewKV(clus.Client(0), "pfx/")
-	if err != nil {
-		t.Fatal(err)
-	}
-	lkv2, err := leasing.NewKV(clus.Client(0), "pfx/")
-	if err != nil {
-		t.Fatal(err)
-	}
+	lkv1, closeLKV1, err := leasing.NewKV(clus.Client(0), "pfx/")
+	testutil.AssertNil(t, err)
+	defer closeLKV1()
+
+	lkv2, closeLKV2, err := leasing.NewKV(clus.Client(0), "pfx/")
+	testutil.AssertNil(t, err)
+	defer closeLKV2()
 
 	if _, err = clus.Client(0).Put(context.TODO(), "k", "abc"); err != nil {
 		t.Fatal(err)
@@ -488,10 +490,10 @@ func TestLeasingOverwriteResponse(t *testing.T) {
 	clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
 	defer clus.Terminate(t)
 
-	lkv, err := leasing.NewKV(clus.Client(0), "pfx/")
-	if err != nil {
-		t.Fatal(err)
-	}
+	lkv, closeLKV, err := leasing.NewKV(clus.Client(0), "pfx/")
+	testutil.AssertNil(t, err)
+	defer closeLKV()
+
 	if _, err = clus.Client(0).Put(context.TODO(), "k", "abc"); err != nil {
 		t.Fatal(err)
 	}
@@ -522,10 +524,10 @@ func TestLeasingOwnerPutResponse(t *testing.T) {
 	clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
 	defer clus.Terminate(t)
 
-	lkv, err := leasing.NewKV(clus.Client(0), "pfx/")
-	if err != nil {
-		t.Fatal(err)
-	}
+	lkv, closeLKV, err := leasing.NewKV(clus.Client(0), "pfx/")
+	testutil.AssertNil(t, err)
+	defer closeLKV()
+
 	if _, err = clus.Client(0).Put(context.TODO(), "k", "abc"); err != nil {
 		t.Fatal(err)
 	}
@@ -560,10 +562,9 @@ func TestLeasingTxnOwnerGetRange(t *testing.T) {
 	clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
 	defer clus.Terminate(t)
 
-	lkv, err := leasing.NewKV(clus.Client(0), "pfx/")
-	if err != nil {
-		t.Fatal(err)
-	}
+	lkv, closeLKV, err := leasing.NewKV(clus.Client(0), "pfx/")
+	testutil.AssertNil(t, err)
+	defer closeLKV()
 
 	keyCount := rand.Intn(10) + 1
 	for i := 0; i < keyCount; i++ {
@@ -590,10 +591,9 @@ func TestLeasingTxnOwnerGet(t *testing.T) {
 	clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
 	defer clus.Terminate(t)
 
-	lkv, err := leasing.NewKV(clus.Client(0), "pfx/")
-	if err != nil {
-		t.Fatal(err)
-	}
+	lkv, closeLKV, err := leasing.NewKV(clus.Client(0), "pfx/")
+	testutil.AssertNil(t, err)
+	defer closeLKV()
 
 	keyCount := rand.Intn(10) + 1
 	var ops []clientv3.Op
@@ -663,10 +663,10 @@ func TestLeasingTxnOwnerIf(t *testing.T) {
 	clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
 	defer clus.Terminate(t)
 
-	lkv, err := leasing.NewKV(clus.Client(0), "pfx/")
-	if err != nil {
-		t.Fatal(err)
-	}
+	lkv, closeLKV, err := leasing.NewKV(clus.Client(0), "pfx/")
+	testutil.AssertNil(t, err)
+	defer closeLKV()
+
 	if _, err = clus.Client(0).Put(context.TODO(), "k", "abc"); err != nil {
 		t.Fatal(err)
 	}
@@ -757,14 +757,13 @@ func TestLeasingTxnCancel(t *testing.T) {
 	clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
 	defer clus.Terminate(t)
 
-	lkv1, err := leasing.NewKV(clus.Client(0), "pfx/")
-	if err != nil {
-		t.Fatal(err)
-	}
-	lkv2, err := leasing.NewKV(clus.Client(1), "pfx/")
-	if err != nil {
-		t.Fatal(err)
-	}
+	lkv1, closeLKV1, err := leasing.NewKV(clus.Client(0), "pfx/")
+	testutil.AssertNil(t, err)
+	defer closeLKV1()
+
+	lkv2, closeLKV2, err := leasing.NewKV(clus.Client(1), "pfx/")
+	testutil.AssertNil(t, err)
+	defer closeLKV2()
 
 	// acquire lease but disconnect so no revoke in time
 	if _, err = lkv1.Get(context.TODO(), "k"); err != nil {
@@ -792,11 +791,13 @@ func TestLeasingTxnNonOwnerPut(t *testing.T) {
 	clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
 	defer clus.Terminate(t)
 
-	lkv, err := leasing.NewKV(clus.Client(0), "pfx/")
-	if err != nil {
-		t.Fatal(err)
-	}
-	lkv2, err := leasing.NewKV(clus.Client(0), "pfx/")
+	lkv, closeLKV, err := leasing.NewKV(clus.Client(0), "pfx/")
+	testutil.AssertNil(t, err)
+	defer closeLKV()
+
+	lkv2, closeLKV2, err := leasing.NewKV(clus.Client(0), "pfx/")
+	testutil.AssertNil(t, err)
+	defer closeLKV2()
 
 	if _, err = clus.Client(0).Put(context.TODO(), "k", "abc"); err != nil {
 		t.Fatal(err)
@@ -867,14 +868,13 @@ func TestLeasingTxnRandIfThenOrElse(t *testing.T) {
 	clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
 	defer clus.Terminate(t)
 
-	lkv1, err1 := leasing.NewKV(clus.Client(0), "pfx/")
-	if err1 != nil {
-		t.Fatal(err1)
-	}
-	lkv2, err2 := leasing.NewKV(clus.Client(0), "pfx/")
-	if err2 != nil {
-		t.Fatal(err2)
-	}
+	lkv1, closeLKV1, err1 := leasing.NewKV(clus.Client(0), "pfx/")
+	testutil.AssertNil(t, err1)
+	defer closeLKV1()
+
+	lkv2, closeLKV2, err2 := leasing.NewKV(clus.Client(0), "pfx/")
+	testutil.AssertNil(t, err2)
+	defer closeLKV2()
 
 	keyCount := 16
 	dat := make([]*clientv3.PutResponse, keyCount)
@@ -974,10 +974,10 @@ func TestLeasingOwnerPutError(t *testing.T) {
 	clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
 	defer clus.Terminate(t)
 
-	lkv, err := leasing.NewKV(clus.Client(0), "pfx/")
-	if err != nil {
-		t.Fatal(err)
-	}
+	lkv, closeLKV, err := leasing.NewKV(clus.Client(0), "pfx/")
+	testutil.AssertNil(t, err)
+	defer closeLKV()
+
 	if _, err = lkv.Get(context.TODO(), "k"); err != nil {
 		t.Fatal(err)
 	}
@@ -995,10 +995,10 @@ func TestLeasingOwnerDeleteError(t *testing.T) {
 	clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
 	defer clus.Terminate(t)
 
-	lkv, err := leasing.NewKV(clus.Client(0), "pfx/")
-	if err != nil {
-		t.Fatal(err)
-	}
+	lkv, closeLKV, err := leasing.NewKV(clus.Client(0), "pfx/")
+	testutil.AssertNil(t, err)
+	defer closeLKV()
+
 	if _, err = lkv.Get(context.TODO(), "k"); err != nil {
 		t.Fatal(err)
 	}
@@ -1016,10 +1016,9 @@ func TestLeasingNonOwnerPutError(t *testing.T) {
 	clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
 	defer clus.Terminate(t)
 
-	lkv, err := leasing.NewKV(clus.Client(0), "pfx/")
-	if err != nil {
-		t.Fatal(err)
-	}
+	lkv, closeLKV, err := leasing.NewKV(clus.Client(0), "pfx/")
+	testutil.AssertNil(t, err)
+	defer closeLKV()
 
 	clus.Members[0].Stop(t)
 	ctx, cancel := context.WithTimeout(context.TODO(), 100*time.Millisecond)
@@ -1042,10 +1041,9 @@ func testLeasingOwnerDelete(t *testing.T, del clientv3.Op) {
 	clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
 	defer clus.Terminate(t)
 
-	lkv, err := leasing.NewKV(clus.Client(0), "0/")
-	if err != nil {
-		t.Fatal(err)
-	}
+	lkv, closeLKV, err := leasing.NewKV(clus.Client(0), "0/")
+	testutil.AssertNil(t, err)
+	defer closeLKV()
 
 	for i := 0; i < 8; i++ {
 		if _, err = clus.Client(0).Put(context.TODO(), fmt.Sprintf("key/%d", i), "123"); err != nil {
@@ -1092,15 +1090,13 @@ func TestLeasingDeleteRangeBounds(t *testing.T) {
 	clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
 	defer clus.Terminate(t)
 
-	delkv, err := leasing.NewKV(clus.Client(0), "0/")
-	if err != nil {
-		t.Fatal(err)
-	}
+	delkv, closeDelKV, err := leasing.NewKV(clus.Client(0), "0/")
+	testutil.AssertNil(t, err)
+	defer closeDelKV()
 
-	getkv, err := leasing.NewKV(clus.Client(0), "0/")
-	if err != nil {
-		t.Fatal(err)
-	}
+	getkv, closeGetKv, err := leasing.NewKV(clus.Client(0), "0/")
+	testutil.AssertNil(t, err)
+	defer closeGetKv()
 
 	for _, k := range []string{"j", "m"} {
 		if _, err = clus.Client(0).Put(context.TODO(), k, "123"); err != nil {
@@ -1152,14 +1148,13 @@ func testLeasingDeleteRangeContend(t *testing.T, op clientv3.Op) {
 	clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
 	defer clus.Terminate(t)
 
-	delkv, err := leasing.NewKV(clus.Client(0), "0/")
-	if err != nil {
-		t.Fatal(err)
-	}
-	putkv, err := leasing.NewKV(clus.Client(0), "0/")
-	if err != nil {
-		t.Fatal(err)
-	}
+	delkv, closeDelKV, err := leasing.NewKV(clus.Client(0), "0/")
+	testutil.AssertNil(t, err)
+	defer closeDelKV()
+
+	putkv, closePutKV, err := leasing.NewKV(clus.Client(0), "0/")
+	testutil.AssertNil(t, err)
+	defer closePutKV()
 
 	for i := 0; i < 8; i++ {
 		key := fmt.Sprintf("key/%d", i)
@@ -1213,10 +1208,9 @@ func TestLeasingPutGetDeleteConcurrent(t *testing.T) {
 
 	lkvs := make([]clientv3.KV, 16)
 	for i := range lkvs {
-		lkv, err := leasing.NewKV(clus.Client(0), "pfx/")
-		if err != nil {
-			t.Fatal(err)
-		}
+		lkv, closeLKV, err := leasing.NewKV(clus.Client(0), "pfx/")
+		testutil.AssertNil(t, err)
+		defer closeLKV()
 		lkvs[i] = lkv
 	}
 
@@ -1271,14 +1265,13 @@ func TestLeasingReconnectOwnerRevoke(t *testing.T) {
 	clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
 	defer clus.Terminate(t)
 
-	lkv1, err1 := leasing.NewKV(clus.Client(0), "foo/")
-	if err1 != nil {
-		t.Fatal(err1)
-	}
-	lkv2, err2 := leasing.NewKV(clus.Client(1), "foo/")
-	if err2 != nil {
-		t.Fatal(err2)
-	}
+	lkv1, closeLKV1, err1 := leasing.NewKV(clus.Client(0), "foo/")
+	testutil.AssertNil(t, err1)
+	defer closeLKV1()
+
+	lkv2, closeLKV2, err2 := leasing.NewKV(clus.Client(1), "foo/")
+	testutil.AssertNil(t, err2)
+	defer closeLKV2()
 
 	if _, err := lkv1.Get(context.TODO(), "k"); err != nil {
 		t.Fatal(err)
@@ -1332,14 +1325,13 @@ func TestLeasingReconnectOwnerRevokeCompact(t *testing.T) {
 	clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
 	defer clus.Terminate(t)
 
-	lkv1, err1 := leasing.NewKV(clus.Client(0), "foo/")
-	if err1 != nil {
-		t.Fatal(err1)
-	}
-	lkv2, err2 := leasing.NewKV(clus.Client(1), "foo/")
-	if err2 != nil {
-		t.Fatal(err2)
-	}
+	lkv1, closeLKV1, err1 := leasing.NewKV(clus.Client(0), "foo/")
+	testutil.AssertNil(t, err1)
+	defer closeLKV1()
+
+	lkv2, closeLKV2, err2 := leasing.NewKV(clus.Client(1), "foo/")
+	testutil.AssertNil(t, err2)
+	defer closeLKV2()
 
 	if _, err := lkv1.Get(context.TODO(), "k"); err != nil {
 		t.Fatal(err)
@@ -1386,10 +1378,9 @@ func TestLeasingReconnectOwnerConsistency(t *testing.T) {
 	clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
 	defer clus.Terminate(t)
 
-	lkv, err := leasing.NewKV(clus.Client(0), "foo/")
-	if err != nil {
-		t.Fatal(err)
-	}
+	lkv, closeLKV, err := leasing.NewKV(clus.Client(0), "foo/")
+	defer closeLKV()
+	testutil.AssertNil(t, err)
 
 	if _, err = lkv.Put(context.TODO(), "k", "x"); err != nil {
 		t.Fatal(err)
@@ -1461,10 +1452,9 @@ func TestLeasingTxnAtomicCache(t *testing.T) {
 	clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
 	defer clus.Terminate(t)
 
-	lkv, err := leasing.NewKV(clus.Client(0), "foo/")
-	if err != nil {
-		t.Fatal(err)
-	}
+	lkv, closeLKV, err := leasing.NewKV(clus.Client(0), "foo/")
+	testutil.AssertNil(t, err)
+	defer closeLKV()
 
 	puts, gets := make([]clientv3.Op, 16), make([]clientv3.Op, 16)
 	for i := range puts {
@@ -1539,10 +1529,10 @@ func TestLeasingReconnectTxn(t *testing.T) {
 	clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
 	defer clus.Terminate(t)
 
-	lkv, err := leasing.NewKV(clus.Client(0), "foo/")
-	if err != nil {
-		t.Fatal(err)
-	}
+	lkv, closeLKV, err := leasing.NewKV(clus.Client(0), "foo/")
+	testutil.AssertNil(t, err)
+	defer closeLKV()
+
 	if _, err = lkv.Get(context.TODO(), "k"); err != nil {
 		t.Fatal(err)
 	}
@@ -1574,10 +1564,9 @@ func TestLeasingReconnectNonOwnerGet(t *testing.T) {
 	clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
 	defer clus.Terminate(t)
 
-	lkv, err := leasing.NewKV(clus.Client(0), "foo/")
-	if err != nil {
-		t.Fatal(err)
-	}
+	lkv, closeLKV, err := leasing.NewKV(clus.Client(0), "foo/")
+	testutil.AssertNil(t, err)
+	defer closeLKV()
 
 	// populate a few keys so some leasing gets have keys
 	for i := 0; i < 4; i++ {
@@ -1626,10 +1615,9 @@ func TestLeasingTxnRangeCmp(t *testing.T) {
 	clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
 	defer clus.Terminate(t)
 
-	lkv, err := leasing.NewKV(clus.Client(0), "foo/")
-	if err != nil {
-		t.Fatal(err)
-	}
+	lkv, closeLKV, err := leasing.NewKV(clus.Client(0), "foo/")
+	testutil.AssertNil(t, err)
+	defer closeLKV()
 
 	if _, err = clus.Client(0).Put(context.TODO(), "k", "a"); err != nil {
 		t.Fatal(err)
@@ -1662,10 +1650,9 @@ func TestLeasingDo(t *testing.T) {
 	clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
 	defer clus.Terminate(t)
 
-	lkv, err := leasing.NewKV(clus.Client(0), "foo/")
-	if err != nil {
-		t.Fatal(err)
-	}
+	lkv, closeLKV, err := leasing.NewKV(clus.Client(0), "foo/")
+	testutil.AssertNil(t, err)
+	defer closeLKV()
 
 	ops := []clientv3.Op{
 		clientv3.OpTxn(nil, nil, nil),
@@ -1705,10 +1692,9 @@ func TestLeasingTxnOwnerPutBranch(t *testing.T) {
 	clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
 	defer clus.Terminate(t)
 
-	lkv, err := leasing.NewKV(clus.Client(0), "foo/")
-	if err != nil {
-		t.Fatal(err)
-	}
+	lkv, closeLKV, err := leasing.NewKV(clus.Client(0), "foo/")
+	testutil.AssertNil(t, err)
+	defer closeLKV()
 
 	n := 0
 	treeOp := makePutTreeOp("tree", &n, 4)
@@ -1800,14 +1786,13 @@ func TestLeasingSessionExpire(t *testing.T) {
 	clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
 	defer clus.Terminate(t)
 
-	lkv, err := leasing.NewKV(clus.Client(0), "foo/", concurrency.WithTTL(1))
-	if err != nil {
-		t.Fatal(err)
-	}
-	lkv2, err := leasing.NewKV(clus.Client(0), "foo/")
-	if err != nil {
-		t.Fatal(err)
-	}
+	lkv, closeLKV, err := leasing.NewKV(clus.Client(0), "foo/", concurrency.WithTTL(1))
+	testutil.AssertNil(t, err)
+	defer closeLKV()
+
+	lkv2, closeLKV2, err := leasing.NewKV(clus.Client(0), "foo/")
+	testutil.AssertNil(t, err)
+	defer closeLKV2()
 
 	// acquire lease on abc
 	if _, err = lkv.Get(context.TODO(), "abc"); err != nil {
@@ -1876,10 +1861,10 @@ func TestLeasingSessionExpireCancel(t *testing.T) {
 		},
 	}
 	for i := range tests {
-		lkv, err := leasing.NewKV(clus.Client(0), "foo/", concurrency.WithTTL(1))
-		if err != nil {
-			t.Fatal(err)
-		}
+		lkv, closeLKV, err := leasing.NewKV(clus.Client(0), "foo/", concurrency.WithTTL(1))
+		testutil.AssertNil(t, err)
+		defer closeLKV()
+
 		if _, err = lkv.Get(context.TODO(), "abc"); err != nil {
 			t.Fatal(err)
 		}

+ 25 - 6
clientv3/leasing/kv.go

@@ -16,6 +16,7 @@ package leasing
 
 import (
 	"strings"
+	"sync"
 	"time"
 
 	v3 "github.com/coreos/etcd/clientv3"
@@ -33,8 +34,10 @@ type leasingKV struct {
 	kv     v3.KV
 	pfx    string
 	leases leaseCache
+
 	ctx    context.Context
 	cancel context.CancelFunc
+	wg     sync.WaitGroup
 
 	sessionOpts []concurrency.SessionOption
 	session     *concurrency.Session
@@ -49,9 +52,9 @@ func init() {
 }
 
 // NewKV wraps a KV instance so that all requests are wired through a leasing protocol.
-func NewKV(cl *v3.Client, pfx string, opts ...concurrency.SessionOption) (v3.KV, error) {
+func NewKV(cl *v3.Client, pfx string, opts ...concurrency.SessionOption) (v3.KV, func(), error) {
 	cctx, cancel := context.WithCancel(cl.Ctx())
-	lkv := leasingKV{
+	lkv := &leasingKV{
 		cl:          cl,
 		kv:          cl.KV,
 		pfx:         pfx,
@@ -61,9 +64,21 @@ func NewKV(cl *v3.Client, pfx string, opts ...concurrency.SessionOption) (v3.KV,
 		sessionOpts: opts,
 		sessionc:    make(chan struct{}),
 	}
-	go lkv.monitorSession()
-	go lkv.leases.clearOldRevokes(cctx)
-	return &lkv, lkv.waitSession(cctx)
+	lkv.wg.Add(2)
+	go func() {
+		defer lkv.wg.Done()
+		lkv.monitorSession()
+	}()
+	go func() {
+		defer lkv.wg.Done()
+		lkv.leases.clearOldRevokes(cctx)
+	}()
+	return lkv, lkv.Close, lkv.waitSession(cctx)
+}
+
+func (lkv *leasingKV) Close() {
+	lkv.cancel()
+	lkv.wg.Wait()
 }
 
 func (lkv *leasingKV) Get(ctx context.Context, key string, opts ...v3.OpOption) (*v3.GetResponse, error) {
@@ -301,7 +316,11 @@ func (lkv *leasingKV) get(ctx context.Context, op v3.Op) (*v3.GetResponse, error
 	getResp.Header = resp.Header
 	if resp.Succeeded {
 		getResp = lkv.leases.Add(key, getResp, op)
-		go lkv.monitorLease(ctx, key, resp.Header.Revision)
+		lkv.wg.Add(1)
+		go func() {
+			defer lkv.wg.Done()
+			lkv.monitorLease(ctx, key, resp.Header.Revision)
+		}()
 	}
 	return getResp, nil
 }

+ 1 - 1
etcdmain/grpc_proxy.go

@@ -285,7 +285,7 @@ func newGRPCProxyServer(client *clientv3.Client) *grpc.Server {
 	}
 
 	if len(grpcProxyLeasing) > 0 {
-		client.KV, _ = leasing.NewKV(client, grpcProxyLeasing)
+		client.KV, _, _ = leasing.NewKV(client, grpcProxyLeasing)
 	}
 
 	kvp, _ := grpcproxy.NewKvProxy(client)