Browse Source

Merge pull request #2996 from yichengq/storage-workflow-test

storage: add TestWorkflow
Yicheng Qin 10 years ago
parent
commit
05228729a3
3 changed files with 71 additions and 7 deletions
  1. 62 0
      storage/kv_test.go
  2. 8 6
      storage/kvstore.go
  3. 1 1
      storage/kvstore_compaction.go

+ 62 - 0
storage/kv_test.go

@@ -0,0 +1,62 @@
+package storage
+
+import (
+	"fmt"
+	"os"
+	"reflect"
+	"testing"
+)
+
+type kv struct {
+	k, v []byte
+}
+
+// TestWorkflow simulates the whole workflow that storage is used in normal
+// etcd running, including key changes, compaction and restart.
+func TestWorkflow(t *testing.T) {
+	s := newStore("test")
+	defer os.Remove("test")
+
+	var lastrev int64
+	var wkvs []kv
+	for i := 0; i < 10; i++ {
+		// regular compaction
+		s.Compact(lastrev)
+
+		// put 100 keys into the store in each round
+		for k := 0; k < 100; k++ {
+			key := fmt.Sprintf("bar_%03d_%03d", i, k)
+			val := fmt.Sprintf("foo_%03d_%03d", i, k)
+			s.Put([]byte(key), []byte(val))
+			wkvs = append(wkvs, kv{k: []byte(key), v: []byte(val)})
+		}
+
+		// delete second-half keys in this round
+		key := fmt.Sprintf("bar_%03d_050", i)
+		end := fmt.Sprintf("bar_%03d_100", i)
+		if n, _ := s.DeleteRange([]byte(key), []byte(end)); n != 50 {
+			t.Errorf("#%d: delete number = %d, want 50", i, n)
+		}
+		wkvs = wkvs[:len(wkvs)-50]
+
+		// check existing keys
+		kvs, rev, err := s.Range([]byte("bar"), []byte("bas"), 0, 0)
+		if err != nil {
+			t.Errorf("#%d: range error (%v)", err)
+		}
+		for j, kv := range kvs {
+			if !reflect.DeepEqual(kv.Key, wkvs[j].k) {
+				t.Errorf("#%d: keys[%d] = %s, want %s", i, j, kv.Key, wkvs[j].k)
+			}
+			if !reflect.DeepEqual(kv.Value, wkvs[j].v) {
+				t.Errorf("#%d: vals[%d] = %s, want %s", i, j, kv.Value, wkvs[j].v)
+			}
+		}
+		lastrev = rev
+
+		// the store is restarted and restored from the disk file
+		s.Close()
+		s = newStore("test")
+		s.Restore()
+	}
+}

+ 8 - 6
storage/kvstore.go

@@ -14,9 +14,10 @@ import (
 )
 
 var (
-	batchLimit    = 10000
-	batchInterval = 100 * time.Millisecond
-	keyBucketName = []byte("key")
+	batchLimit     = 10000
+	batchInterval  = 100 * time.Millisecond
+	keyBucketName  = []byte("key")
+	metaBucketName = []byte("meta")
 
 	scheduledCompactKeyName = []byte("scheduledCompactRev")
 	finishedCompactKeyName  = []byte("finishedCompactRev")
@@ -50,6 +51,7 @@ func newStore(path string) *store {
 	tx := s.b.BatchTx()
 	tx.Lock()
 	tx.UnsafeCreateBucket(keyBucketName)
+	tx.UnsafeCreateBucket(metaBucketName)
 	tx.Unlock()
 	s.b.ForceCommit()
 
@@ -153,7 +155,7 @@ func (s *store) Compact(rev int64) error {
 
 	tx := s.b.BatchTx()
 	tx.Lock()
-	tx.UnsafePut(keyBucketName, scheduledCompactKeyName, rbytes)
+	tx.UnsafePut(metaBucketName, scheduledCompactKeyName, rbytes)
 	tx.Unlock()
 
 	keep := s.kvindex.Compact(rev)
@@ -178,7 +180,7 @@ func (s *store) Restore() error {
 	// restore index
 	tx := s.b.BatchTx()
 	tx.Lock()
-	_, finishedCompactBytes := tx.UnsafeRange(keyBucketName, finishedCompactKeyName, nil, 0)
+	_, finishedCompactBytes := tx.UnsafeRange(metaBucketName, finishedCompactKeyName, nil, 0)
 	if len(finishedCompactBytes) != 0 {
 		s.compactMainRev = bytesToRev(finishedCompactBytes[0]).main
 		log.Printf("storage: restore compact to %d", s.compactMainRev)
@@ -208,7 +210,7 @@ func (s *store) Restore() error {
 		s.currentRev = rev
 	}
 
-	_, scheduledCompactBytes := tx.UnsafeRange(keyBucketName, scheduledCompactKeyName, nil, 0)
+	_, scheduledCompactBytes := tx.UnsafeRange(metaBucketName, scheduledCompactKeyName, nil, 0)
 	if len(scheduledCompactBytes) != 0 {
 		scheduledCompact := bytesToRev(finishedCompactBytes[0]).main
 		if scheduledCompact > s.compactMainRev {

+ 1 - 1
storage/kvstore_compaction.go

@@ -28,7 +28,7 @@ func (s *store) scheduleCompaction(compactMainRev int64, keep map[reversion]stru
 		if len(keys) == 0 {
 			rbytes := make([]byte, 8+1+8)
 			revToBytes(reversion{main: compactMainRev}, rbytes)
-			tx.UnsafePut(keyBucketName, finishedCompactKeyName, rbytes)
+			tx.UnsafePut(metaBucketName, finishedCompactKeyName, rbytes)
 			tx.Unlock()
 			return
 		}