Ver código fonte

Merge pull request #8333 from fanminshi/retrieve_keep_from_index

mvcc: fix TestHashKVWhenCompacting hash mismatch
Anthony Romano 8 anos atrás
pai
commit
585b1d7bdc
6 arquivos alterados com 126 adições e 55 exclusões
  1. 14 0
      mvcc/index.go
  2. 11 5
      mvcc/index_test.go
  3. 43 18
      mvcc/key_index.go
  4. 48 4
      mvcc/key_index_test.go
  5. 5 27
      mvcc/kvstore.go
  6. 5 1
      mvcc/kvstore_test.go

+ 14 - 0
mvcc/index.go

@@ -28,6 +28,7 @@ type index interface {
 	Tombstone(key []byte, rev revision) error
 	RangeSince(key, end []byte, rev int64) []revision
 	Compact(rev int64) map[revision]struct{}
+	Keep(rev int64) map[revision]struct{}
 	Equal(b index) bool
 
 	Insert(ki *keyIndex)
@@ -179,6 +180,19 @@ func (ti *treeIndex) Compact(rev int64) map[revision]struct{} {
 	return available
 }
 
+// Keep finds all revisions to be kept for a Compaction at the given rev.
+func (ti *treeIndex) Keep(rev int64) map[revision]struct{} {
+	available := make(map[revision]struct{})
+	ti.RLock()
+	defer ti.RUnlock()
+	ti.tree.Ascend(func(i btree.Item) bool {
+		keyi := i.(*keyIndex)
+		keyi.keep(rev, available)
+		return true
+	})
+	return available
+}
+
 func compactIndex(rev int64, available map[revision]struct{}, emptyki *[]*keyIndex) func(i btree.Item) bool {
 	return func(i btree.Item) bool {
 		keyi := i.(*keyIndex)

+ 11 - 5
mvcc/index_test.go

@@ -193,7 +193,7 @@ func TestIndexRangeSince(t *testing.T) {
 	}
 }
 
-func TestIndexCompact(t *testing.T) {
+func TestIndexCompactAndKeep(t *testing.T) {
 	maxRev := int64(20)
 	tests := []struct {
 		key     []byte
@@ -215,7 +215,7 @@ func TestIndexCompact(t *testing.T) {
 		{[]byte("foo1"), false, revision{10, 1}, revision{10, 1}, 1},
 	}
 
-	// Continuous Compact
+	// Continuous Compact and Keep
 	ti := newTreeIndex()
 	for _, tt := range tests {
 		if tt.remove {
@@ -226,7 +226,10 @@ func TestIndexCompact(t *testing.T) {
 	}
 	for i := int64(1); i < maxRev; i++ {
 		am := ti.Compact(i)
-
+		keep := ti.Keep(i)
+		if !(reflect.DeepEqual(am, keep)) {
+			t.Errorf("#%d: compact keep %v != Keep keep %v", i, am, keep)
+		}
 		wti := &treeIndex{tree: btree.New(32)}
 		for _, tt := range tests {
 			if _, ok := am[tt.rev]; ok || tt.rev.GreaterThan(revision{main: i}) {
@@ -242,7 +245,7 @@ func TestIndexCompact(t *testing.T) {
 		}
 	}
 
-	// Once Compact
+	// Once Compact and Keep
 	for i := int64(1); i < maxRev; i++ {
 		ti := newTreeIndex()
 		for _, tt := range tests {
@@ -253,7 +256,10 @@ func TestIndexCompact(t *testing.T) {
 			}
 		}
 		am := ti.Compact(i)
-
+		keep := ti.Keep(i)
+		if !(reflect.DeepEqual(am, keep)) {
+			t.Errorf("#%d: compact keep %v != Keep keep %v", i, am, keep)
+		}
 		wti := &treeIndex{tree: btree.New(32)}
 		for _, tt := range tests {
 			if _, ok := am[tt.rev]; ok || tt.rev.GreaterThan(revision{main: i}) {

+ 43 - 18
mvcc/key_index.go

@@ -187,6 +187,42 @@ func (ki *keyIndex) compact(atRev int64, available map[revision]struct{}) {
 		plog.Panicf("store.keyindex: unexpected compact on empty keyIndex %s", string(ki.key))
 	}
 
+	genIdx, revIndex := ki.doCompact(atRev, available)
+
+	g := &ki.generations[genIdx]
+	if !g.isEmpty() {
+		// remove the previous contents.
+		if revIndex != -1 {
+			g.revs = g.revs[revIndex:]
+		}
+		// remove any tombstone
+		if len(g.revs) == 1 && genIdx != len(ki.generations)-1 {
+			delete(available, g.revs[0])
+			genIdx++
+		}
+	}
+
+	// remove the previous generations.
+	ki.generations = ki.generations[genIdx:]
+}
+
+// keep finds the revision to be kept if compact is called at given atRev.
+func (ki *keyIndex) keep(atRev int64, available map[revision]struct{}) {
+	if ki.isEmpty() {
+		return
+	}
+
+	genIdx, revIndex := ki.doCompact(atRev, available)
+	g := &ki.generations[genIdx]
+	if !g.isEmpty() {
+		// remove any tombstone
+		if revIndex == len(g.revs)-1 && genIdx != len(ki.generations)-1 {
+			delete(available, g.revs[revIndex])
+		}
+	}
+}
+
+func (ki *keyIndex) doCompact(atRev int64, available map[revision]struct{}) (genIdx int, revIndex int) {
 	// walk until reaching the first revision that has an revision smaller or equal to
 	// the atRev.
 	// add it to the available map
@@ -198,30 +234,19 @@ func (ki *keyIndex) compact(atRev int64, available map[revision]struct{}) {
 		return true
 	}
 
-	i, g := 0, &ki.generations[0]
+	genIdx, g := 0, &ki.generations[0]
 	// find first generation includes atRev or created after atRev
-	for i < len(ki.generations)-1 {
+	for genIdx < len(ki.generations)-1 {
 		if tomb := g.revs[len(g.revs)-1].main; tomb > atRev {
 			break
 		}
-		i++
-		g = &ki.generations[i]
+		genIdx++
+		g = &ki.generations[genIdx]
 	}
 
-	if !g.isEmpty() {
-		n := g.walk(f)
-		// remove the previous contents.
-		if n != -1 {
-			g.revs = g.revs[n:]
-		}
-		// remove any tombstone
-		if len(g.revs) == 1 && i != len(ki.generations)-1 {
-			delete(available, g.revs[0])
-			i++
-		}
-	}
-	// remove the previous generations.
-	ki.generations = ki.generations[i:]
+	revIndex = g.walk(f)
+
+	return genIdx, revIndex
 }
 
 func (ki *keyIndex) isEmpty() bool {

+ 48 - 4
mvcc/key_index_test.go

@@ -205,7 +205,7 @@ func TestKeyIndexTombstone(t *testing.T) {
 	}
 }
 
-func TestKeyIndexCompact(t *testing.T) {
+func TestKeyIndexCompactAndKeep(t *testing.T) {
 	tests := []struct {
 		compact int64
 
@@ -441,10 +441,19 @@ func TestKeyIndexCompact(t *testing.T) {
 		},
 	}
 
-	// Continuous Compaction
+	// Continuous Compaction and finding Keep
 	ki := newTestKeyIndex()
 	for i, tt := range tests {
 		am := make(map[revision]struct{})
+		kiclone := cloneKeyIndex(ki)
+		ki.keep(tt.compact, am)
+		if !reflect.DeepEqual(ki, kiclone) {
+			t.Errorf("#%d: ki = %+v, want %+v", i, ki, kiclone)
+		}
+		if !reflect.DeepEqual(am, tt.wam) {
+			t.Errorf("#%d: am = %+v, want %+v", i, am, tt.wam)
+		}
+		am = make(map[revision]struct{})
 		ki.compact(tt.compact, am)
 		if !reflect.DeepEqual(ki, tt.wki) {
 			t.Errorf("#%d: ki = %+v, want %+v", i, ki, tt.wki)
@@ -454,11 +463,20 @@ func TestKeyIndexCompact(t *testing.T) {
 		}
 	}
 
-	// Jump Compaction
+	// Jump Compaction and finding Keep
 	ki = newTestKeyIndex()
 	for i, tt := range tests {
 		if (i%2 == 0 && i < 6) || (i%2 == 1 && i > 6) {
 			am := make(map[revision]struct{})
+			kiclone := cloneKeyIndex(ki)
+			ki.keep(tt.compact, am)
+			if !reflect.DeepEqual(ki, kiclone) {
+				t.Errorf("#%d: ki = %+v, want %+v", i, ki, kiclone)
+			}
+			if !reflect.DeepEqual(am, tt.wam) {
+				t.Errorf("#%d: am = %+v, want %+v", i, am, tt.wam)
+			}
+			am = make(map[revision]struct{})
 			ki.compact(tt.compact, am)
 			if !reflect.DeepEqual(ki, tt.wki) {
 				t.Errorf("#%d: ki = %+v, want %+v", i, ki, tt.wki)
@@ -469,10 +487,19 @@ func TestKeyIndexCompact(t *testing.T) {
 		}
 	}
 
-	// Once Compaction
+	kiClone := newTestKeyIndex()
+	// Once Compaction and finding Keep
 	for i, tt := range tests {
 		ki := newTestKeyIndex()
 		am := make(map[revision]struct{})
+		ki.keep(tt.compact, am)
+		if !reflect.DeepEqual(ki, kiClone) {
+			t.Errorf("#%d: ki = %+v, want %+v", i, ki, kiClone)
+		}
+		if !reflect.DeepEqual(am, tt.wam) {
+			t.Errorf("#%d: am = %+v, want %+v", i, am, tt.wam)
+		}
+		am = make(map[revision]struct{})
 		ki.compact(tt.compact, am)
 		if !reflect.DeepEqual(ki, tt.wki) {
 			t.Errorf("#%d: ki = %+v, want %+v", i, ki, tt.wki)
@@ -483,6 +510,23 @@ func TestKeyIndexCompact(t *testing.T) {
 	}
 }
 
+func cloneKeyIndex(ki *keyIndex) *keyIndex {
+	generations := make([]generation, len(ki.generations))
+	for i, gen := range ki.generations {
+		generations[i] = *cloneGeneration(&gen)
+	}
+	return &keyIndex{ki.key, ki.modified, generations}
+}
+
+func cloneGeneration(g *generation) *generation {
+	if g.revs == nil {
+		return &generation{g.ver, g.created, nil}
+	}
+	tmp := make([]revision, len(g.revs))
+	copy(tmp, g.revs)
+	return &generation{g.ver, g.created, tmp}
+}
+
 // test that compact on version that higher than last modified version works well
 func TestKeyIndexCompactOnFurtherRev(t *testing.T) {
 	ki := &keyIndex{key: []byte("foo")}

+ 5 - 27
mvcc/kvstore.go

@@ -45,8 +45,6 @@ var (
 	ErrClosed    = errors.New("mvcc: closed")
 
 	plog = capnslog.NewPackageLogger("github.com/coreos/etcd", "mvcc")
-
-	emptyKeep = make(map[revision]struct{})
 )
 
 const (
@@ -101,12 +99,6 @@ type store struct {
 	fifoSched schedule.Scheduler
 
 	stopc chan struct{}
-
-	// keepMu protects keep
-	keepMu sync.RWMutex
-	// keep contains all revisions <= compactMainRev to be kept for the
-	// ongoing compaction; nil otherwise.
-	keep map[revision]struct{}
 }
 
 // NewStore returns a new store. It is useful to create a store inside
@@ -170,33 +162,25 @@ func (s *store) Hash() (hash uint32, revision int64, err error) {
 }
 
 func (s *store) HashByRev(rev int64) (hash uint32, currentRev int64, compactRev int64, err error) {
-	s.mu.Lock()
+	s.mu.RLock()
 	s.revMu.RLock()
 	compactRev, currentRev = s.compactMainRev, s.currentRev
 	s.revMu.RUnlock()
 
 	if rev > 0 && rev <= compactRev {
-		s.mu.Unlock()
+		s.mu.RUnlock()
 		return 0, 0, compactRev, ErrCompacted
 	} else if rev > 0 && rev > currentRev {
-		s.mu.Unlock()
+		s.mu.RUnlock()
 		return 0, currentRev, 0, ErrFutureRev
 	}
 
-	s.keepMu.Lock()
-	if s.keep == nil {
-		// ForceCommit ensures that txnRead begins after backend
-		// has committed all the changes from the prev completed compaction.
-		s.b.ForceCommit()
-		s.keep = emptyKeep
-	}
-	keep := s.keep
-	s.keepMu.Unlock()
+	keep := s.kvindex.Keep(rev)
 
 	tx := s.b.ReadTx()
 	tx.Lock()
 	defer tx.Unlock()
-	s.mu.Unlock()
+	s.mu.RUnlock()
 
 	if rev == 0 {
 		rev = currentRev
@@ -257,9 +241,6 @@ func (s *store) Compact(rev int64) (<-chan struct{}, error) {
 	s.b.ForceCommit()
 
 	keep := s.kvindex.Compact(rev)
-	s.keepMu.Lock()
-	s.keep = keep
-	s.keepMu.Unlock()
 	ch := make(chan struct{})
 	var j = func(ctx context.Context) {
 		if ctx.Err() != nil {
@@ -271,9 +252,6 @@ func (s *store) Compact(rev int64) (<-chan struct{}, error) {
 			return
 		}
 		close(ch)
-		s.keepMu.Lock()
-		s.keep = nil
-		s.keepMu.Unlock()
 	}
 
 	s.fifoSched.Schedule(j)

+ 5 - 1
mvcc/kvstore_test.go

@@ -522,7 +522,7 @@ func TestHashKVWhenCompacting(t *testing.T) {
 	s := NewStore(b, &lease.FakeLessor{}, nil)
 	defer os.Remove(tmpPath)
 
-	rev := 1000
+	rev := 10000
 	for i := 2; i <= rev; i++ {
 		s.Put([]byte("foo"), []byte(fmt.Sprintf("bar%d", i)), lease.NoLease)
 	}
@@ -767,6 +767,10 @@ func (i *fakeIndex) Compact(rev int64) map[revision]struct{} {
 	i.Recorder.Record(testutil.Action{Name: "compact", Params: []interface{}{rev}})
 	return <-i.indexCompactRespc
 }
+func (i *fakeIndex) Keep(rev int64) map[revision]struct{} {
+	i.Recorder.Record(testutil.Action{Name: "keep", Params: []interface{}{rev}})
+	return <-i.indexCompactRespc
+}
 func (i *fakeIndex) Equal(b index) bool { return false }
 
 func (i *fakeIndex) Insert(ki *keyIndex) {