Browse Source

Merge pull request #6633 from xiang90/fix_rev_inconsistency

mvcc: fix rev inconsistency
Xiang Li 9 years ago
parent
commit
a97866b629
2 changed files with 47 additions and 0 deletions
  1. 7 0
      mvcc/kvstore.go
  2. 40 0
      mvcc/kvstore_compaction_test.go

+ 7 - 0
mvcc/kvstore.go

@@ -415,6 +415,13 @@ func (s *store) restore() error {
 		s.currentRev = rev
 		s.currentRev = rev
 	}
 	}
 
 
+	// keys in the range [compacted revision -N, compaction] might all be deleted due to compaction.
+	// the correct revision should be set to compaction revision in the case, not the largest revision
+	// we have seen.
+	if s.currentRev.main < s.compactMainRev {
+		s.currentRev.main = s.compactMainRev
+	}
+
 	for key, lid := range keyToLease {
 	for key, lid := range keyToLease {
 		if s.le == nil {
 		if s.le == nil {
 			panic("no lessor to attach lease")
 			panic("no lessor to attach lease")

+ 40 - 0
mvcc/kvstore_compaction_test.go

@@ -15,8 +15,10 @@
 package mvcc
 package mvcc
 
 
 import (
 import (
+	"os"
 	"reflect"
 	"reflect"
 	"testing"
 	"testing"
+	"time"
 
 
 	"github.com/coreos/etcd/lease"
 	"github.com/coreos/etcd/lease"
 	"github.com/coreos/etcd/mvcc/backend"
 	"github.com/coreos/etcd/mvcc/backend"
@@ -93,3 +95,41 @@ func TestScheduleCompaction(t *testing.T) {
 		cleanup(s, b, tmpPath)
 		cleanup(s, b, tmpPath)
 	}
 	}
 }
 }
+
+func TestCompactAllAndRestore(t *testing.T) {
+	b, tmpPath := backend.NewDefaultTmpBackend()
+	s0 := NewStore(b, &lease.FakeLessor{}, nil)
+	defer os.Remove(tmpPath)
+
+	s0.Put([]byte("foo"), []byte("bar"), lease.NoLease)
+	s0.Put([]byte("foo"), []byte("bar1"), lease.NoLease)
+	s0.Put([]byte("foo"), []byte("bar2"), lease.NoLease)
+	s0.DeleteRange([]byte("foo"), nil)
+
+	rev := s0.Rev()
+	// compact all keys
+	done, err := s0.Compact(rev)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	select {
+	case <-done:
+	case <-time.After(10 * time.Second):
+		t.Fatal("timeout waiting for compaction to finish")
+	}
+
+	err = s0.Close()
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	s1 := NewStore(b, &lease.FakeLessor{}, nil)
+	if s1.Rev() != rev {
+		t.Errorf("rev = %v, want %v", s1.Rev(), rev)
+	}
+	_, err = s1.Range([]byte("foo"), nil, RangeOptions{})
+	if err != nil {
+		t.Errorf("unexpect range error %v", err)
+	}
+}