Browse Source

mvcc: store.restore taking too long triggering snapshot cycle fix

sharat 9 years ago
parent
commit
95fb41a923
2 changed files with 28 additions and 3 deletions
  1. 7 0
      mvcc/index.go
  2. 21 3
      mvcc/kvstore.go

+ 7 - 0
mvcc/index.go

@@ -30,6 +30,7 @@ type index interface {
 	RangeSince(key, end []byte, rev int64) []revision
 	Compact(rev int64) map[revision]struct{}
 	Equal(b index) bool
+	Insert(ki *keyIndex)
 }
 
 type treeIndex struct {
@@ -215,3 +216,9 @@ func (a *treeIndex) Equal(bi index) bool {
 
 	return equal
 }
+
+func (ti *treeIndex) Insert(ki *keyIndex) {
+	ti.Lock()
+	defer ti.Unlock()
+	ti.tree.ReplaceOrInsert(ki)
+}

+ 21 - 3
mvcc/kvstore.go

@@ -380,6 +380,11 @@ func (s *store) restore() error {
 
 	keyToLease := make(map[string]lease.LeaseID)
 
+	// use an unordered map to hold the temp index data to speed up
+	// the initial key index recovery.
+	// we will convert this unordered map into the tree index later.
+	unordered := make(map[string]*keyIndex, 100000)
+
 	// restore index
 	tx := s.b.BatchTx()
 	tx.Lock()
@@ -402,11 +407,20 @@ func (s *store) restore() error {
 		// restore index
 		switch {
 		case isTombstone(key):
-			s.kvindex.Tombstone(kv.Key, rev)
+			if ki, ok := unordered[string(kv.Key)]; ok {
+				ki.tombstone(rev.main, rev.sub)
+			}
 			delete(keyToLease, string(kv.Key))
 
 		default:
-			s.kvindex.Restore(kv.Key, revision{kv.CreateRevision, 0}, rev, kv.Version)
+			ki, ok := unordered[string(kv.Key)]
+			if ok {
+				ki.put(rev.main, rev.sub)
+			} else {
+				ki = &keyIndex{key: kv.Key}
+				ki.restore(revision{kv.CreateRevision, 0}, rev, kv.Version)
+				unordered[string(kv.Key)] = ki
+			}
 
 			if lid := lease.LeaseID(kv.Lease); lid != lease.NoLease {
 				keyToLease[string(kv.Key)] = lid
@@ -419,6 +433,11 @@ func (s *store) restore() error {
 		s.currentRev = rev
 	}
 
+	// restore the tree index from the unordered index.
+	for _, v := range unordered {
+		s.kvindex.Insert(v)
+	}
+
 	// keys in the range [compacted revision -N, compaction] might all be deleted due to compaction.
 	// the correct revision should be set to compaction revision in the case, not the largest revision
 	// we have seen.
@@ -444,7 +463,6 @@ func (s *store) restore() error {
 			scheduledCompact = 0
 		}
 	}
-
 	tx.Unlock()
 
 	if scheduledCompact != 0 {