Jelajahi Sumber

Merge pull request #6117 from gyuho/lease-test

integration: add more lease tests
Gyu-Ho Lee 9 tahun lalu
induk
melakukan
f505bcb91a
1 mengubah file dengan 228 tambahan dan 0 penghapusan
  1. 228 0
      integration/v3_lease_test.go

+ 228 - 0
integration/v3_lease_test.go

@@ -366,6 +366,234 @@ func TestV3LeaseFailover(t *testing.T) {
 	}
 }
 
+const fiveMinTTL int64 = 300
+
+// TestV3LeaseRecoverAndRevoke ensures that revoking a lease after restart deletes the attached key.
+func TestV3LeaseRecoverAndRevoke(t *testing.T) {
+	clus := NewClusterV3(t, &ClusterConfig{Size: 1})
+	defer clus.Terminate(t)
+
+	kvc := toGRPC(clus.Client(0)).KV
+	lsc := toGRPC(clus.Client(0)).Lease
+
+	lresp, err := lsc.LeaseGrant(context.TODO(), &pb.LeaseGrantRequest{TTL: fiveMinTTL})
+	if err != nil {
+		t.Fatal(err)
+	}
+	if lresp.Error != "" {
+		t.Fatal(lresp.Error)
+	}
+	_, err = kvc.Put(context.TODO(), &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar"), Lease: lresp.ID})
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	// restart server and ensure lease still exists
+	clus.Members[0].Stop(t)
+	clus.Members[0].Restart(t)
+	clus.waitLeader(t, clus.Members)
+
+	// overwrite old client with newly dialed connection
+	// otherwise, error with "grpc: RPC failed fast due to transport failure"
+	nc, err := NewClientV3(clus.Members[0])
+	if err != nil {
+		t.Fatal(err)
+	}
+	kvc = toGRPC(nc).KV
+	lsc = toGRPC(nc).Lease
+	defer nc.Close()
+
+	// revoke should delete the key
+	_, err = lsc.LeaseRevoke(context.TODO(), &pb.LeaseRevokeRequest{ID: lresp.ID})
+	if err != nil {
+		t.Fatal(err)
+	}
+	rresp, err := kvc.Range(context.TODO(), &pb.RangeRequest{Key: []byte("foo")})
+	if err != nil {
+		t.Fatal(err)
+	}
+	if len(rresp.Kvs) != 0 {
+		t.Fatalf("lease removed but key remains")
+	}
+}
+
+// TestV3LeaseRevokeAndRecover ensures that revoked key stays deleted after restart.
+func TestV3LeaseRevokeAndRecover(t *testing.T) {
+	clus := NewClusterV3(t, &ClusterConfig{Size: 1})
+	defer clus.Terminate(t)
+
+	kvc := toGRPC(clus.Client(0)).KV
+	lsc := toGRPC(clus.Client(0)).Lease
+
+	lresp, err := lsc.LeaseGrant(context.TODO(), &pb.LeaseGrantRequest{TTL: fiveMinTTL})
+	if err != nil {
+		t.Fatal(err)
+	}
+	if lresp.Error != "" {
+		t.Fatal(lresp.Error)
+	}
+	_, err = kvc.Put(context.TODO(), &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar"), Lease: lresp.ID})
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	// revoke should delete the key
+	_, err = lsc.LeaseRevoke(context.TODO(), &pb.LeaseRevokeRequest{ID: lresp.ID})
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	// restart server and ensure revoked key doesn't exist
+	clus.Members[0].Stop(t)
+	clus.Members[0].Restart(t)
+	clus.waitLeader(t, clus.Members)
+
+	// overwrite old client with newly dialed connection
+	// otherwise, error with "grpc: RPC failed fast due to transport failure"
+	nc, err := NewClientV3(clus.Members[0])
+	if err != nil {
+		t.Fatal(err)
+	}
+	kvc = toGRPC(nc).KV
+	defer nc.Close()
+
+	rresp, err := kvc.Range(context.TODO(), &pb.RangeRequest{Key: []byte("foo")})
+	if err != nil {
+		t.Fatal(err)
+	}
+	if len(rresp.Kvs) != 0 {
+		t.Fatalf("lease removed but key remains")
+	}
+}
+
+// TestV3LeaseRecoverKeyWithDetachedLease ensures that revoking a detached lease after restart
+// does not delete the key.
+func TestV3LeaseRecoverKeyWithDetachedLease(t *testing.T) {
+	clus := NewClusterV3(t, &ClusterConfig{Size: 1})
+	defer clus.Terminate(t)
+
+	kvc := toGRPC(clus.Client(0)).KV
+	lsc := toGRPC(clus.Client(0)).Lease
+
+	lresp, err := lsc.LeaseGrant(context.TODO(), &pb.LeaseGrantRequest{TTL: fiveMinTTL})
+	if err != nil {
+		t.Fatal(err)
+	}
+	if lresp.Error != "" {
+		t.Fatal(lresp.Error)
+	}
+	_, err = kvc.Put(context.TODO(), &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar"), Lease: lresp.ID})
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	// overwrite lease with none
+	_, err = kvc.Put(context.TODO(), &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")})
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	// restart server and ensure lease still exists
+	clus.Members[0].Stop(t)
+	clus.Members[0].Restart(t)
+	clus.waitLeader(t, clus.Members)
+
+	// overwrite old client with newly dialed connection
+	// otherwise, error with "grpc: RPC failed fast due to transport failure"
+	nc, err := NewClientV3(clus.Members[0])
+	if err != nil {
+		t.Fatal(err)
+	}
+	kvc = toGRPC(nc).KV
+	lsc = toGRPC(nc).Lease
+	defer nc.Close()
+
+	// revoke the detached lease
+	_, err = lsc.LeaseRevoke(context.TODO(), &pb.LeaseRevokeRequest{ID: lresp.ID})
+	if err != nil {
+		t.Fatal(err)
+	}
+	rresp, err := kvc.Range(context.TODO(), &pb.RangeRequest{Key: []byte("foo")})
+	if err != nil {
+		t.Fatal(err)
+	}
+	if len(rresp.Kvs) != 1 {
+		t.Fatalf("only detached lease removed, key should remain")
+	}
+}
+
+func TestV3LeaseRecoverKeyWithMutipleLease(t *testing.T) {
+	clus := NewClusterV3(t, &ClusterConfig{Size: 1})
+	defer clus.Terminate(t)
+
+	kvc := toGRPC(clus.Client(0)).KV
+	lsc := toGRPC(clus.Client(0)).Lease
+
+	var leaseIDs []int64
+	for i := 0; i < 2; i++ {
+		lresp, err := lsc.LeaseGrant(context.TODO(), &pb.LeaseGrantRequest{TTL: fiveMinTTL})
+		if err != nil {
+			t.Fatal(err)
+		}
+		if lresp.Error != "" {
+			t.Fatal(lresp.Error)
+		}
+		leaseIDs = append(leaseIDs, lresp.ID)
+
+		_, err = kvc.Put(context.TODO(), &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar"), Lease: lresp.ID})
+		if err != nil {
+			t.Fatal(err)
+		}
+	}
+
+	// restart server and ensure lease still exists
+	clus.Members[0].Stop(t)
+	clus.Members[0].Restart(t)
+	clus.waitLeader(t, clus.Members)
+	for i, leaseID := range leaseIDs {
+		if !leaseExist(t, clus, leaseID) {
+			t.Errorf("#%d: unexpected lease not exists", i)
+		}
+	}
+
+	// overwrite old client with newly dialed connection
+	// otherwise, error with "grpc: RPC failed fast due to transport failure"
+	nc, err := NewClientV3(clus.Members[0])
+	if err != nil {
+		t.Fatal(err)
+	}
+	kvc = toGRPC(nc).KV
+	lsc = toGRPC(nc).Lease
+	defer nc.Close()
+
+	// revoke the old lease
+	_, err = lsc.LeaseRevoke(context.TODO(), &pb.LeaseRevokeRequest{ID: leaseIDs[0]})
+	if err != nil {
+		t.Fatal(err)
+	}
+	// key should still exist
+	rresp, err := kvc.Range(context.TODO(), &pb.RangeRequest{Key: []byte("foo")})
+	if err != nil {
+		t.Fatal(err)
+	}
+	if len(rresp.Kvs) != 1 {
+		t.Fatalf("only detached lease removed, key should remain")
+	}
+
+	// revoke the latest lease
+	_, err = lsc.LeaseRevoke(context.TODO(), &pb.LeaseRevokeRequest{ID: leaseIDs[1]})
+	if err != nil {
+		t.Fatal(err)
+	}
+	rresp, err = kvc.Range(context.TODO(), &pb.RangeRequest{Key: []byte("foo")})
+	if err != nil {
+		t.Fatal(err)
+	}
+	if len(rresp.Kvs) != 0 {
+		t.Fatalf("lease removed but key remains")
+	}
+}
+
 // acquireLeaseAndKey creates a new lease and creates an attached key.
 func acquireLeaseAndKey(clus *ClusterV3, key string) (int64, error) {
 	// create lease