Browse Source

rafthttp: fix wrong return in pipeline.handle

pipeline.handle is a long-living one, and should continue to receive
next message to send out when current message fails to send. So it
should `continue` instead of `return` here.
Yicheng Qin 10 years ago
parent
commit
f648d52afe
2 changed files with 23 additions and 1 deletions
  1. 1 1
      rafthttp/pipeline.go
  2. 22 0
      rafthttp/pipeline_test.go

+ 1 - 1
rafthttp/pipeline.go

@@ -106,7 +106,7 @@ func (p *pipeline) handle() {
 			if isMsgSnap(m) {
 				p.r.ReportSnapshot(m.To, raft.SnapshotFailure)
 			}
-			return
+			continue
 		}
 
 		p.status.activate()

+ 22 - 0
rafthttp/pipeline_test.go

@@ -52,6 +52,28 @@ func TestPipelineSend(t *testing.T) {
 	}
 }
 
+// TestPipelineKeepSending tests that pipeline could keep sending messages
+// if there are messages in msgc channel.
+func TestPipelineKeepSending(t *testing.T) {
+	tr := &roundTripperRecorder{}
+	picker := mustNewURLPicker(t, []string{"http://localhost:2380"})
+	fs := &stats.FollowerStats{}
+	p := newPipeline(tr, picker, types.ID(2), types.ID(1), types.ID(1), newPeerStatus(types.ID(1)), fs, &fakeRaft{}, nil)
+
+	for i := 0; i < 50; i++ {
+		p.msgc <- raftpb.Message{Type: raftpb.MsgApp}
+	}
+	testutil.WaitSchedule()
+	p.stop()
+
+	// check it sends all messages out successfully
+	fs.Lock()
+	defer fs.Unlock()
+	if fs.Counts.Success != 50 {
+		t.Errorf("success = %d, want 50", fs.Counts.Success)
+	}
+}
+
 func TestPipelineExceedMaximumServing(t *testing.T) {
 	tr := newRoundTripperBlocker()
 	picker := mustNewURLPicker(t, []string{"http://localhost:2380"})