Browse Source

mvcc: attach keys to leases after recover all state

The previous logic is wrong. When we have hisotry like Put(foo, bar, lease1),
and Put(foo, bar, lease2), we will end up with attaching foo to two leases 1 and
2. Similar things can happen for deattach by clearing the lease of a key.

Now we try to fix this by starting to attach leases at the end of the recovery.
We use a map to keep the last lease attachment state.
Xiang Li 9 years ago
parent
commit
bd62b0a646
1 changed files with 21 additions and 20 deletions
  1. 21 20
      mvcc/kvstore.go

+ 21 - 20
mvcc/kvstore.go

@@ -367,6 +367,8 @@ func (s *store) restore() error {
 	revToBytes(revision{main: 1}, min)
 	revToBytes(revision{main: 1}, min)
 	revToBytes(revision{main: math.MaxInt64, sub: math.MaxInt64}, max)
 	revToBytes(revision{main: math.MaxInt64, sub: math.MaxInt64}, max)
 
 
+	keyToLease := make(map[string]lease.LeaseID)
+
 	// restore index
 	// restore index
 	tx := s.b.BatchTx()
 	tx := s.b.BatchTx()
 	tx.Lock()
 	tx.Lock()
@@ -390,26 +392,15 @@ func (s *store) restore() error {
 		switch {
 		switch {
 		case isTombstone(key):
 		case isTombstone(key):
 			s.kvindex.Tombstone(kv.Key, rev)
 			s.kvindex.Tombstone(kv.Key, rev)
-			if lease.LeaseID(kv.Lease) != lease.NoLease {
-				err := s.le.Detach(lease.LeaseID(kv.Lease), []lease.LeaseItem{{Key: string(kv.Key)}})
-				if err != nil && err != lease.ErrLeaseNotFound {
-					plog.Fatalf("unexpected Detach error %v", err)
-				}
-			}
+			delete(keyToLease, string(kv.Key))
+
 		default:
 		default:
 			s.kvindex.Restore(kv.Key, revision{kv.CreateRevision, 0}, rev, kv.Version)
 			s.kvindex.Restore(kv.Key, revision{kv.CreateRevision, 0}, rev, kv.Version)
-			if lease.LeaseID(kv.Lease) != lease.NoLease {
-				if s.le == nil {
-					panic("no lessor to attach lease")
-				}
-				err := s.le.Attach(lease.LeaseID(kv.Lease), []lease.LeaseItem{{Key: string(kv.Key)}})
-				// We are walking through the kv history here. It is possible that we attached a key to
-				// the lease and the lease was revoked later.
-				// Thus attaching an old version of key to a none existing lease is possible here, and
-				// we should just ignore the error.
-				if err != nil && err != lease.ErrLeaseNotFound {
-					panic("unexpected Attach error")
-				}
+
+			if lid := lease.LeaseID(kv.Lease); lid != lease.NoLease {
+				keyToLease[string(kv.Key)] = lid
+			} else {
+				delete(keyToLease, string(kv.Key))
 			}
 			}
 		}
 		}
 
 
@@ -417,6 +408,16 @@ func (s *store) restore() error {
 		s.currentRev = rev
 		s.currentRev = rev
 	}
 	}
 
 
+	for key, lid := range keyToLease {
+		if s.le == nil {
+			panic("no lessor to attach lease")
+		}
+		err := s.le.Attach(lid, []lease.LeaseItem{{Key: key}})
+		if err != nil {
+			plog.Errorf("unexpected Attach error: %v", err)
+		}
+	}
+
 	_, scheduledCompactBytes := tx.UnsafeRange(metaBucketName, scheduledCompactKeyName, nil, 0)
 	_, scheduledCompactBytes := tx.UnsafeRange(metaBucketName, scheduledCompactKeyName, nil, 0)
 	scheduledCompact := int64(0)
 	scheduledCompact := int64(0)
 	if len(scheduledCompactBytes) != 0 {
 	if len(scheduledCompactBytes) != 0 {
@@ -550,7 +551,7 @@ func (s *store) put(key, value []byte, leaseID lease.LeaseID) {
 
 
 		err = s.le.Detach(oldLease, []lease.LeaseItem{{Key: string(key)}})
 		err = s.le.Detach(oldLease, []lease.LeaseItem{{Key: string(key)}})
 		if err != nil {
 		if err != nil {
-			panic("unexpected error from lease detach")
+			plog.Errorf("unexpected error from lease detach: %v", err)
 		}
 		}
 	}
 	}
 
 
@@ -619,7 +620,7 @@ func (s *store) delete(key []byte, rev revision) {
 	if lease.LeaseID(kv.Lease) != lease.NoLease {
 	if lease.LeaseID(kv.Lease) != lease.NoLease {
 		err = s.le.Detach(lease.LeaseID(kv.Lease), []lease.LeaseItem{{Key: string(kv.Key)}})
 		err = s.le.Detach(lease.LeaseID(kv.Lease), []lease.LeaseItem{{Key: string(kv.Key)}})
 		if err != nil {
 		if err != nil {
-			plog.Fatalf("cannot detach %v", err)
+			plog.Errorf("cannot detach %v", err)
 		}
 		}
 	}
 	}
 }
 }