Browse Source

etcdserver: fix racey WaitSchedule() tests to wait for recorder actions

Fixes #4119
Anthony Romano 10 years ago
parent
commit
838328b057
2 changed files with 39 additions and 32 deletions
  1. 1 1
      etcdserver/raft_test.go
  2. 38 31
      etcdserver/server_test.go

+ 1 - 1
etcdserver/raft_test.go

@@ -150,7 +150,7 @@ func TestCreateConfigChangeEnts(t *testing.T) {
 }
 }
 
 
 func TestStopRaftWhenWaitingForApplyDone(t *testing.T) {
 func TestStopRaftWhenWaitingForApplyDone(t *testing.T) {
-	n := newReadyNode()
+	n := newNopReadyNode()
 	r := raftNode{
 	r := raftNode{
 		Node:        n,
 		Node:        n,
 		storage:     &storageRecorder{},
 		storage:     &storageRecorder{},

+ 38 - 31
etcdserver/server_test.go

@@ -468,7 +468,7 @@ func TestApplyConfChangeError(t *testing.T) {
 				Params: []interface{}{cc},
 				Params: []interface{}{cc},
 			},
 			},
 		}
 		}
-		if g := n.Action(); !reflect.DeepEqual(g, w) {
+		if g, _ := n.Wait(1); !reflect.DeepEqual(g, w) {
 			t.Errorf("#%d: action = %+v, want %+v", i, g, w)
 			t.Errorf("#%d: action = %+v, want %+v", i, g, w)
 		}
 		}
 	}
 	}
@@ -657,9 +657,7 @@ func TestSync(t *testing.T) {
 		t.Fatal("sync should be non-blocking but did not return after 1s!")
 		t.Fatal("sync should be non-blocking but did not return after 1s!")
 	}
 	}
 
 
-	testutil.WaitSchedule()
-
-	action := n.Action()
+	action, _ := n.Wait(1)
 	if len(action) != 1 {
 	if len(action) != 1 {
 		t.Fatalf("len(action) = %d, want 1", len(action))
 		t.Fatalf("len(action) = %d, want 1", len(action))
 	}
 	}
@@ -697,10 +695,8 @@ func TestSyncTimeout(t *testing.T) {
 		t.Fatal("sync should be non-blocking but did not return after 1s!")
 		t.Fatal("sync should be non-blocking but did not return after 1s!")
 	}
 	}
 
 
-	// give time for goroutine in sync to cancel
-	testutil.WaitSchedule()
 	w := []testutil.Action{{Name: "Propose blocked"}}
 	w := []testutil.Action{{Name: "Propose blocked"}}
-	if g := n.Action(); !reflect.DeepEqual(g, w) {
+	if g, _ := n.Wait(1); !reflect.DeepEqual(g, w) {
 		t.Errorf("action = %v, want %v", g, w)
 		t.Errorf("action = %v, want %v", g, w)
 	}
 	}
 }
 }
@@ -723,19 +719,22 @@ func TestSyncTrigger(t *testing.T) {
 		SyncTicker: st,
 		SyncTicker: st,
 		reqIDGen:   idutil.NewGenerator(0, time.Time{}),
 		reqIDGen:   idutil.NewGenerator(0, time.Time{}),
 	}
 	}
-	srv.start()
-	defer srv.Stop()
+
 	// trigger the server to become a leader and accept sync requests
 	// trigger the server to become a leader and accept sync requests
-	n.readyc <- raft.Ready{
-		SoftState: &raft.SoftState{
-			RaftState: raft.StateLeader,
-		},
-	}
-	// trigger a sync request
-	st <- time.Time{}
-	testutil.WaitSchedule()
+	go func() {
+		srv.start()
+		n.readyc <- raft.Ready{
+			SoftState: &raft.SoftState{
+				RaftState: raft.StateLeader,
+			},
+		}
+		// trigger a sync request
+		st <- time.Time{}
+	}()
+
+	action, _ := n.Wait(1)
+	go srv.Stop()
 
 
-	action := n.Action()
 	if len(action) != 1 {
 	if len(action) != 1 {
 		t.Fatalf("len(action) = %d, want 1", len(action))
 		t.Fatalf("len(action) = %d, want 1", len(action))
 	}
 	}
