Browse Source

pkg/testutil: make Recorder an interface

Provides two implementations of Recorder-- one that is non-blocking
like the original version and one that provides a blocking channel
to avoid busy waiting or racing in tests when no other synchronization
is available.
Anthony Romano 10 years ago
parent
commit
384cc76299
6 changed files with 110 additions and 14 deletions
  1. 1 1
      etcdserver/server_test.go
  2. 1 1
      etcdserver/storage.go
  3. 97 4
      pkg/testutil/recorder.go
  4. 3 3
      pkg/wait/wait.go
  5. 4 1
      storage/kvstore_test.go
  6. 4 4
      store/store.go

+ 1 - 1
etcdserver/server_test.go

@@ -1313,7 +1313,7 @@ func TestGetOtherPeerURLs(t *testing.T) {
 
 
 type nodeRecorder struct{ testutil.Recorder }
 type nodeRecorder struct{ testutil.Recorder }
 
 
-func newNodeRecorder() *nodeRecorder { return &nodeRecorder{} }
+func newNodeRecorder() *nodeRecorder { return &nodeRecorder{&testutil.RecorderBuffered{}} }
 func newNodeNop() raft.Node          { return newNodeRecorder() }
 func newNodeNop() raft.Node          { return newNodeRecorder() }
 
 
 func (n *nodeRecorder) Tick() { n.Record(testutil.Action{Name: "Tick"}) }
 func (n *nodeRecorder) Tick() { n.Record(testutil.Action{Name: "Tick"}) }

+ 1 - 1
etcdserver/storage.go

@@ -149,7 +149,7 @@ func makeMemberDir(dir string) error {
 }
 }
 
 
 type storageRecorder struct {
 type storageRecorder struct {
-	testutil.Recorder
+	testutil.RecorderBuffered
 	dbPath string // must have '/' suffix if set
 	dbPath string // must have '/' suffix if set
 }
 }
 
 

+ 97 - 4
pkg/testutil/recorder.go

@@ -14,27 +14,120 @@
 
 
 package testutil
 package testutil
 
 
-import "sync"
+import (
+	"errors"
+	"fmt"
+	"sync"
+	"time"
+)
 
 
 type Action struct {
 type Action struct {
 	Name   string
 	Name   string
 	Params []interface{}
 	Params []interface{}
 }
 }
 
 
-type Recorder struct {
+type Recorder interface {
+	// Record publishes an Action (e.g., function call) which will
+	// be reflected by Wait() or Chan()
+	Record(a Action)
+	// Wait waits until at least n Actions are availble or returns with error
+	Wait(n int) ([]Action, error)
+	// Action returns immediately available Actions
+	Action() []Action
+	// Chan returns the channel for actions published by Record
+	Chan() <-chan Action
+}
+
+// RecorderBuffered appends all Actions to a slice
+type RecorderBuffered struct {
 	sync.Mutex
 	sync.Mutex
 	actions []Action
 	actions []Action
 }
 }
 
 
-func (r *Recorder) Record(a Action) {
+func (r *RecorderBuffered) Record(a Action) {
 	r.Lock()
 	r.Lock()
 	r.actions = append(r.actions, a)
 	r.actions = append(r.actions, a)
 	r.Unlock()
 	r.Unlock()
 }
 }
-func (r *Recorder) Action() []Action {
+func (r *RecorderBuffered) Action() []Action {
 	r.Lock()
 	r.Lock()
 	cpy := make([]Action, len(r.actions))
 	cpy := make([]Action, len(r.actions))
 	copy(cpy, r.actions)
 	copy(cpy, r.actions)
 	r.Unlock()
 	r.Unlock()
 	return cpy
 	return cpy
 }
 }
+func (r *RecorderBuffered) Wait(n int) (acts []Action, err error) {
+	// legacy racey behavior
+	WaitSchedule()
+	acts = r.Action()
+	if len(acts) < n {
+		err = newLenErr(n, len(r.actions))
+	}
+	return acts, err
+}
+
+func (r *RecorderBuffered) Chan() <-chan Action {
+	ch := make(chan Action)
+	go func() {
+		acts := r.Action()
+		for i := range acts {
+			ch <- acts[i]
+		}
+		close(ch)
+	}()
+	return ch
+}
+
+// RecorderStream writes all Actions to an unbuffered channel
+type recorderStream struct {
+	ch chan Action
+}
+
+func NewRecorderStream() Recorder {
+	return &recorderStream{ch: make(chan Action)}
+}
+
+func (r *recorderStream) Record(a Action) {
+	r.ch <- a
+}
+
+func (r *recorderStream) Action() (acts []Action) {
+	for {
+		select {
+		case act := <-r.ch:
+			acts = append(acts, act)
+		default:
+			return acts
+		}
+	}
+	return acts
+}
+
+func (r *recorderStream) Chan() <-chan Action {
+	return r.ch
+}
+
+func (r *recorderStream) Wait(n int) ([]Action, error) {
+	acts := make([]Action, n)
+	timeoutC := time.After(5 * time.Second)
+	for i := 0; i < n; i++ {
+		select {
+		case acts[i] = <-r.ch:
+		case <-timeoutC:
+			acts = acts[:i]
+			return acts, newLenErr(n, i)
+		}
+	}
+	// extra wait to catch any Action spew
+	select {
+	case act := <-r.ch:
+		acts = append(acts, act)
+	case <-time.After(10 * time.Millisecond):
+	}
+	return acts, nil
+}
+
+func newLenErr(expected int, actual int) error {
+	s := fmt.Sprintf("len(actions) = %d, expected >= %d", actual, expected)
+	return errors.New(s)
+}

