Browse Source

mvcc: test restore and deletes with small chunk sizes

Anthony Romano 8 years ago
parent
commit
02164874d9
2 changed files with 55 additions and 3 deletions
  1. 3 3
      mvcc/kvstore.go
  2. 52 0
      mvcc/kvstore_test.go

+ 3 - 3
mvcc/kvstore.go

@@ -53,10 +53,10 @@ const (
 	markedRevBytesLen      = revBytesLen + 1
 	markBytePosition       = markedRevBytesLen - 1
 	markTombstone     byte = 't'
-
-	restoreChunkKeys = 10000
 )
 
+var restoreChunkKeys = 10000 // non-const for testing
+
 // ConsistentIndexGetter is an interface that wraps the Get method.
 // Consistent index is the offset of an entry in a consistent replicated log.
 type ConsistentIndexGetter interface {
@@ -286,7 +286,7 @@ func (s *store) restore() error {
 		}
 	}()
 	for {
-		keys, vals := tx.UnsafeRange(keyBucketName, min, max, restoreChunkKeys)
+		keys, vals := tx.UnsafeRange(keyBucketName, min, max, int64(restoreChunkKeys))
 		if len(keys) == 0 {
 			break
 		}

+ 52 - 0
mvcc/kvstore_test.go

@@ -17,7 +17,9 @@ package mvcc
 import (
 	"crypto/rand"
 	"encoding/binary"
+	"fmt"
 	"math"
+	mrand "math/rand"
 	"os"
 	"reflect"
 	"testing"
@@ -408,6 +410,56 @@ func TestStoreRestore(t *testing.T) {
 	}
 }
 
+func TestRestoreDelete(t *testing.T) {
+	oldChunk := restoreChunkKeys
+	restoreChunkKeys = mrand.Intn(3) + 2
+	defer func() { restoreChunkKeys = oldChunk }()
+
+	b, tmpPath := backend.NewDefaultTmpBackend()
+	s := NewStore(b, &lease.FakeLessor{}, nil)
+	defer os.Remove(tmpPath)
+
+	keys := make(map[string]struct{})
+	for i := 0; i < 20; i++ {
+		ks := fmt.Sprintf("foo-%d", i)
+		k := []byte(ks)
+		s.Put(k, []byte("bar"), lease.NoLease)
+		keys[ks] = struct{}{}
+		switch mrand.Intn(3) {
+		case 0:
+			// put random key from past via random range on map
+			ks = fmt.Sprintf("foo-%d", mrand.Intn(i+1))
+			s.Put([]byte(ks), []byte("baz"), lease.NoLease)
+			keys[ks] = struct{}{}
+		case 1:
+			// delete random key via random range on map
+			for k := range keys {
+				s.DeleteRange([]byte(k), nil)
+				delete(keys, k)
+				break
+			}
+		}
+	}
+	s.Close()
+
+	s = NewStore(b, &lease.FakeLessor{}, nil)
+	defer s.Close()
+	for i := 0; i < 20; i++ {
+		ks := fmt.Sprintf("foo-%d", i)
+		r, err := s.Range([]byte(ks), nil, RangeOptions{})
+		if err != nil {
+			t.Fatal(err)
+		}
+		if _, ok := keys[ks]; ok {
+			if len(r.KVs) == 0 {
+				t.Errorf("#%d: expected %q, got deleted", i, ks)
+			}
+		} else if len(r.KVs) != 0 {
+			t.Errorf("#%d: expected deleted, got %q", i, ks)
+		}
+	}
+}
+
 func TestRestoreContinueUnfinishedCompaction(t *testing.T) {
 	b, tmpPath := backend.NewDefaultTmpBackend()
 	s0 := NewStore(b, &lease.FakeLessor{}, nil)