@@ -750,6 +749,9 @@ func TestSyncTrigger(t *testing.T) {
 	if req.Method != "SYNC" {
 	if req.Method != "SYNC" {
 		t.Fatalf("unexpected proposed request: %#v", req.Method)
 		t.Fatalf("unexpected proposed request: %#v", req.Method)
 	}
 	}
+
+	// wait on stop message
+	<-n.Chan()
 }
 }
 
 
 // snapshot should snapshot the store and cut the persistent
 // snapshot should snapshot the store and cut the persistent
@@ -768,8 +770,7 @@ func TestSnapshot(t *testing.T) {
 		store: st,
 		store: st,
 	}
 	}
 	srv.snapshot(1, raftpb.ConfState{Nodes: []uint64{1}})
 	srv.snapshot(1, raftpb.ConfState{Nodes: []uint64{1}})
-	testutil.WaitSchedule()
-	gaction := st.Action()
+	gaction, _ := st.Wait(2)
 	if len(gaction) != 2 {
 	if len(gaction) != 2 {
 		t.Fatalf("len(action) = %d, want 1", len(gaction))
 		t.Fatalf("len(action) = %d, want 1", len(gaction))
 	}
 	}
@@ -809,14 +810,14 @@ func TestTriggerSnap(t *testing.T) {
 	for i := 0; i < snapc+1; i++ {
 	for i := 0; i < snapc+1; i++ {
 		srv.Do(context.Background(), pb.Request{Method: "PUT"})
 		srv.Do(context.Background(), pb.Request{Method: "PUT"})
 	}
 	}
+
+	wcnt := 2 + snapc
+	gaction, _ := p.Wait(wcnt)
+
 	srv.Stop()
 	srv.Stop()
-	// wait for snapshot goroutine to finish
-	testutil.WaitSchedule()
 
 
-	gaction := p.Action()
 	// each operation is recorded as a Save
 	// each operation is recorded as a Save
 	// (SnapCount+1) * Puts + SaveSnap = (SnapCount+1) * Save + SaveSnap
 	// (SnapCount+1) * Puts + SaveSnap = (SnapCount+1) * Save + SaveSnap
-	wcnt := 2 + snapc
 	if len(gaction) != wcnt {
 	if len(gaction) != wcnt {
 		t.Fatalf("len(action) = %d, want %d", len(gaction), wcnt)
 		t.Fatalf("len(action) = %d, want %d", len(gaction), wcnt)
 	}
 	}
@@ -832,7 +833,7 @@ func TestConcurrentApplyAndSnapshotV3(t *testing.T) {
 		// snapshots that may queue up at once without dropping
 		// snapshots that may queue up at once without dropping
 		maxInFlightMsgSnap = 16
 		maxInFlightMsgSnap = 16
 	)
 	)
-	n := newReadyNode()
+	n := newNopReadyNode()
 	cl := newCluster("abc")
 	cl := newCluster("abc")
 	cl.SetStore(store.New())
 	cl.SetStore(store.New())
 
 
