|
|
@@ -156,7 +156,7 @@ func TestDoBadLocalAction(t *testing.T) {
|
|
|
|
|
|
// TestApplyRepeat tests that server handles repeat raft messages gracefully
|
|
|
func TestApplyRepeat(t *testing.T) {
|
|
|
- n := newNodeConfChangeCommitterRecorder()
|
|
|
+ n := newNodeConfChangeCommitterStream()
|
|
|
n.readyc <- raft.Ready{
|
|
|
SoftState: &raft.SoftState{RaftState: raft.StateLeader},
|
|
|
}
|
|
|
@@ -191,7 +191,14 @@ func TestApplyRepeat(t *testing.T) {
|
|
|
Data: pbutil.MustMarshal(cc),
|
|
|
}}
|
|
|
n.readyc <- raft.Ready{CommittedEntries: ents}
|
|
|
+ // wait for conf change message
|
|
|
act, err := n.Wait(1)
|
|
|
+ // wait for stop message (async to avoid deadlock)
|
|
|
+ stopc := make(chan error)
|
|
|
+ go func() {
|
|
|
+ _, werr := n.Wait(1)
|
|
|
+ stopc <- werr
|
|
|
+ }()
|
|
|
s.Stop()
|
|
|
|
|
|
// only want to confirm etcdserver won't panic; no data to check
|
|
|
@@ -202,6 +209,10 @@ func TestApplyRepeat(t *testing.T) {
|
|
|
if len(act) == 0 {
|
|
|
t.Fatalf("expected len(act)=0, got %d", len(act))
|
|
|
}
|
|
|
+
|
|
|
+ if err = <-stopc; err != nil {
|
|
|
+ t.Fatalf("error on stop (%v)", err)
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
func TestApplyRequest(t *testing.T) {
|
|
|
@@ -1448,6 +1459,10 @@ func newNodeConfChangeCommitterRecorder() *nodeConfChangeCommitterRecorder {
|
|
|
return &nodeConfChangeCommitterRecorder{*newNopReadyNode(), 0}
|
|
|
}
|
|
|
|
|
|
+func newNodeConfChangeCommitterStream() *nodeConfChangeCommitterRecorder {
|
|
|
+ return &nodeConfChangeCommitterRecorder{*newReadyNode(), 0}
|
|
|
+}
|
|
|
+
|
|
|
func (n *nodeConfChangeCommitterRecorder) ProposeConfChange(ctx context.Context, conf raftpb.ConfChange) error {
|
|
|
data, err := conf.Marshal()
|
|
|
if err != nil {
|