Browse Source

Merge pull request #7900 from heyitsanthony/chunk-restore

mvcc: chunk reads for restoring
Anthony Romano 8 years ago
parent
commit
343a018361
3 changed files with 89 additions and 62 deletions
  1. 79 59
      mvcc/kvstore.go
  2. 6 1
      mvcc/kvstore_bench_test.go
  3. 4 2
      mvcc/kvstore_test.go

+ 79 - 59
mvcc/kvstore.go

@@ -33,13 +33,6 @@ var (
 	keyBucketName  = []byte("key")
 	metaBucketName = []byte("meta")
 
-	// markedRevBytesLen is the byte length of marked revision.
-	// The first `revBytesLen` bytes represents a normal revision. The last
-	// one byte is the mark.
-	markedRevBytesLen      = revBytesLen + 1
-	markBytePosition       = markedRevBytesLen - 1
-	markTombstone     byte = 't'
-
 	consistentIndexKeyName  = []byte("consistent_index")
 	scheduledCompactKeyName = []byte("scheduledCompactRev")
 	finishedCompactKeyName  = []byte("finishedCompactRev")
@@ -52,6 +45,17 @@ var (
 	plog = capnslog.NewPackageLogger("github.com/coreos/etcd", "mvcc")
 )
 
+const (
+	// markedRevBytesLen is the byte length of marked revision.
+	// The first `revBytesLen` bytes represents a normal revision. The last
+	// one byte is the mark.
+	markedRevBytesLen      = revBytesLen + 1
+	markBytePosition       = markedRevBytesLen - 1
+	markTombstone     byte = 't'
+
+	restoreChunkKeys = 10000
+)
+
 // ConsistentIndexGetter is an interface that wraps the Get method.
 // Consistent index is the offset of an entry in a consistent replicated log.
 type ConsistentIndexGetter interface {
@@ -247,11 +251,6 @@ func (s *store) restore() error {
 
 	keyToLease := make(map[string]lease.LeaseID)
 
-	// use an unordered map to hold the temp index data to speed up
-	// the initial key index recovery.
-	// we will convert this unordered map into the tree index later.
-	unordered := make(map[string]*keyIndex, 100000)
-
 	// restore index
 	tx := s.b.BatchTx()
 	tx.Lock()
@@ -260,48 +259,41 @@ func (s *store) restore() error {
 		s.compactMainRev = bytesToRev(finishedCompactBytes[0]).main
 		plog.Printf("restore compact to %d", s.compactMainRev)
 	}
+	_, scheduledCompactBytes := tx.UnsafeRange(metaBucketName, scheduledCompactKeyName, nil, 0)
+	scheduledCompact := int64(0)
+	if len(scheduledCompactBytes) != 0 {
+		scheduledCompact = bytesToRev(scheduledCompactBytes[0]).main
+	}
 
-	// TODO: limit N to reduce max memory usage
-	keys, vals := tx.UnsafeRange(keyBucketName, min, max, 0)
-	for i, key := range keys {
-		var kv mvccpb.KeyValue
-		if err := kv.Unmarshal(vals[i]); err != nil {
-			plog.Fatalf("cannot unmarshal event: %v", err)
-		}
-
-		rev := bytesToRev(key[:revBytesLen])
-		s.currentRev = rev.main
-
-		// restore index
-		switch {
-		case isTombstone(key):
-			if ki, ok := unordered[string(kv.Key)]; ok {
-				ki.tombstone(rev.main, rev.sub)
-			}
-			delete(keyToLease, string(kv.Key))
-
-		default:
-			ki, ok := unordered[string(kv.Key)]
-			if ok {
-				ki.put(rev.main, rev.sub)
-			} else {
-				ki = &keyIndex{key: kv.Key}
-				ki.restore(revision{kv.CreateRevision, 0}, rev, kv.Version)
-				unordered[string(kv.Key)] = ki
-			}
-
-			if lid := lease.LeaseID(kv.Lease); lid != lease.NoLease {
-				keyToLease[string(kv.Key)] = lid
-			} else {
-				delete(keyToLease, string(kv.Key))
+	// index keys concurrently as they're loaded in from tx
+	unorderedc, donec := make(chan map[string]*keyIndex), make(chan struct{})
+	go func() {
+		defer close(donec)
+		for unordered := range unorderedc {
+			// restore the tree index from the unordered index.
+			for _, v := range unordered {
+				s.kvindex.Insert(v)
 			}
 		}
+	}()
+	for {
+		keys, vals := tx.UnsafeRange(keyBucketName, min, max, restoreChunkKeys)
+		if len(keys) == 0 {
+			break
+		}
+		// unbuffered so keys don't pile up in memory
+		unorderedc <- s.restoreChunk(keys, vals, keyToLease)
+		if len(keys) < restoreChunkKeys {
+			// partial set implies final set
+			break
+		}
+		// next set begins after where this one ended
+		newMin := bytesToRev(keys[len(keys)-1][:revBytesLen])
+		newMin.sub++
+		revToBytes(newMin, min)
 	}
-
-	// restore the tree index from the unordered index.
-	for _, v := range unordered {
-		s.kvindex.Insert(v)
-	}
+	close(unorderedc)
+	<-donec
 
 	// keys in the range [compacted revision -N, compaction] might all be deleted due to compaction.
 	// the correct revision should be set to compaction revision in the case, not the largest revision
@@ -309,6 +301,9 @@ func (s *store) restore() error {
 	if s.currentRev < s.compactMainRev {
 		s.currentRev = s.compactMainRev
 	}
+	if scheduledCompact <= s.compactMainRev {
+		scheduledCompact = 0
+	}
 
 	for key, lid := range keyToLease {
 		if s.le == nil {
@@ -320,15 +315,6 @@ func (s *store) restore() error {
 		}
 	}
 
-	_, scheduledCompactBytes := tx.UnsafeRange(metaBucketName, scheduledCompactKeyName, nil, 0)
-	scheduledCompact := int64(0)
-	if len(scheduledCompactBytes) != 0 {
-		scheduledCompact = bytesToRev(scheduledCompactBytes[0]).main
-		if scheduledCompact <= s.compactMainRev {
-			scheduledCompact = 0
-		}
-	}
-
 	tx.Unlock()
 
 	if scheduledCompact != 0 {
@@ -339,6 +325,40 @@ func (s *store) restore() error {
 	return nil
 }
 
+func (s *store) restoreChunk(keys, vals [][]byte, keyToLease map[string]lease.LeaseID) map[string]*keyIndex {
+	// assume half of keys are overwrites
+	unordered := make(map[string]*keyIndex, len(keys)/2)
+	for i, key := range keys {
+		var kv mvccpb.KeyValue
+		if err := kv.Unmarshal(vals[i]); err != nil {
+			plog.Fatalf("cannot unmarshal event: %v", err)
+		}
+		rev := bytesToRev(key[:revBytesLen])
+		s.currentRev = rev.main
+		kstr := string(kv.Key)
+		if isTombstone(key) {
+			if ki, ok := unordered[kstr]; ok {
+				ki.tombstone(rev.main, rev.sub)
+			}
+			delete(keyToLease, kstr)
+			continue
+		}
+		if ki, ok := unordered[kstr]; ok {
+			ki.put(rev.main, rev.sub)
+		} else {
+			ki = &keyIndex{key: kv.Key}
+			ki.restore(revision{kv.CreateRevision, 0}, rev, kv.Version)
+			unordered[kstr] = ki
+		}
+		if lid := lease.LeaseID(kv.Lease); lid != lease.NoLease {
+			keyToLease[kstr] = lid
+		} else {
+			delete(keyToLease, kstr)
+		}
+	}
+	return unordered
+}
+
 func (s *store) Close() error {
 	close(s.stopc)
 	s.fifoSched.Stop()

+ 6 - 1
mvcc/kvstore_bench_test.go

@@ -89,7 +89,8 @@ func benchmarkStoreRestore(revsPerKey int, b *testing.B) {
 	var i fakeConsistentIndex
 	be, tmpPath := backend.NewDefaultTmpBackend()
 	s := NewStore(be, &lease.FakeLessor{}, &i)
-	defer cleanup(s, be, tmpPath)
+	// use closure to capture 's' to pick up the reassignment
+	defer func() { cleanup(s, be, tmpPath) }()
 
 	// arbitrary number of bytes
 	bytesN := 64
@@ -103,7 +104,11 @@ func benchmarkStoreRestore(revsPerKey int, b *testing.B) {
 			txn.End()
 		}
 	}
+	s.Close()
+
+	b.ReportAllocs()
 	b.ResetTimer()
+	s = NewStore(be, &lease.FakeLessor{}, &i)
 }
 
 func BenchmarkStoreRestoreRevs1(b *testing.B) {

+ 4 - 2
mvcc/kvstore_test.go

@@ -373,9 +373,11 @@ func TestStoreRestore(t *testing.T) {
 		t.Fatal(err)
 	}
 	b.tx.rangeRespc <- rangeResp{[][]byte{finishedCompactKeyName}, [][]byte{newTestRevBytes(revision{3, 0})}}
-	b.tx.rangeRespc <- rangeResp{[][]byte{putkey, delkey}, [][]byte{putkvb, delkvb}}
 	b.tx.rangeRespc <- rangeResp{[][]byte{scheduledCompactKeyName}, [][]byte{newTestRevBytes(revision{3, 0})}}
 
+	b.tx.rangeRespc <- rangeResp{[][]byte{putkey, delkey}, [][]byte{putkvb, delkvb}}
+	b.tx.rangeRespc <- rangeResp{nil, nil}
+
 	s.restore()
 
 	if s.compactMainRev != 3 {
@@ -386,8 +388,8 @@ func TestStoreRestore(t *testing.T) {
 	}
 	wact := []testutil.Action{
 		{"range", []interface{}{metaBucketName, finishedCompactKeyName, []byte(nil), int64(0)}},
-		{"range", []interface{}{keyBucketName, newTestRevBytes(revision{1, 0}), newTestRevBytes(revision{math.MaxInt64, math.MaxInt64}), int64(0)}},
 		{"range", []interface{}{metaBucketName, scheduledCompactKeyName, []byte(nil), int64(0)}},
+		{"range", []interface{}{keyBucketName, newTestRevBytes(revision{1, 0}), newTestRevBytes(revision{math.MaxInt64, math.MaxInt64}), int64(restoreChunkKeys)}},
 	}
 	if g := b.tx.Action(); !reflect.DeepEqual(g, wact) {
 		t.Errorf("tx actions = %+v, want %+v", g, wact)