Ver código fonte

Merge pull request #5494 from xiang90/refactor_rafthttp

rafthttp: remove the newPipeline func
Xiang Li 9 anos atrás
pai
commit
e39f436728
5 arquivos alterados com 70 adições e 55 exclusões
  1. 12 1
      rafthttp/peer.go
  2. 15 27
      rafthttp/pipeline.go
  3. 30 24
      rafthttp/pipeline_test.go
  4. 12 2
      rafthttp/remote.go
  5. 1 1
      rafthttp/transport.go

+ 12 - 1
rafthttp/peer.go

@@ -120,6 +120,17 @@ type peer struct {
 func startPeer(transport *Transport, urls types.URLs, local, to, cid types.ID, r Raft, fs *stats.FollowerStats, errorc chan error) *peer {
 	status := newPeerStatus(to)
 	picker := newURLPicker(urls)
+	pipeline := &pipeline{
+		to:            to,
+		tr:            transport,
+		picker:        picker,
+		status:        status,
+		followerStats: fs,
+		raft:          r,
+		errorc:        errorc,
+	}
+	pipeline.start()
+
 	p := &peer{
 		id:             to,
 		r:              r,
@@ -127,7 +138,7 @@ func startPeer(transport *Transport, urls types.URLs, local, to, cid types.ID, r
 		picker:         picker,
 		msgAppV2Writer: startStreamWriter(to, status, fs, r),
 		writer:         startStreamWriter(to, status, fs, r),
-		pipeline:       newPipeline(transport, picker, local, to, cid, status, fs, r, errorc),
+		pipeline:       pipeline,
 		snapSender:     newSnapshotSender(transport, picker, local, to, cid, status, r, errorc),
 		sendc:          make(chan raftpb.Message),
 		recvc:          make(chan raftpb.Message, recvBufSize),

+ 15 - 27
rafthttp/pipeline.go

@@ -41,15 +41,15 @@ const (
 var errStopped = errors.New("stopped")
 
 type pipeline struct {
-	from, to types.ID
-	cid      types.ID
+	to types.ID
 
 	tr     *Transport
 	picker *urlPicker
 	status *peerStatus
-	fs     *stats.FollowerStats
-	r      Raft
+	raft   Raft
 	errorc chan error
+	// deprecate when we depercate v2 API
+	followerStats *stats.FollowerStats
 
 	msgc chan raftpb.Message
 	// wait for the handling routines
@@ -57,25 +57,13 @@ type pipeline struct {
 	stopc chan struct{}
 }
 
-func newPipeline(tr *Transport, picker *urlPicker, from, to, cid types.ID, status *peerStatus, fs *stats.FollowerStats, r Raft, errorc chan error) *pipeline {
-	p := &pipeline{
-		from:   from,
-		to:     to,
-		cid:    cid,
-		tr:     tr,
-		picker: picker,
-		status: status,
-		fs:     fs,
-		r:      r,
-		errorc: errorc,
-		stopc:  make(chan struct{}),
-		msgc:   make(chan raftpb.Message, pipelineBufSize),
-	}
+func (p *pipeline) start() {
+	p.stopc = make(chan struct{})
+	p.msgc = make(chan raftpb.Message, pipelineBufSize)
 	p.wg.Add(connPerPipeline)
 	for i := 0; i < connPerPipeline; i++ {
 		go p.handle()
 	}
-	return p
 }
 
 func (p *pipeline) stop() {
@@ -96,22 +84,22 @@ func (p *pipeline) handle() {
 			if err != nil {
 				p.status.deactivate(failureType{source: pipelineMsg, action: "write"}, err.Error())
 
-				if m.Type == raftpb.MsgApp && p.fs != nil {
-					p.fs.Fail()
+				if m.Type == raftpb.MsgApp && p.followerStats != nil {
+					p.followerStats.Fail()
 				}
-				p.r.ReportUnreachable(m.To)
+				p.raft.ReportUnreachable(m.To)
 				if isMsgSnap(m) {
-					p.r.ReportSnapshot(m.To, raft.SnapshotFailure)
+					p.raft.ReportSnapshot(m.To, raft.SnapshotFailure)
 				}
 				continue
 			}
 
 			p.status.activate()
-			if m.Type == raftpb.MsgApp && p.fs != nil {
-				p.fs.Succ(end.Sub(start))
+			if m.Type == raftpb.MsgApp && p.followerStats != nil {
+				p.followerStats.Succ(end.Sub(start))
 			}
 			if isMsgSnap(m) {
-				p.r.ReportSnapshot(m.To, raft.SnapshotFinish)
+				p.raft.ReportSnapshot(m.To, raft.SnapshotFinish)
 			}
 			sentBytes.WithLabelValues(types.ID(m.To).String()).Add(float64(m.Size()))
 		case <-p.stopc:
@@ -124,7 +112,7 @@ func (p *pipeline) handle() {
 // error on any failure.
 func (p *pipeline) post(data []byte) (err error) {
 	u := p.picker.pick()
-	req := createPostRequest(u, RaftPrefix, bytes.NewBuffer(data), "application/protobuf", p.tr.URLs, p.from, p.cid)
+	req := createPostRequest(u, RaftPrefix, bytes.NewBuffer(data), "application/protobuf", p.tr.URLs, p.tr.ID, p.tr.ClusterID)
 
 	done := make(chan struct{}, 1)
 	cancel := httputil.RequestCanceler(p.tr.pipelineRt, req)

+ 30 - 24
rafthttp/pipeline_test.go

@@ -36,9 +36,8 @@ import (
 func TestPipelineSend(t *testing.T) {
 	tr := &roundTripperRecorder{}
 	picker := mustNewURLPicker(t, []string{"http://localhost:2380"})
-	fs := &stats.FollowerStats{}
 	tp := &Transport{pipelineRt: tr}
-	p := newPipeline(tp, picker, types.ID(2), types.ID(1), types.ID(1), newPeerStatus(types.ID(1)), fs, &fakeRaft{}, nil)
+	p := startTestPipeline(tp, picker)
 
 	p.msgc <- raftpb.Message{Type: raftpb.MsgApp}
 	testutil.WaitSchedule()
@@ -47,10 +46,8 @@ func TestPipelineSend(t *testing.T) {
 	if tr.Request() == nil {
 		t.Errorf("sender fails to post the data")
 	}
-	fs.Lock()
-	defer fs.Unlock()
-	if fs.Counts.Success != 1 {
-		t.Errorf("success = %d, want 1", fs.Counts.Success)
+	if p.followerStats.Counts.Success != 1 {
+		t.Errorf("success = %d, want 1", p.followerStats.Counts.Success)
 	}
 }
 
@@ -59,9 +56,8 @@ func TestPipelineSend(t *testing.T) {
 func TestPipelineKeepSendingWhenPostError(t *testing.T) {
 	tr := &respRoundTripper{rec: testutil.NewRecorderStream(), err: fmt.Errorf("roundtrip error")}
 	picker := mustNewURLPicker(t, []string{"http://localhost:2380"})
-	fs := &stats.FollowerStats{}
 	tp := &Transport{pipelineRt: tr}
-	p := newPipeline(tp, picker, types.ID(2), types.ID(1), types.ID(1), newPeerStatus(types.ID(1)), fs, &fakeRaft{}, nil)
+	p := startTestPipeline(tp, picker)
 	defer p.stop()
 
 	for i := 0; i < 50; i++ {
@@ -77,9 +73,9 @@ func TestPipelineKeepSendingWhenPostError(t *testing.T) {
 func TestPipelineExceedMaximumServing(t *testing.T) {
 	tr := newRoundTripperBlocker()
 	picker := mustNewURLPicker(t, []string{"http://localhost:2380"})
-	fs := &stats.FollowerStats{}
 	tp := &Transport{pipelineRt: tr}
-	p := newPipeline(tp, picker, types.ID(2), types.ID(1), types.ID(1), newPeerStatus(types.ID(1)), fs, &fakeRaft{}, nil)
+	p := startTestPipeline(tp, picker)
+	defer p.stop()
 
 	// keep the sender busy and make the buffer full
 	// nothing can go out as we block the sender
@@ -111,33 +107,29 @@ func TestPipelineExceedMaximumServing(t *testing.T) {
 	default:
 		t.Errorf("failed to send out message")
 	}
-	p.stop()
 }
 
 // TestPipelineSendFailed tests that when send func meets the post error,
 // it increases fail count in stats.
 func TestPipelineSendFailed(t *testing.T) {
 	picker := mustNewURLPicker(t, []string{"http://localhost:2380"})
-	fs := &stats.FollowerStats{}
 	tp := &Transport{pipelineRt: newRespRoundTripper(0, errors.New("blah"))}
-	p := newPipeline(tp, picker, types.ID(2), types.ID(1), types.ID(1), newPeerStatus(types.ID(1)), fs, &fakeRaft{}, nil)
+	p := startTestPipeline(tp, picker)
 
 	p.msgc <- raftpb.Message{Type: raftpb.MsgApp}
 	testutil.WaitSchedule()
 	p.stop()
 
-	fs.Lock()
-	defer fs.Unlock()
-	if fs.Counts.Fail != 1 {
-		t.Errorf("fail = %d, want 1", fs.Counts.Fail)
+	if p.followerStats.Counts.Fail != 1 {
+		t.Errorf("fail = %d, want 1", p.followerStats.Counts.Fail)
 	}
 }
 
 func TestPipelinePost(t *testing.T) {
 	tr := &roundTripperRecorder{}
 	picker := mustNewURLPicker(t, []string{"http://localhost:2380"})
-	tp := &Transport{pipelineRt: tr}
-	p := newPipeline(tp, picker, types.ID(2), types.ID(1), types.ID(1), newPeerStatus(types.ID(1)), nil, &fakeRaft{}, nil)
+	tp := &Transport{ClusterID: types.ID(1), pipelineRt: tr}
+	p := startTestPipeline(tp, picker)
 	if err := p.post([]byte("some data")); err != nil {
 		t.Fatalf("unexpected post error: %v", err)
 	}
@@ -185,7 +177,7 @@ func TestPipelinePostBad(t *testing.T) {
 	for i, tt := range tests {
 		picker := mustNewURLPicker(t, []string{tt.u})
 		tp := &Transport{pipelineRt: newRespRoundTripper(tt.code, tt.err)}
-		p := newPipeline(tp, picker, types.ID(2), types.ID(1), types.ID(1), newPeerStatus(types.ID(1)), nil, &fakeRaft{}, make(chan error))
+		p := startTestPipeline(tp, picker)
 		err := p.post([]byte("some data"))
 		p.stop()
 
@@ -205,13 +197,12 @@ func TestPipelinePostErrorc(t *testing.T) {
 	}
 	for i, tt := range tests {
 		picker := mustNewURLPicker(t, []string{tt.u})
-		errorc := make(chan error, 1)
 		tp := &Transport{pipelineRt: newRespRoundTripper(tt.code, tt.err)}
-		p := newPipeline(tp, picker, types.ID(2), types.ID(1), types.ID(1), newPeerStatus(types.ID(1)), nil, &fakeRaft{}, errorc)
+		p := startTestPipeline(tp, picker)
 		p.post([]byte("some data"))
 		p.stop()
 		select {
-		case <-errorc:
+		case <-p.errorc:
 		default:
 			t.Fatalf("#%d: cannot receive from errorc", i)
 		}
@@ -221,7 +212,7 @@ func TestPipelinePostErrorc(t *testing.T) {
 func TestStopBlockedPipeline(t *testing.T) {
 	picker := mustNewURLPicker(t, []string{"http://localhost:2380"})
 	tp := &Transport{pipelineRt: newRoundTripperBlocker()}
-	p := newPipeline(tp, picker, types.ID(2), types.ID(1), types.ID(1), newPeerStatus(types.ID(1)), nil, &fakeRaft{}, nil)
+	p := startTestPipeline(tp, picker)
 	// send many messages that most of them will be blocked in buffer
 	for i := 0; i < connPerPipeline*10; i++ {
 		p.msgc <- raftpb.Message{}
@@ -307,3 +298,18 @@ type nopReadCloser struct{}
 
 func (n *nopReadCloser) Read(p []byte) (int, error) { return 0, io.EOF }
 func (n *nopReadCloser) Close() error               { return nil }
+
+func startTestPipeline(tr *Transport, picker *urlPicker) *pipeline {
+	p := &pipeline{
+		tr:     tr,
+		picker: picker,
+
+		to:            types.ID(1),
+		status:        newPeerStatus(types.ID(1)),
+		raft:          &fakeRaft{},
+		followerStats: &stats.FollowerStats{},
+		errorc:        make(chan error, 1),
+	}
+	p.start()
+	return p
+}

+ 12 - 2
rafthttp/remote.go

@@ -25,13 +25,23 @@ type remote struct {
 	pipeline *pipeline
 }
 
-func startRemote(tr *Transport, urls types.URLs, local, to, cid types.ID, r Raft, errorc chan error) *remote {
+func startRemote(tr *Transport, urls types.URLs, to types.ID, r Raft, errorc chan error) *remote {
 	picker := newURLPicker(urls)
 	status := newPeerStatus(to)
+	pipeline := &pipeline{
+		to:     to,
+		tr:     tr,
+		picker: picker,
+		status: status,
+		raft:   r,
+		errorc: errorc,
+	}
+	pipeline.start()
+
 	return &remote{
 		id:       to,
 		status:   status,
-		pipeline: newPipeline(tr, picker, local, to, cid, status, nil, r, errorc),
+		pipeline: pipeline,
 	}
 }
 

+ 1 - 1
rafthttp/transport.go

@@ -225,7 +225,7 @@ func (t *Transport) AddRemote(id types.ID, us []string) {
 	if err != nil {
 		plog.Panicf("newURLs %+v should never fail: %+v", us, err)
 	}
-	t.remotes[id] = startRemote(t, urls, t.ID, id, t.ClusterID, t.Raft, t.ErrorC)
+	t.remotes[id] = startRemote(t, urls, id, t.Raft, t.ErrorC)
 }
 
 func (t *Transport) AddPeer(id types.ID, us []string) {