@@ -922,7 +923,7 @@ func TestConcurrentApplyAndSnapshotV3(t *testing.T) {
 // TestRecvSnapshot tests when it receives a snapshot from raft leader,
 // TestRecvSnapshot tests when it receives a snapshot from raft leader,
 // it should trigger storage.SaveSnap and also store.Recover.
 // it should trigger storage.SaveSnap and also store.Recover.
 func TestRecvSnapshot(t *testing.T) {
 func TestRecvSnapshot(t *testing.T) {
-	n := newReadyNode()
+	n := newNopReadyNode()
 	st := store.NewRecorder()
 	st := store.NewRecorder()
 	p := &storageRecorder{}
 	p := &storageRecorder{}
 	cl := newCluster("abc")
 	cl := newCluster("abc")
@@ -962,7 +963,7 @@ func TestRecvSnapshot(t *testing.T) {
 // TestApplySnapshotAndCommittedEntries tests that server applies snapshot
 // TestApplySnapshotAndCommittedEntries tests that server applies snapshot
 // first and then committed entries.
 // first and then committed entries.
 func TestApplySnapshotAndCommittedEntries(t *testing.T) {
 func TestApplySnapshotAndCommittedEntries(t *testing.T) {
-	n := newReadyNode()
+	n := newNopReadyNode()
 	st := store.NewRecorder()
 	st := store.NewRecorder()
 	cl := newCluster("abc")
 	cl := newCluster("abc")
 	cl.SetStore(store.New())
 	cl.SetStore(store.New())
@@ -988,10 +989,9 @@ func TestApplySnapshotAndCommittedEntries(t *testing.T) {
 		},
 		},
 	}
 	}
 	// make goroutines move forward to receive snapshot
 	// make goroutines move forward to receive snapshot
-	testutil.WaitSchedule()
+	actions, _ := st.Wait(2)
 	s.Stop()
 	s.Stop()
 
 
-	actions := st.Action()
 	if len(actions) != 2 {
 	if len(actions) != 2 {
 		t.Fatalf("len(action) = %d, want 2", len(actions))
 		t.Fatalf("len(action) = %d, want 2", len(actions))
 	}
 	}
@@ -1374,8 +1374,14 @@ type readyNode struct {
 }
 }
 
 
 func newReadyNode() *readyNode {
 func newReadyNode() *readyNode {
+	return &readyNode{
+		nodeRecorder{testutil.NewRecorderStream()},
+		make(chan raft.Ready, 1)}
+}
+func newNopReadyNode() *readyNode {
 	return &readyNode{*newNodeRecorder(), make(chan raft.Ready, 1)}
 	return &readyNode{*newNodeRecorder(), make(chan raft.Ready, 1)}
 }
 }
+
 func (n *readyNode) Ready() <-chan raft.Ready { return n.readyc }
 func (n *readyNode) Ready() <-chan raft.Ready { return n.readyc }
 
 
 type nodeConfChangeCommitterRecorder struct {
 type nodeConfChangeCommitterRecorder struct {
@@ -1384,8 +1390,9 @@ type nodeConfChangeCommitterRecorder struct {
 }
 }
 
 
 func newNodeConfChangeCommitterRecorder() *nodeConfChangeCommitterRecorder {
 func newNodeConfChangeCommitterRecorder() *nodeConfChangeCommitterRecorder {
-	return &nodeConfChangeCommitterRecorder{*newReadyNode(), 0}
+	return &nodeConfChangeCommitterRecorder{*newNopReadyNode(), 0}
 }
 }
+
 func (n *nodeConfChangeCommitterRecorder) ProposeConfChange(ctx context.Context, conf raftpb.ConfChange) error {
 func (n *nodeConfChangeCommitterRecorder) ProposeConfChange(ctx context.Context, conf raftpb.ConfChange) error {
 	data, err := conf.Marshal()
 	data, err := conf.Marshal()
 	if err != nil {
 	if err != nil {
@@ -1411,7 +1418,7 @@ type nodeCommitter struct {
 }
 }
 
 
 func newNodeCommitter() raft.Node {
 func newNodeCommitter() raft.Node {
-	return &nodeCommitter{*newReadyNode(), 0}
+	return &nodeCommitter{*newNopReadyNode(), 0}
 }
 }
 func (n *nodeCommitter) Propose(ctx context.Context, data []byte) error {
 func (n *nodeCommitter) Propose(ctx context.Context, data []byte) error {
 	n.index++
 	n.index++