Browse Source

mvcc: restore into tree index with one key index

Clobbering the mvcc kvindex with new keyIndexes for each restore
chunk would cause index corruption by dropping historical information.
Anthony Romano 8 years ago
parent
commit
51a568aa81
3 changed files with 88 additions and 44 deletions
  1. 16 5
      mvcc/index.go
  2. 66 39
      mvcc/kvstore.go
  3. 6 0
      mvcc/kvstore_test.go

+ 16 - 5
mvcc/index.go

@@ -29,7 +29,9 @@ type index interface {
 	RangeSince(key, end []byte, rev int64) []revision
 	RangeSince(key, end []byte, rev int64) []revision
 	Compact(rev int64) map[revision]struct{}
 	Compact(rev int64) map[revision]struct{}
 	Equal(b index) bool
 	Equal(b index) bool
+
 	Insert(ki *keyIndex)
 	Insert(ki *keyIndex)
+	KeyIndex(ki *keyIndex) *keyIndex
 }
 }
 
 
 type treeIndex struct {
 type treeIndex struct {
@@ -60,18 +62,27 @@ func (ti *treeIndex) Put(key []byte, rev revision) {
 
 
 func (ti *treeIndex) Get(key []byte, atRev int64) (modified, created revision, ver int64, err error) {
 func (ti *treeIndex) Get(key []byte, atRev int64) (modified, created revision, ver int64, err error) {
 	keyi := &keyIndex{key: key}
 	keyi := &keyIndex{key: key}
-
 	ti.RLock()
 	ti.RLock()
 	defer ti.RUnlock()
 	defer ti.RUnlock()
-	item := ti.tree.Get(keyi)
-	if item == nil {
+	if keyi = ti.keyIndex(keyi); keyi == nil {
 		return revision{}, revision{}, 0, ErrRevisionNotFound
 		return revision{}, revision{}, 0, ErrRevisionNotFound
 	}
 	}
-
-	keyi = item.(*keyIndex)
 	return keyi.get(atRev)
 	return keyi.get(atRev)
 }
 }
 
 
+func (ti *treeIndex) KeyIndex(keyi *keyIndex) *keyIndex {
+	ti.RLock()
+	defer ti.RUnlock()
+	return ti.keyIndex(keyi)
+}
+
+func (ti *treeIndex) keyIndex(keyi *keyIndex) *keyIndex {
+	if item := ti.tree.Get(keyi); item != nil {
+		return item.(*keyIndex)
+	}
+	return nil
+}
+
 func (ti *treeIndex) Range(key, end []byte, atRev int64) (keys [][]byte, revs []revision) {
 func (ti *treeIndex) Range(key, end []byte, atRev int64) (keys [][]byte, revs []revision) {
 	if end == nil {
 	if end == nil {
 		rev, _, _, err := ti.Get(key, atRev)
 		rev, _, _, err := ti.Get(key, atRev)

+ 66 - 39
mvcc/kvstore.go

@@ -275,23 +275,15 @@ func (s *store) restore() error {
 	}
 	}
 
 
 	// index keys concurrently as they're loaded in from tx
 	// index keys concurrently as they're loaded in from tx
-	unorderedc, donec := make(chan map[string]*keyIndex), make(chan struct{})
-	go func() {
-		defer close(donec)
-		for unordered := range unorderedc {
-			// restore the tree index from the unordered index.
-			for _, v := range unordered {
-				s.kvindex.Insert(v)
-			}
-		}
-	}()
+	rkvc, revc := restoreIntoIndex(s.kvindex)
 	for {
 	for {
 		keys, vals := tx.UnsafeRange(keyBucketName, min, max, int64(restoreChunkKeys))
 		keys, vals := tx.UnsafeRange(keyBucketName, min, max, int64(restoreChunkKeys))
 		if len(keys) == 0 {
 		if len(keys) == 0 {
 			break
 			break
 		}
 		}
-		// unbuffered so keys don't pile up in memory
-		unorderedc <- s.restoreChunk(keys, vals, keyToLease)
+		// rkvc blocks if the total pending keys exceeds the restore
+		// chunk size to keep keys from consuming too much memory.
+		restoreChunk(rkvc, keys, vals, keyToLease)
 		if len(keys) < restoreChunkKeys {
 		if len(keys) < restoreChunkKeys {
 			// partial set implies final set
 			// partial set implies final set
 			break
 			break
@@ -301,8 +293,8 @@ func (s *store) restore() error {
 		newMin.sub++
 		newMin.sub++
 		revToBytes(newMin, min)
 		revToBytes(newMin, min)
 	}
 	}
-	close(unorderedc)
-	<-donec
+	close(rkvc)
+	s.currentRev = <-revc
 
 
 	// keys in the range [compacted revision -N, compaction] might all be deleted due to compaction.
 	// keys in the range [compacted revision -N, compaction] might all be deleted due to compaction.
 	// the correct revision should be set to compaction revision in the case, not the largest revision
 	// the correct revision should be set to compaction revision in the case, not the largest revision
@@ -334,38 +326,73 @@ func (s *store) restore() error {
 	return nil
 	return nil
 }
 }
 
 
-func (s *store) restoreChunk(keys, vals [][]byte, keyToLease map[string]lease.LeaseID) map[string]*keyIndex {
-	// assume half of keys are overwrites
-	unordered := make(map[string]*keyIndex, len(keys)/2)
+type revKeyValue struct {
+	key  []byte
+	kv   mvccpb.KeyValue
+	kstr string
+}
+
+func restoreIntoIndex(idx index) (chan<- revKeyValue, <-chan int64) {
+	rkvc, revc := make(chan revKeyValue, restoreChunkKeys), make(chan int64, 1)
+	go func() {
+		currentRev := int64(1)
+		defer func() { revc <- currentRev }()
+		// restore the tree index from streaming the unordered index.
+		kiCache := make(map[string]*keyIndex, restoreChunkKeys)
+		for rkv := range rkvc {
+			ki, ok := kiCache[rkv.kstr]
+			// purge kiCache if many keys but still missing in the cache
+			if !ok && len(kiCache) >= restoreChunkKeys {
+				i := 10
+				for k := range kiCache {
+					delete(kiCache, k)
+					if i--; i == 0 {
+						break
+					}
+				}
+			}
+			// cache miss, fetch from tree index if there
+			if !ok {
+				ki = &keyIndex{key: rkv.kv.Key}
+				if idxKey := idx.KeyIndex(ki); idxKey != nil {
+					kiCache[rkv.kstr], ki = idxKey, idxKey
+					ok = true
+				}
+			}
+			rev := bytesToRev(rkv.key)
+			currentRev = rev.main
+			if ok {
+				if isTombstone(rkv.key) {
+					ki.tombstone(rev.main, rev.sub)
+					continue
+				}
+				ki.put(rev.main, rev.sub)
+			} else if !isTombstone(rkv.key) {
+				ki.restore(revision{rkv.kv.CreateRevision, 0}, rev, rkv.kv.Version)
+				idx.Insert(ki)
+				kiCache[rkv.kstr] = ki
+			}
+		}
+	}()
+	return rkvc, revc
+}
+
+func restoreChunk(kvc chan<- revKeyValue, keys, vals [][]byte, keyToLease map[string]lease.LeaseID) {
 	for i, key := range keys {
 	for i, key := range keys {
-		var kv mvccpb.KeyValue
-		if err := kv.Unmarshal(vals[i]); err != nil {
+		rkv := revKeyValue{key: key}
+		if err := rkv.kv.Unmarshal(vals[i]); err != nil {
 			plog.Fatalf("cannot unmarshal event: %v", err)
 			plog.Fatalf("cannot unmarshal event: %v", err)
 		}
 		}
-		rev := bytesToRev(key[:revBytesLen])
-		s.currentRev = rev.main
-		kstr := string(kv.Key)
+		rkv.kstr = string(rkv.kv.Key)
 		if isTombstone(key) {
 		if isTombstone(key) {
-			if ki, ok := unordered[kstr]; ok {
-				ki.tombstone(rev.main, rev.sub)
-			}
-			delete(keyToLease, kstr)
-			continue
-		}
-		if ki, ok := unordered[kstr]; ok {
-			ki.put(rev.main, rev.sub)
-		} else {
-			ki = &keyIndex{key: kv.Key}
-			ki.restore(revision{kv.CreateRevision, 0}, rev, kv.Version)
-			unordered[kstr] = ki
-		}
-		if lid := lease.LeaseID(kv.Lease); lid != lease.NoLease {
-			keyToLease[kstr] = lid
+			delete(keyToLease, rkv.kstr)
+		} else if lid := lease.LeaseID(rkv.kv.Lease); lid != lease.NoLease {
+			keyToLease[rkv.kstr] = lid
 		} else {
 		} else {
-			delete(keyToLease, kstr)
+			delete(keyToLease, rkv.kstr)
 		}
 		}
+		kvc <- rkv
 	}
 	}
-	return unordered
 }
 }
 
 
 func (s *store) Close() error {
 func (s *store) Close() error {

+ 6 - 0
mvcc/kvstore_test.go

@@ -403,6 +403,7 @@ func TestStoreRestore(t *testing.T) {
 	}
 	}
 	ki := &keyIndex{key: []byte("foo"), modified: revision{5, 0}, generations: gens}
 	ki := &keyIndex{key: []byte("foo"), modified: revision{5, 0}, generations: gens}
 	wact = []testutil.Action{
 	wact = []testutil.Action{
+		{"keyIndex", []interface{}{ki}},
 		{"insert", []interface{}{ki}},
 		{"insert", []interface{}{ki}},
 	}
 	}
 	if g := fi.Action(); !reflect.DeepEqual(g, wact) {
 	if g := fi.Action(); !reflect.DeepEqual(g, wact) {
@@ -698,6 +699,11 @@ func (i *fakeIndex) Insert(ki *keyIndex) {
 	i.Recorder.Record(testutil.Action{Name: "insert", Params: []interface{}{ki}})
 	i.Recorder.Record(testutil.Action{Name: "insert", Params: []interface{}{ki}})
 }
 }
 
 
+func (i *fakeIndex) KeyIndex(ki *keyIndex) *keyIndex {
+	i.Recorder.Record(testutil.Action{Name: "keyIndex", Params: []interface{}{ki}})
+	return nil
+}
+
 func createBytesSlice(bytesN, sliceN int) [][]byte {
 func createBytesSlice(bytesN, sliceN int) [][]byte {
 	rs := [][]byte{}
 	rs := [][]byte{}
 	for len(rs) != sliceN {
 	for len(rs) != sliceN {