Browse Source

Merge pull request #3003 from yichengq/storage-kvstore-test

storage: add restore test and fix some bug
Yicheng Qin 10 years ago
parent
commit
789e2f3426
4 changed files with 80 additions and 5 deletions
  1. 3 0
      storage/kv_test.go
  2. 9 1
      storage/kvstore.go
  3. 6 1
      storage/kvstore_compaction.go
  4. 62 3
      storage/kvstore_test.go

+ 3 - 0
storage/kv_test.go

@@ -44,6 +44,9 @@ func TestWorkflow(t *testing.T) {
 		if err != nil {
 		if err != nil {
 			t.Errorf("#%d: range error (%v)", err)
 			t.Errorf("#%d: range error (%v)", err)
 		}
 		}
+		if len(kvs) != len(wkvs) {
+			t.Fatalf("#%d: len(kvs) = %d, want %d", i, len(kvs), len(wkvs))
+		}
 		for j, kv := range kvs {
 		for j, kv := range kvs {
 			if !reflect.DeepEqual(kv.Key, wkvs[j].k) {
 			if !reflect.DeepEqual(kv.Key, wkvs[j].k) {
 				t.Errorf("#%d: keys[%d] = %s, want %s", i, j, kv.Key, wkvs[j].k)
 				t.Errorf("#%d: keys[%d] = %s, want %s", i, j, kv.Key, wkvs[j].k)

+ 9 - 1
storage/kvstore.go

@@ -39,6 +39,9 @@ type store struct {
 
 
 	tmu   sync.Mutex // protect the tnxID field
 	tmu   sync.Mutex // protect the tnxID field
 	tnxID int64      // tracks the current tnxID to verify tnx operations
 	tnxID int64      // tracks the current tnxID to verify tnx operations
+
+	wg    sync.WaitGroup
+	stopc chan struct{}
 }
 }
 
 
 func newStore(path string) *store {
 func newStore(path string) *store {
@@ -47,6 +50,7 @@ func newStore(path string) *store {
 		kvindex:        newTreeIndex(),
 		kvindex:        newTreeIndex(),
 		currentRev:     reversion{},
 		currentRev:     reversion{},
 		compactMainRev: -1,
 		compactMainRev: -1,
+		stopc:          make(chan struct{}),
 	}
 	}
 
 
 	tx := s.b.BatchTx()
 	tx := s.b.BatchTx()
@@ -161,6 +165,7 @@ func (s *store) Compact(rev int64) error {
 
 
 	keep := s.kvindex.Compact(rev)
 	keep := s.kvindex.Compact(rev)
 
 
+	s.wg.Add(1)
 	go s.scheduleCompaction(rev, keep)
 	go s.scheduleCompaction(rev, keep)
 	return nil
 	return nil
 }
 }
@@ -213,7 +218,7 @@ func (s *store) Restore() error {
 
 
 	_, scheduledCompactBytes := tx.UnsafeRange(metaBucketName, scheduledCompactKeyName, nil, 0)
 	_, scheduledCompactBytes := tx.UnsafeRange(metaBucketName, scheduledCompactKeyName, nil, 0)
 	if len(scheduledCompactBytes) != 0 {
 	if len(scheduledCompactBytes) != 0 {
-		scheduledCompact := bytesToRev(finishedCompactBytes[0]).main
+		scheduledCompact := bytesToRev(scheduledCompactBytes[0]).main
 		if scheduledCompact > s.compactMainRev {
 		if scheduledCompact > s.compactMainRev {
 			log.Printf("storage: resume scheduled compaction at %d", scheduledCompact)
 			log.Printf("storage: resume scheduled compaction at %d", scheduledCompact)
 			go s.Compact(scheduledCompact)
 			go s.Compact(scheduledCompact)
@@ -226,6 +231,9 @@ func (s *store) Restore() error {
 }
 }
 
 
 func (s *store) Close() error {
 func (s *store) Close() error {
+	close(s.stopc)
+	s.wg.Wait()
+	s.b.ForceCommit()
 	return s.b.Close()
 	return s.b.Close()
 }
 }
 
 

+ 6 - 1
storage/kvstore_compaction.go

@@ -6,6 +6,7 @@ import (
 )
 )
 
 
 func (s *store) scheduleCompaction(compactMainRev int64, keep map[reversion]struct{}) {
 func (s *store) scheduleCompaction(compactMainRev int64, keep map[reversion]struct{}) {
+	defer s.wg.Done()
 	end := make([]byte, 8)
 	end := make([]byte, 8)
 	binary.BigEndian.PutUint64(end, uint64(compactMainRev+1))
 	binary.BigEndian.PutUint64(end, uint64(compactMainRev+1))
 
 
@@ -37,6 +38,10 @@ func (s *store) scheduleCompaction(compactMainRev int64, keep map[reversion]stru
 		revToBytes(reversion{main: rev.main, sub: rev.sub + 1}, last)
 		revToBytes(reversion{main: rev.main, sub: rev.sub + 1}, last)
 		tx.Unlock()
 		tx.Unlock()
 
 
-		time.Sleep(100 * time.Millisecond)
+		select {
+		case <-time.After(100 * time.Millisecond):
+		case <-s.stopc:
+			return
+		}
 	}
 	}
 }
 }

+ 62 - 3
storage/kvstore_test.go

@@ -3,9 +3,11 @@ package storage
 import (
 import (
 	"bytes"
 	"bytes"
 	"crypto/rand"
 	"crypto/rand"
+	"math"
 	"os"
 	"os"
 	"reflect"
 	"reflect"
 	"testing"
 	"testing"
+	"time"
 
 
 	"github.com/coreos/etcd/storage/storagepb"
 	"github.com/coreos/etcd/storage/storagepb"
 )
 )
@@ -388,9 +390,6 @@ func TestCompaction(t *testing.T) {
 	}
 	}
 }
 }
 
 
-// TODO: test more complicated cases:
-// with unfinished compaction
-// with removed keys
 func TestRestore(t *testing.T) {
 func TestRestore(t *testing.T) {
 	s0 := newStore("test")
 	s0 := newStore("test")
 	defer os.Remove("test")
 	defer os.Remove("test")
@@ -402,6 +401,19 @@ func TestRestore(t *testing.T) {
 	s0.Put([]byte("foo1"), []byte("bar12"))
 	s0.Put([]byte("foo1"), []byte("bar12"))
 	s0.Put([]byte("foo2"), []byte("bar13"))
 	s0.Put([]byte("foo2"), []byte("bar13"))
 	s0.Put([]byte("foo1"), []byte("bar14"))
 	s0.Put([]byte("foo1"), []byte("bar14"))
+	s0.Put([]byte("foo3"), []byte("bar3"))
+	s0.DeleteRange([]byte("foo3"), nil)
+	s0.Put([]byte("foo3"), []byte("bar31"))
+	s0.DeleteRange([]byte("foo3"), nil)
+
+	mink := newRevBytes()
+	revToBytes(reversion{main: 0, sub: 0}, mink)
+	maxk := newRevBytes()
+	revToBytes(reversion{main: math.MaxInt64, sub: math.MaxInt64}, maxk)
+	s0kvs, _, err := s0.rangeKeys(mink, maxk, 0, 0)
+	if err != nil {
+		t.Fatalf("rangeKeys on s0 error (%v)", err)
+	}
 
 
 	s0.Close()
 	s0.Close()
 
 
@@ -411,6 +423,53 @@ func TestRestore(t *testing.T) {
 	if !s0.Equal(s1) {
 	if !s0.Equal(s1) {
 		t.Errorf("not equal!")
 		t.Errorf("not equal!")
 	}
 	}
+	s1kvs, _, err := s1.rangeKeys(mink, maxk, 0, 0)
+	if err != nil {
+		t.Fatalf("rangeKeys on s1 error (%v)", err)
+	}
+	if !reflect.DeepEqual(s1kvs, s0kvs) {
+		t.Errorf("s1kvs = %+v, want %+v", s1kvs, s0kvs)
+	}
+}
+
+func TestRestoreContinueUnfinishedCompaction(t *testing.T) {
+	s0 := newStore("test")
+	defer os.Remove("test")
+
+	s0.Put([]byte("foo"), []byte("bar"))
+	s0.Put([]byte("foo"), []byte("bar1"))
+	s0.Put([]byte("foo"), []byte("bar2"))
+
+	// write scheduled compaction, but not do compaction
+	rbytes := newRevBytes()
+	revToBytes(reversion{main: 2}, rbytes)
+	tx := s0.b.BatchTx()
+	tx.Lock()
+	tx.UnsafePut(metaBucketName, scheduledCompactKeyName, rbytes)
+	tx.Unlock()
+
+	s0.Close()
+
+	s1 := newStore("test")
+	s1.Restore()
+
+	// wait for scheduled compaction to be finished
+	time.Sleep(100 * time.Millisecond)
+
+	if _, _, err := s1.Range([]byte("foo"), nil, 0, 2); err != ErrCompacted {
+		t.Errorf("range on compacted rev error = %v, want %v", err, ErrCompacted)
+	}
+	// check the key in backend is deleted
+	revbytes := newRevBytes()
+	// TODO: compact should delete main=2 key too
+	revToBytes(reversion{main: 1}, revbytes)
+	tx = s1.b.BatchTx()
+	tx.Lock()
+	ks, _ := tx.UnsafeRange(keyBucketName, revbytes, nil, 0)
+	if len(ks) != 0 {
+		t.Errorf("key for rev %+v still exists, want deleted", bytesToRev(revbytes))
+	}
+	tx.Unlock()
 }
 }
 
 
 func BenchmarkStorePut(b *testing.B) {
 func BenchmarkStorePut(b *testing.B) {