Browse Source

*: detach keys from leases

1. deatch when a key is removed
2. deatch when the key's lease changes
3. potentially deatch when restroing a tombstone key
Xiang Li 10 years ago
parent
commit
bfa5e310a9
4 changed files with 166 additions and 13 deletions
  1. 28 1
      lease/lessor.go
  2. 39 0
      lease/lessor_test.go
  3. 48 12
      storage/kvstore.go
  4. 51 0
      storage/kvstore_test.go

+ 28 - 1
lease/lessor.go

@@ -72,6 +72,10 @@ type Lessor interface {
 	// If the lease does not exist, an error will be returned.
 	Attach(id LeaseID, items []LeaseItem) error
 
+	// Detach detaches given leaseItem from the lease with given LeaseID.
+	// If the lease does not exist, an error will be returned.
+	Detach(id LeaseID, items []LeaseItem) error
+
 	// Promote promotes the lessor to be the primary lessor. Primary lessor manages
 	// the expiration and renew of leases.
 	Promote()
@@ -194,12 +198,14 @@ func (le *lessor) Grant(id LeaseID, ttl int64) (*Lease, error) {
 
 func (le *lessor) Revoke(id LeaseID) error {
 	le.mu.Lock()
-	defer le.mu.Unlock()
 
 	l := le.leaseMap[id]
 	if l == nil {
+		le.mu.Unlock()
 		return ErrLeaseNotFound
 	}
+	// unlock before doing external work
+	le.mu.Unlock()
 
 	if le.rd != nil {
 		for item := range l.itemSet {
@@ -207,6 +213,8 @@ func (le *lessor) Revoke(id LeaseID) error {
 		}
 	}
 
+	le.mu.Lock()
+	defer le.mu.Unlock()
 	delete(le.leaseMap, l.ID)
 	l.removeFrom(le.b)
 
@@ -284,6 +292,23 @@ func (le *lessor) Attach(id LeaseID, items []LeaseItem) error {
 	return nil
 }
 
+// Detach detaches items from the lease with given ID.
+// If the given lease does not exist, an error will be returned.
+func (le *lessor) Detach(id LeaseID, items []LeaseItem) error {
+	le.mu.Lock()
+	defer le.mu.Unlock()
+
+	l := le.leaseMap[id]
+	if l == nil {
+		return ErrLeaseNotFound
+	}
+
+	for _, it := range items {
+		delete(l.itemSet, it)
+	}
+	return nil
+}
+
 func (le *lessor) Recover(b backend.Backend, rd RangeDeleter) {
 	le.mu.Lock()
 	defer le.mu.Unlock()
@@ -462,6 +487,8 @@ func (fl *FakeLessor) Revoke(id LeaseID) error { return nil }
 
 func (fl *FakeLessor) Attach(id LeaseID, items []LeaseItem) error { return nil }
 
+func (fl *FakeLessor) Detach(id LeaseID, items []LeaseItem) error { return nil }
+
 func (fl *FakeLessor) Promote() {}
 
 func (fl *FakeLessor) Demote() {}

+ 39 - 0
lease/lessor_test.go

@@ -151,6 +151,45 @@ func TestLessorRenew(t *testing.T) {
 	}
 }
 
+func TestLessorDetach(t *testing.T) {
+	dir, be := NewTestBackend(t)
+	defer os.RemoveAll(dir)
+	defer be.Close()
+
+	fd := &fakeDeleter{}
+
+	le := newLessor(be)
+	le.SetRangeDeleter(fd)
+
+	// grant a lease with long term (100 seconds) to
+	// avoid early termination during the test.
+	l, err := le.Grant(1, 100)
+	if err != nil {
+		t.Fatalf("could not grant lease for 100s ttl (%v)", err)
+	}
+
+	items := []LeaseItem{
+		{"foo"},
+		{"bar"},
+	}
+
+	if err := le.Attach(l.ID, items); err != nil {
+		t.Fatalf("failed to attach items to the lease: %v", err)
+	}
+
+	if err := le.Detach(l.ID, items[0:1]); err != nil {
+		t.Fatalf("failed to de-attach items to the lease: %v", err)
+	}
+
+	l = le.Lookup(l.ID)
+	if len(l.itemSet) != 1 {
+		t.Fatalf("len(l.itemSet) = %d, failed to de-attach items", len(l.itemSet))
+	}
+	if _, ok := l.itemSet[LeaseItem{"bar"}]; !ok {
+		t.Fatalf("de-attached wrong item, want %q exists", "bar")
+	}
+}
+
 // TestLessorRecover ensures Lessor recovers leases from
 // persist backend.
 func TestLessorRecover(t *testing.T) {

+ 48 - 12
storage/kvstore.go

@@ -312,8 +312,13 @@ func (s *store) restore() error {
 		// restore index
 		switch {
 		case isTombstone(key):
-			// TODO: De-attach keys from lease if necessary
 			s.kvindex.Tombstone(kv.Key, rev)
+			if lease.LeaseID(kv.Lease) != lease.NoLease {
+				err := s.le.Detach(lease.LeaseID(kv.Lease), []lease.LeaseItem{{Key: string(kv.Key)}})
+				if err != nil && err != lease.ErrLeaseNotFound {
+					log.Fatalf("storage: unexpected Detach error %v", err)
+				}
+			}
 		default:
 			s.kvindex.Restore(kv.Key, revision{kv.CreateRevision, 0}, rev, kv.Version)
 			if lease.LeaseID(kv.Lease) != lease.NoLease {
@@ -413,11 +418,21 @@ func (s *store) rangeKeys(key, end []byte, limit, rangeRev int64) (kvs []storage
 func (s *store) put(key, value []byte, leaseID lease.LeaseID) {
 	rev := s.currentRev.main + 1
 	c := rev
+	oldLease := lease.NoLease
 
-	// if the key exists before, use its previous created
-	_, created, ver, err := s.kvindex.Get(key, rev)
+	// if the key exists before, use its previous created and
+	// get its previous leaseID
+	grev, created, ver, err := s.kvindex.Get(key, rev)
 	if err == nil {
 		c = created.main
+		ibytes := newRevBytes()
+		revToBytes(grev, ibytes)
+		_, vs := s.tx.UnsafeRange(keyBucketName, ibytes, nil, 0)
+		var kv storagepb.KeyValue
+		if err := kv.Unmarshal(vs[0]); err != nil {
+			log.Fatalf("storage: cannot unmarshal value: %v", err)
+		}
+		oldLease = lease.LeaseID(kv.Lease)
 	}
 
 	ibytes := newRevBytes()
@@ -443,15 +458,22 @@ func (s *store) put(key, value []byte, leaseID lease.LeaseID) {
 	s.changes = append(s.changes, kv)
 	s.currentRev.sub += 1
 
+	if oldLease != lease.NoLease {
+		if s.le == nil {
+			panic("no lessor to detach lease")
+		}
+
+		err = s.le.Detach(oldLease, []lease.LeaseItem{{Key: string(key)}})
+		if err != nil {
+			panic("unexpected error from lease detach")
+		}
+	}
+
 	if leaseID != lease.NoLease {
 		if s.le == nil {
 			panic("no lessor to attach lease")
 		}
 
-		// TODO: validate the existence of lease before call Attach.
-		// We need to ensure put always successful since we do not want
-		// to handle abortion for txn request. We need to ensure all requests
-		// inside the txn can execute without error before executing them.
 		err = s.le.Attach(leaseID, []lease.LeaseItem{{Key: string(key)}})
 		if err != nil {
 			panic("unexpected error from lease Attach")
@@ -464,19 +486,19 @@ func (s *store) deleteRange(key, end []byte) int64 {
 	if s.currentRev.sub > 0 {
 		rrev += 1
 	}
-	keys, _ := s.kvindex.Range(key, end, rrev)
+	keys, revs := s.kvindex.Range(key, end, rrev)
 
 	if len(keys) == 0 {
 		return 0
 	}
 
-	for _, key := range keys {
-		s.delete(key)
+	for i, key := range keys {
+		s.delete(key, revs[i])
 	}
 	return int64(len(keys))
 }
 
-func (s *store) delete(key []byte) {
+func (s *store) delete(key []byte, rev revision) {
 	mainrev := s.currentRev.main + 1
 
 	ibytes := newRevBytes()
@@ -500,7 +522,21 @@ func (s *store) delete(key []byte) {
 	s.changes = append(s.changes, kv)
 	s.currentRev.sub += 1
 
-	// TODO: De-attach keys from lease if necessary
+	ibytes = newRevBytes()
+	revToBytes(rev, ibytes)
+	_, vs := s.tx.UnsafeRange(keyBucketName, ibytes, nil, 0)
+
+	kv.Reset()
+	if err := kv.Unmarshal(vs[0]); err != nil {
+		log.Fatalf("storage: cannot unmarshal value: %v", err)
+	}
+
+	if lease.LeaseID(kv.Lease) != lease.NoLease {
+		err = s.le.Detach(lease.LeaseID(kv.Lease), []lease.LeaseItem{{Key: string(kv.Key)}})
+		if err != nil {
+			log.Fatalf("storage: cannot detach %v", err)
+		}
+	}
 }
 
 func (s *store) getChanges() []storagepb.KeyValue {

+ 51 - 0
storage/kvstore_test.go

@@ -45,9 +45,22 @@ func TestStoreRev(t *testing.T) {
 }
 
 func TestStorePut(t *testing.T) {
+	kv := storagepb.KeyValue{
+		Key:            []byte("foo"),
+		Value:          []byte("bar"),
+		CreateRevision: 1,
+		ModRevision:    2,
+		Version:        1,
+	}
+	kvb, err := kv.Marshal()
+	if err != nil {
+		t.Fatal(err)
+	}
+
 	tests := []struct {
 		rev revision
 		r   indexGetResp
+		rr  *rangeResp
 
 		wrev    revision
 		wkey    []byte
@@ -57,6 +70,8 @@ func TestStorePut(t *testing.T) {
 		{
 			revision{1, 0},
 			indexGetResp{revision{}, revision{}, 0, ErrRevisionNotFound},
+			nil,
+
 			revision{1, 1},
 			newTestKeyBytes(revision{2, 0}, false),
 			storagepb.KeyValue{
@@ -72,6 +87,8 @@ func TestStorePut(t *testing.T) {
 		{
 			revision{1, 1},
 			indexGetResp{revision{2, 0}, revision{2, 0}, 1, nil},
+			&rangeResp{[][]byte{newTestKeyBytes(revision{2, 1}, false)}, [][]byte{kvb}},
+
 			revision{1, 2},
 			newTestKeyBytes(revision{2, 1}, false),
 			storagepb.KeyValue{
@@ -87,6 +104,8 @@ func TestStorePut(t *testing.T) {
 		{
 			revision{2, 0},
 			indexGetResp{revision{2, 1}, revision{2, 0}, 2, nil},
+			&rangeResp{[][]byte{newTestKeyBytes(revision{2, 1}, false)}, [][]byte{kvb}},
+
 			revision{2, 1},
 			newTestKeyBytes(revision{3, 0}, false),
 			storagepb.KeyValue{
@@ -108,6 +127,9 @@ func TestStorePut(t *testing.T) {
 		s.currentRev = tt.rev
 		s.tx = b.BatchTx()
 		fi.indexGetRespc <- tt.r
+		if tt.rr != nil {
+			b.tx.rangeRespc <- *tt.rr
+		}
 
 		s.put([]byte("foo"), []byte("bar"), lease.LeaseID(i+1))
 
@@ -115,9 +137,18 @@ func TestStorePut(t *testing.T) {
 		if err != nil {
 			t.Errorf("#%d: marshal err = %v, want nil", i, err)
 		}
+
 		wact := []testutil.Action{
 			{"put", []interface{}{keyBucketName, tt.wkey, data}},
 		}
+
+		if tt.rr != nil {
+			wact = []testutil.Action{
+				{"range", []interface{}{keyBucketName, newTestKeyBytes(tt.r.rev, false), []byte(nil), int64(0)}},
+				{"put", []interface{}{keyBucketName, tt.wkey, data}},
+			}
+		}
+
 		if g := b.tx.Action(); !reflect.DeepEqual(g, wact) {
 			t.Errorf("#%d: tx action = %+v, want %+v", i, g, wact)
 		}
@@ -208,9 +239,23 @@ func TestStoreRange(t *testing.T) {
 }
 
 func TestStoreDeleteRange(t *testing.T) {
+	key := newTestKeyBytes(revision{2, 0}, false)
+	kv := storagepb.KeyValue{
+		Key:            []byte("foo"),
+		Value:          []byte("bar"),
+		CreateRevision: 1,
+		ModRevision:    2,
+		Version:        1,
+	}
+	kvb, err := kv.Marshal()
+	if err != nil {
+		t.Fatal(err)
+	}
+
 	tests := []struct {
 		rev revision
 		r   indexRangeResp
+		rr  rangeResp
 
 		wkey    []byte
 		wrev    revision
@@ -220,6 +265,8 @@ func TestStoreDeleteRange(t *testing.T) {
 		{
 			revision{2, 0},
 			indexRangeResp{[][]byte{[]byte("foo")}, []revision{{2, 0}}},
+			rangeResp{[][]byte{key}, [][]byte{kvb}},
+
 			newTestKeyBytes(revision{3, 0}, true),
 			revision{2, 1},
 			2,
@@ -228,6 +275,8 @@ func TestStoreDeleteRange(t *testing.T) {
 		{
 			revision{2, 1},
 			indexRangeResp{[][]byte{[]byte("foo")}, []revision{{2, 0}}},
+			rangeResp{[][]byte{key}, [][]byte{kvb}},
+
 			newTestKeyBytes(revision{3, 1}, true),
 			revision{2, 2},
 			3,
@@ -242,6 +291,7 @@ func TestStoreDeleteRange(t *testing.T) {
 		s.currentRev = tt.rev
 		s.tx = b.BatchTx()
 		fi.indexRangeRespc <- tt.r
+		b.tx.rangeRespc <- tt.rr
 
 		n := s.deleteRange([]byte("foo"), []byte("goo"))
 		if n != 1 {
@@ -256,6 +306,7 @@ func TestStoreDeleteRange(t *testing.T) {
 		}
 		wact := []testutil.Action{
 			{"put", []interface{}{keyBucketName, tt.wkey, data}},
+			{"range", []interface{}{keyBucketName, newTestKeyBytes(revision{2, 0}, false), []byte(nil), int64(0)}},
 		}
 		if g := b.tx.Action(); !reflect.DeepEqual(g, wact) {
 			t.Errorf("#%d: tx action = %+v, want %+v", i, g, wact)