Browse Source

Merge pull request #3750 from yichengq/rafthttp-continue

rafthttp: fix wrong return in pipeline.handle
Yicheng Qin 10 years ago
parent
commit
4766227b76
2 changed files with 30 additions and 1 deletions
  1. 1 1
      rafthttp/pipeline.go
  2. 29 0
      rafthttp/pipeline_test.go

+ 1 - 1
rafthttp/pipeline.go

@@ -106,7 +106,7 @@ func (p *pipeline) handle() {
 			if isMsgSnap(m) {
 			if isMsgSnap(m) {
 				p.r.ReportSnapshot(m.To, raft.SnapshotFailure)
 				p.r.ReportSnapshot(m.To, raft.SnapshotFailure)
 			}
 			}
-			return
+			continue
 		}
 		}
 
 
 		p.status.activate()
 		p.status.activate()

+ 29 - 0
rafthttp/pipeline_test.go

@@ -16,6 +16,7 @@ package rafthttp
 
 
 import (
 import (
 	"errors"
 	"errors"
+	"fmt"
 	"io"
 	"io"
 	"io/ioutil"
 	"io/ioutil"
 	"net/http"
 	"net/http"
@@ -52,6 +53,28 @@ func TestPipelineSend(t *testing.T) {
 	}
 	}
 }
 }
 
 
+// TestPipelineKeepSendingWhenPostError tests that pipeline can keep
+// sending messages if previous messages meet post error.
+func TestPipelineKeepSendingWhenPostError(t *testing.T) {
+	tr := &respRoundTripper{err: fmt.Errorf("roundtrip error")}
+	picker := mustNewURLPicker(t, []string{"http://localhost:2380"})
+	fs := &stats.FollowerStats{}
+	p := newPipeline(tr, picker, types.ID(2), types.ID(1), types.ID(1), newPeerStatus(types.ID(1)), fs, &fakeRaft{}, nil)
+
+	for i := 0; i < 50; i++ {
+		p.msgc <- raftpb.Message{Type: raftpb.MsgApp}
+	}
+	testutil.WaitSchedule()
+	p.stop()
+
+	// check it send out 50 requests
+	tr.mu.Lock()
+	defer tr.mu.Unlock()
+	if tr.reqCount != 50 {
+		t.Errorf("request count = %d, want 50", tr.reqCount)
+	}
+}
+
 func TestPipelineExceedMaximumServing(t *testing.T) {
 func TestPipelineExceedMaximumServing(t *testing.T) {
 	tr := newRoundTripperBlocker()
 	tr := newRoundTripperBlocker()
 	picker := mustNewURLPicker(t, []string{"http://localhost:2380"})
 	picker := mustNewURLPicker(t, []string{"http://localhost:2380"})
@@ -236,6 +259,9 @@ func (t *roundTripperBlocker) CancelRequest(req *http.Request) {
 }
 }
 
 
 type respRoundTripper struct {
 type respRoundTripper struct {
+	mu       sync.Mutex
+	reqCount int
+
 	code   int
 	code   int
 	header http.Header
 	header http.Header
 	err    error
 	err    error
@@ -245,6 +271,9 @@ func newRespRoundTripper(code int, err error) *respRoundTripper {
 	return &respRoundTripper{code: code, err: err}
 	return &respRoundTripper{code: code, err: err}
 }
 }
 func (t *respRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
 func (t *respRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
+	t.mu.Lock()
+	defer t.mu.Unlock()
+	t.reqCount++
 	return &http.Response{StatusCode: t.code, Header: t.header, Body: &nopReadCloser{}}, t.err
 	return &http.Response{StatusCode: t.code, Header: t.header, Body: &nopReadCloser{}}, t.err
 }
 }