Browse Source

mvcc: fetch revisions with current revision, not 0, in HashByRev

It was getting revisions with "atRev==0", which makes
"available" from "keep" method always empty since
"walk" on "keyIndex" only returns true.

"available" should be populated with all revisions to be
kept if the compaction happens with the given revision.
But, "available" was being empty when "kvindex.Keep(0)"
since it's always the case that "rev.main > atRev==0".

Fix https://github.com/coreos/etcd/issues/9022.

Signed-off-by: Gyuho Lee <gyuhox@gmail.com>
Gyuho Lee 8 years ago
parent
commit
2e95ace82b
2 changed files with 32 additions and 4 deletions
  1. 3 4
      mvcc/kvstore.go
  2. 29 0
      mvcc/kvstore_test.go

+ 3 - 4
mvcc/kvstore.go

@@ -175,6 +175,9 @@ func (s *store) HashByRev(rev int64) (hash uint32, currentRev int64, compactRev
 		return 0, currentRev, 0, ErrFutureRev
 	}
 
+	if rev == 0 {
+		rev = currentRev
+	}
 	keep := s.kvindex.Keep(rev)
 
 	tx := s.b.ReadTx()
@@ -182,10 +185,6 @@ func (s *store) HashByRev(rev int64) (hash uint32, currentRev int64, compactRev
 	defer tx.Unlock()
 	s.mu.RUnlock()
 
-	if rev == 0 {
-		rev = currentRev
-	}
-
 	upper := revision{main: rev + 1}
 	lower := revision{main: compactRev + 1}
 	h := crc32.New(crc32.MakeTable(crc32.Castagnoli))

+ 29 - 0
mvcc/kvstore_test.go

@@ -583,6 +583,35 @@ func TestHashKVWhenCompacting(t *testing.T) {
 	}
 }
 
+// TestHashKVZeroRevision ensures that "HashByRev(0)" computes
+// correct hash value with latest revision.
+func TestHashKVZeroRevision(t *testing.T) {
+	b, tmpPath := backend.NewDefaultTmpBackend()
+	s := NewStore(b, &lease.FakeLessor{}, nil)
+	defer os.Remove(tmpPath)
+
+	rev := 1000
+	for i := 2; i <= rev; i++ {
+		s.Put([]byte("foo"), []byte(fmt.Sprintf("bar%d", i)), lease.NoLease)
+	}
+	if _, err := s.Compact(int64(rev / 2)); err != nil {
+		t.Fatal(err)
+	}
+
+	hash1, _, _, err := s.HashByRev(int64(rev))
+	if err != nil {
+		t.Fatal(err)
+	}
+	var hash2 uint32
+	hash2, _, _, err = s.HashByRev(0)
+	if err != nil {
+		t.Fatal(err)
+	}
+	if hash1 != hash2 {
+		t.Errorf("hash %d (rev %d) != hash %d (rev 0)", hash1, rev, hash2)
+	}
+}
+
 func TestTxnPut(t *testing.T) {
 	// assign arbitrary size
 	bytesN := 30