Browse Source

storage: fix schedule compaction bug in recover process

It uses wrong schedule compaction reversion before.
Yicheng Qin 10 years ago
parent
commit
148394f66f
2 changed files with 42 additions and 3 deletions
  1. 1 1
      storage/kvstore.go
  2. 41 2
      storage/kvstore_test.go

+ 1 - 1
storage/kvstore.go

@@ -213,7 +213,7 @@ func (s *store) Restore() error {
 
 
 	_, scheduledCompactBytes := tx.UnsafeRange(metaBucketName, scheduledCompactKeyName, nil, 0)
 	_, scheduledCompactBytes := tx.UnsafeRange(metaBucketName, scheduledCompactKeyName, nil, 0)
 	if len(scheduledCompactBytes) != 0 {
 	if len(scheduledCompactBytes) != 0 {
-		scheduledCompact := bytesToRev(finishedCompactBytes[0]).main
+		scheduledCompact := bytesToRev(scheduledCompactBytes[0]).main
 		if scheduledCompact > s.compactMainRev {
 		if scheduledCompact > s.compactMainRev {
 			log.Printf("storage: resume scheduled compaction at %d", scheduledCompact)
 			log.Printf("storage: resume scheduled compaction at %d", scheduledCompact)
 			go s.Compact(scheduledCompact)
 			go s.Compact(scheduledCompact)

+ 41 - 2
storage/kvstore_test.go

@@ -7,6 +7,7 @@ import (
 	"os"
 	"os"
 	"reflect"
 	"reflect"
 	"testing"
 	"testing"
+	"time"
 
 
 	"github.com/coreos/etcd/storage/storagepb"
 	"github.com/coreos/etcd/storage/storagepb"
 )
 )
@@ -389,8 +390,6 @@ func TestCompaction(t *testing.T) {
 	}
 	}
 }
 }
 
 
-// TODO: test more complicated cases:
-// with unfinished compaction
 func TestRestore(t *testing.T) {
 func TestRestore(t *testing.T) {
 	s0 := newStore("test")
 	s0 := newStore("test")
 	defer os.Remove("test")
 	defer os.Remove("test")
@@ -433,6 +432,46 @@ func TestRestore(t *testing.T) {
 	}
 	}
 }
 }
 
 
+func TestRestoreContinueUnfinishedCompaction(t *testing.T) {
+	s0 := newStore("test")
+	defer os.Remove("test")
+
+	s0.Put([]byte("foo"), []byte("bar"))
+	s0.Put([]byte("foo"), []byte("bar1"))
+	s0.Put([]byte("foo"), []byte("bar2"))
+
+	// write scheduled compaction, but not do compaction
+	rbytes := newRevBytes()
+	revToBytes(reversion{main: 2}, rbytes)
+	tx := s0.b.BatchTx()
+	tx.Lock()
+	tx.UnsafePut(metaBucketName, scheduledCompactKeyName, rbytes)
+	tx.Unlock()
+
+	s0.Close()
+
+	s1 := newStore("test")
+	s1.Restore()
+
+	// wait for scheduled compaction to be finished
+	time.Sleep(100 * time.Millisecond)
+
+	if _, _, err := s1.Range([]byte("foo"), nil, 0, 2); err != ErrCompacted {
+		t.Errorf("range on compacted rev error = %v, want %v", err, ErrCompacted)
+	}
+	// check the key in backend is deleted
+	revbytes := newRevBytes()
+	// TODO: compact should delete main=2 key too
+	revToBytes(reversion{main: 1}, revbytes)
+	tx = s1.b.BatchTx()
+	tx.Lock()
+	ks, _ := tx.UnsafeRange(keyBucketName, revbytes, nil, 0)
+	if len(ks) != 0 {
+		t.Errorf("key for rev %+v still exists, want deleted", bytesToRev(revbytes))
+	}
+	tx.Unlock()
+}
+
 func BenchmarkStorePut(b *testing.B) {
 func BenchmarkStorePut(b *testing.B) {
 	s := newStore("test")
 	s := newStore("test")
 	defer os.Remove("test")
 	defer os.Remove("test")