Browse Source

Merge pull request #6846 from sinsharat/mvcc_store_restore_timeout_fix

mvcc: store.restore taking too long triggering snapshot cycle fix
Xiang Li 9 years ago
parent
commit
68b04b7067
4 changed files with 76 additions and 4 deletions
  1. 7 0
      mvcc/index.go
  2. 21 2
      mvcc/kvstore.go
  3. 37 0
      mvcc/kvstore_bench_test.go
  4. 11 2
      mvcc/kvstore_test.go

+ 7 - 0
mvcc/index.go

@@ -30,6 +30,7 @@ type index interface {
 	RangeSince(key, end []byte, rev int64) []revision
 	RangeSince(key, end []byte, rev int64) []revision
 	Compact(rev int64) map[revision]struct{}
 	Compact(rev int64) map[revision]struct{}
 	Equal(b index) bool
 	Equal(b index) bool
+	Insert(ki *keyIndex)
 }
 }
 
 
 type treeIndex struct {
 type treeIndex struct {
@@ -215,3 +216,9 @@ func (a *treeIndex) Equal(bi index) bool {
 
 
 	return equal
 	return equal
 }
 }
+
+func (ti *treeIndex) Insert(ki *keyIndex) {
+	ti.Lock()
+	defer ti.Unlock()
+	ti.tree.ReplaceOrInsert(ki)
+}

+ 21 - 2
mvcc/kvstore.go

@@ -380,6 +380,11 @@ func (s *store) restore() error {
 
 
 	keyToLease := make(map[string]lease.LeaseID)
 	keyToLease := make(map[string]lease.LeaseID)
 
 
+	// use an unordered map to hold the temp index data to speed up
+	// the initial key index recovery.
+	// we will convert this unordered map into the tree index later.
+	unordered := make(map[string]*keyIndex, 100000)
+
 	// restore index
 	// restore index
 	tx := s.b.BatchTx()
 	tx := s.b.BatchTx()
 	tx.Lock()
 	tx.Lock()
@@ -402,11 +407,20 @@ func (s *store) restore() error {
 		// restore index
 		// restore index
 		switch {
 		switch {
 		case isTombstone(key):
 		case isTombstone(key):
-			s.kvindex.Tombstone(kv.Key, rev)
+			if ki, ok := unordered[string(kv.Key)]; ok {
+				ki.tombstone(rev.main, rev.sub)
+			}
 			delete(keyToLease, string(kv.Key))
 			delete(keyToLease, string(kv.Key))
 
 
 		default:
 		default:
-			s.kvindex.Restore(kv.Key, revision{kv.CreateRevision, 0}, rev, kv.Version)
+			ki, ok := unordered[string(kv.Key)]
+			if ok {
+				ki.put(rev.main, rev.sub)
+			} else {
+				ki = &keyIndex{key: kv.Key}
+				ki.restore(revision{kv.CreateRevision, 0}, rev, kv.Version)
+				unordered[string(kv.Key)] = ki
+			}
 
 
 			if lid := lease.LeaseID(kv.Lease); lid != lease.NoLease {
 			if lid := lease.LeaseID(kv.Lease); lid != lease.NoLease {
 				keyToLease[string(kv.Key)] = lid
 				keyToLease[string(kv.Key)] = lid
@@ -419,6 +433,11 @@ func (s *store) restore() error {
 		s.currentRev = rev
 		s.currentRev = rev
 	}
 	}
 
 
+	// restore the tree index from the unordered index.
+	for _, v := range unordered {
+		s.kvindex.Insert(v)
+	}
+
 	// keys in the range [compacted revision -N, compaction] might all be deleted due to compaction.
 	// keys in the range [compacted revision -N, compaction] might all be deleted due to compaction.
 	// the correct revision should be set to compaction revision in the case, not the largest revision
 	// the correct revision should be set to compaction revision in the case, not the largest revision
 	// we have seen.
 	// we have seen.

+ 37 - 0
mvcc/kvstore_bench_test.go

@@ -85,3 +85,40 @@ func BenchmarkStoreTxnPut(b *testing.B) {
 		s.TxnEnd(id)
 		s.TxnEnd(id)
 	}
 	}
 }
 }
+
+// benchmarkStoreRestore benchmarks the restore operation
+func benchmarkStoreRestore(revsPerKey int, b *testing.B) {
+	var i fakeConsistentIndex
+	be, tmpPath := backend.NewDefaultTmpBackend()
+	s := NewStore(be, &lease.FakeLessor{}, &i)
+	defer cleanup(s, be, tmpPath)
+
+	// arbitrary number of bytes
+	bytesN := 64
+	keys := createBytesSlice(bytesN, b.N)
+	vals := createBytesSlice(bytesN, b.N)
+
+	for i := 0; i < b.N; i++ {
+		for j := 0; j < revsPerKey; j++ {
+			id := s.TxnBegin()
+			if _, err := s.TxnPut(id, keys[i], vals[i], lease.NoLease); err != nil {
+				plog.Fatalf("txn put error: %v", err)
+			}
+			s.TxnEnd(id)
+		}
+	}
+	b.ResetTimer()
+	s = NewStore(be, &lease.FakeLessor{}, &i)
+}
+
+func BenchmarkStoreRestoreRevs1(b *testing.B) {
+	benchmarkStoreRestore(1, b)
+}
+
+func BenchmarkStoreRestoreRevs10(b *testing.B) {
+	benchmarkStoreRestore(10, b)
+}
+
+func BenchmarkStoreRestoreRevs20(b *testing.B) {
+	benchmarkStoreRestore(20, b)
+}

+ 11 - 2
mvcc/kvstore_test.go

@@ -405,9 +405,14 @@ func TestStoreRestore(t *testing.T) {
 	if g := b.tx.Action(); !reflect.DeepEqual(g, wact) {
 	if g := b.tx.Action(); !reflect.DeepEqual(g, wact) {
 		t.Errorf("tx actions = %+v, want %+v", g, wact)
 		t.Errorf("tx actions = %+v, want %+v", g, wact)
 	}
 	}
+
+	gens := []generation{
+		{created: revision{4, 0}, ver: 2, revs: []revision{{3, 0}, {5, 0}}},
+		{created: revision{0, 0}, ver: 0, revs: nil},
+	}
+	ki := &keyIndex{key: []byte("foo"), modified: revision{5, 0}, generations: gens}
 	wact = []testutil.Action{
 	wact = []testutil.Action{
-		{"restore", []interface{}{[]byte("foo"), revision{4, 0}, revision{3, 0}, int64(1)}},
-		{"tombstone", []interface{}{[]byte("foo"), revision{5, 0}}},
+		{"insert", []interface{}{ki}},
 	}
 	}
 	if g := fi.Action(); !reflect.DeepEqual(g, wact) {
 	if g := fi.Action(); !reflect.DeepEqual(g, wact) {
 		t.Errorf("index action = %+v, want %+v", g, wact)
 		t.Errorf("index action = %+v, want %+v", g, wact)
@@ -668,6 +673,10 @@ func (i *fakeIndex) Compact(rev int64) map[revision]struct{} {
 }
 }
 func (i *fakeIndex) Equal(b index) bool { return false }
 func (i *fakeIndex) Equal(b index) bool { return false }
 
 
+func (i *fakeIndex) Insert(ki *keyIndex) {
+	i.Recorder.Record(testutil.Action{Name: "insert", Params: []interface{}{ki}})
+}
+
 func createBytesSlice(bytesN, sliceN int) [][]byte {
 func createBytesSlice(bytesN, sliceN int) [][]byte {
 	rs := [][]byte{}
 	rs := [][]byte{}
 	for len(rs) != sliceN {
 	for len(rs) != sliceN {