Browse Source

mvcc: chunk reads for restoring

Loading all keys at once would cause etcd to use twice as much
memory than it would need to serve the keys, causing RSS to spike on
boot. Instead, load the keys into the mvcc by chunk. Uses pipelining
for some concurrency.

Fixes #7822
Anthony Romano 8 years ago
parent
commit
163fd2d76b
2 changed files with 83 additions and 61 deletions
  1. 79 59
      mvcc/kvstore.go
  2. 4 2
      mvcc/kvstore_test.go

+ 79 - 59
mvcc/kvstore.go

@@ -33,13 +33,6 @@ var (
 	keyBucketName  = []byte("key")
 	keyBucketName  = []byte("key")
 	metaBucketName = []byte("meta")
 	metaBucketName = []byte("meta")
 
 
-	// markedRevBytesLen is the byte length of marked revision.
-	// The first `revBytesLen` bytes represents a normal revision. The last
-	// one byte is the mark.
-	markedRevBytesLen      = revBytesLen + 1
-	markBytePosition       = markedRevBytesLen - 1
-	markTombstone     byte = 't'
-
 	consistentIndexKeyName  = []byte("consistent_index")
 	consistentIndexKeyName  = []byte("consistent_index")
 	scheduledCompactKeyName = []byte("scheduledCompactRev")
 	scheduledCompactKeyName = []byte("scheduledCompactRev")
 	finishedCompactKeyName  = []byte("finishedCompactRev")
 	finishedCompactKeyName  = []byte("finishedCompactRev")
@@ -52,6 +45,17 @@ var (
 	plog = capnslog.NewPackageLogger("github.com/coreos/etcd", "mvcc")
 	plog = capnslog.NewPackageLogger("github.com/coreos/etcd", "mvcc")
 )
 )
 
 
