|
|
@@ -57,23 +57,20 @@ func TestPipelineSend(t *testing.T) {
|
|
|
// TestPipelineKeepSendingWhenPostError tests that pipeline can keep
|
|
|
// sending messages if previous messages meet post error.
|
|
|
func TestPipelineKeepSendingWhenPostError(t *testing.T) {
|
|
|
- tr := &respRoundTripper{err: fmt.Errorf("roundtrip error")}
|
|
|
+ tr := &respRoundTripper{rec: testutil.NewRecorderStream(), err: fmt.Errorf("roundtrip error")}
|
|
|
picker := mustNewURLPicker(t, []string{"http://localhost:2380"})
|
|
|
fs := &stats.FollowerStats{}
|
|
|
tp := &Transport{pipelineRt: tr}
|
|
|
p := newPipeline(tp, picker, types.ID(2), types.ID(1), types.ID(1), newPeerStatus(types.ID(1)), fs, &fakeRaft{}, nil)
|
|
|
+ defer p.stop()
|
|
|
|
|
|
for i := 0; i < 50; i++ {
|
|
|
p.msgc <- raftpb.Message{Type: raftpb.MsgApp}
|
|
|
}
|
|
|
- testutil.WaitSchedule()
|
|
|
- p.stop()
|
|
|
|
|
|
- // check it send out 50 requests
|
|
|
- tr.mu.Lock()
|
|
|
- defer tr.mu.Unlock()
|
|
|
- if tr.reqCount != 50 {
|
|
|
- t.Errorf("request count = %d, want 50", tr.reqCount)
|
|
|
+ _, err := tr.rec.Wait(50)
|
|
|
+ if err != nil {
|
|
|
+ t.Errorf("unexpected wait error %v", err)
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@@ -269,8 +266,8 @@ func (t *roundTripperBlocker) CancelRequest(req *http.Request) {
|
|
|
}
|
|
|
|
|
|
type respRoundTripper struct {
|
|
|
- mu sync.Mutex
|
|
|
- reqCount int
|
|
|
+ mu sync.Mutex
|
|
|
+ rec testutil.Recorder
|
|
|
|
|
|
code int
|
|
|
header http.Header
|
|
|
@@ -283,7 +280,9 @@ func newRespRoundTripper(code int, err error) *respRoundTripper {
|
|
|
func (t *respRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
|
|
|
t.mu.Lock()
|
|
|
defer t.mu.Unlock()
|
|
|
- t.reqCount++
|
|
|
+ if t.rec != nil {
|
|
|
+ t.rec.Record(testutil.Action{Name: "req", Params: []interface{}{req}})
|
|
|
+ }
|
|
|
return &http.Response{StatusCode: t.code, Header: t.header, Body: &nopReadCloser{}}, t.err
|
|
|
}
|
|
|
|