|
@@ -1227,7 +1227,7 @@ func TestPublishStopped(t *testing.T) {
|
|
|
|
|
|
|
|
// TestPublishRetry tests that publish will keep retry until success.
|
|
// TestPublishRetry tests that publish will keep retry until success.
|
|
|
func TestPublishRetry(t *testing.T) {
|
|
func TestPublishRetry(t *testing.T) {
|
|
|
- n := newNodeRecorder()
|
|
|
|
|
|
|
+ n := newNodeRecorderStream()
|
|
|
srv := &EtcdServer{
|
|
srv := &EtcdServer{
|
|
|
Cfg: &ServerConfig{TickMs: 1},
|
|
Cfg: &ServerConfig{TickMs: 1},
|
|
|
r: raftNode{Node: n},
|
|
r: raftNode{Node: n},
|
|
@@ -1235,15 +1235,27 @@ func TestPublishRetry(t *testing.T) {
|
|
|
stopping: make(chan struct{}),
|
|
stopping: make(chan struct{}),
|
|
|
reqIDGen: idutil.NewGenerator(0, time.Time{}),
|
|
reqIDGen: idutil.NewGenerator(0, time.Time{}),
|
|
|
}
|
|
}
|
|
|
- // TODO: use fakeClockwork
|
|
|
|
|
- time.AfterFunc(10*time.Millisecond, func() { close(srv.stopping) })
|
|
|
|
|
|
|
+ // expect multiple proposals from retrying
|
|
|
|
|
+ ch := make(chan struct{})
|
|
|
|
|
+ go func() {
|
|
|
|
|
+ defer close(ch)
|
|
|
|
|
+ if action, err := n.Wait(2); err != nil {
|
|
|
|
|
+ t.Errorf("len(action) = %d, want >= 2 (%v)", len(action), err)
|
|
|
|
|
+ }
|
|
|
|
|
+ close(srv.stopping)
|
|
|
|
|
+ // drain remaing actions, if any, so publish can terminate
|
|
|
|
|
+ for {
|
|
|
|
|
+ select {
|
|
|
|
|
+ case <-ch:
|
|
|
|
|
+ return
|
|
|
|
|
+ default:
|
|
|
|
|
+ n.Action()
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ }()
|
|
|
srv.publish(10 * time.Nanosecond)
|
|
srv.publish(10 * time.Nanosecond)
|
|
|
-
|
|
|
|
|
- action := n.Action()
|
|
|
|
|
- // multiple Proposes
|
|
|
|
|
- if cnt := len(action); cnt < 2 {
|
|
|
|
|
- t.Errorf("len(action) = %d, want >= 2", cnt)
|
|
|
|
|
- }
|
|
|
|
|
|
|
+ ch <- struct{}{}
|
|
|
|
|
+ <-ch
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
func TestUpdateVersion(t *testing.T) {
|
|
func TestUpdateVersion(t *testing.T) {
|
|
@@ -1350,8 +1362,9 @@ func TestGetOtherPeerURLs(t *testing.T) {
|
|
|
|
|
|
|
|
type nodeRecorder struct{ testutil.Recorder }
|
|
type nodeRecorder struct{ testutil.Recorder }
|
|
|
|
|
|
|
|
-func newNodeRecorder() *nodeRecorder { return &nodeRecorder{&testutil.RecorderBuffered{}} }
|
|
|
|
|
-func newNodeNop() raft.Node { return newNodeRecorder() }
|
|
|
|
|
|
|
+func newNodeRecorder() *nodeRecorder { return &nodeRecorder{&testutil.RecorderBuffered{}} }
|
|
|
|
|
+func newNodeRecorderStream() *nodeRecorder { return &nodeRecorder{testutil.NewRecorderStream()} }
|
|
|
|
|
+func newNodeNop() raft.Node { return newNodeRecorder() }
|
|
|
|
|
|
|
|
func (n *nodeRecorder) Tick() { n.Record(testutil.Action{Name: "Tick"}) }
|
|
func (n *nodeRecorder) Tick() { n.Record(testutil.Action{Name: "Tick"}) }
|
|
|
func (n *nodeRecorder) Campaign(ctx context.Context) error {
|
|
func (n *nodeRecorder) Campaign(ctx context.Context) error {
|