+const (
+	// markedRevBytesLen is the byte length of marked revision.
+	// The first `revBytesLen` bytes represents a normal revision. The last
+	// one byte is the mark.
+	markedRevBytesLen      = revBytesLen + 1
+	markBytePosition       = markedRevBytesLen - 1
+	markTombstone     byte = 't'
+
+	restoreChunkKeys = 10000
+)
+
 // ConsistentIndexGetter is an interface that wraps the Get method.
 // ConsistentIndexGetter is an interface that wraps the Get method.
 // Consistent index is the offset of an entry in a consistent replicated log.
 // Consistent index is the offset of an entry in a consistent replicated log.
 type ConsistentIndexGetter interface {
 type ConsistentIndexGetter interface {
@@ -247,11 +251,6 @@ func (s *store) restore() error {
 
 
 	keyToLease := make(map[string]lease.LeaseID)
 	keyToLease := make(map[string]lease.LeaseID)
 
 
-	// use an unordered map to hold the temp index data to speed up
-	// the initial key index recovery.
-	// we will convert this unordered map into the tree index later.
-	unordered := make(map[string]*keyIndex, 100000)
-
 	// restore index
 	// restore index
 	tx := s.b.BatchTx()
 	tx := s.b.BatchTx()
 	tx.Lock()
 	tx.Lock()
@@ -260,48 +259,41 @@ func (s *store) restore() error {
 		s.compactMainRev = bytesToRev(finishedCompactBytes[0]).main
 		s.compactMainRev = bytesToRev(finishedCompactBytes[0]).main
 		plog.Printf("restore compact to %d", s.compactMainRev)
 		plog.Printf("restore compact to %d", s.compactMainRev)
 	}
 	}
+	_, scheduledCompactBytes := tx.UnsafeRange(metaBucketName, scheduledCompactKeyName, nil, 0)
+	scheduledCompact := int64(0)
+	if len(scheduledCompactBytes) != 0 {
+		scheduledCompact = bytesToRev(scheduledCompactBytes[0]).main
+	}
 
 
-	// TODO: limit N to reduce max memory usage
-	keys, vals := tx.UnsafeRange(keyBucketName, min, max, 0)
-	for i, key := range keys {
-		var kv mvccpb.KeyValue
-		if err := kv.Unmarshal(vals[i]); err != nil {
-			plog.Fatalf("cannot unmarshal event: %v", err)
-		}
-
-		rev := bytesToRev(key[:revBytesLen])
-		s.currentRev = rev.main
-
-		// restore index
-		switch {
-		case isTombstone(key):
-			if ki, ok := unordered[string(kv.Key)]; ok {
-				ki.tombstone(rev.main, rev.sub)
-			}
-			delete(keyToLease, string(kv.Key))
-
-		default:
-			ki, ok := unordered[string(kv.Key)]
-			if ok {
-				ki.put(rev.main, rev.sub)
-			} else {
-				ki = &keyIndex{key: kv.Key}
-				ki.restore(revision{kv.CreateRevision, 0}, rev, kv.Version)
-				unordered[string(kv.Key)] = ki
-			}
-
-			if lid := lease.LeaseID(kv.Lease); lid != lease.NoLease {
-				keyToLease[string(kv.Key)] = lid
-			} else {
-				delete(keyToLease, string(kv.Key))
+	// index keys concurrently as they're loaded in from tx
+	unorderedc, donec := make(chan map[string]*keyIndex), make(chan struct{})
+	go func() {
+		defer close(donec)
+		for unordered := range unorderedc {
+			// restore the tree index from the unordered index.
+			for _, v := range unordered {
+				s.kvindex.Insert(v)
 			}
 			}
 		}
 		}
+	}()
+	for {
+		keys, vals := tx.UnsafeRange(keyBucketName, min, max, restoreChunkKeys)
+		if len(keys) == 0 {
+			break
+		}
+		// unbuffered so keys don't pile up in memory
+		unorderedc <- s.restoreChunk(keys, vals, keyToLease)
+		if len(keys) < restoreChunkKeys {
+			// partial set implies final set
+			break
+		}
+		// next set begins after where this one ended
+		newMin := bytesToRev(keys[len(keys)-1][:revBytesLen])
+		newMin.sub++
+		revToBytes(newMin, min)
 	}
 	}
-
-	// restore the tree index from the unordered index.
-	for _, v := range unordered {
-		s.kvindex.Insert(v)
-	}
+	close(unorderedc)
+	<-donec
 
 
 	// keys in the range [compacted revision -N, compaction] might all be deleted due to compaction.
 	// keys in the range [compacted revision -N, compaction] might all be deleted due to compaction.
 	// the correct revision should be set to compaction revision in the case, not the largest revision
 	// the correct revision should be set to compaction revision in the case, not the largest revision
@@ -309,6 +301,9 @@ func (s *store) restore() error {
 	if s.currentRev < s.compactMainRev {
 	if s.currentRev < s.compactMainRev {
 		s.currentRev = s.compactMainRev
 		s.currentRev = s.compactMainRev
 	}
 	}
+	if scheduledCompact <= s.compactMainRev {
+		scheduledCompact = 0
+	}
 
 
 	for key, lid := range keyToLease {
 	for key, lid := range keyToLease {
 		if s.le == nil {
 		if s.le == nil {
@@ -320,15 +315,6 @@ func (s *store) restore() error {
 		}
 		}
 	}
 	}
 
 
-	_, scheduledCompactBytes := tx.UnsafeRange(metaBucketName, scheduledCompactKeyName, nil, 0)
-	scheduledCompact := int64(0)
-	if len(scheduledCompactBytes) != 0 {
-		scheduledCompact = bytesToRev(scheduledCompactBytes[0]).main
-		if scheduledCompact <= s.compactMainRev {
-			scheduledCompact = 0
-		}
-	}
-
 	tx.Unlock()
 	tx.Unlock()
 
 
 	if scheduledCompact != 0 {
 	if scheduledCompact != 0 {
@@ -339,6 +325,40 @@ func (s *store) restore() error {
 	return nil
 	return nil
 }
 }
 
 
+func (s *store) restoreChunk(keys, vals [][]byte, keyToLease map[string]lease.LeaseID) map[string]*keyIndex {
+	// assume half of keys are overwrites
+	unordered := make(map[string]*keyIndex, len(keys)/2)
+	for i, key := range keys {
+		var kv mvccpb.KeyValue
+		if err := kv.Unmarshal(vals[i]); err != nil {
+			plog.Fatalf("cannot unmarshal event: %v", err)
+		}
+		rev := bytesToRev(key[:revBytesLen])
+		s.currentRev = rev.main
+		kstr := string(kv.Key)
+		if isTombstone(key) {
+			if ki, ok := unordered[kstr]; ok {
+				ki.tombstone(rev.main, rev.sub)
+			}
+			delete(keyToLease, kstr)
+			continue
+		}
+		if ki, ok := unordered[kstr]; ok {
+			ki.put(rev.main, rev.sub)
+		} else {
+			ki = &keyIndex{key: kv.Key}
+			ki.restore(revision{kv.CreateRevision, 0}, rev, kv.Version)
+			unordered[kstr] = ki
+		}
+		if lid := lease.LeaseID(kv.Lease); lid != lease.NoLease {
+			keyToLease[kstr] = lid
+		} else {
+			delete(keyToLease, kstr)
+		}
+	}
+	return unordered
+}
+
 func (s *store) Close() error {
 func (s *store) Close() error {
 	close(s.stopc)
 	close(s.stopc)
 	s.fifoSched.Stop()
 	s.fifoSched.Stop()

+ 4 - 2
mvcc/kvstore_test.go

@@ -373,9 +373,11 @@ func TestStoreRestore(t *testing.T) {
 		t.Fatal(err)
 		t.Fatal(err)
 	}
 	}
 	b.tx.rangeRespc <- rangeResp{[][]byte{finishedCompactKeyName}, [][]byte{newTestRevBytes(revision{3, 0})}}
 	b.tx.rangeRespc <- rangeResp{[][]byte{finishedCompactKeyName}, [][]byte{newTestRevBytes(revision{3, 0})}}
-	b.tx.rangeRespc <- rangeResp{[][]byte{putkey, delkey}, [][]byte{putkvb, delkvb}}
 	b.tx.rangeRespc <- rangeResp{[][]byte{scheduledCompactKeyName}, [][]byte{newTestRevBytes(revision{3, 0})}}
 	b.tx.rangeRespc <- rangeResp{[][]byte{scheduledCompactKeyName}, [][]byte{newTestRevBytes(revision{3, 0})}}
 
 
+	b.tx.rangeRespc <- rangeResp{[][]byte{putkey, delkey}, [][]byte{putkvb, delkvb}}
+	b.tx.rangeRespc <- rangeResp{nil, nil}
+
 	s.restore()
 	s.restore()
 
 
 	if s.compactMainRev != 3 {
 	if s.compactMainRev != 3 {
@@ -386,8 +388,8 @@ func TestStoreRestore(t *testing.T) {
 	}
 	}
 	wact := []testutil.Action{
 	wact := []testutil.Action{
 		{"range", []interface{}{metaBucketName, finishedCompactKeyName, []byte(nil), int64(0)}},
 		{"range", []interface{}{metaBucketName, finishedCompactKeyName, []byte(nil), int64(0)}},
-		{"range", []interface{}{keyBucketName, newTestRevBytes(revision{1, 0}), newTestRevBytes(revision{math.MaxInt64, math.MaxInt64}), int64(0)}},
 		{"range", []interface{}{metaBucketName, scheduledCompactKeyName, []byte(nil), int64(0)}},
 		{"range", []interface{}{metaBucketName, scheduledCompactKeyName, []byte(nil), int64(0)}},
+		{"range", []interface{}{keyBucketName, newTestRevBytes(revision{1, 0}), newTestRevBytes(revision{math.MaxInt64, math.MaxInt64}), int64(restoreChunkKeys)}},
 	}
 	}
 	if g := b.tx.Action(); !reflect.DeepEqual(g, wact) {
 	if g := b.tx.Action(); !reflect.DeepEqual(g, wact) {
 		t.Errorf("tx actions = %+v, want %+v", g, wact)
 		t.Errorf("tx actions = %+v, want %+v", g, wact)