Browse Source

Merge pull request #3395 from yichengq/backend-test

storage/backend: add unit tests for backend and batchTx
Yicheng Qin 10 years ago
parent
commit
1eaf169057
5 changed files with 301 additions and 29 deletions
  1. 9 12
      storage/backend/backend.go
  2. 98 14
      storage/backend/backend_test.go
  3. 11 2
      storage/backend/batch_tx.go
  4. 182 0
      storage/backend/batch_tx_test.go
  5. 1 1
      test

+ 9 - 12
storage/backend/backend.go

@@ -22,12 +22,15 @@ type backend struct {
 	batchLimit    int
 	batchLimit    int
 	batchTx       *batchTx
 	batchTx       *batchTx
 
 
-	stopc  chan struct{}
-	startc chan struct{}
-	donec  chan struct{}
+	stopc chan struct{}
+	donec chan struct{}
 }
 }
 
 
 func New(path string, d time.Duration, limit int) Backend {
 func New(path string, d time.Duration, limit int) Backend {
+	return newBackend(path, d, limit)
+}
+
+func newBackend(path string, d time.Duration, limit int) *backend {
 	db, err := bolt.Open(path, 0600, nil)
 	db, err := bolt.Open(path, 0600, nil)
 	if err != nil {
 	if err != nil {
 		log.Panicf("backend: cannot open database at %s (%v)", path, err)
 		log.Panicf("backend: cannot open database at %s (%v)", path, err)
@@ -38,15 +41,12 @@ func New(path string, d time.Duration, limit int) Backend {
 
 
 		batchInterval: d,
 		batchInterval: d,
 		batchLimit:    limit,
 		batchLimit:    limit,
-		batchTx:       &batchTx{},
 
 
-		stopc:  make(chan struct{}),
-		startc: make(chan struct{}),
-		donec:  make(chan struct{}),
+		stopc: make(chan struct{}),
+		donec: make(chan struct{}),
 	}
 	}
-	b.batchTx.backend = b
+	b.batchTx = newBatchTx(b)
 	go b.run()
 	go b.run()
-	<-b.startc
 	return b
 	return b
 }
 }
 
 
@@ -73,9 +73,6 @@ func (b *backend) Snapshot(w io.Writer) (n int64, err error) {
 func (b *backend) run() {
 func (b *backend) run() {
 	defer close(b.donec)
 	defer close(b.donec)
 
 
-	b.batchTx.Commit()
-	b.startc <- struct{}{}
-
 	for {
 	for {
 		select {
 		select {
 		case <-time.After(b.batchInterval):
 		case <-time.After(b.batchInterval):

+ 98 - 14
storage/backend/backend_test.go

@@ -1,29 +1,113 @@
 package backend
 package backend
 
 
 import (
 import (
+	"io/ioutil"
+	"log"
 	"os"
 	"os"
-	"reflect"
+	"path"
 	"testing"
 	"testing"
 	"time"
 	"time"
+
+	"github.com/coreos/etcd/Godeps/_workspace/src/github.com/boltdb/bolt"
+	"github.com/coreos/etcd/pkg/testutil"
 )
 )
 
 
-func TestBackendPut(t *testing.T) {
-	backend := New("test", 10*time.Second, 10000)
-	defer backend.Close()
-	defer os.Remove("test")
+var tmpPath string
 
 
-	v := []byte("foo")
+func init() {
+	dir, err := ioutil.TempDir(os.TempDir(), "etcd_backend_test")
+	if err != nil {
+		log.Fatal(err)
+	}
+	tmpPath = path.Join(dir, "database")
+}
 
 
-	batchTx := backend.BatchTx()
-	batchTx.Lock()
+func TestBackendClose(t *testing.T) {
+	b := newBackend(tmpPath, time.Hour, 10000)
+	defer os.Remove(tmpPath)
 
 
-	batchTx.UnsafeCreateBucket([]byte("test"))
+	// check close could work
+	done := make(chan struct{})
+	go func() {
+		err := b.Close()
+		if err != nil {
+			t.Errorf("close error = %v, want nil", err)
+		}
+		done <- struct{}{}
+	}()
+	select {
+	case <-done:
+	case <-time.After(time.Second):
+		t.Errorf("failed to close database in 1s")
+	}
+}
 
 
-	batchTx.UnsafePut([]byte("test"), []byte("foo"), v)
-	_, gv := batchTx.UnsafeRange([]byte("test"), v, nil, -1)
-	if !reflect.DeepEqual(gv[0], v) {
-		t.Errorf("v = %s, want %s", string(gv[0]), string(v))
+func TestBackendSnapshot(t *testing.T) {
+	b := New(tmpPath, time.Hour, 10000)
+	defer cleanup(b, tmpPath)
+
+	tx := b.BatchTx()
+	tx.Lock()
+	tx.UnsafeCreateBucket([]byte("test"))
+	tx.UnsafePut([]byte("test"), []byte("foo"), []byte("bar"))
+	tx.Unlock()
+	b.ForceCommit()
+
+	// write snapshot to a new file
+	f, err := ioutil.TempFile(os.TempDir(), "etcd_backend_test")
+	if err != nil {
+		t.Fatal(err)
 	}
 	}
+	_, err = b.Snapshot(f)
+	if err != nil {
+		t.Fatal(err)
+	}
+	f.Close()
+
+	// bootstrap new backend from the snapshot
+	nb := New(f.Name(), time.Hour, 10000)
+	defer cleanup(nb, f.Name())
+
+	newTx := b.BatchTx()
+	newTx.Lock()
+	ks, _ := newTx.UnsafeRange([]byte("test"), []byte("foo"), []byte("goo"), 0)
+	if len(ks) != 1 {
+		t.Errorf("len(kvs) = %d, want 1", len(ks))
+	}
+	newTx.Unlock()
+}
+
+func TestBackendBatchIntervalCommit(t *testing.T) {
+	// start backend with super short batch interval
+	b := newBackend(tmpPath, time.Nanosecond, 10000)
+	defer cleanup(b, tmpPath)
+
+	tx := b.BatchTx()
+	tx.Lock()
+	tx.UnsafeCreateBucket([]byte("test"))
+	tx.UnsafePut([]byte("test"), []byte("foo"), []byte("bar"))
+	tx.Unlock()
+
+	// give time for batch interval commit to happen
+	time.Sleep(time.Nanosecond)
+	testutil.WaitSchedule()
+
+	// check whether put happens via db view
+	b.db.View(func(tx *bolt.Tx) error {
+		bucket := tx.Bucket([]byte("test"))
+		if bucket == nil {
+			t.Errorf("bucket test does not exit")
+			return nil
+		}
+		v := bucket.Get([]byte("foo"))
+		if v == nil {
+			t.Errorf("foo key failed to written in backend")
+		}
+		return nil
+	})
+}
 
 
-	batchTx.Unlock()
+func cleanup(b Backend, path string) {
+	b.Close()
+	os.Remove(path)
 }
 }

+ 11 - 2
storage/backend/batch_tx.go

@@ -26,6 +26,12 @@ type batchTx struct {
 	pending int
 	pending int
 }
 }
 
 
+func newBatchTx(backend *backend) *batchTx {
+	tx := &batchTx{backend: backend}
+	tx.Commit()
+	return tx
+}
+
 func (t *batchTx) UnsafeCreateBucket(name []byte) {
 func (t *batchTx) UnsafeCreateBucket(name []byte) {
 	_, err := t.tx.CreateBucket(name)
 	_, err := t.tx.CreateBucket(name)
 	if err != nil && err != bolt.ErrBucketExists {
 	if err != nil && err != bolt.ErrBucketExists {
@@ -43,7 +49,7 @@ func (t *batchTx) UnsafePut(bucketName []byte, key []byte, value []byte) {
 		log.Fatalf("storage: cannot put key into bucket (%v)", err)
 		log.Fatalf("storage: cannot put key into bucket (%v)", err)
 	}
 	}
 	t.pending++
 	t.pending++
-	if t.pending > t.backend.batchLimit {
+	if t.pending >= t.backend.batchLimit {
 		t.commit(false)
 		t.commit(false)
 		t.pending = 0
 		t.pending = 0
 	}
 	}
@@ -68,6 +74,9 @@ func (t *batchTx) UnsafeRange(bucketName []byte, key, endKey []byte, limit int64
 	for ck, cv := c.Seek(key); ck != nil && bytes.Compare(ck, endKey) < 0; ck, cv = c.Next() {
 	for ck, cv := c.Seek(key); ck != nil && bytes.Compare(ck, endKey) < 0; ck, cv = c.Next() {
 		vs = append(vs, cv)
 		vs = append(vs, cv)
 		keys = append(keys, ck)
 		keys = append(keys, ck)
+		if limit > 0 && limit == int64(len(keys)) {
+			break
+		}
 	}
 	}
 
 
 	return keys, vs
 	return keys, vs
@@ -84,7 +93,7 @@ func (t *batchTx) UnsafeDelete(bucketName []byte, key []byte) {
 		log.Fatalf("storage: cannot delete key from bucket (%v)", err)
 		log.Fatalf("storage: cannot delete key from bucket (%v)", err)
 	}
 	}
 	t.pending++
 	t.pending++
-	if t.pending > t.backend.batchLimit {
+	if t.pending >= t.backend.batchLimit {
 		t.commit(false)
 		t.commit(false)
 		t.pending = 0
 		t.pending = 0
 	}
 	}

+ 182 - 0
storage/backend/batch_tx_test.go

@@ -0,0 +1,182 @@
+package backend
+
+import (
+	"reflect"
+	"testing"
+	"time"
+
+	"github.com/coreos/etcd/Godeps/_workspace/src/github.com/boltdb/bolt"
+)
+
+func TestBatchTxPut(t *testing.T) {
+	b := newBackend(tmpPath, time.Hour, 10000)
+	defer cleanup(b, tmpPath)
+
+	tx := b.batchTx
+	tx.Lock()
+	defer tx.Unlock()
+
+	// create bucket
+	tx.UnsafeCreateBucket([]byte("test"))
+
+	// put
+	v := []byte("bar")
+	tx.UnsafePut([]byte("test"), []byte("foo"), v)
+
+	// check put result before and after tx is committed
+	for k := 0; k < 2; k++ {
+		_, gv := tx.UnsafeRange([]byte("test"), []byte("foo"), nil, 0)
+		if !reflect.DeepEqual(gv[0], v) {
+			t.Errorf("v = %s, want %s", string(gv[0]), string(v))
+		}
+		tx.commit(false)
+	}
+}
+
+func TestBatchTxRange(t *testing.T) {
+	b := newBackend(tmpPath, time.Hour, 10000)
+	defer cleanup(b, tmpPath)
+
+	tx := b.batchTx
+	tx.Lock()
+	defer tx.Unlock()
+
+	tx.UnsafeCreateBucket([]byte("test"))
+	// put keys
+	allKeys := [][]byte{[]byte("foo"), []byte("foo1"), []byte("foo2")}
+	allVals := [][]byte{[]byte("bar"), []byte("bar1"), []byte("bar2")}
+	for i := range allKeys {
+		tx.UnsafePut([]byte("test"), allKeys[i], allVals[i])
+	}
+
+	tests := []struct {
+		key    []byte
+		endKey []byte
+		limit  int64
+
+		wkeys [][]byte
+		wvals [][]byte
+	}{
+		// single key
+		{
+			[]byte("foo"), nil, 0,
+			allKeys[:1], allVals[:1],
+		},
+		// single key, bad
+		{
+			[]byte("doo"), nil, 0,
+			nil, nil,
+		},
+		// key range
+		{
+			[]byte("foo"), []byte("foo1"), 0,
+			allKeys[:1], allVals[:1],
+		},
+		// key range, get all keys
+		{
+			[]byte("foo"), []byte("foo3"), 0,
+			allKeys, allVals,
+		},
+		// key range, bad
+		{
+			[]byte("goo"), []byte("goo3"), 0,
+			nil, nil,
+		},
+		// key range with effective limit
+		{
+			[]byte("foo"), []byte("foo3"), 1,
+			allKeys[:1], allVals[:1],
+		},
+		// key range with limit
+		{
+			[]byte("foo"), []byte("foo3"), 4,
+			allKeys, allVals,
+		},
+	}
+	for i, tt := range tests {
+		keys, vals := tx.UnsafeRange([]byte("test"), tt.key, tt.endKey, tt.limit)
+		if !reflect.DeepEqual(keys, tt.wkeys) {
+			t.Errorf("#%d: keys = %+v, want %+v", i, keys, tt.wkeys)
+		}
+		if !reflect.DeepEqual(vals, tt.wvals) {
+			t.Errorf("#%d: vals = %+v, want %+v", i, vals, tt.wvals)
+		}
+	}
+}
+
+func TestBatchTxDelete(t *testing.T) {
+	b := newBackend(tmpPath, time.Hour, 10000)
+	defer cleanup(b, tmpPath)
+
+	tx := b.batchTx
+	tx.Lock()
+	defer tx.Unlock()
+
+	tx.UnsafeCreateBucket([]byte("test"))
+	tx.UnsafePut([]byte("test"), []byte("foo"), []byte("bar"))
+
+	tx.UnsafeDelete([]byte("test"), []byte("foo"))
+
+	// check put result before and after tx is committed
+	for k := 0; k < 2; k++ {
+		ks, _ := tx.UnsafeRange([]byte("test"), []byte("foo"), nil, 0)
+		if len(ks) != 0 {
+			t.Errorf("keys on foo = %v, want nil", ks)
+		}
+		tx.commit(false)
+	}
+}
+
+func TestBatchTxCommit(t *testing.T) {
+	b := newBackend(tmpPath, time.Hour, 10000)
+	defer cleanup(b, tmpPath)
+
+	tx := b.batchTx
+	tx.Lock()
+	tx.UnsafeCreateBucket([]byte("test"))
+	tx.UnsafePut([]byte("test"), []byte("foo"), []byte("bar"))
+	tx.Unlock()
+
+	tx.Commit()
+
+	// check whether put happens via db view
+	b.db.View(func(tx *bolt.Tx) error {
+		bucket := tx.Bucket([]byte("test"))
+		if bucket == nil {
+			t.Errorf("bucket test does not exit")
+			return nil
+		}
+		v := bucket.Get([]byte("foo"))
+		if v == nil {
+			t.Errorf("foo key failed to written in backend")
+		}
+		return nil
+	})
+}
+
+func TestBatchTxBatchLimitCommit(t *testing.T) {
+	// start backend with batch limit 1
+	b := newBackend(tmpPath, time.Hour, 1)
+	defer cleanup(b, tmpPath)
+
+	tx := b.batchTx
+	tx.Lock()
+	tx.UnsafeCreateBucket([]byte("test"))
+	tx.UnsafePut([]byte("test"), []byte("foo"), []byte("bar"))
+	tx.Unlock()
+
+	// batch limit commit should have been triggered
+	// check whether put happens via db view
+	b.db.View(func(tx *bolt.Tx) error {
+		bucket := tx.Bucket([]byte("test"))
+		if bucket == nil {
+			t.Errorf("bucket test does not exit")
+			return nil
+		}
+		v := bucket.Get([]byte("foo"))
+		if v == nil {
+			t.Errorf("foo key failed to written in backend")
+		}
+		return nil
+	})
+}

+ 1 - 1
test

@@ -16,7 +16,7 @@ COVER=${COVER:-"-cover"}
 source ./build
 source ./build
 
 
 # Hack: gofmt ./ will recursively check the .git directory. So use *.go for gofmt.
 # Hack: gofmt ./ will recursively check the .git directory. So use *.go for gofmt.
-TESTABLE_AND_FORMATTABLE="client discovery error etcdctl/command etcdmain etcdserver etcdserver/auth etcdserver/etcdhttp etcdserver/etcdhttp/httptypes pkg/fileutil pkg/flags pkg/idutil pkg/ioutil pkg/netutil pkg/osutil pkg/pbutil pkg/types pkg/transport pkg/wait proxy raft snap storage store version wal"
+TESTABLE_AND_FORMATTABLE="client discovery error etcdctl/command etcdmain etcdserver etcdserver/auth etcdserver/etcdhttp etcdserver/etcdhttp/httptypes pkg/fileutil pkg/flags pkg/idutil pkg/ioutil pkg/netutil pkg/osutil pkg/pbutil pkg/types pkg/transport pkg/wait proxy raft snap storage storage/backend store version wal"
 # TODO: add it to race testing when the issue is resolved
 # TODO: add it to race testing when the issue is resolved
 # https://github.com/golang/go/issues/9946
 # https://github.com/golang/go/issues/9946
 NO_RACE_TESTABLE="rafthttp"
 NO_RACE_TESTABLE="rafthttp"