+ 3 - 3
pkg/wait/wait.go

@@ -60,16 +60,16 @@ func (w *List) Trigger(id uint64, x interface{}) {
 
 
 type WaitRecorder struct {
 type WaitRecorder struct {
 	Wait
 	Wait
-	*testutil.Recorder
+	testutil.Recorder
 }
 }
 
 
 type waitRecorder struct {
 type waitRecorder struct {
-	testutil.Recorder
+	testutil.RecorderBuffered
 }
 }
 
 
 func NewRecorder() *WaitRecorder {
 func NewRecorder() *WaitRecorder {
 	wr := &waitRecorder{}
 	wr := &waitRecorder{}
-	return &WaitRecorder{Wait: wr, Recorder: &wr.Recorder}
+	return &WaitRecorder{Wait: wr, Recorder: wr}
 }
 }
 func NewNop() Wait { return NewRecorder() }
 func NewNop() Wait { return NewRecorder() }
 
 

+ 4 - 1
storage/kvstore_test.go

@@ -473,8 +473,11 @@ func newTestKeyBytes(rev revision, tombstone bool) []byte {
 }
 }
 
 
 func newFakeStore() *store {
 func newFakeStore() *store {
-	b := &fakeBackend{&fakeBatchTx{rangeRespc: make(chan rangeResp, 5)}}
+	b := &fakeBackend{&fakeBatchTx{
+		Recorder:   &testutil.RecorderBuffered{},
+		rangeRespc: make(chan rangeResp, 5)}}
 	fi := &fakeIndex{
 	fi := &fakeIndex{
+		Recorder:              &testutil.RecorderBuffered{},
 		indexGetRespc:         make(chan indexGetResp, 1),
 		indexGetRespc:         make(chan indexGetResp, 1),
 		indexRangeRespc:       make(chan indexRangeResp, 1),
 		indexRangeRespc:       make(chan indexRangeResp, 1),
 		indexRangeEventsRespc: make(chan indexRangeEventsResp, 1),
 		indexRangeEventsRespc: make(chan indexRangeEventsResp, 1),

+ 4 - 4
store/store.go

@@ -748,7 +748,7 @@ func (s *store) JsonStats() []byte {
 // StoreRecorder provides a Store interface with a testutil.Recorder
 // StoreRecorder provides a Store interface with a testutil.Recorder
 type StoreRecorder struct {
 type StoreRecorder struct {
 	Store
 	Store
-	*testutil.Recorder
+	testutil.Recorder
 }
 }
 
 
 // storeRecorder records all the methods it receives.
 // storeRecorder records all the methods it receives.
@@ -756,13 +756,13 @@ type StoreRecorder struct {
 // It always returns invalid empty response and no error.
 // It always returns invalid empty response and no error.
 type storeRecorder struct {
 type storeRecorder struct {
 	Store
 	Store
-	testutil.Recorder
+	testutil.RecorderBuffered
 }
 }
 
 
 func NewNop() Store { return &storeRecorder{} }
 func NewNop() Store { return &storeRecorder{} }
 func NewRecorder() *StoreRecorder {
 func NewRecorder() *StoreRecorder {
 	sr := &storeRecorder{}
 	sr := &storeRecorder{}
-	return &StoreRecorder{Store: sr, Recorder: &sr.Recorder}
+	return &StoreRecorder{Store: sr, Recorder: sr}
 }
 }
 
 
 func (s *storeRecorder) Version() int  { return 0 }
 func (s *storeRecorder) Version() int  { return 0 }
@@ -856,7 +856,7 @@ type errStoreRecorder struct {
 
 
 func NewErrRecorder(err error) *StoreRecorder {
 func NewErrRecorder(err error) *StoreRecorder {
 	sr := &errStoreRecorder{err: err}
 	sr := &errStoreRecorder{err: err}
-	return &StoreRecorder{Store: sr, Recorder: &sr.Recorder}
+	return &StoreRecorder{Store: sr, Recorder: sr}
 }
 }
 
 
 func (s *errStoreRecorder) Get(path string, recursive, sorted bool) (*Event, error) {
 func (s *errStoreRecorder) Get(path string, recursive, sorted bool) (*Event, error) {